基本解决思路:
(1)通过setStreamTableFilterColumn函数设置流数据表的过滤列
(2)创建n个相同处理动作的handler(metrics相同的流数据引擎)
(3)创建n个订阅,订阅目标是上述设置过滤列的streamTable,每隔订阅通过hash过滤订阅部分数据,同时指定到不同的线程处理
demo例子如下:
def sum_diff(x, y){ return (x-y)/(x+y) } factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) - ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)> share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStream setStreamTableFilterColumn(tickStream, `sym) share streamTable(1000:0, `sym`factor1, [STRING,DOUBLE]) as resultStream for(i in 0..3){ rse = createReactiveStateEngine(name="reactiveDemo"+string(i), metrics =factor1, dummyTable=tickStream, outputTable=resultStream, keyColumn="sym") subscribeTable(tableName=`tickStream, actionName="sub"+string(i), handler=tableInsert{rse}, msgAsTable = true, hash = i, filter = (4,i)) } n=2000000 tmp = table(take("A"+string(1..4000), n) as sym, rand(10.0, n) as price) tickStream.append!(tmp)