要实现通过C语言向Kafka传输数据,可以使用librdkafka这个开源的C语言库。下面是一个简单的示例代码,演示如何使用librdkafka来向Kafka发送消息:
#include#include #include int main(int argc, char *argv[]) { rd_kafka_t *rk; rd_kafka_topic_t *rkt; rd_kafka_conf_t *conf; char errstr[512]; // 创建Kafka配置对象 conf = rd_kafka_conf_new(); // 设置Kafka配置项,例如bootstrap.servers(Kafka集群的地址) if (rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "Error configuring Kafka: %s\n", errstr); return 1; } // 创建Kafka生产者对象 rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "Error creating Kafka producer: %s\n", errstr); return 1; } // 创建Kafka主题对象 rkt = rd_kafka_topic_new(rk, "test_topic", NULL); // 发送消息到Kafka char *message = "Hello, Kafka!"; if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, message, strlen(message), NULL, 0, NULL) == -1) { fprintf(stderr, "Error producing message: %s\n", rd_kafka_err2str(rd_kafka_last_error())); return 1; } // 等待消息发送完成 rd_kafka_flush(rk, 10*1000); // 清理资源 rd_kafka_topic_destroy(rkt); rd_kafka_destroy(rk); return 0; }
以上代码示例中,我们通过librdkafka库创建了一个Kafka生产者对象,并向名为"test_topic"的主题发送了一条消息"Hello, Kafka!"。在实际使用中,你可以根据自己的需求配置更多的Kafka参数,并发送不同的消息内容。