在 Spring Boot 中整合 Kafka 并实现消息优先级,你需要遵循以下步骤:
- 添加依赖
在你的 pom.xml
文件中添加 Kafka 和 Spring Kafka 相关的依赖:
org.springframework.kafka spring-kafka org.springframework.kafka spring-kafka-support
- 配置 Kafka
在你的 application.yml
或 application.properties
文件中配置 Kafka 相关参数:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
- 创建消息优先级配置类
创建一个配置类,用于设置 Kafka 生产者的消息优先级属性:
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.producer.properties.priority.enable}") private boolean priorityEnable; @Bean public MapproducerConfigs() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); if (priorityEnable) { configProps.put(ProducerConfig.PRIORITY_CONFIG, 1); // 设置消息优先级,数值越大优先级越高 } return configProps; } }
- 创建 Kafka 生产者
创建一个 Kafka 生产者类,使用上面配置的属性发送消息:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class KafkaProducerService { @Autowired private KafkaTemplatekafkaTemplate; public void sendMessage(String topic, String message, int priority) { ProducerRecord record = new ProducerRecord<>(topic, message); record.headers().add("priority", String.valueOf(priority).getBytes()); kafkaTemplate.send(record); } }
- 创建 Kafka 消费者
创建一个 Kafka 消费者类,订阅指定主题并处理消息:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.time.Duration; import java.util.Collections; import java.util.Properties; @Component public class KafkaConsumerService { @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.consumer.key-deserializer}") private String keyDeserializer; @Value("${spring.kafka.consumer.value-deserializer}") private String valueDeserializer; @Bean public ConsumerConfig consumerConfig() { return new ConsumerConfig( Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, groupId), Collections.singletonMap(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"), Collections.singletonMap(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer), Collections.singletonMap(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer), Duration.ofMillis(3000) ); } @KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId = groupId) public void listen(ConsumerRecordsrecords) { records.forEach(record -> { // 处理消息逻辑 System.out.println("Received message: " + record.value() + " with priority: " + record.headers().get("priority")); }); } }
现在,你已经成功整合了 Kafka 并实现了消息优先级。当你使用 KafkaProducerService
发送消息时,可以通过设置 priority
参数来指定消息优先级。消费者在处理消息时,可以从消息头中获取优先级信息。