可以,示例代码如下:
loadPlugin("/yourpath/PluginKafka.txt") producerCfg = dict(STRING, ANY); producerCfg["metadata.broker.list"] = "localhost:9092"; producer = kafka::producer(producerCfg); def sendMsgToKafkaFunc(producer, msg){ try { kafka::produce(producer, "dolphindb-producer-test", 1, msg, true) } catch(ex) { writeLog("[Kafka Plugin] Failed to send msg to kafka with error:" +ex) } } subscribeTable(tableName="kafkaTest", actionName="sendMsgToKafka", offset=0, handler=sendMsgToKafkaFunc{producer}, msgAsTable=true, reconnect=true)
kafka中消费到的信息如下:
... {"id":[1,1,...],"datetime":["2020.01.01T10:55:12","2020.01.01T10:55:13",...],"val":[73,74,...]} {"id":[1,1,...],"datetime":["2020.01.01T23:55:12","2020.01.01T23:55:13",...],"val":[88,1,...]} ...