在Java中消费Kafka消息的步骤如下:
- 引入Kafka的依赖库,例如:kafka-clients-2.0.0.jar。
- 创建一个消费者对象,构造函数需要指定Kafka服务器的地址以及消费者组的名字。
- 让消费者订阅一个或多个主题,使用subscribe方法即可。
- 使用poll方法从Kafka集群中拉取消息。
- 处理消息并进行后续的业务逻辑。
示例代码:
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections; import java.util.Properties;
public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// 进行后续处理
}
}
}
}