Kafka插件生产者,是否可以直接将一个表生产到kafka

比如在流表订阅时的handler,直接将msg作为参数传入produce

请先 登录 后评论

1 个回答

Yingnan Wang

可以,示例代码如下:

loadPlugin("/yourpath/PluginKafka.txt")

producerCfg = dict(STRING, ANY);
producerCfg["metadata.broker.list"] = "localhost:9092";
producer = kafka::producer(producerCfg);

def sendMsgToKafkaFunc(producer, msg){
	try {
		kafka::produce(producer, "dolphindb-producer-test", 1, msg, true)
	}
	catch(ex) {
		writeLog("[Kafka Plugin] Failed to send msg to kafka with error:" +ex)
	}
}

subscribeTable(tableName="kafkaTest", actionName="sendMsgToKafka", offset=0, handler=sendMsgToKafkaFunc{producer}, msgAsTable=true, reconnect=true)

kafka中消费到的信息如下:

...
{"id":[1,1,...],"datetime":["2020.01.01T10:55:12","2020.01.01T10:55:13",...],"val":[73,74,...]}
{"id":[1,1,...],"datetime":["2020.01.01T23:55:12","2020.01.01T23:55:13",...],"val":[88,1,...]}
...
请先 登录 后评论