在Spring整合Kafka中,实现消息过滤的方法主要有两种:使用Kafka Message Filter或者使用Spring Kafka的ConsumerAwareErrorHandler
。下面是这两种方法的详细说明和示例代码。
- 使用Kafka Message Filter
Kafka Message Filter允许你在消费者端对消息进行过滤。要实现这个功能,你需要创建一个实现org.apache.kafka.clients.consumer.ConsumerInterceptor
接口的类,并重写onConsume
方法。在这个方法中,你可以根据需要对消息进行过滤。
首先,创建一个实现ConsumerInterceptor
接口的类:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties; public class MessageFilterInterceptor implements ConsumerInterceptor{ @Override public void onConsume(Consumer consumer, ConsumerRecords records) { for (ConsumerRecord record : records) { // 在这里对消息进行过滤 if (shouldFilter(record)) { consumer.seekToCurrentPosition(new TopicPartition(record.topic(), record.partition())); } else { // 如果消息满足条件,则继续处理 System.out.printf("Consumed record: key = %s, value = https://www.yisu.com/ask/%s, partition = %d, offset = %d%n", record.key(), record.value(), record.partition(), record.offset()); } } } @Override public void onCommitOffsets(Consumer consumer, Map offsets) { // 不需要实现此方法 } @Override public void close() { // 不需要实现此方法 } @Override public void configure(Properties props) { // 不需要实现此方法 } private boolean shouldFilter(ConsumerRecord record) { // 在这里实现你的过滤逻辑 // 例如,只处理键为"exampleKey"的消息 return record.key().equals("exampleKey"); } }
接下来,在Spring配置类中注册这个拦截器:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.listener.config.KafkaListenerEndpoint; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig implements KafkaListenerConfigurer { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public MapconsumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); return props; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setId("exampleEndpoint"); endpoint.setTopics("exampleTopic"); endpoint.setMessageHandlerMethodFactory(kafkaListenerContainerFactory().getMessageHandlerMethodFactory()); endpoint.setBean(new KafkaMessageFilterConsumer()); endpoint.setMethod(KafkaMessageFilterConsumer.class.getDeclaredMethods()[0]); registrar.registerEndpoint(endpoint); } @Override public void configureMessageConverters(List endpoints) { // 不需要实现此方法 } }
最后,创建一个消费者类,并使用@KafkaListener
注解指定要监听的端点:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaMessageFilterConsumer { @KafkaListener(id = "exampleEndpoint", groupId = "${spring.kafka.consumer.group-id}") public void listen(String message) { System.out.println("Received message: " + message); } }
- 使用ConsumerAwareErrorHandler
ConsumerAwareErrorHandler
允许你在发生错误时对消息进行处理。你可以在这个处理器中实现消息过滤逻辑。
首先,创建一个实现org.springframework.kafka.listener.ConsumerAwareErrorHandler
接口的类:
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaException; import org.springframework.kafka.listener.ConsumerAwareErrorHandler; import org.springframework.stereotype.Component; @Component public class MessageFilterErrorHandler implements ConsumerAwareErrorHandler { private final Consumerconsumer; public MessageFilterErrorHandler(Consumer consumer) { this.consumer = consumer; } @Override public void handle(Exception thrownException, ConsumerRecord, ?> data, Consumer, ?> consumer) { // 在这里对消息进行过滤 if (shouldFilter(data)) { consumer.seekToCurrentPosition(new TopicPartition(data.topic(), data.partition())); } else { // 如果消息满足条件,则抛出异常 throw new KafkaException("Error processing message: " + data, thrownException); } } private boolean shouldFilter(ConsumerRecord, ?> record) { // 在这里实现你的过滤逻辑 // 例如,只处理键为"exampleKey"的消息 return record.key().equals("exampleKey"); } }
接下来,在Spring配置类中注册这个错误处理器:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.listener.config.KafkaListenerEndpoint; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig implements KafkaListenerConfigurer { // ... 其他配置 ... @Bean public ConsumerFactoryconsumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setErrorHandler(new MessageFilterErrorHandler(consumerFactory().getConsumer())); return factory; } // ... 其他配置 ... }
这样,当发生错误时,MessageFilterErrorHandler
会拦截错误并对消息进行过滤。如果消息满足过滤条件,则忽略该消息;否则,抛出异常。