Kafka 消费者(Consumer)订阅主题(Topic)以消费消息。以下是使用 Java 客户端库进行订阅的步骤:
- 添加 Kafka 客户端依赖
在 Maven 项目的 pom.xml 文件中添加以下依赖:
org.apache.kafka kafka-clients 2.8.0
- 创建消费者配置
创建一个 Kafka 消费者配置对象,指定 Kafka 集群的地址、消费者组 ID 等属性。
Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- 创建消费者
使用消费者配置创建一个 Kafka 消费者实例。
KafkaConsumerconsumer = new KafkaConsumer<>(props);
- 订阅主题
使用 subscribe
方法订阅一个或多个主题。
consumer.subscribe(Arrays.asList("my-topic"));
- 消费消息
使用 poll
方法轮询消息,并使用 consume
方法处理消息。
while (true) { ConsumerRecordsrecords = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = https://www.yisu.com/ask/%s%n", record.offset(), record.key(), record.value()); // 处理消息的逻辑 } }
这是一个简单的 Kafka 消费者订阅消息的示例。实际应用中,你可能需要根据需求进行更多的配置和处理。