在流计算引擎中如何将累计量转化成当期量

请教一下,如果我们订阅的level2行情流表中的成交量和成交额都是累计值,用时间序列聚合引擎如何得到每分钟内的成交额和成交量呢?有什么比较方便的方法吗?
比如我输入下面数据:

sym     timestamp                cumVolume    cumAmount
IBM     2020.06.10T09:30:00.000    100        1,000
MSFT    2020.06.10T09:30:00.000    120        1,200
GOOG    2020.06.10T09:30:00.000    100        1,000
IBM     2020.06.10T09:30:03.000    150        1,600
MSFT    2020.06.10T09:30:03.000    220        2,200
GOOG    2020.06.10T09:30:03.000    200        1,900
IBM     2020.06.10T09:30:06.000    200        2,100
MSFT    2020.06.10T09:30:06.000    300        3,100
GOOG    2020.06.10T09:30:06.000    250        2,500
IBM     2020.06.10T09:30:09.000    240        2,500
MSFT    2020.06.10T09:30:09.000    370        3,700
GOOG    2020.06.10T09:30:09.000    310        3,200

得到下面结果:

sym     timestamp                volume     amount
GOOG    2020.06.10T09:30:00.000        
IBM     2020.06.10T09:30:00.000        
MSFT    2020.06.10T09:30:00.000        
GOOG    2020.06.10T09:30:03.000    100        900
IBM     2020.06.10T09:30:03.000    50         600
MSFT    2020.06.10T09:30:03.000    100        1,000
GOOG    2020.06.10T09:30:06.000    50         600
GOOG    2020.06.10T09:30:09.000    60         700
IBM     2020.06.10T09:30:06.000    50         500
IBM     2020.06.10T09:30:09.000    40         400
MSFT    2020.06.10T09:30:06.000    80         900
MSFT    2020.06.10T09:30:09.000    70         600
请先 登录 后评论

1 个回答

logger


//步骤一
//累积量实时预处理,此函数定义放到createSubStreamingEnv函数的前面
def calcTradeVolumeAndAmount(mutable dictVolume, mutable dictAmount, mutable tsAggrKline, msg){


t = select sym, timestamp, prev(cumVolume) as prevCumVolume, prev(cumAmount) as prevCumAmount from msg context by sym
update t set prevCumVolume = dictVolume[sym], prevCumAmount = dictAmount[sym] where isNull(prevCumVolume)
//update dictVolume and dictAmount with most recent values
snapshot = select sym, cumVolume, cumAmount from msg context by sym limit -1
dictVolume[snapshot.sym] = snapshot.cumVolume
dictAmount[snapshot.sym] = snapshot.cumAmount
//append message to tsAggrKline
tsAggrKline.append!(select sym, timestamp, cumVolume - prevCumVolume as volume, cumAmount - prevCumAmount as amount from t)


}


//步骤二
//定义一个字典用于保存每一个symbol前一次的聚合值,在预处理函数中用到
dictVol = dict(STRING, DOUBLE)
dictAmount = dict(STRING, DOUBLE)


//步骤三
//在订阅写入聚合引擎部分,调用预处理函数写入聚合引擎,增加一个参数传入字典
def createSubStreamingEnv(dbPath,tbName,userName,userPass,mutable dictVol){


//创建时序数据聚合引擎tsAggrKline
......
//订阅流数据写入聚合引擎
subscribeTable(,"Trade", "MinuteK", 0, calcTradeVolumeAndAmount{dictVol, dictAmount, tsAggrKline}, true)
......


}
//步骤四
//在建立订阅环境的入口函数最后增加一个参数传入字典
createSubStreamingEnv(dbPath, tbName, userName, userPass, dictVol, dictAmount)


请先 登录 后评论