如何使用DolphinDB中的横截面表定时刷新某只股票的最新交易价格

我看到DolphinDB的流数据横截面引擎中的横截面表可以作为最终结果,定时刷新某只股票的最新交易价格,请问应该如何实现?能否提供一个案例。

请先 登录 后评论

1 个回答

Jax Wu

DolphinDB中的横截面表可以为聚合计算提供的一个中间数据表,但横截面表亦可为最终结果。比如我们需要定时刷新某只股票的最新交易价格,按照常规思路是从实时交易表中按代码筛选股票并取出最后一条记录,而交易表的数据量是随着时间快速增长的,如果频繁做这样的查询,无论从系统的资源消耗还是从查询的效能来看都不是最优的做法。而横截面表永远只保存所有股票的最近一次交易数据,数据量是稳定的,对于这种定时轮询的场景非常合适。

要将横截面表作为最终结果,需要在创建横截面时,对metrics与outputTable这两个参数置空。

示例代码如下:

//清理环境
def clearEnv(){    
    unsubscribeTable(server="", tableName="trades", actionName="tradesCrossAggregator") //取消订阅
    dropAggregator("CrossSectionalDemo") //取消聚合引擎调用
    undef("trades", SHARED) //删除共享表
    undef("outputTable", SHARED) 
}

//定时向表trades写入数据,总记录为3*n
def writeData(n, trades){
    for (i in 0:n) {
        timev = take(now(), 3)
        symv   = rand(`A`B`C, 3)
           pricev = rand(10.0, 3)
           qtyv   = rand(10 20 30 , 3)
        insert into trades values(timev, symv, pricev,qtyv)
        sleep(1000)
    }
}

login("admin", "123456")
//clearEnv()
//undef all

//创建横截面引擎
share(streamTable(10:0, `time`sym`price`qty, [TIMESTAMP, SYMBOL, DOUBLE, INT]), "trades") //创建共享的流数据表
share(table(1:0, `updateTime`maxQty`maxDollarVolume`sumDollarVolume`count, [TIMESTAMP, INT, DOUBLE, DOUBLE, INT]), "outputTable") //输出表,保存计算结果
tradesCrossAggregator=createCrossSectionalAggregator(name="CrossSectionalDemo", dummyTable=objByName("trades"), keyColumn="sym", triggeringPattern="perRow")//创建横截面引擎
subscribeTable(server="", tableName="trades", actionName="tradesCrossAggregator", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true)

submitJob("jobId","writeDataTrades",writeData{20, objByName("trades")})

select * from tradesCrossAggregator
请先 登录 后评论