消息队列
LCGYL Framework 提供统一的消息队列抽象,支持多种消息中间件,实现应用解耦和异步处理。
消息队列概述
什么是消息队列?
消息队列是一种异步通信机制,允许应用之间通过消息进行松耦合的通信。
核心概念
- 生产者(Producer):发送消息的应用
- 消费者(Consumer):接收消息的应用
- 消息(Message):传递的数据
- 队列(Queue):存储消息的容器
- 主题(Topic):发布/订阅模式的消息通道
支持的消息中间件
- RabbitMQ
- Apache Kafka
- Redis Pub/Sub
- 内存队列(开发测试)
配置
RabbitMQ 配置
properties
# application.properties
messaging.type=rabbitmq
messaging.rabbitmq.host=localhost
messaging.rabbitmq.port=5672
messaging.rabbitmq.username=guest
messaging.rabbitmq.password=guest
messaging.rabbitmq.virtual-host=/Kafka 配置
properties
messaging.type=kafka
messaging.kafka.bootstrap-servers=localhost:9092
messaging.kafka.consumer.group-id=my-group
messaging.kafka.consumer.auto-offset-reset=earliest
messaging.kafka.producer.acks=allRedis 配置
properties
messaging.type=redis
messaging.redis.host=localhost
messaging.redis.port=6379发送消息
使用 MessageTemplate
java
@Component
public class OrderService {
@Inject
private MessageTemplate messageTemplate;
public void createOrder(Order order) {
// 保存订单
orderRepository.save(order);
// 发送消息
messageTemplate.send("order.created", new OrderCreatedMessage(order));
}
// 发送到指定队列
public void sendToQueue(String queue, Object message) {
messageTemplate.sendToQueue(queue, message);
}
// 发送到指定主题
public void sendToTopic(String topic, Object message) {
messageTemplate.sendToTopic(topic, message);
}
// 发送延迟消息
public void sendDelayed(String queue, Object message, Duration delay) {
messageTemplate.sendDelayed(queue, message, delay);
}
}使用 @SendTo
java
@Component
public class OrderProcessor {
@MessageListener("order.created")
@SendTo("order.processed")
public OrderProcessedMessage processOrder(OrderCreatedMessage message) {
// 处理订单
Order order = message.getOrder();
processOrder(order);
// 返回值自动发送到 order.processed
return new OrderProcessedMessage(order);
}
}接收消息
使用 @MessageListener
java
@Component
public class OrderEventListener {
private static final Logger logger = LoggerFactory.getLogger(OrderEventListener.class);
@MessageListener("order.created")
public void onOrderCreated(OrderCreatedMessage message) {
logger.info("收到订单创建消息: {}", message.getOrderId());
// 处理消息
}
@MessageListener(value = "order.created", concurrency = "3")
public void onOrderCreatedConcurrent(OrderCreatedMessage message) {
// 3 个并发消费者
}
@MessageListener(value = "order.*", pattern = true)
public void onOrderEvent(Message message) {
// 通配符匹配
logger.info("收到订单事件: {}", message.getDestination());
}
}消息确认
java
@Component
public class OrderEventListener {
@MessageListener("order.created")
public void onOrderCreated(OrderCreatedMessage message, Acknowledgment ack) {
try {
processOrder(message);
ack.acknowledge(); // 手动确认
} catch (Exception e) {
ack.reject(); // 拒绝消息
}
}
@MessageListener(value = "order.created", ackMode = AckMode.AUTO)
public void onOrderCreatedAuto(OrderCreatedMessage message) {
// 自动确认
}
}消息类型
定义消息
java
public record OrderCreatedMessage(
Long orderId,
Long userId,
BigDecimal totalAmount,
LocalDateTime createdAt
) implements Serializable {
}
public record PaymentCompletedMessage(
Long orderId,
Long paymentId,
String paymentMethod,
LocalDateTime paidAt
) implements Serializable {
}消息头
java
@Component
public class OrderService {
@Inject
private MessageTemplate messageTemplate;
public void sendWithHeaders(Order order) {
Message<OrderCreatedMessage> message = MessageBuilder
.withPayload(new OrderCreatedMessage(order))
.setHeader("orderId", order.getId())
.setHeader("priority", "high")
.setHeader("timestamp", System.currentTimeMillis())
.build();
messageTemplate.send("order.created", message);
}
}
@Component
public class OrderEventListener {
@MessageListener("order.created")
public void onOrderCreated(
OrderCreatedMessage payload,
@Header("orderId") Long orderId,
@Header("priority") String priority) {
logger.info("订单 {} 优先级: {}", orderId, priority);
}
}发布/订阅
发布消息
java
@Component
public class NotificationService {
@Inject
private MessageTemplate messageTemplate;
public void broadcast(Notification notification) {
// 发布到主题,所有订阅者都会收到
messageTemplate.publish("notifications", notification);
}
}订阅消息
java
@Component
public class EmailNotificationListener {
@TopicListener("notifications")
public void onNotification(Notification notification) {
// 发送邮件
emailService.send(notification);
}
}
@Component
public class SmsNotificationListener {
@TopicListener("notifications")
public void onNotification(Notification notification) {
// 发送短信
smsService.send(notification);
}
}
@Component
public class PushNotificationListener {
@TopicListener("notifications")
public void onNotification(Notification notification) {
// 发送推送
pushService.send(notification);
}
}延迟消息
发送延迟消息
java
@Component
public class OrderService {
@Inject
private MessageTemplate messageTemplate;
public void createOrder(Order order) {
orderRepository.save(order);
// 30 分钟后检查订单是否支付
messageTemplate.sendDelayed(
"order.payment.check",
new OrderPaymentCheckMessage(order.getId()),
Duration.ofMinutes(30)
);
}
}处理延迟消息
java
@Component
public class OrderPaymentChecker {
@MessageListener("order.payment.check")
public void checkPayment(OrderPaymentCheckMessage message) {
Order order = orderRepository.findById(message.getOrderId());
if (order.getStatus() == OrderStatus.PENDING) {
// 未支付,取消订单
orderService.cancel(order);
}
}
}消息重试
配置重试
properties
messaging.retry.enabled=true
messaging.retry.max-attempts=3
messaging.retry.initial-interval=1000
messaging.retry.multiplier=2.0
messaging.retry.max-interval=10000自定义重试策略
java
@Component
public class OrderEventListener {
@MessageListener("order.created")
@Retryable(
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void onOrderCreated(OrderCreatedMessage message) {
// 失败会自动重试
processOrder(message);
}
@Recover
public void recover(Exception e, OrderCreatedMessage message) {
// 重试耗尽后的处理
logger.error("处理订单失败: {}", message.getOrderId(), e);
// 发送到死信队列
messageTemplate.send("order.created.dlq", message);
}
}死信队列
配置死信队列
properties
messaging.dlq.enabled=true
messaging.dlq.suffix=.dlq处理死信
java
@Component
public class DeadLetterHandler {
@MessageListener("order.created.dlq")
public void handleDeadLetter(Message<?> message) {
logger.error("死信消息: {}", message);
// 记录到数据库
deadLetterRepository.save(new DeadLetter(
message.getDestination(),
message.getPayload(),
message.getHeaders()
));
// 发送告警
alertService.sendAlert("死信消息", message.toString());
}
}消息序列化
JSON 序列化
java
@Component
public class JsonMessageConverter implements MessageConverter {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(Object message) {
return objectMapper.writeValueAsBytes(message);
}
@Override
public <T> T deserialize(byte[] data, Class<T> type) {
return objectMapper.readValue(data, type);
}
}配置序列化器
properties
messaging.serializer=json
# 或
messaging.serializer=protobuf事务消息
本地事务
java
@Component
public class OrderService {
@Inject
private MessageTemplate messageTemplate;
@Transactional
public void createOrder(Order order) {
// 保存订单
orderRepository.save(order);
// 发送消息(事务提交后发送)
messageTemplate.sendAfterCommit("order.created", new OrderCreatedMessage(order));
}
}分布式事务
java
@Component
public class OrderService {
@Inject
private TransactionalMessageTemplate messageTemplate;
public void createOrder(Order order) {
// 1. 发送半消息
SendResult result = messageTemplate.sendHalfMessage(
"order.created",
new OrderCreatedMessage(order)
);
try {
// 2. 执行本地事务
orderRepository.save(order);
// 3. 提交消息
messageTemplate.commit(result.getMessageId());
} catch (Exception e) {
// 3. 回滚消息
messageTemplate.rollback(result.getMessageId());
throw e;
}
}
}消息过滤
消费者端过滤
java
@Component
public class OrderEventListener {
@MessageListener(
value = "order.created",
filter = "headers['priority'] == 'high'"
)
public void onHighPriorityOrder(OrderCreatedMessage message) {
// 只处理高优先级订单
}
@MessageListener(
value = "order.created",
filter = "payload.totalAmount > 1000"
)
public void onLargeOrder(OrderCreatedMessage message) {
// 只处理大额订单
}
}消息追踪
添加追踪信息
java
@Component
public class TracingMessageInterceptor implements MessageInterceptor {
@Override
public Message<?> preSend(Message<?> message) {
return MessageBuilder.fromMessage(message)
.setHeader("traceId", TraceContext.getTraceId())
.setHeader("spanId", TraceContext.getSpanId())
.build();
}
@Override
public void postReceive(Message<?> message) {
String traceId = message.getHeaders().get("traceId", String.class);
TraceContext.setTraceId(traceId);
}
}监控
消息指标
java
@Component
public class MessageMetrics {
@Inject
private MeterRegistry meterRegistry;
public void recordSend(String destination, long duration) {
meterRegistry.timer("messaging.send", "destination", destination)
.record(duration, TimeUnit.MILLISECONDS);
}
public void recordReceive(String destination, long duration) {
meterRegistry.timer("messaging.receive", "destination", destination)
.record(duration, TimeUnit.MILLISECONDS);
}
public void recordError(String destination, String error) {
meterRegistry.counter("messaging.error",
"destination", destination,
"error", error
).increment();
}
}最佳实践
1. 消息幂等性
java
@Component
public class OrderEventListener {
@Inject
private IdempotentService idempotentService;
@MessageListener("order.created")
public void onOrderCreated(OrderCreatedMessage message) {
String messageId = message.getMessageId();
// 检查是否已处理
if (idempotentService.isProcessed(messageId)) {
logger.info("消息已处理: {}", messageId);
return;
}
try {
processOrder(message);
idempotentService.markProcessed(messageId);
} catch (Exception e) {
// 处理失败,不标记为已处理
throw e;
}
}
}2. 消息顺序性
java
// 使用分区键保证顺序
messageTemplate.send("orders", order, order.getUserId().toString());
// 消费者单线程处理
@MessageListener(value = "orders", concurrency = "1")
public void onOrder(Order order) {
// 顺序处理
}3. 消息大小控制
java
// ✅ 推荐:只发送必要的数据
messageTemplate.send("order.created", new OrderCreatedMessage(
order.getId(),
order.getUserId(),
order.getTotalAmount()
));
// ❌ 不推荐:发送大对象
messageTemplate.send("order.created", order); // 包含所有关联数据4. 错误处理
java
@Component
public class OrderEventListener {
@MessageListener("order.created")
public void onOrderCreated(OrderCreatedMessage message) {
try {
processOrder(message);
} catch (BusinessException e) {
// 业务异常,不重试
logger.warn("业务异常: {}", e.getMessage());
} catch (Exception e) {
// 其他异常,重试
throw e;
}
}
}常见问题
Q: 如何保证消息不丢失?
A:
- 生产者确认
- 消息持久化
- 消费者手动确认
- 死信队列
Q: 如何处理消息积压?
A:
- 增加消费者数量
- 优化消费逻辑
- 消息分片
- 限流降级
Q: 消息重复消费怎么办?
A:
- 实现幂等性
- 使用唯一消息 ID
- 数据库唯一约束