Skip to content

事件总线

事件总线是 LCGYL Framework 的核心特性,基于 Java 21 虚拟线程提供高性能的事件驱动编程。

什么是事件总线?

事件总线是一种发布-订阅模式的实现,允许组件之间通过事件进行松耦合的通信。

优势

  • 解耦:发布者和订阅者互不依赖
  • 异步:支持异步事件处理
  • 高性能:基于虚拟线程,轻量高效
  • 类型安全:编译时类型检查

定义事件

使用 Record

推荐使用 Java 21 的 Record 定义事件。

java
public record UserCreatedEvent(
    User user,
    LocalDateTime timestamp
) implements Event {
}

public record OrderPlacedEvent(
    Order order,
    User user,
    BigDecimal totalAmount
) implements Event {
}

使用类

也可以使用普通类。

java
public class UserLoginEvent implements Event {
    private final User user;
    private final String ipAddress;
    private final LocalDateTime loginTime;
    
    public UserLoginEvent(User user, String ipAddress) {
        this.user = user;
        this.ipAddress = ipAddress;
        this.loginTime = LocalDateTime.now();
    }
    
    // Getters
}

发布事件

同步发布

java
@Component
public class UserService {
    
    @Inject
    private EventBus eventBus;
    
    public User createUser(String name, String email) {
        User user = new User(name, email);
        userRepository.save(user);
        
        // 同步发布事件
        eventBus.publish(new UserCreatedEvent(user, LocalDateTime.now()));
        
        return user;
    }
}

异步发布

java
@Component
public class OrderService {
    
    @Inject
    private EventBus eventBus;
    
    public Order placeOrder(Order order) {
        orderRepository.save(order);
        
        // 异步发布事件(使用虚拟线程)
        eventBus.publishAsync(new OrderPlacedEvent(order, user, totalAmount));
        
        return order;
    }
}

订阅事件

使用 @Subscribe

java
@Component
public class UserEventListener {
    
    private static final Logger logger = LoggerFactory.getLogger(UserEventListener.class);
    
    @Subscribe
    public void onUserCreated(UserCreatedEvent event) {
        logger.info("用户已创建: {}", event.user().getName());
        
        // 发送欢迎邮件
        sendWelcomeEmail(event.user());
    }
    
    @Subscribe
    public void onUserLogin(UserLoginEvent event) {
        logger.info("用户登录: {} from {}", 
            event.getUser().getName(), 
            event.getIpAddress());
    }
}

多个监听器

同一个事件可以有多个监听器。

java
@Component
public class EmailNotificationListener {
    
    @Subscribe
    public void onUserCreated(UserCreatedEvent event) {
        // 发送邮件通知
        emailService.sendWelcomeEmail(event.user());
    }
}

@Component
public class StatisticsListener {
    
    @Subscribe
    public void onUserCreated(UserCreatedEvent event) {
        // 更新统计数据
        statisticsService.incrementUserCount();
    }
}

@Component
public class AuditListener {
    
    @Subscribe
    public void onUserCreated(UserCreatedEvent event) {
        // 记录审计日志
        auditService.log("USER_CREATED", event.user().getId());
    }
}

异步处理

异步监听器

java
@Component
public class AsyncEventListener {
    
    @Subscribe(async = true)
    public void onOrderPlaced(OrderPlacedEvent event) {
        // 在虚拟线程中异步执行
        processOrder(event.order());
    }
}

手动异步

java
@Component
public class ManualAsyncListener {
    
    @Subscribe
    public void onOrderPlaced(OrderPlacedEvent event) {
        // 手动创建异步任务
        CompletableFuture.runAsync(() -> {
            processOrder(event.order());
        });
    }
}

事件优先级

设置优先级

java
@Component
public class PriorityListener {
    
    @Subscribe(priority = 100)  // 高优先级
    public void onUserCreated1(UserCreatedEvent event) {
        // 先执行
    }
    
    @Subscribe(priority = 50)   // 中优先级
    public void onUserCreated2(UserCreatedEvent event) {
        // 后执行
    }
    
    @Subscribe(priority = 1)    // 低优先级
    public void onUserCreated3(UserCreatedEvent event) {
        // 最后执行
    }
}

事件过滤

条件订阅

java
@Component
public class ConditionalListener {
    
    @Subscribe
    @ConditionalOnProperty(name = "notifications.enabled", havingValue = "true")
    public void onUserCreated(UserCreatedEvent event) {
        // 只在配置启用时执行
    }
}

自定义过滤

java
@Component
public class FilteredListener {
    
    @Subscribe
    public void onOrderPlaced(OrderPlacedEvent event) {
        // 只处理大额订单
        if (event.totalAmount().compareTo(new BigDecimal("1000")) > 0) {
            notifyManager(event);
        }
    }
}

错误处理

捕获异常

java
@Component
public class SafeEventListener {
    
    private static final Logger logger = LoggerFactory.getLogger(SafeEventListener.class);
    
    @Subscribe
    public void onUserCreated(UserCreatedEvent event) {
        try {
            sendEmail(event.user());
        } catch (Exception e) {
            logger.error("发送邮件失败", e);
            // 不影响其他监听器
        }
    }
}

全局错误处理

java
@Component
public class GlobalEventErrorHandler implements EventErrorHandler {
    
    @Override
    public void handleError(Event event, Throwable error) {
        logger.error("处理事件失败: {}", event.getClass().getName(), error);
        
        // 记录到监控系统
        metricsService.recordEventError(event, error);
    }
}

事件继承

父事件

java
public interface UserEvent extends Event {
    User getUser();
}

public record UserCreatedEvent(User user) implements UserEvent {
}

public record UserUpdatedEvent(User user) implements UserEvent {
}

public record UserDeletedEvent(User user) implements UserEvent {
}

监听父事件

java
@Component
public class UserEventListener {
    
    // 监听所有 UserEvent
    @Subscribe
    public void onUserEvent(UserEvent event) {
        logger.info("用户事件: {}", event.getClass().getSimpleName());
    }
    
    // 监听特定事件
    @Subscribe
    public void onUserCreated(UserCreatedEvent event) {
        logger.info("用户创建: {}", event.user().getName());
    }
}

事件链

事件触发事件

java
@Component
public class OrderEventListener {
    
    @Inject
    private EventBus eventBus;
    
    @Subscribe
    public void onOrderPlaced(OrderPlacedEvent event) {
        // 处理订单
        processOrder(event.order());
        
        // 触发新事件
        eventBus.publish(new OrderProcessedEvent(event.order()));
    }
}

@Component
public class PaymentEventListener {
    
    @Inject
    private EventBus eventBus;
    
    @Subscribe
    public void onOrderProcessed(OrderProcessedEvent event) {
        // 处理支付
        processPayment(event.order());
        
        // 触发新事件
        eventBus.publish(new PaymentCompletedEvent(event.order()));
    }
}

事件重播

记录事件

java
@Component
public class EventRecorder {
    
    private final List<Event> eventHistory = new CopyOnWriteArrayList<>();
    
    @Subscribe
    public void recordEvent(Event event) {
        eventHistory.add(event);
    }
    
    public void replay() {
        eventHistory.forEach(eventBus::publish);
    }
}

性能优化

批量发布

java
@Component
public class BatchEventPublisher {
    
    @Inject
    private EventBus eventBus;
    
    public void publishBatch(List<Event> events) {
        eventBus.publishBatch(events);
    }
}

事件缓冲

java
@Component
public class BufferedEventPublisher {
    
    private final BlockingQueue<Event> buffer = new LinkedBlockingQueue<>(1000);
    
    @PostConstruct
    public void startPublisher() {
        Thread.ofVirtual().start(() -> {
            while (true) {
                try {
                    Event event = buffer.take();
                    eventBus.publish(event);
                } catch (InterruptedException e) {
                    break;
                }
            }
        });
    }
    
    public void bufferEvent(Event event) {
        buffer.offer(event);
    }
}

最佳实践

1. 事件命名

java
// ✅ 推荐:使用过去式
public record UserCreatedEvent(User user) implements Event {
}

public record OrderPlacedEvent(Order order) implements Event {
}

// ❌ 不推荐:使用现在式
public record UserCreateEvent(User user) implements Event {
}

2. 事件不可变

java
// ✅ 推荐:使用 Record(不可变)
public record UserCreatedEvent(User user, LocalDateTime timestamp) implements Event {
}

// ❌ 不推荐:可变事件
public class UserCreatedEvent implements Event {
    private User user;  // 可变
    
    public void setUser(User user) {
        this.user = user;
    }
}

3. 避免在监听器中修改事件数据

java
// ❌ 不推荐
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
    event.user().setName("Modified");  // 不要修改
}

4. 监听器保持简单

java
// ✅ 推荐:简单快速
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
    logger.info("用户创建: {}", event.user().getName());
    emailService.sendWelcomeEmail(event.user());
}

// ❌ 不推荐:复杂耗时
@Subscribe
public void onUserCreated(UserCreatedEvent event) {
    // 大量复杂逻辑
    // 长时间数据库操作
    // 外部 API 调用
}

5. 使用异步处理耗时操作

java
// ✅ 推荐:异步处理
@Subscribe(async = true)
public void onOrderPlaced(OrderPlacedEvent event) {
    // 耗时操作在虚拟线程中执行
    generateInvoice(event.order());
    sendNotification(event.order());
}

常见问题

Q: 事件是同步还是异步?

A: 默认同步,可以使用 @Subscribe(async = true)publishAsync() 实现异步。

Q: 事件处理顺序如何保证?

A: 使用优先级 @Subscribe(priority = 100),数字越大优先级越高。

Q: 如果监听器抛出异常会怎样?

A: 不影响其他监听器,但建议捕获异常并记录日志。

Q: 虚拟线程的性能如何?

A: 虚拟线程非常轻量,可以创建数百万个,适合高并发场景。

下一步

Released under the Apache License 2.0