在 Spring Boot 中整合 Kafka 进行消息持久化,你需要遵循以下步骤:
- 添加依赖
在你的 pom.xml
文件中添加 Spring Boot 和 Kafka 相关的依赖:
org.springframework.boot spring-boot-starter-kafka
- 配置 Kafka
在 application.properties
或 application.yml
文件中配置 Kafka 相关属性:
# application.properties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- 创建 Kafka 配置类
创建一个配置类,用于设置 Kafka 生产者和消费者的属性:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public MapconsumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
- 创建 Kafka 生产者和消费者
创建一个生产者类,用于发送消息到 Kafka:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class KafkaProducer { @Autowired private KafkaTemplatekafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } }
创建一个消费者类,用于从 Kafka 消费消息:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { @Autowired private KafkaProducer kafkaProducer; @KafkaListener(topics = "my-topic", groupId = "my-group") public void listen(String message) { System.out.println("Received message: " + message); kafkaProducer.sendMessage("another-topic", message); } }
- 创建消息发送和接收的示例
创建一个类,用于发送和接收消息:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Component public class KafkaExample implements CommandLineRunner { @Autowired private KafkaProducer kafkaProducer; @Override public void run(String... args) throws Exception { kafkaProducer.sendMessage("my-topic", "Hello, Kafka!"); } }
现在,当你运行你的 Spring Boot 应用时,它将发送一条消息到 my-topic
主题,然后从同一个主题消费消息并将其发送到另一个主题。Kafka 会自动处理消息的持久化,确保消息不会丢失。