Kafka ProducerRecord 是 Apache Kafka 中用于发送消息到 Kafka 主题(Topic)的一个类。要处理 Kafka ProducerRecord,你需要遵循以下步骤:
- 创建一个 KafkaProducer 实例:首先,你需要创建一个 KafkaProducer 实例,以便与 Kafka 集群进行通信。在创建实例时,需要配置一些基本属性,如 bootstrap.servers(Kafka 集群地址)、key.serializer 和 value.serializer(消息键和值的序列化器)等。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducerproducer = new KafkaProducer<>(props);
- 创建一个 ProducerRecord 实例:接下来,你需要创建一个 ProducerRecord 实例,指定要发送的主题、键(Key)和值(Value)。
String topic = "my-topic"; String key = "my-key"; String value = "https://www.yisu.com/ask/my-value"; ProducerRecordrecord = new ProducerRecord<>(topic, key, value);
- 发送消息:使用 KafkaProducer 实例的
send()
方法发送 ProducerRecord 实例。这个方法是一个异步方法,它将立即返回一个 Future 对象。你可以选择等待这个操作完成,或者继续执行其他任务。
Futurefuture = producer.send(record);
如果你希望等待消息发送完成并获取一个确认消息,可以调用 future.get()
方法。这将抛出 ExecutionException,你需要捕获并处理它。同时,你还可以获取一个 RecordMetadata 对象,其中包含消息的元数据(如分区、偏移量等)。
try { RecordMetadata metadata = https://www.yisu.com/ask/future.get();"Message sent to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
- 关闭 KafkaProducer:在完成所有发送操作后,记得关闭 KafkaProducer 实例,以释放资源。
producer.close();
总结一下,处理 Kafka ProducerRecord 的步骤如下:
- 创建 KafkaProducer 实例并配置属性。
- 创建 ProducerRecord 实例,指定主题、键和值。
- 使用 KafkaProducer 实例的
send()
方法发送消息。 - (可选)等待消息发送完成并获取确认消息。
- 关闭 KafkaProducer 实例。