要确认消息已被正确处理,您可以使用 PHP RdKafka 扩展的 ack
方法
set('group.id', 'myGroup'); $conf->set('bootstrap.servers', 'localhost:9092'); $consumer = new \RdKafka\KafkaConsumer($conf); // 订阅主题 $consumer->subscribe(['myTopic']); while (true) { // 拉取消息 $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "Reached end of partition event\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; case RD_KAFKA_RESP_ERR__PARTITION_NOT_FOUND: echo "Partition not found\n"; break; case RD_KAFKA_RESP_ERR__UNKNOWN: throw new \Exception($message->errstr(), $message->err); default: if ($message->err == RD_KAFKA_RESP_ERR_NO_ERROR) { // 消息已处理,确认 $consumer->ack($message); echo "Message consumed and acknowledged\n"; } else { throw new \Exception($message->errstr(), $message->err); } break; } }
在这个示例中,我们创建了一个 Kafka 消费者,订阅了名为 “myTopic” 的主题。然后,我们进入一个无限循环,不断从 Kafka 拉取消息。当成功拉取到消息时($message->err == RD_KAFKA_RESP_ERR_NO_ERROR
),我们调用 ack
方法来确认消息已被正确处理。如果发生错误,我们将抛出异常。