消息消费者
消息消费者负责接收和处理消息。
基础使用
简单消费者
java
import com.lcgyl.messaging.MessageListener;
@MessageListener(topic = "order.created")
public class OrderCreatedListener {
public void onMessage(String message) {
System.out.println("收到订单消息: " + message);
}
}对象消费者
java
@MessageListener(topic = "order.created")
public class OrderListener {
public void onMessage(Order order) {
System.out.println("收到订单: " + order.getId());
// 处理订单
processOrder(order);
}
}消息确认
自动确认
java
@MessageListener(topic = "order.created", ackMode = AckMode.AUTO)
public class AutoAckListener {
public void onMessage(Order order) {
// 方法执行成功后自动确认
processOrder(order);
}
}手动确认
java
@MessageListener(topic = "order.created", ackMode = AckMode.MANUAL)
public class ManualAckListener {
public void onMessage(Order order, Acknowledgment ack) {
try {
processOrder(order);
// 手动确认
ack.acknowledge();
} catch (Exception e) {
// 拒绝消息
ack.reject();
}
}
}并发消费
java
@MessageListener(
topic = "order.created",
concurrency = 5 // 5 个并发消费者
)
public class ConcurrentListener {
public void onMessage(Order order) {
processOrder(order);
}
}消息过滤
java
@MessageListener(
topic = "order.created",
filter = "status = 'PAID'"
)
public class FilteredListener {
public void onMessage(Order order) {
// 只处理已支付的订单
processOrder(order);
}
}异常处理
java
@MessageListener(topic = "order.created")
public class ErrorHandlingListener {
public void onMessage(Order order) {
try {
processOrder(order);
} catch (BusinessException e) {
// 业务异常,不重试
logger.warn("业务异常: {}", e.getMessage());
} catch (Exception e) {
// 系统异常,重试
throw new RetryableException(e);
}
}
}最佳实践
- 幂等处理:确保消息重复消费不会产生副作用
- 异常处理:区分业务异常和系统异常
- 并发控制:根据业务特点设置并发数
- 消息确认:选择合适的确认模式
- 监控告警:监控消费延迟和失败率