DolphinDB的kafka插件默认是从最新的数据中开始读取。已经往topic中发布的数据,这样的代码是消费不到的,可以设置偏移量以获取之前的数据,加上这一行代码即可:
consumerCfg["auto.offset.reset"] = "earliest";
插件底层来源于librdkafka,其他参数可以参考:
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
现有一个topic里面已经灌入了数据,只有1个分区,然后kafka插件去接入数据发现一直没有数据,请问是哪里使用有问题吗?
代码如下:
brokerList = "....." groupId = "Test001" topics=["TEST1"] consumerCfg = dict(string, any); consumerCfg["metadata.broker.list"] = brokerList; consumerCfg["group.id"] = groupId; consumer = kafka::consumer(consumerCfg); kafka::subscribe(consumer, topics);
然后用:
kafka::pollByteStream(consumer)
调试没有得到数据
DolphinDB的kafka插件默认是从最新的数据中开始读取。已经往topic中发布的数据,这样的代码是消费不到的,可以设置偏移量以获取之前的数据,加上这一行代码即可:
consumerCfg["auto.offset.reset"] = "earliest";
插件底层来源于librdkafka,其他参数可以参考:
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md