数据重分区出错

v = 2021.06.01..2021.06.30
def loadData(m){
shdata= select * from loadTable("dfs://hash", "ss10") where TradingDay=m,left(InstrumentID,1)=`6
//szdata= select * from loadTable("dfs://hash", "ss10") where TradingDay=m,ExchangeID=`SZE, (left(InstrumentID,1)=`3 or left(InstrumentID,1)=`0)
return shdata
}
def saveData(tb){
loadTable("dfs://test1",`ss10).append!(tb)
date=exec top 1 TradingDay from tb
print(date)}
pipeline(each(partial{loadData}, v),saveData)

一开始使用这个脚本迁移了沪市的行情数据能成功(读进内存大概是每天4个G不到),后面换成深市(读进内存每天4个G多一些)就发现报OOM的错。开始尝试教程中的下面写法

dates= 2021.06.02..2021.06.30
def writeDataTo(dbPath, tbName, mutable tbdata){
    loadTable(dbPath,tbName).append!(tbdata)
}
datasrc=repartitionDS(<select * from loadTable("dfs://hash","ss10") where ExchangeID=`SZE, (left(InstrumentID,1)=`3 or left(InstrumentID,1)=`0)>,`TradingDay,VALUE,dates)
mr(ds=datasrc, mapFunc=writeDataTo{"dfs://test1","ss10"}, parallel=true)

发现这个方法执行下来也会报OOM的错。
想请问下这里有什么问题? 如果我遍历二级分区,一点点导入,如何定位(value,hash)组合分区的二级分区?

请先 登录 后评论

1 个回答

wale

repartitionDS改成sqlDS试试,sqlDS可以按原分区加载数据。

请先 登录 后评论
  • 1 关注
  • 0 收藏,1034 浏览
  • Coco 提出于 2021-07-06 17:26

相似问题