下面方法二为多个引擎级联使用,方法一和方法二虽然实现上不同,但最终结果相同。
方法一:通过中间表,连接两个引擎
//++++++++++++++++++++++++ 方法一:通过中间表kline,连接两个引擎 +++++++++++++++++++++++ share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades share streamTable(1000:0, `time`sym`open`close`high`low`volume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as kline outputTable=table(1000:0, `sym`factor1, [SYMBOL, DOUBLE]) // trade表 >>> TimeSeriesEngine 引擎 >>> kline 表 Tengine=createTimeSeriesEngine(name="timeseries", windowSize=60000, step=60000, metrics=<[first(price), last(price), max(price), min(price), sum(volume)]>, dummyTable=trades, outputTable=kline, timeColumn=`time, useSystemTime=false, keyColumn=`sym) subscribeTable(tableName="trades", actionName="timeseries", offset=-1, handler=append!{Tengine}, msgAsTable=true) // kline 表 >>> ReactiveStateEngine 引擎 >>> outputTable 表 Rengine=createReactiveStateEngine(name="reactive", metrics=<[mavg(open, 3)]>, dummyTable=kline, outputTable=outputTable, keyColumn="sym") subscribeTable(tableName="kline", actionName="reactive", offset=-1, handler=append!{Rengine}, msgAsTable=true)
方法二:引擎级联,中间结果不需要输出到中间表 (方法二中kline只提供表的schema,不会存储中间结果)
//++++++++++++++++++++++++ 方法二:引擎级联,不需要中间表+++++++++++++++++++++++++++ share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades share streamTable(1000:0, `time`sym`open`close`high`low`volume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as kline outputTable=table(1000:0, `sym`factor1, [SYMBOL, DOUBLE]) Rengine=createReactiveStateEngine(name="reactive", metrics=<[mavg(open, 3)]>, dummyTable=kline, outputTable=outputTable, keyColumn="sym") Tengine=createTimeSeriesEngine(name="timeseries", windowSize=60000, step=60000, metrics=<[first(price), last(price), max(price), min(price), sum(volume)]>, dummyTable=trades, outputTable=Rengine, timeColumn=`time, useSystemTime=false, keyColumn=`sym) //时间序列引擎的结果输入响应式状态引擎 subscribeTable(tableName="trades", actionName="timeseries", offset=0, handler=append!{Tengine}, msgAsTable=true)
流表trades注入数据:上述方法一或者方法二提交之后,可以向流数据表trades中写入100条数据,以检查计算结果是否正确
def writeData(t, n){ timev = 2018.10.08T01:01:01.001 + timestamp(1..n*10000) symv = take("SZ.00001", n) pricev = 1..n volumev = take(1, n) insert into t values(timev, symv, pricev, volumev) } writeData(trades, 100)
环境清理:如果您首先执行了方法一的代码段,之后执行代码二的代码段前,可能需要下述代码
//取消订阅 unsubscribeTable(tableName="trades", actionName="timeseries") unsubscribeTable(tableName="kline", actionName="reactive") //释放流数据引擎 dropStreamEngine("timeseries") dropStreamEngine("reactive") //释放变量 undef(`trades, SHARED) undef(`kline, SHARED) undef(`outputTable)