I tried to solve this problem by using user-defined functions.
share streamTable(100:0, `time`sym`volume`inc, [TIMESTAMP, SYMBOL, INT, INT]) as trades; share table(100:0, `sym`time`msum_volume`inc`upd, [SYMBOL, TIMESTAMP, INT, INT, TIMESTAMP]) as output; share streamTable(100:0, `time`sym`volume`inc, [TIMESTAMP, SYMBOL, INT, INT]) as tradesInc1; def myFunc(msg){ window = 3 for (row in msg){ tempData = select * from tradesInc1 where sym=row[`sym] index = tempData.size()-(window-1) if(index>=0) msum_volume = row[`volume] + exec sum(volume) from tempData[tempData.size()-(window-1):] else msum_volume = 0 insert into output values(row[`sym] , row[`time] ,msum_volume, row[`inc], now() ) if(row[`inc]==1) tradesInc1.tableInsert(row) } } subscribeTable(tableName="trades", actionName="rs", offset=0, handler=myFunc, msgAsTable=true); tableInsert(trades, 2022.01.26T17:00:00.000, `A, 15, 1); sleep(1001) tableInsert(trades, 2022.01.26T17:01:00.000, `A, 25, 1); sleep(1001) tableInsert(trades, 2022.01.26T17:02:00.000, `A, 75, 1); sleep(1001) tableInsert(trades, 2022.01.26T17:03:00.000, `A, 65, 1) sleep(1001) tableInsert(trades, 2022.01.26T17:03:45.000, `A, 95, 0) sleep(1001) tableInsert(trades, 2022.01.26T17:04:00.000, `A, 15, 1) sleep(1001) tableInsert(trades, 2022.01.26T17:04:04.000, `A, 55, 0) sleep(1001) tableInsert(trades, 2022.01.26T17:04:44.000, `A, 45, 0) sleep(1001) tableInsert(trades, 2022.01.26T17:05:00.000, `A, 85, 1) sleep(1001) tableInsert(trades, 2022.01.26T17:06:00.000, `A, 35, 1) sleep(1001) tableInsert(trades, 2022.01.26T17:07:00.000, `A, 85, 1)
the result: