要使用 Maven 实现 Kafka 消息消费,需要按照以下步骤进行操作:
- 在 Maven 项目的
pom.xml
文件中添加 Kafka 客户端的依赖:
org.apache.kafka kafka-clients 2.8.1
- 创建一个 Kafka 消费者:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { String bootstrapServers = "localhost:9092"; String groupId = "my-group"; String topic = "my-topic"; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); Consumerconsumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton(topic)); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.println("Received message: " + record.value()); } } } }
在上述代码中,需要根据实际情况修改 bootstrapServers
、groupId
和 topic
的值。
- 运行上述代码,即可启动 Kafka 消费者并开始消费消息。
以上就是使用 Maven 实现 Kafka 消息消费的步骤。