Skip to content

Kafka Plugin

对应模块: io.gitee.lcgyl:lcgyl-mq-kafka-plugin

基于 Apache Kafka Client 的消息队列实现,专为高吞吐场景设计。

✨ 特性

  • 高性能 - 原生 Kafka Client 封装,无性能损耗
  • 批量消费 - 支持批量拉取和处理,提升吞吐量
  • 顺序消息 - 严格的分区有序性保证
  • Ack 机制 - 灵活的手动/自动 Offset 提交策略

🚀 快速开始

💡 提示:如果使用 Spring Boot,推荐引入 lcgyl-mq-starter 即可获得开箱即用的 Kafka 支持。

依赖引入

gradle
implementation 'io.gitee.lcgyl:lcgyl-mq-kafka-plugin:2.2.0'

配置

yaml
lcgyl:
  mq:
    kafka:
      bootstrap-servers: localhost:9092
      consumer:
        group-id: my-consumer-group
        auto-offset-reset: earliest
        enable-auto-commit: false
      producer:
        acks: all
        retries: 3

发送消息

java
@Service
public class OrderEventProducer {
    
    @Inject
    private MessageProducer producer;
    
    // 发送简单消息
    public void sendOrderCreated(Order order) {
        producer.send("order-events", order);
    }
    
    // 发送带 Key 的消息(保证同一 Key 路由到同一分区,实现顺序消费)
    public void sendOrderUpdate(Order order) {
        producer.send("order-events", order.getId(), order);
    }
    
    // 异步发送并处理结果
    public CompletableFuture<Void> sendAsync(String topic, Object payload) {
        return producer.sendAsync(topic, payload)
            .thenAccept(result -> log.info("消息发送成功: offset={}", result.offset()))
            .exceptionally(ex -> {
                log.error("消息发送失败", ex);
                return null;
            });
    }
}

消费消息

java
@Component
public class OrderEventConsumer {
    
    // 单条消费
    @MessageListener(topic = "order-events", group = "order-service")
    public void onOrderEvent(Order order) {
        log.info("收到订单事件: {}", order.getId());
        orderService.process(order);
    }
    
    // 批量消费(提升吞吐量)
    @MessageListener(topic = "order-events", group = "batch-processor", batch = true)
    public void onOrderEventBatch(List<Order> orders) {
        log.info("批量处理 {} 条订单", orders.size());
        orderService.batchProcess(orders);
    }
    
    // 手动 Ack
    @MessageListener(topic = "payment-events", group = "payment-service", ackMode = AckMode.MANUAL)
    public void onPaymentEvent(PaymentEvent event, Acknowledgment ack) {
        try {
            paymentService.handle(event);
            ack.acknowledge();  // 处理成功后手动确认
        } catch (Exception e) {
            // 不确认,消息将被重新投递
            log.error("支付事件处理失败", e);
        }
    }
}

Released under the Apache License 2.0