step1, 先使用createDailyTimeSeriesEngine,生成5分钟(或者10分钟)的k线,必须要设置fill参数(根据场景改变),确保每一个时间点都有数据。
step2, 建立一个reactiveStateEngine去做计算,metrics中可以使用优化后的状态函数,窗口大小设置成6就是30分钟K线,12就是60分钟K线。
下面代码可供参考:
login("admin","123456")
n =750+600+900+1200
time = (09:00:00.000+ rand(75*60*1000,750)).sort!() join (10:30:00.000+ rand(60*60*1000,600)).sort!() join (13:30:00.000+ rand(90*60*1000,900)).sort!() join (21:00:00.000+ rand(120*60*1000,1200)).sort!()
price = 100 + cumsum(rand(0.02,n)-0.01)
volume = rand(1000,n)
symbol = rand(`600001`600005`600018`600008, n)
trade = table(symbol, time, price, volume).sortBy!(`symbol`time)
dropStreamEngine(`min5)
dropStreamEngine(`min30)
unsubscribeTable(tableName=`Ticks, actionName="min5")
unsubscribeTable(tableName=`Ticks_5min, actionName="min30")
unsubscribeTable(tableName=`trades, actionName="tradesStats")
share streamTable(10000:0, `symbol`time`price`volume, `SYMBOL`TIME`DOUBLE`INT) as Ticks
share streamTable(10000:0, `time`symbol`avg, `TIME`SYMBOL`DOUBLE) as Ticks_5min
min5 = createDailyTimeSeriesEngine(name="min5", windowSize=5*1000*60, step=5*1000*60, metrics=<avg(price)>, dummyTable=Ticks, outputTable=Ticks_5min, timeColumn=`time, keyColumn=`symbol ,useWindowStartTime=false, sessionBegin=09:00:00.000 10:30:00.000 13:30:00.000 21:00:00.000 , sessionEnd=10:15:00.000 11:30:00.000 15:00:00.000 23:00:00.000 ,mergeSessionEnd=true,fill="null")
subscribeTable(tableName =`Ticks, actionName = "min5", offset = 0 , handler = tableInsert{min5} ,msgAsTable=true)
Ticks_30min = table(10000:0, `symbol`time`avg, `SYMBOL`TIME`DOUBLE)
min30 = createReactiveStateEngine(name = "min30", metrics = [<time>,<mavg(avg,6)>], dummyTable= Ticks_5min, outputTable = Ticks_30min, keyColumn=`symbol)
subscribeTable(tableName =`Ticks_5min, actionName = "min30", offset = 0 , handler = tableInsert{min30} ,msgAsTable=true)
Ticks.append!(trades)