reactive state engine with time series engine (using updateTime) problem

I am calculating 5 min OHLC from 1 min OHLC using updateTime=60000. So the result of 5 min OHLC is generated every 1 min. When I use the output of 5 min OHLC with ReactiveStateEngine it uses all previous duplicate entries for calculating result. How to solve it ?


Please see the photo for detailed information.

Please reply as soon as possible.


attachments-2021-10-jja4uDJs6162c3272a0ad.PNG


Edit 1:

For detailed code and further discussion please visit https://ask.dolphindb.net/question/949


请先 登录 后评论

2 个回答

YcHan韩迎春
Add the parameter "filter  =  <ts !=  move(ts, 5)>" to createReactiveStateEngine.
请先 登录 后评论
blliu

在计算SMA的时候,您只想要5分钟K线的最后一条数据来计算,但是第二个时序聚合引擎会每分钟计算一次结果且输出到下游,那么在SMA的时候每组都会有四条无用的数据。您在计算完5分钟K线之后与计算SMA之间再加一个响应式状态引擎,对5分钟K线的结果进行一个过滤处理。并且修改计算SMA的响应式状态引擎的输入即可。

重点修改代码如下:

// 添加中间过滤的响应式状态引擎
share(streamTable(25000:0,ohlc_colNames,ohlc_colTypes), `sm5_t);
rse5m_t = createReactiveStateEngine(name="prepare_data", metrics = <[open, high, low, close, volume, upd_1m, upd_5m]>, dummyTable=sm5, outputTable=sm5_t, keyColumn= ["ts", "symbol"],filter=<cumcount(symbol)==5>);
subscribeTable(tableName="sm5", actionName="rse5m", offset=0, handler=append!{rse5m_t}, msgAsTable=true, hash=1);
// 修改计算SMA的响应式状态引擎的输入
subscribeTable(tableName="sm5_t", actionName="rse5m", offset=0, handler=append!{rse5m}, msgAsTable=true, hash=1)

对于该业务场景,使用DolphinDB来解决,计算时延仅为1.6微秒。

您可以使用如下的完整代码验证

// 1、模拟数据
tmp_table = table(
    concatDateTime(take(2023.12.12, 4802000), (rand((09:30:00.000+0..2400*3*1000) join (13:00:00.000+0..2400*3*1000), 4802000).sort())) as dt,
    rand(string(1001..1200), 4802000) as symbol, 
    rand(100.0, 4802000) as price, 
    rand(100, 4802000) as vol);

// 2、流式计算SMA代码实现
// 定义trade的输入流表
colName = ["ts", "symbol", "price", "vol", "import_time"];
colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, TIMESTAMP];
// dropStreamTable(`tmp_1)
share(streamTable(25000:0,colName,colType), `tmp_1);
go
// 定义一分钟K线的结果流表
ohlc_colNames = ["ts", "symbol", "time_delay", "open", "high", "low", "close", "volume", "upd_1m"];
ohlc_colTypes = [TIMESTAMP, SYMBOL, LONG, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, TIMESTAMP];
// dropStreamTable(`sm1)
share(streamTable(25000:0,ohlc_colNames,ohlc_colTypes), `sm1);
go
// 定义时序聚合引擎一用于实时计算一分钟K线
OHLC_sm1 = createTimeSeriesEngine(name="OHLC_sm1", windowSize=60000, step=60000, metrics=<[first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(vol) as volume, now() as upd_1m]>, dummyTable=tmp_1, outputTable=sm1, timeColumn=`ts, useSystemTime=false, updateTime=60000, keyColumn=`symbol, useWindowStartTime=false, outputElapsedMicroseconds = true);

// 订阅trade的输入流表然后把数据传递到时序聚合引擎一
subscribeTable(tableName="tmp_1", actionName="OHLC_sm1", offset=0, handler=append!{OHLC_sm1}, msgAsTable=true, hash=1);

// 定义五分钟K线的结果流表
ohlc_colNames = ["ts", "symbol", "time_delay", "open", "high", "low", "close", "volume", "upd_1m", "upd_5m"];
ohlc_colTypes = [TIMESTAMP, SYMBOL, LONG, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, TIMESTAMP, TIMESTAMP];
// dropStreamTable(`sm5)
share(streamTable(25000:0,ohlc_colNames,ohlc_colTypes), `sm5);
go
// 定义元代码
ohlc=<[first(open) as open, max(high) as high, min(low) as low, last(close) as close, sum(volume) as volume, last(upd_1m) as last_tick, now() as upd_5m]>;
// 定义时序聚合引擎二用于实时计算五分钟K线(每一分钟计算一次)
OHLC_sm5 = createTimeSeriesEngine(name="OHLC_sm5", windowSize=300000, step=300000, metrics=ohlc, dummyTable=sm1, outputTable=sm5, timeColumn=`ts, useSystemTime=false, keyColumn=`symbol, updateTime=60000, useWindowStartTime=true, outputElapsedMicroseconds = true);

// 订阅一分钟K线的结果流表然后把数据传递到时序聚合引擎二
subscribeTable(tableName="sm1", actionName="OHLC_sm5", offset=0, handler=append!{OHLC_sm5}, msgAsTable=true, hash=1);

// 定义五分钟K线的结果流表的下游表(存储五分钟K线的结果流表的过滤结果)
ohlc_colNames = ["ts", "symbol", "delay_time", "count_", "open", "high", "low", "close", "volume", "upd_1m", "upd_5m"];
ohlc_colTypes = [TIMESTAMP, SYMBOL, LONG, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, TIMESTAMP, TIMESTAMP];
// dropStreamTable(`sm5_t)
share(streamTable(25000:0,ohlc_colNames,ohlc_colTypes), `sm5_t);
go
// 定义响应式状态引擎一用于过滤出五分钟K线结果的每组的最后一条
rse5m_t = createReactiveStateEngine(name="prepare_data", metrics = <[open, high, low, close, volume, upd_1m, upd_5m]>, dummyTable=sm5, outputTable=sm5_t, keyColumn= ["ts", "symbol"],filter=<cumcount(symbol)==5>, outputElapsedMicroseconds = true);

// 订阅五分钟K线的结果流表然后把数据传递到响应式状态引擎一
subscribeTable(tableName="sm5", actionName="rse5m", offset=0, handler=append!{rse5m_t}, msgAsTable=true, hash=1);

// 定义对五分钟计算SMA的结果流表
rsesm5_colNames = ["symbol","delay_time", "count_", "ts", "close", "sma", "updt"];
rsesm5_colTypes = [SYMBOL, LONG, INT, TIMESTAMP, DOUBLE, DOUBLE, TIMESTAMP];
// dropStreamTable(`rsesm5)
share(streamTable(25000:0,rsesm5_colNames,rsesm5_colTypes), `rsesm5);
go
// 定义响应式状态引擎二用于计算五分钟K线的SMA
rse5m = createReactiveStateEngine(name="rse5m", metrics =<[ts,close,mavg(close,4),now()]>, dummyTable=sm5_t, outputTable=rsesm5, keyColumn="symbol", outputElapsedMicroseconds = true);

// 订阅五分钟K线的结果流表的下游表然后把数据传递到响应式状态引擎二
subscribeTable(tableName="sm5_t", actionName="rse5m", offset=0, handler=append!{rse5m}, msgAsTable=true, hash=1);
// 回放数据以及查看结果和计算时延
timer tmp_1.append(select *, now() from tmp_table);

res = select * from rsesm5;
请先 登录 后评论
  • 2 关注
  • 0 收藏,1009 浏览
  • Vishvesh Upadhyay 提出于 2021-10-10 18:42

相似问题