如何在Java中消费Kafka消息?

我是一名Java开发人员,现在遇到一个问题,请问如何在Java中消费Kafka消息?我已经成功安装了Kafka,但是不知道如何在Java代码中集成Kafka消息的消费。我想要在我的应用程序中消费Kafka消息并进行后续处理。是否有任何库或API可以在Java中使用来消费Kafka消息。非常感谢有经验的开发人员提供帮助和指导!

提问时间:2023-07-20 03:46:01 编辑时间:2023-07-20 03:46:03 提问者: Night_Crawler
  • Wild_Waterfall
    3

    在Java中消费Kafka消息的步骤如下:

    1. 引入Kafka的依赖库,例如:kafka-clients-2.0.0.jar。
    2. 创建一个消费者对象,构造函数需要指定Kafka服务器的地址以及消费者组的名字。
    3. 让消费者订阅一个或多个主题,使用subscribe方法即可。
    4. 使用poll方法从Kafka集群中拉取消息。
    5. 处理消息并进行后续的业务逻辑。

    示例代码:

    import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer;

    import java.util.Collections; import java.util.Properties;

    public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
        consumer.subscribe(Collections.singletonList("test-topic"));
    
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                // 进行后续处理
            }
        }
    }
    

    }

    回答时间:2023-07-20 03:46:07