在Java中操作Apache Kafka的步骤如下:
1.下载并添加Kafka的Java客户端依赖项。
2.创建一个Kafka Producer并设置Bootstrap服务器地址,然后发送消息到Broker。
3.创建一个Kafka Consumer并设置Bootstrap服务器地址,并指定订阅的Topic名称,然后从Broker接收消息。
下面是使用Java编写的操作Kafka的示例代码:
// 发送消息到Kafka Broker String topicName = "sample-topic"; String message = "Hello Kafka!"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>(topicName, null, message)); producer.close();
//从Kafka Broker接收消息 String topicName = "sample-topic"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "sample-group"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(topicName)); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } consumer.close();
希望这些代码能够帮助你开始在Java中操作Apache Kafka。