可以用equaljoinengine,对同一张表根据标识列做过滤订阅,分别把不同标识的行注入引擎左输入和右输入,
关联列是股票id和时间的等值连接。参考案例:
下面这段代码将tmp1和tmp2两个表中相同时间同一股票的数据进行拼接计算:
share streamTable(1:0, `time`sym`price, [SECOND, SYMBOL, DOUBLE]) as leftTable share streamTable(1:0, `time`sym`val, [SECOND, SYMBOL, DOUBLE]) as rightTable output=table(100:0, `time`sym`price`val`total, [SECOND, SYMBOL, DOUBLE, DOUBLE, DOUBLE]) ejEngine=createEquiJoinEngine("test1", leftTable, rightTable, output, [<price>, <val>, <price*val>], `sym, `time) subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{ejEngine, true}, msgAsTable=true) subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{ejEngine, false}, msgAsTable=true) tmp1=table(13:30:10+1..20 as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(1..20) as price) leftTable.append!(tmp1) tmp2=table(13:30:10+1..20 as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(50..31) as val) rightTable.append!(tmp2) select count(*) from output