Skip to content

事件总线

LCGYL Framework 提供了基于 Java 21 虚拟线程的高性能事件总线,支持同步和异步事件处理。

核心概念

事件(Event)

表示系统中发生的某个动作或状态变化。

事件监听器(EventListener)

监听并处理特定类型的事件。

事件总线(EventBus)

负责事件的发布和分发。

定义事件

使用 Record(推荐)

java
import com.lcgyl.framework.core.event.Event;

public record UserCreatedEvent(
    String userId,
    String username,
    String email,
    long timestamp
) implements Event {
}

使用普通类

java
import com.lcgyl.framework.core.event.Event;

public class OrderPlacedEvent implements Event {
    private final String orderId;
    private final String userId;
    private final double amount;
    private final long timestamp;
    
    public OrderPlacedEvent(String orderId, String userId, double amount) {
        this.orderId = orderId;
        this.userId = userId;
        this.amount = amount;
        this.timestamp = System.currentTimeMillis();
    }
    
    // Getters...
}

创建事件监听器

同步监听器

java
import com.lcgyl.framework.core.annotation.Component;
import com.lcgyl.framework.core.event.EventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
public class UserCreatedListener implements EventListener<UserCreatedEvent> {
    private static final Logger logger = LoggerFactory.getLogger(UserCreatedListener.class);
    
    @Override
    public void onEvent(UserCreatedEvent event) {
        logger.info("用户创建: {} - {}", event.userId(), event.username());
        
        // 发送欢迎邮件
        sendWelcomeEmail(event.email());
    }
    
    private void sendWelcomeEmail(String email) {
        // 发送邮件逻辑
    }
}

异步监听器

java
import com.lcgyl.framework.core.event.AsyncEventListener;

@Component
public class UserCreatedAsyncListener implements AsyncEventListener<UserCreatedEvent> {
    private static final Logger logger = LoggerFactory.getLogger(UserCreatedAsyncListener.class);
    
    @Override
    public void onEvent(UserCreatedEvent event) {
        // 在虚拟线程中异步执行
        logger.info("异步处理用户创建事件: {}", event.userId());
        
        // 执行耗时操作
        performHeavyTask(event);
    }
    
    private void performHeavyTask(UserCreatedEvent event) {
        // 耗时操作,如数据分析、第三方API调用等
    }
}

发布事件

在服务中发布

java
import com.lcgyl.framework.core.annotation.Component;
import com.lcgyl.framework.core.annotation.Inject;
import com.lcgyl.framework.core.event.EventBus;

@Component
public class UserService {
    
    @Inject
    private EventBus eventBus;
    
    public void createUser(String username, String email) {
        // 创建用户逻辑
        String userId = generateUserId();
        saveUser(userId, username, email);
        
        // 发布事件
        UserCreatedEvent event = new UserCreatedEvent(
            userId,
            username,
            email,
            System.currentTimeMillis()
        );
        eventBus.publish(event);
    }
}

同步发布

java
// 同步发布,等待所有监听器执行完成
eventBus.publishSync(event);

异步发布

java
// 异步发布,立即返回
eventBus.publishAsync(event);

事件优先级

设置监听器优先级

java
import com.lcgyl.framework.core.event.EventListener;
import com.lcgyl.framework.core.event.Priority;

@Component
@Priority(100) // 数字越大优先级越高
public class HighPriorityListener implements EventListener<UserCreatedEvent> {
    @Override
    public void onEvent(UserCreatedEvent event) {
        // 高优先级处理
    }
}

@Component
@Priority(50)
public class LowPriorityListener implements EventListener<UserCreatedEvent> {
    @Override
    public void onEvent(UserCreatedEvent event) {
        // 低优先级处理
    }
}

事件过滤

条件监听

java
@Component
public class VIPUserListener implements EventListener<UserCreatedEvent> {
    
    @Override
    public void onEvent(UserCreatedEvent event) {
        // 只处理 VIP 用户
        if (isVIPUser(event.userId())) {
            sendVIPWelcomeGift(event.userId());
        }
    }
    
    private boolean isVIPUser(String userId) {
        // 判断是否为 VIP 用户
        return false;
    }
}

错误处理

监听器异常处理

java
@Component
public class SafeEventListener implements EventListener<UserCreatedEvent> {
    private static final Logger logger = LoggerFactory.getLogger(SafeEventListener.class);
    
    @Override
    public void onEvent(UserCreatedEvent event) {
        try {
            processEvent(event);
        } catch (Exception e) {
            logger.error("处理事件失败: {}", event, e);
            // 可以选择重试或记录到死信队列
        }
    }
}

全局异常处理器

java
import com.lcgyl.framework.core.event.EventExceptionHandler;

@Component
public class GlobalEventExceptionHandler implements EventExceptionHandler {
    private static final Logger logger = LoggerFactory.getLogger(GlobalEventExceptionHandler.class);
    
    @Override
    public void handleException(Event event, EventListener<?> listener, Throwable throwable) {
        logger.error("事件处理异常 - Event: {}, Listener: {}", 
            event.getClass().getSimpleName(),
            listener.getClass().getSimpleName(),
            throwable
        );
        
        // 记录到监控系统
        // 发送告警
    }
}

完整示例:订单系统

定义事件

java
// 订单创建事件
public record OrderCreatedEvent(
    String orderId,
    String userId,
    List<OrderItem> items,
    double totalAmount,
    long timestamp
) implements Event {
}

// 订单支付事件
public record OrderPaidEvent(
    String orderId,
    String paymentId,
    double amount,
    long timestamp
) implements Event {
}

// 订单发货事件
public record OrderShippedEvent(
    String orderId,
    String trackingNumber,
    long timestamp
) implements Event {
}

订单服务

java
@Component
public class OrderService {
    
    @Inject
    private EventBus eventBus;
    
    @Inject
    private OrderRepository orderRepository;
    
    public String createOrder(String userId, List<OrderItem> items) {
        // 创建订单
        Order order = new Order(userId, items);
        orderRepository.save(order);
        
        // 发布事件
        eventBus.publish(new OrderCreatedEvent(
            order.getId(),
            userId,
            items,
            order.getTotalAmount(),
            System.currentTimeMillis()
        ));
        
        return order.getId();
    }
    
    public void payOrder(String orderId, String paymentId) {
        // 处理支付
        Order order = orderRepository.findById(orderId);
        order.setPaid(true);
        orderRepository.save(order);
        
        // 发布事件
        eventBus.publish(new OrderPaidEvent(
            orderId,
            paymentId,
            order.getTotalAmount(),
            System.currentTimeMillis()
        ));
    }
}

事件监听器

java
// 库存监听器
@Component
public class InventoryListener implements EventListener<OrderCreatedEvent> {
    
    @Inject
    private InventoryService inventoryService;
    
    @Override
    public void onEvent(OrderCreatedEvent event) {
        // 扣减库存
        inventoryService.deductStock(event.items());
    }
}

// 通知监听器
@Component
public class NotificationListener implements AsyncEventListener<OrderPaidEvent> {
    
    @Inject
    private NotificationService notificationService;
    
    @Override
    public void onEvent(OrderPaidEvent event) {
        // 发送支付成功通知
        notificationService.sendPaymentSuccessNotification(event.orderId());
    }
}

// 物流监听器
@Component
@Priority(100) // 高优先级
public class LogisticsListener implements EventListener<OrderPaidEvent> {
    
    @Inject
    private LogisticsService logisticsService;
    
    @Override
    public void onEvent(OrderPaidEvent event) {
        // 创建物流订单
        String trackingNumber = logisticsService.createShipment(event.orderId());
        
        // 发布发货事件
        eventBus.publish(new OrderShippedEvent(
            event.orderId(),
            trackingNumber,
            System.currentTimeMillis()
        ));
    }
}

性能优化

使用虚拟线程

java
// 异步监听器自动使用虚拟线程
@Component
public class HeavyTaskListener implements AsyncEventListener<DataProcessEvent> {
    
    @Override
    public void onEvent(DataProcessEvent event) {
        // 在虚拟线程中执行,不会阻塞主线程
        performHeavyComputation(event.data());
    }
}

批量处理

java
@Component
public class BatchEventListener implements EventListener<UserActivityEvent> {
    private final List<UserActivityEvent> buffer = new CopyOnWriteArrayList<>();
    
    @Override
    public void onEvent(UserActivityEvent event) {
        buffer.add(event);
        
        if (buffer.size() >= 100) {
            processBatch(new ArrayList<>(buffer));
            buffer.clear();
        }
    }
    
    private void processBatch(List<UserActivityEvent> events) {
        // 批量处理
    }
}

最佳实践

1. 事件命名规范

java
// ✅ 推荐:使用过去式 + Event 后缀
public record UserCreatedEvent(...) implements Event {}
public record OrderPaidEvent(...) implements Event {}

// ❌ 避免:使用现在时或动词
public record CreateUserEvent(...) implements Event {}
public record PayOrderEvent(...) implements Event {}

2. 事件不可变

java
// ✅ 推荐:使用 Record 或 final 字段
public record UserCreatedEvent(String userId, String username) implements Event {}

// ❌ 避免:可变事件
public class UserCreatedEvent implements Event {
    private String userId; // 可变字段
    public void setUserId(String userId) { this.userId = userId; }
}

3. 避免事件链过长

java
// ❌ 避免:A -> B -> C -> D -> E(事件链过长)
// ✅ 推荐:使用编排服务协调多个步骤

4. 合理使用同步/异步

java
// 同步:需要立即执行且影响业务流程
@Component
public class CriticalListener implements EventListener<PaymentEvent> {
    // 同步处理支付
}

// 异步:耗时操作或非关键流程
@Component
public class EmailListener implements AsyncEventListener<PaymentEvent> {
    // 异步发送邮件
}

下一步

Released under the Apache License 2.0