下面方法二为多个引擎级联使用,方法一和方法二虽然实现上不同,但最终结果相同。
方法一:通过中间表,连接两个引擎
//++++++++++++++++++++++++ 方法一:通过中间表kline,连接两个引擎 +++++++++++++++++++++++
share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades
share streamTable(1000:0, `time`sym`open`close`high`low`volume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as kline
outputTable=table(1000:0, `sym`factor1, [SYMBOL, DOUBLE])
// trade表 >>> TimeSeriesEngine 引擎 >>> kline 表
Tengine=createTimeSeriesEngine(name="timeseries", windowSize=60000, step=60000, metrics=<[first(price), last(price), max(price), min(price), sum(volume)]>, dummyTable=trades, outputTable=kline, timeColumn=`time, useSystemTime=false, keyColumn=`sym)
subscribeTable(tableName="trades", actionName="timeseries", offset=-1, handler=append!{Tengine}, msgAsTable=true)
// kline 表 >>> ReactiveStateEngine 引擎 >>> outputTable 表
Rengine=createReactiveStateEngine(name="reactive", metrics=<[mavg(open, 3)]>, dummyTable=kline, outputTable=outputTable, keyColumn="sym")
subscribeTable(tableName="kline", actionName="reactive", offset=-1, handler=append!{Rengine}, msgAsTable=true)
方法二:引擎级联,中间结果不需要输出到中间表 (方法二中kline只提供表的schema,不会存储中间结果)
//++++++++++++++++++++++++ 方法二:引擎级联,不需要中间表+++++++++++++++++++++++++++
share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades
share streamTable(1000:0, `time`sym`open`close`high`low`volume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as kline
outputTable=table(1000:0, `sym`factor1, [SYMBOL, DOUBLE])
Rengine=createReactiveStateEngine(name="reactive", metrics=<[mavg(open, 3)]>, dummyTable=kline, outputTable=outputTable, keyColumn="sym")
Tengine=createTimeSeriesEngine(name="timeseries", windowSize=60000, step=60000, metrics=<[first(price), last(price), max(price), min(price), sum(volume)]>, dummyTable=trades, outputTable=Rengine, timeColumn=`time, useSystemTime=false, keyColumn=`sym)
//时间序列引擎的结果输入响应式状态引擎
subscribeTable(tableName="trades", actionName="timeseries", offset=0, handler=append!{Tengine}, msgAsTable=true)
流表trades注入数据:上述方法一或者方法二提交之后,可以向流数据表trades中写入100条数据,以检查计算结果是否正确
def writeData(t, n){
timev = 2018.10.08T01:01:01.001 + timestamp(1..n*10000)
symv = take("SZ.00001", n)
pricev = 1..n
volumev = take(1, n)
insert into t values(timev, symv, pricev, volumev)
}
writeData(trades, 100)
环境清理:如果您首先执行了方法一的代码段,之后执行代码二的代码段前,可能需要下述代码
//取消订阅
unsubscribeTable(tableName="trades", actionName="timeseries")
unsubscribeTable(tableName="kline", actionName="reactive")
//释放流数据引擎
dropStreamEngine("timeseries")
dropStreamEngine("reactive")
//释放变量
undef(`trades, SHARED)
undef(`kline, SHARED)
undef(`outputTable)