Java程序可以通过Kafka提供的Java客户端来获取Kafka的topic。以下是一个获取topic列表的示例代码:
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.KafkaFuture; import java.util.Collection; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaTopicExample { public static void main(String[] args) { // Kafka配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // 创建AdminClient对象 try (AdminClient adminClient = AdminClient.create(properties)) { // 获取topic列表 ListTopicsResult topicsResult = adminClient.listTopics(); // 获取Future对象 KafkaFuture> topicListingFuture = topicsResult.listings(); // 获取topic列表 Collection topicListings = topicListingFuture.get(); // 遍历输出每个topic for (TopicListing topicListing : topicListings) { System.out.println(topicListing.name()); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
在上述代码中,我们首先创建了一个AdminClient
对象,并传入Kafka的配置。然后,我们通过listTopics
方法获取一个ListTopicsResult
对象,该对象包含了获取topic列表的方法。我们通过调用listings
方法获取一个KafkaFuture
对象,该对象代表了一个异步的获取topic列表的过程。最后,我们通过调用get
方法获取真正的topic列表,并遍历输出每个topic的名称。
请注意,这里的配置中使用了bootstrap.servers
参数来指定Kafka集群的地址,你需要根据你实际的Kafka集群配置来修改该参数。