可以再创建一个共享流表(table2),利用handler将subscribeTalbe订阅(table1)的数据进行特征计算,计算后的表结构为table2,createTimeSeriesEngine的dummyTable设置为table2,即可实现。
share streamTable(1000:0, `time`a, [TIMESTAMP, DOUBLE]) as table1
share streamTable(1000:0, `time`a`b, [TIMESTAMP, DOUBLE, DOUBLE]) as table2
outputTable = table(10000:0, `interval_time`a`b, [TIMESTAMP, DOUBLE, DOUBLE])
def append_after_filtering(mutable inputTable, msg){
t = select * from msg
t[`b] = t.a+100
insert into inputTable values(t.time,t.a,t.b)
}
Aggregator = createTimeSeriesEngine(name="Aggregator", windowSize=6, step=3, metrics=<[avg(a), avg(b)]>, dummyTable=table2, outputTable=outputTable, timeColumn=`time)
subscribeTable(tableName="table1", actionName="test", offset=0, handler=append_after_filtering{Aggregator}, msgAsTable=true)