用响应式状态引擎和deltas函数,示例如下:
share streamTable(1:0, `date`time`sym`market`price`qty, [DATE, TIME, SYMBOL, CHAR, DOUBLE, INT]) as trades outputTable = table(100:0, `date`sym`factor1, [DATE, STRING, DOUBLE]) engine = createReactiveStateEngine(name="test", metrics=<deltas(price)>, dummyTable=trades, outputTable=outputTable, keyColumn=["date","sym"], keepOrder=true) subscribeTable(tableName=`trades, actionName="test", msgAsTable=true, handler=tableInsert{engine}) n=100 tmp = table(rand(2012.01.01..2012.01.10, n) as date, rand(09:00:00.000..15:59:59.999, n) as time, rand("A"+string(1..3), n) as sym, rand(['B', 'S'], n) as market, rand(100.0, n) as price, rand(1000..2000, n) as qty) trades.append!(tmp) select * from outputTable