多个引擎流水线处理和多个流表的级联处理有何不同?

please show the code

请show出伪代码,谢谢!

请先 登录 后评论

2 个回答

alex

以下是流水线处理,目前只是写法有区别

dummy = table(1:0, `sym`time`maxIndex, [SYMBOL, TIMESTAMP, DOUBLE])
resultTable = streamTable(10000:0, `time`sym`factor1, [TIMESTAMP, SYMBOL, DOUBLE])
ccsRank = createCrossSectionalAggregator(name="alpha1CCS", metrics=<[sym, rank(maxIndex)\count(maxIndex) - 0.5]>, dummyTable=dummy, outputTable=resultTable, keyColumn=`sym, triggeringPattern='keyCount', triggeringInterval=3000, timeColumn=`time)

@state
def wqAlpha1TS(close){
ret = ratios(close) - 1
v = iif(ret < 0, mstd(ret, 20), close)
return mimax(signum(v)*v*v, 5)
}

//创建响应式状态引擎,输出到前面的横截面引擎ccsRank
input = table(1:0, `sym`time`close, [SYMBOL, TIMESTAMP, DOUBLE])
rse = createReactiveStateEngine(name="alpha1", metrics=<[time, wqAlpha1TS(close)]>, dummyTable=input, outputTable=ccsRank, keyColumn="sym")

请先 登录 后评论
Yating Xie

下面方法二为多个引擎级联使用,方法一和方法二虽然实现上不同,但最终结果相同。

方法一:通过中间表,连接两个引擎

//++++++++++++++++++++++++ 方法一:通过中间表kline,连接两个引擎 +++++++++++++++++++++++
share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades
share streamTable(1000:0, `time`sym`open`close`high`low`volume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as kline
outputTable=table(1000:0, `sym`factor1, [SYMBOL, DOUBLE])

// trade表 >>> TimeSeriesEngine 引擎 >>> kline 表
Tengine=createTimeSeriesEngine(name="timeseries", windowSize=60000, step=60000, metrics=<[first(price), last(price), max(price), min(price), sum(volume)]>, dummyTable=trades, outputTable=kline, timeColumn=`time, useSystemTime=false, keyColumn=`sym)

subscribeTable(tableName="trades", actionName="timeseries", offset=-1, handler=append!{Tengine}, msgAsTable=true)   

// kline 表 >>> ReactiveStateEngine 引擎 >>> outputTable 表
Rengine=createReactiveStateEngine(name="reactive", metrics=<[mavg(open, 3)]>, dummyTable=kline, outputTable=outputTable, keyColumn="sym")

subscribeTable(tableName="kline", actionName="reactive", offset=-1, handler=append!{Rengine}, msgAsTable=true)   


方法二:引擎级联,中间结果不需要输出到中间表 (方法二中kline只提供表的schema,不会存储中间结果)

//++++++++++++++++++++++++ 方法二:引擎级联,不需要中间表+++++++++++++++++++++++++++
share streamTable(1000:0, `time`sym`price`volume, [TIMESTAMP, SYMBOL, DOUBLE, INT]) as trades
share streamTable(1000:0, `time`sym`open`close`high`low`volume, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT]) as kline
outputTable=table(1000:0, `sym`factor1, [SYMBOL, DOUBLE])

Rengine=createReactiveStateEngine(name="reactive", metrics=<[mavg(open, 3)]>, dummyTable=kline, outputTable=outputTable, keyColumn="sym")

Tengine=createTimeSeriesEngine(name="timeseries", windowSize=60000, step=60000, metrics=<[first(price), last(price), max(price), min(price), sum(volume)]>, dummyTable=trades, outputTable=Rengine, timeColumn=`time, useSystemTime=false, keyColumn=`sym)
//时间序列引擎的结果输入响应式状态引擎

subscribeTable(tableName="trades", actionName="timeseries", offset=0, handler=append!{Tengine}, msgAsTable=true)   


流表trades注入数据:上述方法一或者方法二提交之后,可以向流数据表trades中写入100条数据,以检查计算结果是否正确

def writeData(t, n){
    timev = 2018.10.08T01:01:01.001 + timestamp(1..n*10000)
    symv = take("SZ.00001", n)
    pricev = 1..n
    volumev = take(1, n)
    insert into t values(timev, symv, pricev, volumev)
}
writeData(trades, 100)


环境清理:如果您首先执行了方法一的代码段,之后执行代码二的代码段前,可能需要下述代码

//取消订阅
unsubscribeTable(tableName="trades", actionName="timeseries")
unsubscribeTable(tableName="kline", actionName="reactive")
//释放流数据引擎
dropStreamEngine("timeseries")
dropStreamEngine("reactive")
//释放变量
undef(`trades, SHARED)
undef(`kline, SHARED)
undef(`outputTable)
请先 登录 后评论
  • 2 关注
  • 0 收藏,1126 浏览
  • ddbuserex 提出于 2022-02-08 21:58

相似问题