利用DolphinDB分布式计算股票的因子—Map-Reduce函数案例

Map-Reduce函数是DolphinDB通用分布式计算框架的核心功能。DolphinDB的Map-Reduce函数mr的语法是 mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true]),它可接受一组数据源和一个mapF...

Map-Reduce函数是DolphinDB通用分布式计算框架的核心功能。DolphinDB的Map-Reduce函数mr的语法是 mr(ds, mapFunc, [reduceFunc], [finalFunc], [parallel=true]),它可接受一组数据源和一个mapFunc函数作为参数。它会将计算任务分发到每个数据源所在的结点,通过mapFunc对每个数据源中的数据进行处理。可选参数reduceFunc会将mapFunc的返回值两两做计算,得到的结果再与第三个mapFunc的返回值计算,如此累积计算,将mapFunc的结果汇总。如果有M个map调用,reduce函数将被调用M-1次。可选参数finalFunc对reduceFunc的返回值做进一步处理。

案例1,模拟生成2008.01.01-2022-04.01共14年的全市场股票1分钟的数据,用TSDB引擎存储dolphindb数据库中92.8G。目前全量计算每只股票每天在指定时间段内的最高价作为一个因子m_h_0931_1451,用的mr函数计算,耗时1分54s。建库建表及模拟数据生成脚本见案例1。

dbPath="dfs://min_data"
tableName='stockdata'
tb=loadTable(dbPath,tableName)
// 定义map函数
def Mapfun(t) {
    x=select "m_h_0931_1451" as factorname, max(high) as  factorvalue  from t group by code ,date(datetime) as date;
    return x
}
 // 创建数据源
ds = sqlDS(<select datetime,code,high from tb where date(datetime)>2008.01.01 and date(datetime)<2022.04.01 and second(datetime)  between 09:31:00:14:52:00>)     
// 执行计算       
timer res=mr(ds, Mapfun, , unionAll)
//Time elapsed: 114493.127 ms,1m 54s 


案例1建库建表语句

dbPath="dfs://min_data"
 if (existsDatabase(dbPath)){
 	dropDatabase(dbPath)}
db1=database("", VALUE, 2007.01M..2030.12M)
db2=database("", HASH, [SYMBOL,10])
db=database(dbPath, COMPO, [db1,db2],engine="TSDB")
name=['datetime', 'code', 'open', 'high', 'low','close', 'volume', 'amount', 'xr_factor']
type=[DATETIME,SYMBOL,FLOAT,FLOAT,FLOAT,FLOAT,INT,DOUBLE,FLOAT]
tb=table(100:0,name,type)
tableName='stockdata'
min=db.createPartitionedTable(tb,tableName,['datetime','code'],compressMethods={datetime:"delta"},sortColumns=`code`datetime,keepDuplicates=LAST)

tradedate=loadText("/home/mhxiang/data/tradedate/tradedate.csv")
dates=exec cal_date from tradedate where is_open==1 and cal_date<2022.04.01
second=09:31m..15:00m
second=second(second[second<11:31m||second>13:00m])
datetime=eachLeft(concatDateTime,dates,second).reshape()
n=size(datetime)
tb=loadTable("dfs://min_data",'stockdata')
symbol="sh"+string(600000..604699)
for (isym in symbol){
	print(isym)
	data=table(datetime,take(isym,n) as code,rand(100.0,n)as open,rand(100.0,n)as high,rand(100.0,n)as low,rand(100.0,n)as close,rand(100000,n)as volume,rand(100000,n)as amount,take(1,n) as xr_factor)
	tb.append!(data)
	}

案例2.计算因子的相关系数,tick级数据,每天计算每只股票的所有因子的相关系数,再把所有的股票因子的相关系数取平均。模拟数据生产5天的tick级因子数据,因子数量为300只,每天因子数据量为54.4G,使用mr函数计算耗时4分20秒。建库建表及模拟数据生成脚本见案例2。

def sampleMap(t) {
2	codes = exec distinct(code) from t
3	 flag_stt = 0
4	 for (icode in codes){
5		  stk_ftb =select * from t where code==icode
6		  stk_selfcorr = corrMatrix(matrix(stk_ftb[:,2:]))
7		  stk_selfcorr.nullFill!(0)
8		  if (flag_stt == 0){
9			   total_selfcorr = stk_selfcorr
10			   flag_stt = 1
11		  }else{
12		   	total_selfcorr = total_selfcorr + stk_selfcorr
13		  }
14		 }
15	 return total_selfcorr
16}
17 // 创建数据源
18tb=loadTable("dfs://factor",'factordata')
19timer{
20	ds = sqlDS(<select * from tb where date(datetime)==2022.03.31>)             
21	selfcorr = mr(ds, sampleMap,+ )
22	ncodes=exec distinct(code) from tb
23	selfcorr=selfcorr\ncodes.size()
24}//Time elapsed: 260297.578 ms,4m 20s 343ms

案例2建库建表语句

dbPath="dfs://factor"
 if (existsDatabase(dbPath)){
 	dropDatabase(dbPath)}
db1=database("", VALUE, 2021.01.02..2022.12.31)
db2=database("", HASH, [SYMBOL,20])
db=database(dbPath, COMPO, [db1,db2])
name=['datetime', 'code']
type=[DATETIME,SYMBOL]
factorname="factor"+string(1..300)
name.append!(factorname)
type.append!((take(DOUBLE,size(factorname))))
tb=table(100:0,name,type)
tableName='factordata'
min=db.createPartitionedTable(tb,tableName,['datetime','code'])
//模拟生成tick级别的因子数据
tradedate=loadText("/home/mhxiang/data/tradedate/tradedate.csv")
dates=exec cal_date from tradedate where is_open==1 and cal_date<2022.04.01 and cal_date>=2022.03.25
second=distinct(bar(09:15:03..09:25:00,3s)).append!(distinct(bar(09:30:03..11:30:00,3s))).append!(distinct(bar(13:00:03..15:00:00,3s)))
datetime=eachLeft(concatDateTime,dates,second).reshape()
n=size(datetime)

login("admin","123456")
tb=loadTable("dfs://factor",'factordata')
symbol="sh"+string(600000..604699)
for (isym in symbol){
	print(isym)
	t1=table(datetime,take(isym,n) as code)
	t2=table(rand(10.0, n:size(factorname)).rename!(factorname))
	t2=join(t1,t2)
	tb.append!(t2)
	}

tradedate.csv 




0 条评论

请先 登录 后评论
mhxiang
mhxiang

4 篇文章

作家榜 »

  1. Junxi 73 文章
  2. wfHuang 6 文章
  3. liang.lin 5 文章
  4. mhxiang 4 文章
  5. admin 3 文章
  6. alex 2 文章
  7. 柏木 1 文章
  8. 丘坤威 1 文章