在Spring整合Kafka时,消息转换是一个重要的环节。为了实现消息转换,你可以使用Kafka的MessageConverter
接口。Spring提供了两种常用的MessageConverter
实现:StringHttpMessageConverter
和ByteArrayHttpMessageConverter
。但在Kafka消费者端,我们通常使用KafkaDeserializer
来处理消息的反序列化。
以下是一个简单的示例,展示了如何在Spring整合Kafka时进行消息转换:
- 首先,创建一个自定义的
KafkaDeserializer
,用于实现消息的反序列化。在这个例子中,我们将把字符串消息转换为MyMessage
对象:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.databind.ObjectMapper; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.io.IOException; public class MyKafkaDeserializer extends ErrorHandlingDeserializer{ private final ObjectMapper objectMapper = new ObjectMapper(); public MyKafkaDeserializer() { this(null); } public MyKafkaDeserializer(String keyDeserializer) { super(new JsonDeserializer<>(MyMessage.class)); } @Override protected MyMessage deserialize(String topic, byte[] data) throws IOException { return objectMapper.readValue(data, MyMessage.class); } }
- 在你的Kafka消费者配置类中,将自定义的
MyKafkaDeserializer
添加到KafkaListenerEndpointRegistry
中:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConsumerConfig implements KafkaListenerConfigurer { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ConcurrentKafkaListenerContainerFactorykafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaDeserializer.class); return props; } @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setId("myListener"); endpoint.setTopics("my-topic"); endpoint.setMessageHandlerMethodFactory(kafkaListenerContainerFactory().getMessageHandlerMethodFactory()); endpoint.setBean(new MyKafkaConsumer()); registrar.registerEndpoint(endpoint); } @Override public void configureKafkaListenerEndpointRegistrars(KafkaListenerEndpointRegistry registry) { registry.registerEndpoints(configureKafkaListeners(new KafkaListenerEndpointRegistrar())); } }
- 创建一个Kafka消费者监听器类
MyKafkaConsumer
,用于处理接收到的消息:
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class MyKafkaConsumer { @KafkaListener(id = "myListener", groupId = "my-group") public void listen(MyMessage message) { System.out.println("Received message: " + message); } }
- 最后,确保你的
MyMessage
类已经正确实现了序列化和反序列化。你可以使用Jackson库来实现这个功能:
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; public class MyMessage { private String content; // Getters and setters public static MyMessage fromJson(String json) throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(json, MyMessage.class); } public String toJson() throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(this); } }
现在,当你的Spring应用从Kafka消费消息时,消息将被转换为MyMessage
对象,然后由MyKafkaConsumer
处理。