step1:根据数据量,合理分区,创建好数据库
demo示例,数据量每个月差不多1000W条记录,分区原则按照每个最小分区内100W条记录,设计两层分区,第一层按月,第二层按HASH均分为10个分区。
代码:
//undef all login("admin", "123456") dbPath = "dfs://min_k" tbName = "min_table" filePath = "G:/Data/2020-1-good/000001.SZ.csv" schemaTB = extractTextSchema(filePath) col1 = exec name from schemaTB col2 = exec type from schemaTB t = table(10:0, col1, col2) if(existsDatabase(dbPath)){ dropDB(dbPath) } dbMonth = database("", VALUE, 2021.01M..2021.02M) dbSymbol = database("", HASH, [SYMBOL,10]) db = database(dbPath, COMPO, [dbMonth,dbSymbol]) createPartitionedTable(db, t, tbName, `datetime`symbol) min_table = loadTable(dbPath, tbName)
step2:
先批量读取csv文件(比如500个一读),用ploadText函数多线程加载数据到内存表,然后再写入DFS数据库表。
代码:
//undef all //读取单个csv def loadOrderBook(path, filename){ t = ploadText(path + "/" + filename) return t } //写入数据库 def loadOrderDir(mutable tb, path){ fileList = exec filename from files(path, "%.csv") fs= fileList.cut(100) for(i in 0:fs.size()){ t=table(500000:0, tb.schema().colDefs['name'], tb.schema().colDefs['typeString']) for(f in fs[i]) { t.append!(loadOrderBook(path, f)) } tb.append!(t) } } //读取csv 每个文件并提交写入数据库的作业 def loopLoadOrderBook(dir, mutable tb){ dirs = exec filename from files(dir) where isDir = true for (path in dirs){ path = dir + "/" + path print path submitJob("writing", "loadOrderDir"+path, loadOrderDir{tb, path}) } } login("admin", "123456") dbPath = "dfs://min_k" tbName = "min_table" dir = "G:/Data" orderbooktb = loadTable(dbPath, tbName) loopLoadOrderBook(dir, orderbooktb) //查询提交的job:getRecentJobs() //取消提交的job:cancelJob(`sdn_3101202103020002) //select count(*) from orderbooktb loadOrderDir: fs = ::cut(fileList, 100) => The cut size must be greater than one and less than the vector size