repartitionDS改成sqlDS试试,sqlDS可以按原分区加载数据。
v = 2021.06.01..2021.06.30
def loadData(m){
shdata= select * from loadTable("dfs://hash", "ss10") where TradingDay=m,left(InstrumentID,1)=`6
//szdata= select * from loadTable("dfs://hash", "ss10") where TradingDay=m,ExchangeID=`SZE, (left(InstrumentID,1)=`3 or left(InstrumentID,1)=`0)
return shdata
}
def saveData(tb){
loadTable("dfs://test1",`ss10).append!(tb)
date=exec top 1 TradingDay from tb
print(date)}
pipeline(each(partial{loadData}, v),saveData)
一开始使用这个脚本迁移了沪市的行情数据能成功(读进内存大概是每天4个G不到),后面换成深市(读进内存每天4个G多一些)就发现报OOM的错。开始尝试教程中的下面写法
dates= 2021.06.02..2021.06.30
def writeDataTo(dbPath, tbName, mutable tbdata){
loadTable(dbPath,tbName).append!(tbdata)
}
datasrc=repartitionDS(<select * from loadTable("dfs://hash","ss10") where ExchangeID=`SZE, (left(InstrumentID,1)=`3 or left(InstrumentID,1)=`0)>,`TradingDay,VALUE,dates)
mr(ds=datasrc, mapFunc=writeDataTo{"dfs://test1","ss10"}, parallel=true)
发现这个方法执行下来也会报OOM的错。
想请问下这里有什么问题? 如果我遍历二级分区,一点点导入,如何定位(value,hash)组合分区的二级分区?