Kafka 延迟队列的实现主要依赖于两个组件:KafkaDelayQueue
和 DelayedMessageListenerContainer
。下面是一个简单的配置示例,展示了如何在 Spring Boot 项目中配置 Kafka 延迟队列。
- 添加依赖
在 pom.xml
文件中添加 Kafka 和 Spring Kafka 相关的依赖:
org.apache.kafka kafka-clients 2.8.0 org.springframework.kafka spring-kafka 2.8.0
- 配置 Kafka 延迟队列
在 application.yml
或 application.properties
文件中配置 Kafka 相关的属性:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: delay-queue-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
- 创建延迟消息生产者
创建一个生产者,用于发送延迟消息到 Kafka 延迟队列:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class DelayMessageProducer { @Autowired private KafkaProducerkafkaProducer; public void sendDelayMessage(String topic, String message, long delay) { kafkaProducer.send(new ProducerRecord<>(topic, message)); } }
- 创建延迟消息消费者
创建一个消费者,用于监听 Kafka 延迟队列中的消息:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.time.Duration; import java.util.Collections; import java.util.Properties; @Component public class DelayMessageConsumer { @Autowired private KafkaConsumerkafkaConsumer; public DelayMessageConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "delay-queue-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); props.put("enable.auto.commit", "false"); kafkaConsumer.setProperties(props); } @KafkaListener(topics = "${kafka.consumer.topic}", groupId = "${kafka.consumer.group-id}") public void listen(ConsumerRecord record) { // 处理延迟消息 } }
- 配置监听容器
创建一个监听容器,用于处理延迟消息的消费:
import org.apache.kafka.clients.consumer.Consumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar; import org.springframework.kafka.listener.config.KafkaListenerEndpoint; import org.springframework.stereotype.Component; @Component public class DelayMessageListenerContainerRegistrar implements MethodKafkaListenerEndpointRegistrar { @Autowired private DelayMessageConsumer delayMessageConsumer; @Override public void registerEndpoints(MethodKafkaListenerEndpointRegistrar registrar) { KafkaListenerEndpointendpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setId("delayMessageEndpoint"); endpoint.setTopics(Collections.singletonList("${kafka.consumer.topic}")); endpoint.setMessageHandlerMethodFactory(new DefaultKafkaListenerEndpointMessageHandlerMethodFactory()); endpoint.setConsumer(delayMessageConsumer); registrar.registerEndpoint(endpoint); } }
现在,你已经配置了一个简单的 Kafka 延迟队列。你可以通过调用 DelayMessageProducer
的 sendDelayMessage
方法发送延迟消息,然后 DelayMessageConsumer
将监听并处理这些消息。