Skip to content

RabbitMQ Plugin

对应模块: io.gitee.lcgyl:lcgyl-mq-rabbitmq-plugin

基于 RabbitMQ 的可靠消息队列实现,支持复杂的路由模型。

✨ 特性

  • 可靠传输 - 完整的 Publisher Confirm 和 Consumer Ack 机制
  • Exchange 支持 - 支持 Direct, Topic, Fanout, Headers 等多种交换机
  • 延迟队列 - 基于 TTL + DLX 实现精确的延迟消息
  • 虚拟主机 - 支持 VHost 级别的资源隔离

🚀 快速开始

💡 提示:如果使用 Spring Boot,推荐引入 lcgyl-mq-starter 即可获得开箱即用的 RabbitMQ 支持。

依赖引入

gradle
implementation 'io.gitee.lcgyl:lcgyl-mq-rabbitmq-plugin:2.2.0'

配置

yaml
lcgyl:
  mq:
    rabbitmq:
      host: localhost
      port: 5672
      username: guest
      password: guest
      virtual-host: /
      publisher-confirms: true
      publisher-returns: true

发送消息

java
@Service
public class OrderEventPublisher {
    
    @Inject
    private RabbitMessageProducer producer;
    
    // Direct 模式:发送到指定队列
    public void sendOrderCreated(Order order) {
        producer.send("order.queue", order);
    }
    
    // Topic 模式:基于路由键发送
    public void sendOrderEvent(String eventType, Order order) {
        // 路由键: order.created, order.paid, order.shipped
        producer.send("order.exchange", "order." + eventType, order);
    }
    
    // 延迟消息:订单超时自动取消
    public void sendOrderTimeout(Order order, Duration delay) {
        Message<Order> message = Message.of(order)
            .delay(delay);  // 30分钟后执行
        producer.send("order.delay.queue", message);
    }
    
    // 可靠发送:带确认回调
    public void sendWithConfirm(Order order) {
        producer.sendWithConfirm("order.queue", order, (success, cause) -> {
            if (success) {
                log.info("消息发送成功: orderId={}", order.getId());
            } else {
                log.error("消息发送失败: orderId={}", order.getId(), cause);
                // 重试或告警
            }
        });
    }
}

消费消息

java
@Component
public class OrderEventConsumer {
    
    // 简单消费
    @RabbitListener(queue = "order.queue")
    public void onOrderCreated(Order order) {
        log.info("收到订单: {}", order.getId());
        orderService.process(order);
    }
    
    // Topic 模式消费:监听多个路由键
    @RabbitListener(exchange = "order.exchange", routingKey = "order.*")
    public void onOrderEvent(Order order, @Header("routingKey") String routingKey) {
        log.info("收到订单事件: type={}, orderId={}", routingKey, order.getId());
    }
    
    // 手动 Ack
    @RabbitListener(queue = "payment.queue", ackMode = AckMode.MANUAL)
    public void onPayment(Payment payment, Channel channel, @Header("deliveryTag") long tag) {
        try {
            paymentService.process(payment);
            channel.basicAck(tag, false);  // 确认
        } catch (Exception e) {
            channel.basicNack(tag, false, true);  // 拒绝并重新入队
        }
    }
}

Released under the Apache License 2.0