在 Spring Boot 中整合 Kafka 相对简单,只需要几个步骤即可完成。以下是详细的配置步骤:
1. 添加依赖
首先,在你的 pom.xml
文件中添加 Spring Boot 和 Kafka 的依赖:
org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka
2. 配置 Kafka
在 application.properties
或 application.yml
文件中配置 Kafka 的相关参数:
application.properties
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
application.yml
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
3. 创建 Kafka 配置类
创建一个配置类来定义 Kafka 的相关配置:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConfig implements KafkaListenerConfigurer { @Bean public MapconsumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setKafkaListenerEndpointRegistrar(new KafkaListenerEndpointRegistrar() { @Override public void registerEndpoints(KafkaListenerEndpointRegistry registry) { registry.register(kafkaListenerEndpoint()); } }); } @Bean public KafkaListenerEndpoint kafkaListenerEndpoint() { KafkaListenerEndpoint endpoint = new KafkaListenerEndpoint<>(); endpoint.setId("my-endpoint"); endpoint.setTopics("my-topic"); endpoint.setMessageHandlerMethodFactory(kafkaListenerEndpointMethodFactory()); return endpoint; } @Bean public KafkaListenerEndpointMethodFactory kafkaListenerEndpointMethodFactory() { return new KafkaListenerEndpointMethodFactory<>(); } }
4. 创建消费者和生产者
创建一个消费者和生产者类来处理消息:
消费者示例
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group") public void listen(String message) { System.out.println("Received message: " + message); } }
生产者示例
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducer { @Autowired private KafkaTemplatekafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } }
5. 启动类
创建一个启动类来启动 Spring Boot 应用:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
6. 测试
启动应用后,你可以使用 Kafka 工具(如 kafka-console-producer.sh
和 kafka-console-consumer.sh
)来测试消息的生产和消费。
总结
以上步骤涵盖了在 Spring Boot 中整合 Kafka 的基本配置。你可以根据具体需求进一步扩展和优化这些配置。