//步骤一
//累积量实时预处理,此函数定义放到createSubStreamingEnv函数的前面
def calcTradeVolumeAndAmount(mutable dictVolume, mutable dictAmount, mutable tsAggrKline, msg){
t = select sym, timestamp, prev(cumVolume) as prevCumVolume, prev(cumAmount) as prevCumAmount from msg context by sym update t set prevCumVolume = dictVolume[sym], prevCumAmount = dictAmount[sym] where isNull(prevCumVolume) //update dictVolume and dictAmount with most recent values snapshot = select sym, cumVolume, cumAmount from msg context by sym limit -1 dictVolume[snapshot.sym] = snapshot.cumVolume dictAmount[snapshot.sym] = snapshot.cumAmount //append message to tsAggrKline tsAggrKline.append!(select sym, timestamp, cumVolume - prevCumVolume as volume, cumAmount - prevCumAmount as amount from t)
}
//步骤二
//定义一个字典用于保存每一个symbol前一次的聚合值,在预处理函数中用到
dictVol = dict(STRING, DOUBLE)
dictAmount = dict(STRING, DOUBLE)
//步骤三
//在订阅写入聚合引擎部分,调用预处理函数写入聚合引擎,增加一个参数传入字典
def createSubStreamingEnv(dbPath,tbName,userName,userPass,mutable dictVol){
//创建时序数据聚合引擎tsAggrKline ...... //订阅流数据写入聚合引擎 subscribeTable(,"Trade", "MinuteK", 0, calcTradeVolumeAndAmount{dictVol, dictAmount, tsAggrKline}, true) ......
}
//步骤四
//在建立订阅环境的入口函数最后增加一个参数传入字典
createSubStreamingEnv(dbPath, tbName, userName, userPass, dictVol, dictAmount)