Kafka 消息发送的消息确认机制是通过生产者客户端来实现的。生产者客户端在发送消息到 Kafka 集群时,可以配置不同的确认策略来确保消息的可靠传输。以下是 Kafka 生产者提供的两种主要确认方式:
-
同步确认(Synchronous Acknowledgment):
- 在这种模式下,生产者在成功发送消息后,会等待 Kafka 集群的响应,确认消息已经被成功写入。
- 生产者会阻塞,直到收到 Kafka 的确认消息。
- 这种方式可以保证消息的可靠传输,但可能会降低生产者的吞吐量。
-
异步确认(Asynchronous Acknowledgment):
- 在这种模式下,生产者发送消息后不会等待 Kafka 的响应,而是立即返回成功。
- 生产者可以通过轮询或者回调的方式来检查消息是否已经被成功写入。
- 这种方式可以提高生产者的吞吐量,但可能会增加消息丢失的风险。
配置和使用示例
同步确认
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "all"); // 设置为 all 表示需要所有副本都确认 Producerproducer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>("my-topic", "key", "value"), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } else { System.out.println("Message sent to topic: " + metadata.topic() + " partition: " + metadata.partition() + " offset: " + metadata.offset()); } } }); producer.close();
异步确认
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("acks", "1"); // 设置为 1 表示只需要 leader 确认 Producerproducer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>("my-topic", "key", "value"), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } else { System.out.println("Message sent to topic: " + metadata.topic() + " partition: " + metadata.partition() + " offset: " + metadata.offset()); } } }); producer.close();
总结
- 同步确认:适用于对消息可靠性要求较高的场景,但可能会降低吞吐量。
- 异步确认:适用于对吞吐量要求较高的场景,但可能会增加消息丢失的风险。
根据具体的应用场景和需求,可以选择合适的确认策略来确保消息的可靠传输。