Skip to content

消息生产者

消息生产者负责发送消息到消息队列。

基础使用

发送简单消息

java
import com.lcgyl.messaging.MessageProducer;

public class SimpleProducer {
    
    private final MessageProducer producer;
    
    public void sendMessage() {
        // 发送文本消息
        producer.send("order.created", "Order #12345 created");
        
        // 发送带键的消息
        producer.send("order.created", "order-123", "Order data");
    }
}

发送对象消息

java
public class ObjectProducer {
    
    private final MessageProducer producer;
    
    public void sendOrder(Order order) {
        // 自动序列化为 JSON
        producer.send("order.created", order);
    }
    
    public void sendUser(User user) {
        // 指定序列化方式
        producer.send("user.created", user, SerializationType.PROTOBUF);
    }
}

高级特性

消息确认

java
public class ConfirmProducer {
    
    private final MessageProducer producer;
    
    public void sendWithConfirm() {
        // 同步确认
        boolean success = producer.sendAndWait("order.created", order);
        
        if (success) {
            System.out.println("消息发送成功");
        }
    }
    
    public void sendWithCallback() {
        // 异步确认
        producer.sendAsync("order.created", order, new SendCallback() {
            @Override
            public void onSuccess() {
                System.out.println("消息发送成功");
            }
            
            @Override
            public void onFailure(Exception e) {
                System.err.println("消息发送失败: " + e.getMessage());
            }
        });
    }
}

批量发送

java
public class BatchProducer {
    
    private final MessageProducer producer;
    
    public void batchSend(List<Order> orders) {
        // 批量发送
        producer.batchSend("order.created", orders);
    }
}

延迟消息

java
public class DelayProducer {
    
    private final MessageProducer producer;
    
    public void sendDelayMessage() {
        // 延迟 5 秒发送
        producer.sendDelay("order.timeout", order, Duration.ofSeconds(5));
        
        // 定时发送
        producer.sendAt("order.reminder", order, 
            LocalDateTime.now().plusHours(1));
    }
}

最佳实践

  1. 消息幂等:确保消息处理的幂等性
  2. 消息确认:重要消息使用确认机制
  3. 批量发送:提高吞吐量
  4. 异常处理:处理发送失败的情况
  5. 消息追踪:记录消息发送日志

下一步

Released under the Apache License 2.0