RabbitMQ Plugin
对应模块:
io.gitee.lcgyl:lcgyl-mq-rabbitmq-plugin
基于 RabbitMQ 的可靠消息队列实现,支持复杂的路由模型。
✨ 特性
- ✅ 可靠传输 - 完整的 Publisher Confirm 和 Consumer Ack 机制
- ✅ Exchange 支持 - 支持 Direct, Topic, Fanout, Headers 等多种交换机
- ✅ 延迟队列 - 基于 TTL + DLX 实现精确的延迟消息
- ✅ 虚拟主机 - 支持 VHost 级别的资源隔离
🚀 快速开始
💡 提示:如果使用 Spring Boot,推荐引入
lcgyl-mq-starter即可获得开箱即用的 RabbitMQ 支持。
依赖引入
gradle
implementation 'io.gitee.lcgyl:lcgyl-mq-rabbitmq-plugin:2.2.0'配置
yaml
lcgyl:
mq:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirms: true
publisher-returns: true发送消息
java
@Service
public class OrderEventPublisher {
@Inject
private RabbitMessageProducer producer;
// Direct 模式:发送到指定队列
public void sendOrderCreated(Order order) {
producer.send("order.queue", order);
}
// Topic 模式:基于路由键发送
public void sendOrderEvent(String eventType, Order order) {
// 路由键: order.created, order.paid, order.shipped
producer.send("order.exchange", "order." + eventType, order);
}
// 延迟消息:订单超时自动取消
public void sendOrderTimeout(Order order, Duration delay) {
Message<Order> message = Message.of(order)
.delay(delay); // 30分钟后执行
producer.send("order.delay.queue", message);
}
// 可靠发送:带确认回调
public void sendWithConfirm(Order order) {
producer.sendWithConfirm("order.queue", order, (success, cause) -> {
if (success) {
log.info("消息发送成功: orderId={}", order.getId());
} else {
log.error("消息发送失败: orderId={}", order.getId(), cause);
// 重试或告警
}
});
}
}消费消息
java
@Component
public class OrderEventConsumer {
// 简单消费
@RabbitListener(queue = "order.queue")
public void onOrderCreated(Order order) {
log.info("收到订单: {}", order.getId());
orderService.process(order);
}
// Topic 模式消费:监听多个路由键
@RabbitListener(exchange = "order.exchange", routingKey = "order.*")
public void onOrderEvent(Order order, @Header("routingKey") String routingKey) {
log.info("收到订单事件: type={}, orderId={}", routingKey, order.getId());
}
// 手动 Ack
@RabbitListener(queue = "payment.queue", ackMode = AckMode.MANUAL)
public void onPayment(Payment payment, Channel channel, @Header("deliveryTag") long tag) {
try {
paymentService.process(payment);
channel.basicAck(tag, false); // 确认
} catch (Exception e) {
channel.basicNack(tag, false, true); // 拒绝并重新入队
}
}
}