在使用DolphinDB流数据横截面聚合引擎时用interval模式触发计算,为什么结果表一直有数据

在使用DolphinDB流数据横截面聚合引擎时用interval模式触发计算,我的代码如下:

//清理环境
def clearEnv(){    
    unsubscribeTable(server="", tableName="trades", actionName="tradesCrossAggregator") //取消订阅
    dropAggregator("CrossSectionalDemo") //取消聚合引擎调用
    undef("trades", SHARED) //删除共享表
    undef("outputTable", SHARED) 
}

//创建横截面引擎并订阅
def createSub(){
    share(streamTable(10:0, `time`sym`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT]), "trades") //创建共享的流数据表
    share(table(1:0, `updateTime`maxQty`maxDollarVolume`sumDollarVolume`count, [TIMESTAMP, INT, DOUBLE, DOUBLE, INT]), "outputTable") //输出表,保存计算结果
    tradesCrossAggregator = createCrossSectionalAggregator(name="CrossSectionalDemo", metrics=<[max(qty), max(price*qty), sum(price*qty), count(price)]>, dummyTable=objByName("trades"), outputTable=objByName("outputTable"), keyColumn="sym", triggeringPattern="interval", triggeringInterval=1000, useSystemTime=true) //创建横截面引擎
    subscribeTable(server="", tableName="trades", actionName="tradesCrossAggregator", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true)    
}

//定时向表trades写入数据,总记录为3*n
def writeData(n, trades){
    for (i in 0:n) {
        timev = take(now(), 3)
        symv = rand(`A`B`C, 3)
           pricev = rand(10.0, 3)
           qtyv = rand(10 20 30 , 3)
        insert into trades values(timev, symv, pricev, qtyv)
        sleep(1000)
    }
}

login("admin", "123456")
//clearEnv()
//undef all
createSub()
submitJob("jobId", "writeDataTrades", writeData{20, objByName("trades")})

select * from outputTable

为什么已经停止向trades表写入数据,outputTable表还会不断有新的计算结果进来?

请先 登录 后评论

1 个回答

Jax Wu

在"interval"模式下,无论是否有新的数据写入,计算均会定时触发。

可以通过取消聚合引擎调用停止触发计算:

dropAggregator("CrossSectionalDemo")
请先 登录 后评论