可以自定义一个流表,用share table的方式存入这个变量,使用的时候直接从流表中取出数据就可以了
def sum_diff(x, y){ return (x-y)/(x+y) } share streamTable(1:0, `sym`time`price, [STRING,DATETIME,DOUBLE]) as tickStream share streamTable(1:0, `sym`time`price, [STRING,DATETIME,DOUBLE]) as tickStream1 result = table(1000:0, `sym`time`factor1, [STRING,DATETIME,DOUBLE]) def factor2(price,mutable tickStream1){ select * from tickStream1; factor1 = ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) - ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20) factor1 = factor1 + exec max(price) from tickStream1 return factor1 } rse = createReactiveStateEngine(name="reactiveDemo", metrics =[<time>, <factor2(price, tickStream1) as `factor1>], dummyTable=tickStream, outputTable=result, keyColumn="sym", filter=<sym in ["000001.SH", "000002.SH"]>) subscribeTable(tableName=`tickStream, actionName="factors", handler=tableInsert{rse})