用流数据订阅的自定义handler可以处理,没用上状态引擎,参考代码:
share(streamTable(10:0, `rowNum`price`side`v, [INT, INT, INT, INT]), `originStream) result = table(10:0, `f1`f2, [INT[], INT[]]) msgCache = table(6:0, `rowNum`price`side`v, [INT, INT, INT, INT]) def updateMsgCache(mutable msgCache, row) { // maintain a window which length is 3 insert into msgCache values(row.rowNum, row.price, row.side, row.v) msgCache = select top 3 * from msgCache order by rowNum desc } def caculateF1(mutable msgCache) { minPrice = 2147483647 maxPrice = 0 for (msg in msgCache) { if (msg.price < minPrice) { minPrice = msg.price } if (msg.price > maxPrice) { maxPrice = msg.price } } return minPrice..maxPrice } def caculateF2(mutable f1, mutable msgCache) { f1Size = size(f1) f2 = array(INT, f1Size, f1Size, 0) for (msg in msgCache) { if (msg.side == 1) { index = at(f1 == msg.price) f2[index[0]] += msg.v } } return f2 } def subscribeHandler(mutable result, mutable msgCache, msg) { for (row in msg) { // update msgCache updateMsgCache(msgCache, row) // caculate f1 and f2 via msgCache f1 = caculateF1(msgCache) f2 = caculateF2(f1, msgCache) // store f1 and f2 to result insert into result values([f1], [f2]) } } subscribeTable(tableName=`originStream, actionName=`caculateFactors, handler=subscribeHandler{result, msgCache}, msgAsTable=true) //insert into originStream values(1, 9, 1, 1) //insert into originStream values(2, 13, 1, 2) //insert into originStream values(3, 11, -1, 3) //insert into originStream values(4, 10, 1, 4) //insert into originStream values(5, 10, 1, 5) //select string(f1), string(f2) from result //unsubscribeTable(tableName="originStream", actionName="caculateFactors") //undef("originStream", SHARED) //undef all