Skip to content

消息消费者

消息消费者负责接收和处理消息。

基础使用

简单消费者

java
import com.lcgyl.messaging.MessageListener;

@MessageListener(topic = "order.created")
public class OrderCreatedListener {
    
    public void onMessage(String message) {
        System.out.println("收到订单消息: " + message);
    }
}

对象消费者

java
@MessageListener(topic = "order.created")
public class OrderListener {
    
    public void onMessage(Order order) {
        System.out.println("收到订单: " + order.getId());
        
        // 处理订单
        processOrder(order);
    }
}

消息确认

自动确认

java
@MessageListener(topic = "order.created", ackMode = AckMode.AUTO)
public class AutoAckListener {
    
    public void onMessage(Order order) {
        // 方法执行成功后自动确认
        processOrder(order);
    }
}

手动确认

java
@MessageListener(topic = "order.created", ackMode = AckMode.MANUAL)
public class ManualAckListener {
    
    public void onMessage(Order order, Acknowledgment ack) {
        try {
            processOrder(order);
            
            // 手动确认
            ack.acknowledge();
        } catch (Exception e) {
            // 拒绝消息
            ack.reject();
        }
    }
}

并发消费

java
@MessageListener(
    topic = "order.created",
    concurrency = 5  // 5 个并发消费者
)
public class ConcurrentListener {
    
    public void onMessage(Order order) {
        processOrder(order);
    }
}

消息过滤

java
@MessageListener(
    topic = "order.created",
    filter = "status = 'PAID'"
)
public class FilteredListener {
    
    public void onMessage(Order order) {
        // 只处理已支付的订单
        processOrder(order);
    }
}

异常处理

java
@MessageListener(topic = "order.created")
public class ErrorHandlingListener {
    
    public void onMessage(Order order) {
        try {
            processOrder(order);
        } catch (BusinessException e) {
            // 业务异常,不重试
            logger.warn("业务异常: {}", e.getMessage());
        } catch (Exception e) {
            // 系统异常,重试
            throw new RetryableException(e);
        }
    }
}

最佳实践

  1. 幂等处理:确保消息重复消费不会产生副作用
  2. 异常处理:区分业务异常和系统异常
  3. 并发控制:根据业务特点设置并发数
  4. 消息确认:选择合适的确认模式
  5. 监控告警:监控消费延迟和失败率

下一步

Released under the Apache License 2.0