在 Spring Boot 中整合 Kafka 并实现消息过滤,可以通过以下步骤来完成:
- 添加依赖
在 pom.xml
文件中添加 Kafka 和 Spring Boot Kafka 相关的依赖:
org.springframework.boot spring-boot-starter-kafka
- 配置 Kafka
在 application.yml
或 application.properties
文件中配置 Kafka 相关属性:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- 创建消息过滤器
创建一个实现 ConsumerInterceptor
接口的类,用于实现消息过滤逻辑:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.Collections; import java.util.List; @Component public class MessageFilterInterceptor implements ConsumerInterceptor{ @Value("${kafka.topic}") private String topic; @Override public List > onConsume(ConsumerRecords records) { List > filteredRecords = Collections.emptyList(); for (ConsumerRecord record : records) { if (filter(record)) { filteredRecords.add(record); } } return filteredRecords; } @Override public void onAcknowledge(OffsetAndMetadata offsetAndMetadata) { // 无需实现 } @Override public void onCommitOffsets(OffsetAndMetadata offsetAndMetadata) { // 无需实现 } @Override public void onError(Exception e) { // 无需实现 } private boolean filter(ConsumerRecord record) { // 实现消息过滤逻辑,例如根据消息内容判断是否过滤 String messageValue = https://www.yisu.com/ask/record.value();"filtered"); } }
- 配置 Kafka 消费者
创建一个配置类,用于配置 Kafka 消费者,并将自定义的消息过滤器添加到消费者配置中:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar; import org.springframework.kafka.listener.config.KafkaListenerEndpoint; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig implements KafkaListenerConfigurer { @Value("${kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ConcurrentKafkaListenerContainerFactorykafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { MethodKafkaListenerEndpointRegistrar methodRegistrar = new MethodKafkaListenerEndpointRegistrar(registrar); methodRegistrar.setBean(this); methodRegistrar.setMethod(this.getClass().getDeclaredMethods()[0]); methodRegistrar.setTopics(Collections.singletonList(this.topic())); methodRegistrar.setConsumerFactory(consumerFactory()); methodRegistrar.setInterceptor(new MessageFilterInterceptor()); methodRegistrar.registerEndpoints(); } @Bean public String topic() { return "my-topic"; } }
- 创建 Kafka 监听器
创建一个类,用于处理接收到的消息:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaMessageListener { @KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.consumer.group-id}") public void listen(String message) { System.out.println("Received message: " + message); } }
现在,当消费者接收到消息时,MessageFilterInterceptor
会先对消息进行过滤,只有符合条件的消息才会被 KafkaMessageListener
处理。