要查看Kafka的偏移量,您可以使用Kafka的命令行工具或编程API。以下是两种常用方法:
方法1:使用Kafka命令行工具
Kafka提供了一个名为kafka-consumer-groups.sh
的命令行工具,可以用来查看消费者组的偏移量。请按照以下步骤操作:
- 打开命令行终端。
- 使用以下命令格式运行
kafka-consumer-groups.sh
工具:
./kafka-consumer-groups.sh --bootstrap-server--describe --group
将
替换为您的Kafka代理地址(例如:localhost:9092
),将
替换为您要查询的消费者组ID。
运行此命令后,您将看到类似以下的输出:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-consumer-group my-topic 0 100 200 100 consumer-1-86753f8a-1234-4567-89ab-cdef12345678 /127.0.0.1 consumer-1
在这个例子中,您可以查看消费者组my-consumer-group
在主题my-topic
的每个分区的当前偏移量(CURRENT-OFFSET
)、日志结束偏移量(LOG-END-OFFSET
)和lag(LAG
)。
方法2:使用编程API
您还可以使用Kafka客户端库(如Java、Python、Go等)编写程序来查询偏移量。以下是使用Java API的示例:
import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaOffsetViewer { public static void main(String[] args) { String kafkaBootstrapServer = ""; String consumerGroupId = " "; Properties props = new Properties(); props.put("bootstrap.servers", kafkaBootstrapServer); props.put("group.id", consumerGroupId); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(" ")); consumer.seekToEnd(consumer.assignment()); while (true) { consumer.poll(Duration.ofMillis(1000)); consumer.position(consumer.assignment().iterator().next()); System.out.println("Topic: " + consumer.topic() + ", Partition: " + consumer.partition() + ", Offset: " + consumer.position()); } } }
将
替换为您的Kafka代理地址,将
替换为您要查询的消费者组ID,将
替换为您要查询的主题名称。
运行此Java程序后,您将看到类似以下的输出:
Topic: my-topic, Partition: 0, Offset: 200
在这个例子中,您可以查看主题my-topic
在分区0的当前偏移量。