在Spring Kafka中,消费者组是通过KafkaListenerContainerFactory
和ConsumerFactory
来实现的。要实现消费者组,你需要遵循以下步骤:
- 配置
KafkaListenerContainerFactory
:
首先,你需要配置一个KafkaListenerContainerFactory
bean,这个bean将用于创建Kafka消费者。在这个bean的配置中,你可以设置消费者组的ID。例如:
@Bean public KafkaListenerContainerFactorykafkaListenerContainerFactory( ConsumerFactory consumerFactory) { KafkaListenerContainerFactory factory = new KafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(3); // 设置并发消费者数量 return factory; }
- 配置
ConsumerFactory
:
接下来,你需要配置一个ConsumerFactory
bean,这个bean将用于创建Kafka消费者实例。在这个bean的配置中,你可以设置消费者组ID。例如:
@Bean public ConsumerFactoryconsumerFactory() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); }
在这个例子中,bootstrapServers
是你的Kafka集群的地址,groupId
是你的消费者组ID。
- 创建Kafka消费者监听器:
现在你可以创建一个Kafka消费者监听器,并使用@KafkaListener
注解来指定要订阅的主题。例如:
@Service public class KafkaConsumerListener { @KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}") public void listen(ConsumerRecordrecord) { System.out.printf("Received message: key = %s, value = https://www.yisu.com/ask/%s, partition = %d, offset = %d%n", record.key(), record.value(), record.partition(), record.offset()); } }
在这个例子中,kafka.topic
和kafka.groupId
是应用程序的配置属性,它们分别表示要订阅的主题和消费者组ID。
- 启动应用程序:
最后,启动你的Spring Boot应用程序。Spring Kafka将自动创建消费者组,并根据配置的并发消费者数量创建相应的消费者实例。消费者实例将根据消费者组ID和主题进行分组,并从Kafka集群中消费消息。