消息生产者
消息生产者负责发送消息到消息队列。
基础使用
发送简单消息
java
import com.lcgyl.messaging.MessageProducer;
public class SimpleProducer {
private final MessageProducer producer;
public void sendMessage() {
// 发送文本消息
producer.send("order.created", "Order #12345 created");
// 发送带键的消息
producer.send("order.created", "order-123", "Order data");
}
}发送对象消息
java
public class ObjectProducer {
private final MessageProducer producer;
public void sendOrder(Order order) {
// 自动序列化为 JSON
producer.send("order.created", order);
}
public void sendUser(User user) {
// 指定序列化方式
producer.send("user.created", user, SerializationType.PROTOBUF);
}
}高级特性
消息确认
java
public class ConfirmProducer {
private final MessageProducer producer;
public void sendWithConfirm() {
// 同步确认
boolean success = producer.sendAndWait("order.created", order);
if (success) {
System.out.println("消息发送成功");
}
}
public void sendWithCallback() {
// 异步确认
producer.sendAsync("order.created", order, new SendCallback() {
@Override
public void onSuccess() {
System.out.println("消息发送成功");
}
@Override
public void onFailure(Exception e) {
System.err.println("消息发送失败: " + e.getMessage());
}
});
}
}批量发送
java
public class BatchProducer {
private final MessageProducer producer;
public void batchSend(List<Order> orders) {
// 批量发送
producer.batchSend("order.created", orders);
}
}延迟消息
java
public class DelayProducer {
private final MessageProducer producer;
public void sendDelayMessage() {
// 延迟 5 秒发送
producer.sendDelay("order.timeout", order, Duration.ofSeconds(5));
// 定时发送
producer.sendAt("order.reminder", order,
LocalDateTime.now().plusHours(1));
}
}最佳实践
- 消息幂等:确保消息处理的幂等性
- 消息确认:重要消息使用确认机制
- 批量发送:提高吞吐量
- 异常处理:处理发送失败的情况
- 消息追踪:记录消息发送日志