事件总线
LCGYL Framework 提供了基于 Java 21 虚拟线程的高性能事件总线,支持同步和异步事件处理。
核心概念
事件(Event)
表示系统中发生的某个动作或状态变化。
事件监听器(EventListener)
监听并处理特定类型的事件。
事件总线(EventBus)
负责事件的发布和分发。
定义事件
使用 Record(推荐)
java
import com.lcgyl.framework.core.event.Event;
public record UserCreatedEvent(
String userId,
String username,
String email,
long timestamp
) implements Event {
}使用普通类
java
import com.lcgyl.framework.core.event.Event;
public class OrderPlacedEvent implements Event {
private final String orderId;
private final String userId;
private final double amount;
private final long timestamp;
public OrderPlacedEvent(String orderId, String userId, double amount) {
this.orderId = orderId;
this.userId = userId;
this.amount = amount;
this.timestamp = System.currentTimeMillis();
}
// Getters...
}创建事件监听器
同步监听器
java
import com.lcgyl.framework.core.annotation.Component;
import com.lcgyl.framework.core.event.EventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component
public class UserCreatedListener implements EventListener<UserCreatedEvent> {
private static final Logger logger = LoggerFactory.getLogger(UserCreatedListener.class);
@Override
public void onEvent(UserCreatedEvent event) {
logger.info("用户创建: {} - {}", event.userId(), event.username());
// 发送欢迎邮件
sendWelcomeEmail(event.email());
}
private void sendWelcomeEmail(String email) {
// 发送邮件逻辑
}
}异步监听器
java
import com.lcgyl.framework.core.event.AsyncEventListener;
@Component
public class UserCreatedAsyncListener implements AsyncEventListener<UserCreatedEvent> {
private static final Logger logger = LoggerFactory.getLogger(UserCreatedAsyncListener.class);
@Override
public void onEvent(UserCreatedEvent event) {
// 在虚拟线程中异步执行
logger.info("异步处理用户创建事件: {}", event.userId());
// 执行耗时操作
performHeavyTask(event);
}
private void performHeavyTask(UserCreatedEvent event) {
// 耗时操作,如数据分析、第三方API调用等
}
}发布事件
在服务中发布
java
import com.lcgyl.framework.core.annotation.Component;
import com.lcgyl.framework.core.annotation.Inject;
import com.lcgyl.framework.core.event.EventBus;
@Component
public class UserService {
@Inject
private EventBus eventBus;
public void createUser(String username, String email) {
// 创建用户逻辑
String userId = generateUserId();
saveUser(userId, username, email);
// 发布事件
UserCreatedEvent event = new UserCreatedEvent(
userId,
username,
email,
System.currentTimeMillis()
);
eventBus.publish(event);
}
}同步发布
java
// 同步发布,等待所有监听器执行完成
eventBus.publishSync(event);异步发布
java
// 异步发布,立即返回
eventBus.publishAsync(event);事件优先级
设置监听器优先级
java
import com.lcgyl.framework.core.event.EventListener;
import com.lcgyl.framework.core.event.Priority;
@Component
@Priority(100) // 数字越大优先级越高
public class HighPriorityListener implements EventListener<UserCreatedEvent> {
@Override
public void onEvent(UserCreatedEvent event) {
// 高优先级处理
}
}
@Component
@Priority(50)
public class LowPriorityListener implements EventListener<UserCreatedEvent> {
@Override
public void onEvent(UserCreatedEvent event) {
// 低优先级处理
}
}事件过滤
条件监听
java
@Component
public class VIPUserListener implements EventListener<UserCreatedEvent> {
@Override
public void onEvent(UserCreatedEvent event) {
// 只处理 VIP 用户
if (isVIPUser(event.userId())) {
sendVIPWelcomeGift(event.userId());
}
}
private boolean isVIPUser(String userId) {
// 判断是否为 VIP 用户
return false;
}
}错误处理
监听器异常处理
java
@Component
public class SafeEventListener implements EventListener<UserCreatedEvent> {
private static final Logger logger = LoggerFactory.getLogger(SafeEventListener.class);
@Override
public void onEvent(UserCreatedEvent event) {
try {
processEvent(event);
} catch (Exception e) {
logger.error("处理事件失败: {}", event, e);
// 可以选择重试或记录到死信队列
}
}
}全局异常处理器
java
import com.lcgyl.framework.core.event.EventExceptionHandler;
@Component
public class GlobalEventExceptionHandler implements EventExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(GlobalEventExceptionHandler.class);
@Override
public void handleException(Event event, EventListener<?> listener, Throwable throwable) {
logger.error("事件处理异常 - Event: {}, Listener: {}",
event.getClass().getSimpleName(),
listener.getClass().getSimpleName(),
throwable
);
// 记录到监控系统
// 发送告警
}
}完整示例:订单系统
定义事件
java
// 订单创建事件
public record OrderCreatedEvent(
String orderId,
String userId,
List<OrderItem> items,
double totalAmount,
long timestamp
) implements Event {
}
// 订单支付事件
public record OrderPaidEvent(
String orderId,
String paymentId,
double amount,
long timestamp
) implements Event {
}
// 订单发货事件
public record OrderShippedEvent(
String orderId,
String trackingNumber,
long timestamp
) implements Event {
}订单服务
java
@Component
public class OrderService {
@Inject
private EventBus eventBus;
@Inject
private OrderRepository orderRepository;
public String createOrder(String userId, List<OrderItem> items) {
// 创建订单
Order order = new Order(userId, items);
orderRepository.save(order);
// 发布事件
eventBus.publish(new OrderCreatedEvent(
order.getId(),
userId,
items,
order.getTotalAmount(),
System.currentTimeMillis()
));
return order.getId();
}
public void payOrder(String orderId, String paymentId) {
// 处理支付
Order order = orderRepository.findById(orderId);
order.setPaid(true);
orderRepository.save(order);
// 发布事件
eventBus.publish(new OrderPaidEvent(
orderId,
paymentId,
order.getTotalAmount(),
System.currentTimeMillis()
));
}
}事件监听器
java
// 库存监听器
@Component
public class InventoryListener implements EventListener<OrderCreatedEvent> {
@Inject
private InventoryService inventoryService;
@Override
public void onEvent(OrderCreatedEvent event) {
// 扣减库存
inventoryService.deductStock(event.items());
}
}
// 通知监听器
@Component
public class NotificationListener implements AsyncEventListener<OrderPaidEvent> {
@Inject
private NotificationService notificationService;
@Override
public void onEvent(OrderPaidEvent event) {
// 发送支付成功通知
notificationService.sendPaymentSuccessNotification(event.orderId());
}
}
// 物流监听器
@Component
@Priority(100) // 高优先级
public class LogisticsListener implements EventListener<OrderPaidEvent> {
@Inject
private LogisticsService logisticsService;
@Override
public void onEvent(OrderPaidEvent event) {
// 创建物流订单
String trackingNumber = logisticsService.createShipment(event.orderId());
// 发布发货事件
eventBus.publish(new OrderShippedEvent(
event.orderId(),
trackingNumber,
System.currentTimeMillis()
));
}
}性能优化
使用虚拟线程
java
// 异步监听器自动使用虚拟线程
@Component
public class HeavyTaskListener implements AsyncEventListener<DataProcessEvent> {
@Override
public void onEvent(DataProcessEvent event) {
// 在虚拟线程中执行,不会阻塞主线程
performHeavyComputation(event.data());
}
}批量处理
java
@Component
public class BatchEventListener implements EventListener<UserActivityEvent> {
private final List<UserActivityEvent> buffer = new CopyOnWriteArrayList<>();
@Override
public void onEvent(UserActivityEvent event) {
buffer.add(event);
if (buffer.size() >= 100) {
processBatch(new ArrayList<>(buffer));
buffer.clear();
}
}
private void processBatch(List<UserActivityEvent> events) {
// 批量处理
}
}最佳实践
1. 事件命名规范
java
// ✅ 推荐:使用过去式 + Event 后缀
public record UserCreatedEvent(...) implements Event {}
public record OrderPaidEvent(...) implements Event {}
// ❌ 避免:使用现在时或动词
public record CreateUserEvent(...) implements Event {}
public record PayOrderEvent(...) implements Event {}2. 事件不可变
java
// ✅ 推荐:使用 Record 或 final 字段
public record UserCreatedEvent(String userId, String username) implements Event {}
// ❌ 避免:可变事件
public class UserCreatedEvent implements Event {
private String userId; // 可变字段
public void setUserId(String userId) { this.userId = userId; }
}3. 避免事件链过长
java
// ❌ 避免:A -> B -> C -> D -> E(事件链过长)
// ✅ 推荐:使用编排服务协调多个步骤4. 合理使用同步/异步
java
// 同步:需要立即执行且影响业务流程
@Component
public class CriticalListener implements EventListener<PaymentEvent> {
// 同步处理支付
}
// 异步:耗时操作或非关键流程
@Component
public class EmailListener implements AsyncEventListener<PaymentEvent> {
// 异步发送邮件
}