Reactive State Engine calculation problem

In reactive state engine I have marked data which I want to keep in calculation with 1 and data which I want for one time calculation with 0. Then I am using keyPurgeFilter=<inc==0>, keyPurgeFreqInSecond=1 But results are not as expected.

I this picture I have shown what correct data should be and what values to keep in calculation and what values to remove.

attachments-2022-01-hXdSChTu61f154ef8f760.png

Here is the code: ( In this code I have shown msum for just an example, in reality I have to use many functions)


share streamTable(100:0, `time`sym`volume`inc, [TIMESTAMP, SYMBOL, INT, INT]) as trades;
output=table(100:0, `sym`time`msum_volume`inc`upd, [SYMBOL, TIMESTAMP, INT, INT, TIMESTAMP]);
rse=createReactiveStateEngine(name="rse",metrics=<[time, msum(volume,3), inc, now()]>,dummyTable=trades,outputTable=output,keyColumn=`sym, keyPurgeFilter=<inc==0>, keyPurgeFreqInSecond=1);
subscribeTable(tableName="trades", actionName="rs", offset=0, handler=append!{rse}, msgAsTable=true);
tableInsert(trades, 2022.01.26T17:00:00.000, `A, 15, 1);
sleep(1001)
tableInsert(trades, 2022.01.26T17:01:00.000, `A, 25, 1);
sleep(1001)
tableInsert(trades, 2022.01.26T17:02:00.000, `A, 75, 1);
sleep(1001)
tableInsert(trades, 2022.01.26T17:03:00.000, `A, 65, 1)
sleep(1001)
tableInsert(trades, 2022.01.26T17:03:45.000, `A, 95, 0)
sleep(1001)
tableInsert(trades, 2022.01.26T17:04:00.000, `A, 15, 1)
sleep(1001)
tableInsert(trades, 2022.01.26T17:04:04.000, `A, 55, 0)
sleep(1001)
tableInsert(trades, 2022.01.26T17:04:44.000, `A, 45, 0)
sleep(1001)
tableInsert(trades, 2022.01.26T17:05:00.000, `A, 85, 1)
sleep(1001)
tableInsert(trades, 2022.01.26T17:06:00.000, `A, 35, 1)
sleep(1001)
tableInsert(trades, 2022.01.26T17:07:00.000, `A, 85, 1)

Note: Do not advise me to use filter in reactive state engine, because it will not solve the problem. Also I have to use reactive state engine as time series engine and other engines do not solve my problem.

请先 登录 后评论

1 个回答

Yating Xie

I tried to solve this problem by using user-defined functions. 

share streamTable(100:0, `time`sym`volume`inc, [TIMESTAMP, SYMBOL, INT, INT]) as trades;
share table(100:0, `sym`time`msum_volume`inc`upd, [SYMBOL, TIMESTAMP, INT, INT, TIMESTAMP]) as output;

share streamTable(100:0, `time`sym`volume`inc, [TIMESTAMP, SYMBOL, INT, INT]) as tradesInc1;
def myFunc(msg){
	window = 3
	for (row in msg){
		tempData = select * from tradesInc1 where sym=row[`sym]
		index = tempData.size()-(window-1)
		if(index>=0)  msum_volume = row[`volume] + exec sum(volume) from tempData[tempData.size()-(window-1):]
		else msum_volume = 0
		insert into output values(row[`sym] , row[`time] ,msum_volume, row[`inc], now() )
		if(row[`inc]==1)  tradesInc1.tableInsert(row)
	}
}
subscribeTable(tableName="trades", actionName="rs", offset=0, handler=myFunc, msgAsTable=true);

tableInsert(trades, 2022.01.26T17:00:00.000, `A, 15, 1);
sleep(1001)
tableInsert(trades, 2022.01.26T17:01:00.000, `A, 25, 1);
sleep(1001)
tableInsert(trades, 2022.01.26T17:02:00.000, `A, 75, 1);
sleep(1001)
tableInsert(trades, 2022.01.26T17:03:00.000, `A, 65, 1)
sleep(1001)
tableInsert(trades, 2022.01.26T17:03:45.000, `A, 95, 0)
sleep(1001)
tableInsert(trades, 2022.01.26T17:04:00.000, `A, 15, 1)
sleep(1001)
tableInsert(trades, 2022.01.26T17:04:04.000, `A, 55, 0)
sleep(1001)
tableInsert(trades, 2022.01.26T17:04:44.000, `A, 45, 0)
sleep(1001)
tableInsert(trades, 2022.01.26T17:05:00.000, `A, 85, 1)
sleep(1001)
tableInsert(trades, 2022.01.26T17:06:00.000, `A, 35, 1)
sleep(1001)
tableInsert(trades, 2022.01.26T17:07:00.000, `A, 85, 1)

the result:

attachments-2022-01-J3GHGNyG61f36fba7fc22.png

请先 登录 后评论
  • 1 关注
  • 0 收藏,999 浏览
  • Vishvesh Upadhyay 提出于 2022-01-26 21:20

相似问题