在 Spring Boot 中整合 Kafka 进行异步发送非常简单。首先,确保你已经在项目中添加了 Kafka 依赖。在 pom.xml
文件中添加以下依赖:
org.springframework.kafka spring-kafka
接下来,配置 Kafka 生产者。在 application.yml
或 application.properties
文件中添加以下配置:
spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
创建一个配置类,用于创建 Kafka 生产者实例:
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactoryproducerFactory() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
现在,你可以在需要发送消息的类中注入 KafkaTemplate
并使用它异步发送消息。例如:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaMessageSender { @Autowired private KafkaTemplatekafkaTemplate; public void sendMessageAsync(String topic, String message) { kafkaTemplate.send(topic, message); } }
在上面的示例中,我们创建了一个名为 KafkaMessageSender
的服务类,并在其中注入了 KafkaTemplate
。sendMessageAsync
方法使用 kafkaTemplate.send()
方法异步发送消息到指定的 Kafka 主题。
现在,你可以在其他类中调用 KafkaMessageSender
的 sendMessageAsync
方法来异步发送 Kafka 消息。