Map-Reduce函数是DolphinDB通用分布式计算框架的核心功能。DolphinDB的Map-Reduce函数mr的语法是 mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true]),它可接受一组数据源和一个mapFunc函数作为参数。它会将计算任务分发到每个数据源所在的结点,通过mapFunc对每个数据源中的数据进行处理。可选参数reduceFunc会将mapFunc的返回值两两做计算,得到的结果再与第三个mapFunc的返回值计算,如此累积计算,将mapFunc的结果汇总。如果有M个map调用,reduce函数将被调用M-1次。可选参数finalFunc对reduceFunc的返回值做进一步处理。
案例1,模拟生成2008.01.01-2022-04.01共14年的全市场股票1分钟的数据,用TSDB引擎存储dolphindb数据库中92.8G。目前全量计算每只股票每天在指定时间段内的最高价作为一个因子m_h_0931_1451,用的mr函数计算,耗时1分54s。建库建表及模拟数据生成脚本见案例1。
dbPath="dfs://min_data" tableName='stockdata' tb=loadTable(dbPath,tableName) // 定义map函数 def Mapfun(t) { x=select "m_h_0931_1451" as factorname, max(high) as factorvalue from t group by code ,date(datetime) as date; return x } // 创建数据源 ds = sqlDS(<select datetime,code,high from tb where date(datetime)>2008.01.01 and date(datetime)<2022.04.01 and second(datetime) between 09:31:00:14:52:00>) // 执行计算 timer res=mr(ds, Mapfun, , unionAll) //Time elapsed: 114493.127 ms,1m 54s
dbPath="dfs://min_data" if (existsDatabase(dbPath)){ dropDatabase(dbPath)} db1=database("", VALUE, 2007.01M..2030.12M) db2=database("", HASH, [SYMBOL,10]) db=database(dbPath, COMPO, [db1,db2],engine="TSDB") name=['datetime', 'code', 'open', 'high', 'low','close', 'volume', 'amount', 'xr_factor'] type=[DATETIME,SYMBOL,FLOAT,FLOAT,FLOAT,FLOAT,INT,DOUBLE,FLOAT] tb=table(100:0,name,type) tableName='stockdata' min=db.createPartitionedTable(tb,tableName,['datetime','code'],compressMethods={datetime:"delta"},sortColumns=`code`datetime,keepDuplicates=LAST) tradedate=loadText("/home/mhxiang/data/tradedate/tradedate.csv") dates=exec cal_date from tradedate where is_open==1 and cal_date<2022.04.01 second=09:31m..15:00m second=second(second[second<11:31m||second>13:00m]) datetime=eachLeft(concatDateTime,dates,second).reshape() n=size(datetime) tb=loadTable("dfs://min_data",'stockdata') symbol="sh"+string(600000..604699) for (isym in symbol){ print(isym) data=table(datetime,take(isym,n) as code,rand(100.0,n)as open,rand(100.0,n)as high,rand(100.0,n)as low,rand(100.0,n)as close,rand(100000,n)as volume,rand(100000,n)as amount,take(1,n) as xr_factor) tb.append!(data) }
案例2.计算因子的相关系数,tick级数据,每天计算每只股票的所有因子的相关系数,再把所有的股票因子的相关系数取平均。模拟数据生产5天的tick级因子数据,因子数量为300只,每天因子数据量为54.4G,使用mr函数计算耗时4分20秒。建库建表及模拟数据生成脚本见案例2。
def sampleMap(t) { 2 codes = exec distinct(code) from t 3 flag_stt = 0 4 for (icode in codes){ 5 stk_ftb =select * from t where code==icode 6 stk_selfcorr = corrMatrix(matrix(stk_ftb[:,2:])) 7 stk_selfcorr.nullFill!(0) 8 if (flag_stt == 0){ 9 total_selfcorr = stk_selfcorr 10 flag_stt = 1 11 }else{ 12 total_selfcorr = total_selfcorr + stk_selfcorr 13 } 14 } 15 return total_selfcorr 16} 17 // 创建数据源 18tb=loadTable("dfs://factor",'factordata') 19timer{ 20 ds = sqlDS(<select * from tb where date(datetime)==2022.03.31>) 21 selfcorr = mr(ds, sampleMap,+ ) 22 ncodes=exec distinct(code) from tb 23 selfcorr=selfcorr\ncodes.size() 24}//Time elapsed: 260297.578 ms,4m 20s 343ms
dbPath="dfs://factor" if (existsDatabase(dbPath)){ dropDatabase(dbPath)} db1=database("", VALUE, 2021.01.02..2022.12.31) db2=database("", HASH, [SYMBOL,20]) db=database(dbPath, COMPO, [db1,db2]) name=['datetime', 'code'] type=[DATETIME,SYMBOL] factorname="factor"+string(1..300) name.append!(factorname) type.append!((take(DOUBLE,size(factorname)))) tb=table(100:0,name,type) tableName='factordata' min=db.createPartitionedTable(tb,tableName,['datetime','code']) //模拟生成tick级别的因子数据 tradedate=loadText("/home/mhxiang/data/tradedate/tradedate.csv") dates=exec cal_date from tradedate where is_open==1 and cal_date<2022.04.01 and cal_date>=2022.03.25 second=distinct(bar(09:15:03..09:25:00,3s)).append!(distinct(bar(09:30:03..11:30:00,3s))).append!(distinct(bar(13:00:03..15:00:00,3s))) datetime=eachLeft(concatDateTime,dates,second).reshape() n=size(datetime) login("admin","123456") tb=loadTable("dfs://factor",'factordata') symbol="sh"+string(600000..604699) for (isym in symbol){ print(isym) t1=table(datetime,take(isym,n) as code) t2=table(rand(10.0, n:size(factorname)).rename!(factorname)) t2=join(t1,t2) tb.append!(t2) }
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!