How to get VWAP using TimeSeriesEngine or ReactiveStateEngine

Hi,


I am getting live tick data consisting of Time, Symbol Name, Last Traded Price, Cumulative Volume (Daily).


Now how to get VWAP using 1) Custom function 2) TimeSeriesEngine 3) ReactiveStateEngine? Please Help me. Necessary code is as under.


This is stream table for getting ticks from python

t_colNames=`ts`symbol`price`vol`upd_tick
t_colTypes=`TIMESTAMP`SYMBOL`DOUBLE`DOUBLE`TIMESTAMP


This is stream table to store 1 min OHLC data

ohlc_colNames=`ts`symbol`open`high`low`close`volume`tp`last_tick`upd_1m
ohlc_colTypes=`TIMESTAMP`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`TIMESTAMP`TIMESTAMP


This is 1 min OHLC TimeSeriesEngine

OHLC_sm1 = createTimeSeriesEngine(name="OHLC_sm1", windowSize=60000, step=60000, metrics=<[first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(vol) as volume, (max(price)+min(price)+last(price))/3 as tp, last(upd_tick) as last_tick, now() as upd_1m]>, dummyTable=tmp, outputTable=sm1 , timeColumn=`ts, useSystemTime=true, keyColumn=`symbol, updateTime=60000, useWindowStartTime=false);


This is the function to convert cumulative volume to volume

def calcVolume(mutable dictVolume, mutable tsAggrOHLC, msg){
t = select ts,symbol,price,vol,upd_tick from msg context by symbol limit -1
update t set prevVolume = dictVolume[symbol]
dictVolume[t.symbol] = t.vol
tsAggrOHLC.append!(t.update!("vol", <vol-prevVolume>))
}
dictVol = dict(STRING, DOUBLE)
subscribeTable(tableName="t", actionName="OHLC_sm1", offset=0, handler=calcVolume{dictVol,OHLC_sm1}, msgAsTable=true, hash=1)

请先 登录 后评论

1 个回答

Shena Mao

I recommend you here using ReactiveStateEngine to convert cumulative volume to volume and then connecting two engines in series. Here is an example:

tradesData = your_tick_data
//define Trade Table
x=tradesData.schema().colDefs
share streamTable(100:0, x.name, x.typeString) as Trade
//define OHLC outputTable
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume`updatetime,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,TIMESTAMP]) as OHLC
//1 min OHLC TimeSeriesEngine
tsAggrOHLC = createTimeSeriesAggregator(name="aggr_ohlc", windowSize=60000, step=60000, metrics=<[first(Price),max(Price),min(Price),last(Price),wavg(Price,Volume),now()]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)
//ReactiveStateEngine:convert cumulative volume to volume
rsAggrOHLC = createReactiveStateEngine(name="calc_vol", metrics=<[Datetime, Price, deltas(Volume) as Volume]>, dummyTable=Trade, outputTable=tsAggrOHLC, keyColumn=`Symbol)
//subscribe table and insert data into engines
subscribeTable(tableName="Trade", actionName="minuteOHLC2", offset=0, handler=append!{rsAggrOHLC}, msgAsTable=true)
replay(inputTables=tradesData, outputTables=Trade, dateColumn=`Datetime)


You can use your user defined function in any of the engine's matrics.

请先 登录 后评论
  • 1 关注
  • 0 收藏,1054 浏览
  • Vishvesh Upadhyay 提出于 2021-12-28 20:11

相似问题