Apache Kafka 消费者可以通过提交偏移量来跟踪它们已经处理过的消息。偏移量是消费者在消费消息时记录的一个位置,用于表示该消费者已经读取到了哪一条消息。当消费者处理完一条消息后,它会提交这条消息的偏移量到 Kafka 的内置主题中,以便 Kafka 知道该消费者已经处理了哪些消息。
以下是使用 offset 进行偏移量提交的基本步骤:
- 创建消费者配置:在创建 Kafka 消费者时,需要配置一些参数,例如 Kafka 代理地址、消费者组 ID、密钥和反序列化器等。这些参数将用于连接到 Kafka 代理并消费消息。
- 订阅主题:使用
subscribe
方法订阅一个或多个 Kafka 主题。这将使消费者开始监听这些主题上的消息。 - 消费消息:使用循环来持续消费消息。在每次循环中,消费者将从 Kafka 代理拉取一条或多条消息,并对这些消息进行处理。
- 提交偏移量:在处理完一条消息后,需要调用
commitSync
或commitAsync
方法来提交这条消息的偏移量。commitSync
方法将立即提交偏移量,而commitAsync
方法则将偏移量提交到 Kafka,但不会等待提交完成。
以下是使用 offset 进行偏移量提交的示例代码(Java):
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "my-group"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { // 处理消息 System.out.printf("offset = %d, key = %s, value = https://www.yisu.com/ask/%s%n", record.offset(), record.key(), record.value()); // 提交偏移量 consumer.commitSync(); } }
在上面的示例中,我们首先创建了一个 Kafka 消费者,并订阅了名为 “my-topic” 的主题。然后,我们使用一个无限循环来持续消费消息。在每次循环中,我们从 Kafka 代理拉取一条或多条消息,并对这些消息进行处理。最后,我们调用 commitSync
方法来提交每条消息的偏移量。
需要注意的是,提交偏移量应该在处理完消息之后进行,以确保消息不会被重复消费。另外,可以根据实际需求选择使用 commitSync
或 commitAsync
方法来提交偏移量。如果使用 commitAsync
方法,需要注意处理提交失败的情况,以避免消息丢失或重复消费。