在计算SMA的时候,您只想要5分钟K线的最后一条数据来计算,但是第二个时序聚合引擎会每分钟计算一次结果且输出到下游,那么在SMA的时候每组都会有四条无用的数据。您在计算完5分钟K线之后与计算SMA之间再加一个响应式状态引擎,对5分钟K线的结果进行一个过滤处理。并且修改计算SMA的响应式状态引擎的输入即可。
重点修改代码如下:
// 添加中间过滤的响应式状态引擎 share(streamTable(25000:0,ohlc_colNames,ohlc_colTypes), `sm5_t); rse5m_t = createReactiveStateEngine(name="prepare_data", metrics = <[open, high, low, close, volume, upd_1m, upd_5m]>, dummyTable=sm5, outputTable=sm5_t, keyColumn= ["ts", "symbol"],filter=<cumcount(symbol)==5>); subscribeTable(tableName="sm5", actionName="rse5m", offset=0, handler=append!{rse5m_t}, msgAsTable=true, hash=1); // 修改计算SMA的响应式状态引擎的输入 subscribeTable(tableName="sm5_t", actionName="rse5m", offset=0, handler=append!{rse5m}, msgAsTable=true, hash=1)
对于该业务场景,使用DolphinDB来解决,计算时延仅为1.6微秒。
您可以使用如下的完整代码验证
// 1、模拟数据 tmp_table = table( concatDateTime(take(2023.12.12, 4802000), (rand((09:30:00.000+0..2400*3*1000) join (13:00:00.000+0..2400*3*1000), 4802000).sort())) as dt, rand(string(1001..1200), 4802000) as symbol, rand(100.0, 4802000) as price, rand(100, 4802000) as vol); // 2、流式计算SMA代码实现 // 定义trade的输入流表 colName = ["ts", "symbol", "price", "vol", "import_time"]; colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, TIMESTAMP]; // dropStreamTable(`tmp_1) share(streamTable(25000:0,colName,colType), `tmp_1); go // 定义一分钟K线的结果流表 ohlc_colNames = ["ts", "symbol", "time_delay", "open", "high", "low", "close", "volume", "upd_1m"]; ohlc_colTypes = [TIMESTAMP, SYMBOL, LONG, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, TIMESTAMP]; // dropStreamTable(`sm1) share(streamTable(25000:0,ohlc_colNames,ohlc_colTypes), `sm1); go // 定义时序聚合引擎一用于实时计算一分钟K线 OHLC_sm1 = createTimeSeriesEngine(name="OHLC_sm1", windowSize=60000, step=60000, metrics=<[first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(vol) as volume, now() as upd_1m]>, dummyTable=tmp_1, outputTable=sm1, timeColumn=`ts, useSystemTime=false, updateTime=60000, keyColumn=`symbol, useWindowStartTime=false, outputElapsedMicroseconds = true); // 订阅trade的输入流表然后把数据传递到时序聚合引擎一 subscribeTable(tableName="tmp_1", actionName="OHLC_sm1", offset=0, handler=append!{OHLC_sm1}, msgAsTable=true, hash=1); // 定义五分钟K线的结果流表 ohlc_colNames = ["ts", "symbol", "time_delay", "open", "high", "low", "close", "volume", "upd_1m", "upd_5m"]; ohlc_colTypes = [TIMESTAMP, SYMBOL, LONG, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, TIMESTAMP, TIMESTAMP]; // dropStreamTable(`sm5) share(streamTable(25000:0,ohlc_colNames,ohlc_colTypes), `sm5); go // 定义元代码 ohlc=<[first(open) as open, max(high) as high, min(low) as low, last(close) as close, sum(volume) as volume, last(upd_1m) as last_tick, now() as upd_5m]>; // 定义时序聚合引擎二用于实时计算五分钟K线(每一分钟计算一次) OHLC_sm5 = createTimeSeriesEngine(name="OHLC_sm5", windowSize=300000, step=300000, metrics=ohlc, dummyTable=sm1, outputTable=sm5, timeColumn=`ts, useSystemTime=false, keyColumn=`symbol, updateTime=60000, useWindowStartTime=true, outputElapsedMicroseconds = true); // 订阅一分钟K线的结果流表然后把数据传递到时序聚合引擎二 subscribeTable(tableName="sm1", actionName="OHLC_sm5", offset=0, handler=append!{OHLC_sm5}, msgAsTable=true, hash=1); // 定义五分钟K线的结果流表的下游表(存储五分钟K线的结果流表的过滤结果) ohlc_colNames = ["ts", "symbol", "delay_time", "count_", "open", "high", "low", "close", "volume", "upd_1m", "upd_5m"]; ohlc_colTypes = [TIMESTAMP, SYMBOL, LONG, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, TIMESTAMP, TIMESTAMP]; // dropStreamTable(`sm5_t) share(streamTable(25000:0,ohlc_colNames,ohlc_colTypes), `sm5_t); go // 定义响应式状态引擎一用于过滤出五分钟K线结果的每组的最后一条 rse5m_t = createReactiveStateEngine(name="prepare_data", metrics = <[open, high, low, close, volume, upd_1m, upd_5m]>, dummyTable=sm5, outputTable=sm5_t, keyColumn= ["ts", "symbol"],filter=<cumcount(symbol)==5>, outputElapsedMicroseconds = true); // 订阅五分钟K线的结果流表然后把数据传递到响应式状态引擎一 subscribeTable(tableName="sm5", actionName="rse5m", offset=0, handler=append!{rse5m_t}, msgAsTable=true, hash=1); // 定义对五分钟计算SMA的结果流表 rsesm5_colNames = ["symbol","delay_time", "count_", "ts", "close", "sma", "updt"]; rsesm5_colTypes = [SYMBOL, LONG, INT, TIMESTAMP, DOUBLE, DOUBLE, TIMESTAMP]; // dropStreamTable(`rsesm5) share(streamTable(25000:0,rsesm5_colNames,rsesm5_colTypes), `rsesm5); go // 定义响应式状态引擎二用于计算五分钟K线的SMA rse5m = createReactiveStateEngine(name="rse5m", metrics =<[ts,close,mavg(close,4),now()]>, dummyTable=sm5_t, outputTable=rsesm5, keyColumn="symbol", outputElapsedMicroseconds = true); // 订阅五分钟K线的结果流表的下游表然后把数据传递到响应式状态引擎二 subscribeTable(tableName="sm5_t", actionName="rse5m", offset=0, handler=append!{rse5m}, msgAsTable=true, hash=1); // 回放数据以及查看结果和计算时延 timer tmp_1.append(select *, now() from tmp_table); res = select * from rsesm5;