使用PHP的rdkafka扩展库来消费消息的步骤如下:
- 首先安装rdkafka扩展库,可以通过composer安装:
composer require edenhill/php-rdkafka
- 创建一个消费者实例:
set('group.id', 'myConsumerGroup'); $conf->set('metadata.broker.list', 'localhost:9092'); $consumer = new RdKafka\Consumer($conf); $consumer->subscribe(['myTopic']);
- 开始消费消息:
while (true) { $message = $consumer->consume(120 * 1000); // 120秒超时 if ($message->err) { echo "Error: {$message->errstr()}\n"; continue; } echo "Received message: {$message->payload}\n"; }
-
处理消费到的消息,可以根据业务需求进行处理。
-
最后,记得在结束时关闭消费者实例:
$consumer->close();
以上就是使用PHP的rdkafka扩展库来消费消息的基本步骤,可以根据实际情况进行适当的调整和优化。