Skip to content

消息队列

LCGYL Framework 提供统一的消息队列抽象,支持多种消息中间件,实现应用解耦和异步处理。

消息队列概述

什么是消息队列?

消息队列是一种异步通信机制,允许应用之间通过消息进行松耦合的通信。

核心概念

  • 生产者(Producer):发送消息的应用
  • 消费者(Consumer):接收消息的应用
  • 消息(Message):传递的数据
  • 队列(Queue):存储消息的容器
  • 主题(Topic):发布/订阅模式的消息通道

支持的消息中间件

  • RabbitMQ
  • Apache Kafka
  • Redis Pub/Sub
  • 内存队列(开发测试)

配置

RabbitMQ 配置

properties
# application.properties
messaging.type=rabbitmq
messaging.rabbitmq.host=localhost
messaging.rabbitmq.port=5672
messaging.rabbitmq.username=guest
messaging.rabbitmq.password=guest
messaging.rabbitmq.virtual-host=/

Kafka 配置

properties
messaging.type=kafka
messaging.kafka.bootstrap-servers=localhost:9092
messaging.kafka.consumer.group-id=my-group
messaging.kafka.consumer.auto-offset-reset=earliest
messaging.kafka.producer.acks=all

Redis 配置

properties
messaging.type=redis
messaging.redis.host=localhost
messaging.redis.port=6379

发送消息

使用 MessageTemplate

java
@Component
public class OrderService {
    
    @Inject
    private MessageTemplate messageTemplate;
    
    public void createOrder(Order order) {
        // 保存订单
        orderRepository.save(order);
        
        // 发送消息
        messageTemplate.send("order.created", new OrderCreatedMessage(order));
    }
    
    // 发送到指定队列
    public void sendToQueue(String queue, Object message) {
        messageTemplate.sendToQueue(queue, message);
    }
    
    // 发送到指定主题
    public void sendToTopic(String topic, Object message) {
        messageTemplate.sendToTopic(topic, message);
    }
    
    // 发送延迟消息
    public void sendDelayed(String queue, Object message, Duration delay) {
        messageTemplate.sendDelayed(queue, message, delay);
    }
}

使用 @SendTo

java
@Component
public class OrderProcessor {
    
    @MessageListener("order.created")
    @SendTo("order.processed")
    public OrderProcessedMessage processOrder(OrderCreatedMessage message) {
        // 处理订单
        Order order = message.getOrder();
        processOrder(order);
        
        // 返回值自动发送到 order.processed
        return new OrderProcessedMessage(order);
    }
}

接收消息

使用 @MessageListener

java
@Component
public class OrderEventListener {
    
    private static final Logger logger = LoggerFactory.getLogger(OrderEventListener.class);
    
    @MessageListener("order.created")
    public void onOrderCreated(OrderCreatedMessage message) {
        logger.info("收到订单创建消息: {}", message.getOrderId());
        // 处理消息
    }
    
    @MessageListener(value = "order.created", concurrency = "3")
    public void onOrderCreatedConcurrent(OrderCreatedMessage message) {
        // 3 个并发消费者
    }
    
    @MessageListener(value = "order.*", pattern = true)
    public void onOrderEvent(Message message) {
        // 通配符匹配
        logger.info("收到订单事件: {}", message.getDestination());
    }
}

消息确认

java
@Component
public class OrderEventListener {
    
    @MessageListener("order.created")
    public void onOrderCreated(OrderCreatedMessage message, Acknowledgment ack) {
        try {
            processOrder(message);
            ack.acknowledge();  // 手动确认
        } catch (Exception e) {
            ack.reject();  // 拒绝消息
        }
    }
    
    @MessageListener(value = "order.created", ackMode = AckMode.AUTO)
    public void onOrderCreatedAuto(OrderCreatedMessage message) {
        // 自动确认
    }
}

消息类型

定义消息

java
public record OrderCreatedMessage(
    Long orderId,
    Long userId,
    BigDecimal totalAmount,
    LocalDateTime createdAt
) implements Serializable {
}

public record PaymentCompletedMessage(
    Long orderId,
    Long paymentId,
    String paymentMethod,
    LocalDateTime paidAt
) implements Serializable {
}

消息头

java
@Component
public class OrderService {
    
    @Inject
    private MessageTemplate messageTemplate;
    
    public void sendWithHeaders(Order order) {
        Message<OrderCreatedMessage> message = MessageBuilder
            .withPayload(new OrderCreatedMessage(order))
            .setHeader("orderId", order.getId())
            .setHeader("priority", "high")
            .setHeader("timestamp", System.currentTimeMillis())
            .build();
        
        messageTemplate.send("order.created", message);
    }
}

@Component
public class OrderEventListener {
    
    @MessageListener("order.created")
    public void onOrderCreated(
            OrderCreatedMessage payload,
            @Header("orderId") Long orderId,
            @Header("priority") String priority) {
        logger.info("订单 {} 优先级: {}", orderId, priority);
    }
}

发布/订阅

发布消息

java
@Component
public class NotificationService {
    
    @Inject
    private MessageTemplate messageTemplate;
    
    public void broadcast(Notification notification) {
        // 发布到主题,所有订阅者都会收到
        messageTemplate.publish("notifications", notification);
    }
}

订阅消息

java
@Component
public class EmailNotificationListener {
    
    @TopicListener("notifications")
    public void onNotification(Notification notification) {
        // 发送邮件
        emailService.send(notification);
    }
}

@Component
public class SmsNotificationListener {
    
    @TopicListener("notifications")
    public void onNotification(Notification notification) {
        // 发送短信
        smsService.send(notification);
    }
}

@Component
public class PushNotificationListener {
    
    @TopicListener("notifications")
    public void onNotification(Notification notification) {
        // 发送推送
        pushService.send(notification);
    }
}

延迟消息

发送延迟消息

java
@Component
public class OrderService {
    
    @Inject
    private MessageTemplate messageTemplate;
    
    public void createOrder(Order order) {
        orderRepository.save(order);
        
        // 30 分钟后检查订单是否支付
        messageTemplate.sendDelayed(
            "order.payment.check",
            new OrderPaymentCheckMessage(order.getId()),
            Duration.ofMinutes(30)
        );
    }
}

处理延迟消息

java
@Component
public class OrderPaymentChecker {
    
    @MessageListener("order.payment.check")
    public void checkPayment(OrderPaymentCheckMessage message) {
        Order order = orderRepository.findById(message.getOrderId());
        
        if (order.getStatus() == OrderStatus.PENDING) {
            // 未支付,取消订单
            orderService.cancel(order);
        }
    }
}

消息重试

配置重试

properties
messaging.retry.enabled=true
messaging.retry.max-attempts=3
messaging.retry.initial-interval=1000
messaging.retry.multiplier=2.0
messaging.retry.max-interval=10000

自定义重试策略

java
@Component
public class OrderEventListener {
    
    @MessageListener("order.created")
    @Retryable(
        maxAttempts = 3,
        backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    public void onOrderCreated(OrderCreatedMessage message) {
        // 失败会自动重试
        processOrder(message);
    }
    
    @Recover
    public void recover(Exception e, OrderCreatedMessage message) {
        // 重试耗尽后的处理
        logger.error("处理订单失败: {}", message.getOrderId(), e);
        // 发送到死信队列
        messageTemplate.send("order.created.dlq", message);
    }
}

死信队列

配置死信队列

properties
messaging.dlq.enabled=true
messaging.dlq.suffix=.dlq

处理死信

java
@Component
public class DeadLetterHandler {
    
    @MessageListener("order.created.dlq")
    public void handleDeadLetter(Message<?> message) {
        logger.error("死信消息: {}", message);
        
        // 记录到数据库
        deadLetterRepository.save(new DeadLetter(
            message.getDestination(),
            message.getPayload(),
            message.getHeaders()
        ));
        
        // 发送告警
        alertService.sendAlert("死信消息", message.toString());
    }
}

消息序列化

JSON 序列化

java
@Component
public class JsonMessageConverter implements MessageConverter {
    
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public byte[] serialize(Object message) {
        return objectMapper.writeValueAsBytes(message);
    }
    
    @Override
    public <T> T deserialize(byte[] data, Class<T> type) {
        return objectMapper.readValue(data, type);
    }
}

配置序列化器

properties
messaging.serializer=json
# 或
messaging.serializer=protobuf

事务消息

本地事务

java
@Component
public class OrderService {
    
    @Inject
    private MessageTemplate messageTemplate;
    
    @Transactional
    public void createOrder(Order order) {
        // 保存订单
        orderRepository.save(order);
        
        // 发送消息(事务提交后发送)
        messageTemplate.sendAfterCommit("order.created", new OrderCreatedMessage(order));
    }
}

分布式事务

java
@Component
public class OrderService {
    
    @Inject
    private TransactionalMessageTemplate messageTemplate;
    
    public void createOrder(Order order) {
        // 1. 发送半消息
        SendResult result = messageTemplate.sendHalfMessage(
            "order.created", 
            new OrderCreatedMessage(order)
        );
        
        try {
            // 2. 执行本地事务
            orderRepository.save(order);
            
            // 3. 提交消息
            messageTemplate.commit(result.getMessageId());
        } catch (Exception e) {
            // 3. 回滚消息
            messageTemplate.rollback(result.getMessageId());
            throw e;
        }
    }
}

消息过滤

消费者端过滤

java
@Component
public class OrderEventListener {
    
    @MessageListener(
        value = "order.created",
        filter = "headers['priority'] == 'high'"
    )
    public void onHighPriorityOrder(OrderCreatedMessage message) {
        // 只处理高优先级订单
    }
    
    @MessageListener(
        value = "order.created",
        filter = "payload.totalAmount > 1000"
    )
    public void onLargeOrder(OrderCreatedMessage message) {
        // 只处理大额订单
    }
}

消息追踪

添加追踪信息

java
@Component
public class TracingMessageInterceptor implements MessageInterceptor {
    
    @Override
    public Message<?> preSend(Message<?> message) {
        return MessageBuilder.fromMessage(message)
            .setHeader("traceId", TraceContext.getTraceId())
            .setHeader("spanId", TraceContext.getSpanId())
            .build();
    }
    
    @Override
    public void postReceive(Message<?> message) {
        String traceId = message.getHeaders().get("traceId", String.class);
        TraceContext.setTraceId(traceId);
    }
}

监控

消息指标

java
@Component
public class MessageMetrics {
    
    @Inject
    private MeterRegistry meterRegistry;
    
    public void recordSend(String destination, long duration) {
        meterRegistry.timer("messaging.send", "destination", destination)
            .record(duration, TimeUnit.MILLISECONDS);
    }
    
    public void recordReceive(String destination, long duration) {
        meterRegistry.timer("messaging.receive", "destination", destination)
            .record(duration, TimeUnit.MILLISECONDS);
    }
    
    public void recordError(String destination, String error) {
        meterRegistry.counter("messaging.error", 
            "destination", destination, 
            "error", error
        ).increment();
    }
}

最佳实践

1. 消息幂等性

java
@Component
public class OrderEventListener {
    
    @Inject
    private IdempotentService idempotentService;
    
    @MessageListener("order.created")
    public void onOrderCreated(OrderCreatedMessage message) {
        String messageId = message.getMessageId();
        
        // 检查是否已处理
        if (idempotentService.isProcessed(messageId)) {
            logger.info("消息已处理: {}", messageId);
            return;
        }
        
        try {
            processOrder(message);
            idempotentService.markProcessed(messageId);
        } catch (Exception e) {
            // 处理失败,不标记为已处理
            throw e;
        }
    }
}

2. 消息顺序性

java
// 使用分区键保证顺序
messageTemplate.send("orders", order, order.getUserId().toString());

// 消费者单线程处理
@MessageListener(value = "orders", concurrency = "1")
public void onOrder(Order order) {
    // 顺序处理
}

3. 消息大小控制

java
// ✅ 推荐:只发送必要的数据
messageTemplate.send("order.created", new OrderCreatedMessage(
    order.getId(),
    order.getUserId(),
    order.getTotalAmount()
));

// ❌ 不推荐:发送大对象
messageTemplate.send("order.created", order);  // 包含所有关联数据

4. 错误处理

java
@Component
public class OrderEventListener {
    
    @MessageListener("order.created")
    public void onOrderCreated(OrderCreatedMessage message) {
        try {
            processOrder(message);
        } catch (BusinessException e) {
            // 业务异常,不重试
            logger.warn("业务异常: {}", e.getMessage());
        } catch (Exception e) {
            // 其他异常,重试
            throw e;
        }
    }
}

常见问题

Q: 如何保证消息不丢失?

A:

  1. 生产者确认
  2. 消息持久化
  3. 消费者手动确认
  4. 死信队列

Q: 如何处理消息积压?

A:

  1. 增加消费者数量
  2. 优化消费逻辑
  3. 消息分片
  4. 限流降级

Q: 消息重复消费怎么办?

A:

  1. 实现幂等性
  2. 使用唯一消息 ID
  3. 数据库唯一约束

下一步

Released under the Apache License 2.0