Skip to content

Messaging Plugin

对应模块: io.gitee.lcgyl:lcgyl-mq-plugin

统一消息队列抽象层,屏蔽底层中间件差异,提供标准化的消息收发接口。

✨ 特性

  • 统一模型 - 标准化的 Message, Topic, Subscription 定义
  • 多协议支持 - 适配 Kafka, RabbitMQ, RocketMQ 等多种中间件
  • 事务消息 - 支持基于本地事务表模式的分布式事务消息
  • 死信队列 - 内置自动重试机制和死信处理策略

🚀 快速开始

依赖引入

gradle
// 引入 Messaging Starter (自动包含 Kafka 和 RabbitMQ 支持)
implementation 'io.gitee.lcgyl:lcgyl-mq-starter:2.2.0'

配置

yaml
lcgyl:
  mq:
    default-provider: kafka  # kafka, rabbitmq
    retry:
      max-attempts: 3
      delay: 1s

发送消息

java
@Service
public class NotificationService {
    
    @Inject
    private MessageProducer producer;
    
    // 发送简单消息
    public void sendNotification(String userId, String content) {
        producer.send("notifications", new Notification(userId, content));
    }
    
    // 发送延迟消息
    public void sendDelayedNotification(String userId, String content, Duration delay) {
        Message<Notification> message = Message.of(new Notification(userId, content))
            .delay(delay);
        producer.send("notifications", message);
    }
    
    // 发送带 Headers 的消息
    public void sendWithHeaders(Order order) {
        Message<Order> message = Message.of(order)
            .header("source", "order-service")
            .header("priority", "high");
        producer.send("order-events", message);
    }
}

消费消息

java
@Component
public class NotificationConsumer {
    
    @MessageListener(topic = "notifications", group = "notification-service")
    public void onNotification(Notification notification) {
        log.info("收到通知: userId={}, content={}", 
            notification.getUserId(), notification.getContent());
        // 处理通知逻辑
        notificationService.push(notification);
    }
    
    // 获取消息元数据
    @MessageListener(topic = "order-events", group = "analytics")
    public void onOrderEvent(Message<Order> message) {
        Order order = message.getPayload();
        String source = message.getHeader("source");
        log.info("订单事件来源: {}, 订单ID: {}", source, order.getId());
    }
}

事务消息

java
@Service
public class OrderService {
    
    @Inject
    private TransactionalMessageProducer txProducer;
    
    @Transactional
    public void createOrderWithMessage(OrderRequest request) {
        // 1. 本地事务
        Order order = orderRepository.save(new Order(request));
        
        // 2. 发送事务消息(与本地事务绑定,保证最终一致性)
        txProducer.send("order-created", order);
    }
}

Released under the Apache License 2.0