subscribeTable和createTimeSeriesEngine一起使用的时候如何实现订阅数据后时序聚合前进行特征计算(增加列)

https://gitee.com/dolphindb/Tutorials_CN/blob/master/stream_aggregator.md#2-%E8%BF%87%E6%BB%A4%E6%B5%81%E6%95%B0%E6%8D%AE

在该教程中介绍了如何在监听表的同时过滤数据,如果想实现过滤的同时进行特征的计算,表结构发生了变化该如何处理?

share streamTable(1000:0, `time`a, [TIMESTAMP, DOUBLE]) as table

outputTable = table(10000:0, `time`a, [TIMESTAMP, DOUBLE, DOUBLE])

def append_after_filtering(mutable inputTable, msg){

t = select * from msg

insert into inputTable values(t.time,t.a)

}

Aggregator = createTimeSeriesEngine(name="Aggregator", windowSize=6, step=3, metrics=<[avg(a)]>, dummyTable=table, outputTable=outputTable, timeColumn=`time)

subscribeTable(tableName="table", actionName="test", offset=0, handler=append_after_filtering{Aggregator}, msgAsTable=true)

我想在append_after_filtering增加一列计算a+100作为b列

请先 登录 后评论

1 个回答

banxianer

可以再创建一个共享流表(table2),利用handler将subscribeTalbe订阅(table1)的数据进行特征计算,计算后的表结构为table2,createTimeSeriesEngine的dummyTable设置为table2,即可实现。

share streamTable(1000:0, `time`a, [TIMESTAMP, DOUBLE]) as table1

share streamTable(1000:0, `time`a`b, [TIMESTAMP, DOUBLE, DOUBLE]) as table2

outputTable = table(10000:0, `interval_time`a`b, [TIMESTAMP, DOUBLE, DOUBLE])


def append_after_filtering(mutable inputTable, msg){

t = select * from msg

t[`b] = t.a+100

insert into inputTable values(t.time,t.a,t.b)

}

Aggregator = createTimeSeriesEngine(name="Aggregator", windowSize=6, step=3, metrics=<[avg(a), avg(b)]>, dummyTable=table2, outputTable=outputTable, timeColumn=`time)

subscribeTable(tableName="table1", actionName="test", offset=0, handler=append_after_filtering{Aggregator}, msgAsTable=true)

请先 登录 后评论