关于dolphidb复杂因子计算

有复杂一点的因子的流式数据的例子吗,比如 过去5天股价日涨幅与板块内所有股票的平均日涨幅两者相关性 - 过去10天股价日涨幅与板块内所有股票的平均日涨幅两者相关性?

每分钟计算一次,单只股票的日涨幅 就是 股价/昨天同时刻的股价,数据也是分钟数据,每分钟一条记录,所有股票等权重平均涨幅计算吧

请先 登录 后评论

1 个回答

Qing Li

/*---------------------------------------------场景需求---------------------------------------------------

  • 过去5天股价日涨幅与板块内所有股票的平均日涨幅两者相关性

  • 过去10天股价日涨幅与板块内所有股票的平均日涨幅两者相关性

  • 所有股票的平均日涨幅:按所有股票等权重平均涨幅计算

  • 单只股票的日涨幅 = 今日股价/昨天同时刻的股价-1

  • tick,数据是分钟数据,每分钟一条记录

  • reactive state engine 支持多个key

  • version:1.30.7 2021.04.21
    */

/*-------------------------------------------清理流数据环境------------------------------------------------
函数 clearEnv(): 清理流数据环境
函数 unsubscribeTable(): 取消订阅
函数 dropStreamEngine(): 释放指定的流数据引擎的定义
函数 undef(): 从内存中释放变量
*/
def clearEnv(){
try{unsubscribeTable(tableName="tickStream", actionName="subTick")} catch(ex) {print ex}
try{dropStreamEngine("dailyIncrease")} catch(ex) {print ex}
try{dropStreamEngine("tickCrossAggregator")} catch(ex) {print ex}
try{dropStreamEngine("correlation")} catch(ex) {print ex}
try{undef("tickStream", SHARED)} catch(ex) {print ex}
try{undef("tickCrossIn", SHARED)} catch(ex) {print ex}
try{undef("tickCrossOut", SHARED)} catch(ex) {print ex}
try{undef("correlation", SHARED)} catch(ex) {print ex}
}

/*-------------------------------------------创建流数据引擎并订阅发布流数据的表------------------------------------------------
函数 prepareSub(numOfTick): 创建流数据引擎并订阅发布流数据的表 numOfTick是股票的数量
tickStream: 共享的流数据表,作为发布的流数据表,被订阅后可以被消费
(1)响应式状态引擎 dailyIncrease:计算每只股票日涨幅
(2)横截面引擎 tickCrossAggregator:计算板块内所有股票的平均日涨幅
(3)响应式状态引擎 correlation:计算相关性
共享表tickCrossIn:作为tickCrossAggregator输入的表结构(即dailyIncrease输出的表结构)
共享表tickCrossOut:作为correlation输入的表结构(即tickCrossAggregator输出的表结构)
共享表correlation:保存响应式状态引擎 correlation 计算的结果
*/
def prepareSub(numOfTick){
share(streamTable(1000:0, SecurityIDTradeTimeTradeDatePrice, [SYMBOL, TIME, DATE, DOUBLE]), "tickStream")
share(table(1000:0, SecurityIDTradeTimeTradeDateDateTimeRatio, [SYMBOL, TIME, DATE, DATETIME, DOUBLE]), "tickCrossIn") share(table(1:0, updateTimeSecurityIDTradeTimeTradeDateDateTimeRatioavgRatio, [TIMESTAMP, SYMBOL, TIME, DATE, DATETIME, DOUBLE, DOUBLE]), "tickCrossOut")
share(table(1:0, SecurityIDTradeTimeupdateTimeTradeDateDateTimeRatioavgRatiomcorr5mcorr10, [SYMBOL, TIME, TIMESTAMP, DATE, DATETIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE]), "correlation") correlation = createReactiveStateEngine(name="correlation", metrics=[<updateTime>, <TradeDate>, <DateTime>, <Ratio>, <avgRatio>, <mcorr(Ratio, avgRatio, 5)>, <mcorr(Ratio, avgRatio, 10)>], dummyTable=objByName("tickCrossOut"), outputTable=objByName("correlation"), keyColumn=SecurityIDTradeTime) tickCrossAggregator = createCrossSectionalEngine(name="tickCrossAggregator", metrics=<[SecurityID, TradeTime, TradeDate, DateTime, Ratio, avg(Ratio)]>, dummyTable=objByName("tickCrossIn"), outputTable=correlation, keyColumn="SecurityID", triggeringPattern='keyCount', triggeringInterval=numOfTick, timeColumn=DateTime, lastBatchOnly=true) //创建横截面引擎
dailyIncrease = createReactiveStateEngine(name="dailyIncrease", metrics=[<TradeDate>, <datetime(concatDateTime(TradeDate, TradeTime))>, <ratios(Price)-1>], dummyTable=objByName("tickStream"), outputTable=tickCrossAggregator, keyColumn=SecurityIDTradeTime)
subscribeTable(tableName="tickStream", actionName="subTick", handler=tableInsert{dailyIncrease})
}

/-------------------------------------------产生模拟数据------------------------------------------------
函数 prepareData():模拟三只股票的每日分钟级数据
传参格式:prepareData(2021.01.01..2021.01.15, 4)
/
def prepareData(day, numOfPertick){
IDDistinct = take("A"+string(1..3), 3)
t1 = table(1:0, SecurityIDTradeTimeTradeDatePrice, [SYMBOL, TIME, DATE, DOUBLE])
for(i in day){
SecurityID = flatten([take(IDDistinct[0], numOfPertick), take(IDDistinct[1], numOfPertick), take(IDDistinct[2], numOfPertick)])
TradeTime = take(temporalAdd(09:00:00.000, 1..numOfPertick, "m"), numOfPertick3)
TradeDate = take(i, numOfPertick3)
Price = rand(10.0, numOfPertick*3)
t2 = table(SecurityID, TradeTime, TradeDate, Price)
t1.append!(t2)
}
return select * from t1 order by TradeDate, TradeTime
}

clearEnv()
prepareSub(3)
data = prepareData(2021.01.01..2021.01.15, 4)
objByName("tickStream").append!(data)

/*-------------------------------------------------------结果查询------------------------------------------------------
共享表 correlation 保存了所有的计算结果
select * from correlation
*/

/*-------------------------------------------------------流数据发布订阅情况查询函数------------------------------------------------------
getStreamingStat().subWorkers //查询订阅状态
getStreamingStat().pubConns //发布链接情况
getStreamingStat().pubTables //发布表
getStreamingStat().subConns //订阅链接数
*/

/*----------------------调试ReactiveStateEngine多个keyColumn输出时间乱序的问题-------------------
a = exec top 1100 Datetime from correlation
isMonotonicIncreasing(a)
*/

/*----------------------以loadTable("dfs://CN_STOCK", "tick")的数据调试-------------------
tick = loadTable("dfs://CN_STOCK", "tick")
t = select SecurityID, TradeTime, TradeDate, Price from
(select avg(LastPrice) as Price from tick where TradeDate between 2019.09.06 : 2019.09.30 and SecurityID in 000019000050 group by TradeDate, time(minute(UpdateTime)) as TradeTime, SecurityID)
objByName("tickStream").append!(t)
*/

请先 登录 后评论