要连接Kafka,可以使用Kafka-Python库。以下是一个简单的示例代码,演示如何连接Kafka并发送消息:
from kafka import KafkaProducer, KafkaConsumer # 连接Kafka生产者 producer = KafkaProducer(bootstrap_servers='localhost:9092') # 发送消息 producer.send('my_topic', b'Hello, Kafka!') # 关闭生产者连接 producer.close() # 连接Kafka消费者 consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092') # 消费消息 for message in consumer: print(message.value.decode('utf-8')) # 关闭消费者连接 consumer.close()
在上面的示例中,我们首先使用KafkaProducer
类连接到Kafka,并使用send()
方法发送消息到名为my_topic
的主题。
然后,我们使用KafkaConsumer
类连接到Kafka,并使用consumer()
方法订阅my_topic
主题。然后,使用for
循环遍历消费者对象以接收消息,并使用value
属性获取消息的值。
最后,我们关闭生产者和消费者的连接。
请注意,上述示例假设Kafka服务器在本地运行,且端口号为9092。您需要根据您的Kafka服务器配置进行适当的更改。