如何在Java中操作Apache Kafka?

我想在Java应用程序中使用Apache Kafka,但对于如何操作不太熟悉。我已经成功地安装了Kafka并且熟悉了基本概念,但是不知道如何开始编写Java代码与Kafka交互。我需要一个简单的步骤指南,它能帮我建立一个与Kafka Broker连接并能够从Broker中获取消息的Java应用程序。希望有经验的开发人员能够提供一些示例代码和指导,以便我在实践中学习如何操作Kafka。

提问时间:2023-07-30 12:21:45 编辑时间:2023-07-30 12:21:47 提问者: Wild_Waterfall
  • 狂沙漫舞
    8

    在Java中操作Apache Kafka的步骤如下:

    1.下载并添加Kafka的Java客户端依赖项。

    2.创建一个Kafka Producer并设置Bootstrap服务器地址,然后发送消息到Broker。

    3.创建一个Kafka Consumer并设置Bootstrap服务器地址,并指定订阅的Topic名称,然后从Broker接收消息。

    下面是使用Java编写的操作Kafka的示例代码:

    // 发送消息到Kafka Broker String topicName = "sample-topic"; String message = "Hello Kafka!"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>(topicName, null, message)); producer.close();

    //从Kafka Broker接收消息 String topicName = "sample-topic"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "sample-group"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(topicName)); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } consumer.close();

    希望这些代码能够帮助你开始在Java中操作Apache Kafka。

    回答时间:2023-07-30 12:21:50