Kafka Plugin
对应模块:
io.gitee.lcgyl:lcgyl-mq-kafka-plugin
基于 Apache Kafka Client 的消息队列实现,专为高吞吐场景设计。
✨ 特性
- ✅ 高性能 - 原生 Kafka Client 封装,无性能损耗
- ✅ 批量消费 - 支持批量拉取和处理,提升吞吐量
- ✅ 顺序消息 - 严格的分区有序性保证
- ✅ Ack 机制 - 灵活的手动/自动 Offset 提交策略
🚀 快速开始
💡 提示:如果使用 Spring Boot,推荐引入
lcgyl-mq-starter即可获得开箱即用的 Kafka 支持。
依赖引入
gradle
implementation 'io.gitee.lcgyl:lcgyl-mq-kafka-plugin:2.2.0'配置
yaml
lcgyl:
mq:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
producer:
acks: all
retries: 3发送消息
java
@Service
public class OrderEventProducer {
@Inject
private MessageProducer producer;
// 发送简单消息
public void sendOrderCreated(Order order) {
producer.send("order-events", order);
}
// 发送带 Key 的消息(保证同一 Key 路由到同一分区,实现顺序消费)
public void sendOrderUpdate(Order order) {
producer.send("order-events", order.getId(), order);
}
// 异步发送并处理结果
public CompletableFuture<Void> sendAsync(String topic, Object payload) {
return producer.sendAsync(topic, payload)
.thenAccept(result -> log.info("消息发送成功: offset={}", result.offset()))
.exceptionally(ex -> {
log.error("消息发送失败", ex);
return null;
});
}
}消费消息
java
@Component
public class OrderEventConsumer {
// 单条消费
@MessageListener(topic = "order-events", group = "order-service")
public void onOrderEvent(Order order) {
log.info("收到订单事件: {}", order.getId());
orderService.process(order);
}
// 批量消费(提升吞吐量)
@MessageListener(topic = "order-events", group = "batch-processor", batch = true)
public void onOrderEventBatch(List<Order> orders) {
log.info("批量处理 {} 条订单", orders.size());
orderService.batchProcess(orders);
}
// 手动 Ack
@MessageListener(topic = "payment-events", group = "payment-service", ackMode = AckMode.MANUAL)
public void onPaymentEvent(PaymentEvent event, Acknowledgment ack) {
try {
paymentService.handle(event);
ack.acknowledge(); // 处理成功后手动确认
} catch (Exception e) {
// 不确认,消息将被重新投递
log.error("支付事件处理失败", e);
}
}
}