Messaging Plugin
对应模块:
io.gitee.lcgyl:lcgyl-mq-plugin
统一消息队列抽象层,屏蔽底层中间件差异,提供标准化的消息收发接口。
✨ 特性
- ✅ 统一模型 - 标准化的
Message,Topic,Subscription定义 - ✅ 多协议支持 - 适配 Kafka, RabbitMQ, RocketMQ 等多种中间件
- ✅ 事务消息 - 支持基于本地事务表模式的分布式事务消息
- ✅ 死信队列 - 内置自动重试机制和死信处理策略
🚀 快速开始
依赖引入
gradle
// 引入 Messaging Starter (自动包含 Kafka 和 RabbitMQ 支持)
implementation 'io.gitee.lcgyl:lcgyl-mq-starter:2.2.0'配置
yaml
lcgyl:
mq:
default-provider: kafka # kafka, rabbitmq
retry:
max-attempts: 3
delay: 1s发送消息
java
@Service
public class NotificationService {
@Inject
private MessageProducer producer;
// 发送简单消息
public void sendNotification(String userId, String content) {
producer.send("notifications", new Notification(userId, content));
}
// 发送延迟消息
public void sendDelayedNotification(String userId, String content, Duration delay) {
Message<Notification> message = Message.of(new Notification(userId, content))
.delay(delay);
producer.send("notifications", message);
}
// 发送带 Headers 的消息
public void sendWithHeaders(Order order) {
Message<Order> message = Message.of(order)
.header("source", "order-service")
.header("priority", "high");
producer.send("order-events", message);
}
}消费消息
java
@Component
public class NotificationConsumer {
@MessageListener(topic = "notifications", group = "notification-service")
public void onNotification(Notification notification) {
log.info("收到通知: userId={}, content={}",
notification.getUserId(), notification.getContent());
// 处理通知逻辑
notificationService.push(notification);
}
// 获取消息元数据
@MessageListener(topic = "order-events", group = "analytics")
public void onOrderEvent(Message<Order> message) {
Order order = message.getPayload();
String source = message.getHeader("source");
log.info("订单事件来源: {}, 订单ID: {}", source, order.getId());
}
}事务消息
java
@Service
public class OrderService {
@Inject
private TransactionalMessageProducer txProducer;
@Transactional
public void createOrderWithMessage(OrderRequest request) {
// 1. 本地事务
Order order = orderRepository.save(new Order(request));
// 2. 发送事务消息(与本地事务绑定,保证最终一致性)
txProducer.send("order-created", order);
}
}