事件总线
事件总线是 LCGYL Framework 的核心特性,基于 Java 21 虚拟线程提供高性能的事件驱动编程。
什么是事件总线?
事件总线是一种发布-订阅模式的实现,允许组件之间通过事件进行松耦合的通信。
优势
- 解耦:发布者和订阅者互不依赖
- 异步:支持异步事件处理
- 高性能:基于虚拟线程,轻量高效
- 类型安全:编译时类型检查
定义事件
使用 Record
推荐使用 Java 21 的 Record 定义事件。
java
public record UserCreatedEvent(
User user,
LocalDateTime timestamp
) implements Event {
}
public record OrderPlacedEvent(
Order order,
User user,
BigDecimal totalAmount
) implements Event {
}使用类
也可以使用普通类。
java
public class UserLoginEvent implements Event {
private final User user;
private final String ipAddress;
private final LocalDateTime loginTime;
public UserLoginEvent(User user, String ipAddress) {
this.user = user;
this.ipAddress = ipAddress;
this.loginTime = LocalDateTime.now();
}
// Getters
}发布事件
同步发布
java
@Component
public class UserService {
@Inject
private EventBus eventBus;
public User createUser(String name, String email) {
User user = new User(name, email);
userRepository.save(user);
// 同步发布事件
eventBus.publish(new UserCreatedEvent(user, LocalDateTime.now()));
return user;
}
}异步发布
java
@Component
public class OrderService {
@Inject
private EventBus eventBus;
public Order placeOrder(Order order) {
orderRepository.save(order);
// 异步发布事件(使用虚拟线程)
eventBus.publishAsync(new OrderPlacedEvent(order, user, totalAmount));
return order;
}
}订阅事件
使用 @Subscribe
java
@Component
public class UserEventListener {
private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class);
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
logger.info("用户已创建: {}", event.user().getName());
// 发送欢迎邮件
sendWelcomeEmail(event.user());
}
@Subscribe
public void onUserLogin(UserLoginEvent event) {
logger.info("用户登录: {} from {}",
event.getUser().getName(),
event.getIpAddress());
}
}多个监听器
同一个事件可以有多个监听器。
java
@Component
public class EmailNotificationListener {
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
// 发送邮件通知
emailService.sendWelcomeEmail(event.user());
}
}
@Component
public class StatisticsListener {
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
// 更新统计数据
statisticsService.incrementUserCount();
}
}
@Component
public class AuditListener {
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
// 记录审计日志
auditService.log("USER_CREATED", event.user().getId());
}
}异步处理
异步监听器
java
@Component
public class AsyncEventListener {
@Subscribe(async = true)
public void onOrderPlaced(OrderPlacedEvent event) {
// 在虚拟线程中异步执行
processOrder(event.order());
}
}手动异步
java
@Component
public class ManualAsyncListener {
@Subscribe
public void onOrderPlaced(OrderPlacedEvent event) {
// 手动创建异步任务
CompletableFuture.runAsync(() -> {
processOrder(event.order());
});
}
}事件优先级
设置优先级
java
@Component
public class PriorityListener {
@Subscribe(priority = 100) // 高优先级
public void onUserCreated1(UserCreatedEvent event) {
// 先执行
}
@Subscribe(priority = 50) // 中优先级
public void onUserCreated2(UserCreatedEvent event) {
// 后执行
}
@Subscribe(priority = 1) // 低优先级
public void onUserCreated3(UserCreatedEvent event) {
// 最后执行
}
}事件过滤
条件订阅
java
@Component
public class ConditionalListener {
@Subscribe
@ConditionalOnProperty(name = "notifications.enabled", havingValue = "true")
public void onUserCreated(UserCreatedEvent event) {
// 只在配置启用时执行
}
}自定义过滤
java
@Component
public class FilteredListener {
@Subscribe
public void onOrderPlaced(OrderPlacedEvent event) {
// 只处理大额订单
if (event.totalAmount().compareTo(new BigDecimal("1000")) > 0) {
notifyManager(event);
}
}
}错误处理
捕获异常
java
@Component
public class SafeEventListener {
private static final Logger logger = LoggerFactory.getLogger(SafeEventListener.class);
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
try {
sendEmail(event.user());
} catch (Exception e) {
logger.error("发送邮件失败", e);
// 不影响其他监听器
}
}
}全局错误处理
java
@Component
public class GlobalEventErrorHandler implements EventErrorHandler {
@Override
public void handleError(Event event, Throwable error) {
logger.error("处理事件失败: {}", event.getClass().getName(), error);
// 记录到监控系统
metricsService.recordEventError(event, error);
}
}事件继承
父事件
java
public interface UserEvent extends Event {
User getUser();
}
public record UserCreatedEvent(User user) implements UserEvent {
}
public record UserUpdatedEvent(User user) implements UserEvent {
}
public record UserDeletedEvent(User user) implements UserEvent {
}监听父事件
java
@Component
public class UserEventListener {
// 监听所有 UserEvent
@Subscribe
public void onUserEvent(UserEvent event) {
logger.info("用户事件: {}", event.getClass().getSimpleName());
}
// 监听特定事件
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
logger.info("用户创建: {}", event.user().getName());
}
}事件链
事件触发事件
java
@Component
public class OrderEventListener {
@Inject
private EventBus eventBus;
@Subscribe
public void onOrderPlaced(OrderPlacedEvent event) {
// 处理订单
processOrder(event.order());
// 触发新事件
eventBus.publish(new OrderProcessedEvent(event.order()));
}
}
@Component
public class PaymentEventListener {
@Inject
private EventBus eventBus;
@Subscribe
public void onOrderProcessed(OrderProcessedEvent event) {
// 处理支付
processPayment(event.order());
// 触发新事件
eventBus.publish(new PaymentCompletedEvent(event.order()));
}
}事件重播
记录事件
java
@Component
public class EventRecorder {
private final List<Event> eventHistory = new CopyOnWriteArrayList<>();
@Subscribe
public void recordEvent(Event event) {
eventHistory.add(event);
}
public void replay() {
eventHistory.forEach(eventBus::publish);
}
}性能优化
批量发布
java
@Component
public class BatchEventPublisher {
@Inject
private EventBus eventBus;
public void publishBatch(List<Event> events) {
eventBus.publishBatch(events);
}
}事件缓冲
java
@Component
public class BufferedEventPublisher {
private final BlockingQueue<Event> buffer = new LinkedBlockingQueue<>(1000);
@PostConstruct
public void startPublisher() {
Thread.ofVirtual().start(() -> {
while (true) {
try {
Event event = buffer.take();
eventBus.publish(event);
} catch (InterruptedException e) {
break;
}
}
});
}
public void bufferEvent(Event event) {
buffer.offer(event);
}
}最佳实践
1. 事件命名
java
// ✅ 推荐:使用过去式
public record UserCreatedEvent(User user) implements Event {
}
public record OrderPlacedEvent(Order order) implements Event {
}
// ❌ 不推荐:使用现在式
public record UserCreateEvent(User user) implements Event {
}2. 事件不可变
java
// ✅ 推荐:使用 Record(不可变)
public record UserCreatedEvent(User user, LocalDateTime timestamp) implements Event {
}
// ❌ 不推荐:可变事件
public class UserCreatedEvent implements Event {
private User user; // 可变
public void setUser(User user) {
this.user = user;
}
}3. 避免在监听器中修改事件数据
java
// ❌ 不推荐
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
event.user().setName("Modified"); // 不要修改
}4. 监听器保持简单
java
// ✅ 推荐:简单快速
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
logger.info("用户创建: {}", event.user().getName());
emailService.sendWelcomeEmail(event.user());
}
// ❌ 不推荐:复杂耗时
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
// 大量复杂逻辑
// 长时间数据库操作
// 外部 API 调用
}5. 使用异步处理耗时操作
java
// ✅ 推荐:异步处理
@Subscribe(async = true)
public void onOrderPlaced(OrderPlacedEvent event) {
// 耗时操作在虚拟线程中执行
generateInvoice(event.order());
sendNotification(event.order());
}常见问题
Q: 事件是同步还是异步?
A: 默认同步,可以使用 @Subscribe(async = true) 或 publishAsync() 实现异步。
Q: 事件处理顺序如何保证?
A: 使用优先级 @Subscribe(priority = 100),数字越大优先级越高。
Q: 如果监听器抛出异常会怎样?
A: 不影响其他监听器,但建议捕获异常并记录日志。
Q: 虚拟线程的性能如何?
A: 虚拟线程非常轻量,可以创建数百万个,适合高并发场景。