10 TSDB engine 导入tick数据Out of memory问题

社区版 8G内存


login("admin","123456")

def createDB(dbName,tableName){
if(existsDatabase(dbName))
dropDatabase(dbName)
dbDate = database("",VALUE,2021.01.01..2022.12.31)
dbSymbol=database("",HASH, [SYMBOL,4])
// db = database(dbName, COMPO, [dbDate, dbSymbol])
db=database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate, dbSymbol], engine="TSDB")

// columns = `symbol`time`last`volume`ask_0_p`ask_0_v`ask_1_p`ask_1_v`ask_2_p`ask_2_v`ask_3_p`ask_3_v`ask_4_p`ask_4_v`ask_5_p`ask_5_v`ask_6_p`ask_6_v`ask_7_p`ask_7_v`ask_8_p`ask_8_v`ask_9_p`ask_9_v`ask_10_p`ask_10_v`ask_11_p`ask_11_v`ask_12_p`ask_12_v`ask_13_p`ask_13_v`ask_14_p`ask_14_v`ask_15_p`ask_15_v`ask_16_p`ask_16_v`ask_17_p`ask_17_v`ask_18_p`ask_18_v`ask_19_p`ask_19_v`bid_0_p`bid_0_v`bid_1_p`bid_1_v`bid_2_p`bid_2_v`bid_3_p`bid_3_v`bid_4_p`bid_4_v`bid_5_p`bid_5_v`bid_6_p`bid_6_v`bid_7_p`bid_7_v`bid_8_p`bid_8_v`bid_9_p`bid_9_v`bid_10_p`bid_10_v`bid_11_p`bid_11_v`bid_12_p`bid_12_v`bid_13_p`bid_13_v`bid_14_p`bid_14_v`bid_15_p`bid_15_v`bid_16_p`bid_16_v`bid_17_p`bid_17_v`bid_18_p`bid_18_v`bid_19_p`bid_19_v`timestamp`intra_time`ask_qty`bid_qty`bid`ask`wpr`next_bid`next_ask`wpr_ret`ret`good`min_1024`max_1024`min_2048`max_2048`min_4096`max_4096`datetime
// type=[SYMBOL,SYMBOL,DATETIME,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE]

// type=[SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,TIMESTAMP,STRING,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,BOOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DATETIME]

columns = `symbol`time`last`volume`ask_0_p`ask_0_v`ask_1_p`ask_1_v`ask_2_p`ask_2_v`ask_3_p`ask_3_v`ask_4_p`ask_4_v`ask_5_p`ask_5_v`ask_6_p`ask_6_v`ask_7_p`ask_7_v`ask_8_p`ask_8_v`ask_9_p`ask_9_v`ask_10_p`ask_10_v`ask_11_p`ask_11_v`ask_12_p`ask_12_v`ask_13_p`ask_13_v`ask_14_p`ask_14_v`ask_15_p`ask_15_v`ask_16_p`ask_16_v`ask_17_p`ask_17_v`ask_18_p`ask_18_v`ask_19_p`ask_19_v`bid_0_p`bid_0_v`bid_1_p`bid_1_v`bid_2_p`bid_2_v`bid_3_p`bid_3_v`bid_4_p`bid_4_v`bid_5_p`bid_5_v`bid_6_p`bid_6_v`bid_7_p`bid_7_v`bid_8_p`bid_8_v`bid_9_p`bid_9_v`bid_10_p`bid_10_v`bid_11_p`bid_11_v`bid_12_p`bid_12_v`bid_13_p`bid_13_v`bid_14_p`bid_14_v`bid_15_p`bid_15_v`bid_16_p`bid_16_v`bid_17_p`bid_17_v`bid_18_p`bid_18_v`bid_19_p`bid_19_v`timestamp`datetime
type = [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,TIMESTAMP,DATE]
orderData = table(1:0, columns,type)
db.createPartitionedTable(orderData, tableName,`datetime`symbol,sortColumns=`timestamp)
// db.createPartitionedTable(orderData, tableName,`datetime`symbol)

}

dbName = "dfs://tick_data"
tableName = "tickData"
createDB(dbName,tableName)

tickData=loadTable("dfs://tick_data",`tickData)
select top 10 * from tickData
select count(*) from loadTable("dfs://tick_data",`tickData)

用这个脚本创建数据库,导入csv格式的tick数据。


导入到一半提示Out of memory

请问有什么办法解决吗 olap 导入一切正常



我的导入脚本如下



def filePathGetSymbol(filepath){
startPos = regexFind(filepath,"xxx-",0)+8
endPos = regexFind(filepath,".csv",0)
return strReplace(substr(filepath,startPos,endPos - startPos),".","");
}

def loadOneDayData(filepath){
schema=extractTextSchema(filepath);
update schema set type = `DATETIME where name = `time;
temp = loadText(filepath,schema=schema)
timestamp_tmp = timestamp((select timestamp from temp).values())
dropColumns!(temp,`timestamp)
update temp set timestamp = timestamp_tmp[0]
update temp set symbol = filePathGetSymbol(filepath)
datetime = date((select time from temp).values())[0]
update temp set datetime = datetime[0]
temp.reorderColumns!(`symbol`time`last`volume`ask_0_p`ask_0_v`ask_1_p`ask_1_v`ask_2_p`ask_2_v`ask_3_p`ask_3_v`ask_4_p`ask_4_v`ask_5_p`ask_5_v`ask_6_p`ask_6_v`ask_7_p`ask_7_v`ask_8_p`ask_8_v`ask_9_p`ask_9_v`ask_10_p`ask_10_v`ask_11_p`ask_11_v`ask_12_p`ask_12_v`ask_13_p`ask_13_v`ask_14_p`ask_14_v`ask_15_p`ask_15_v`ask_16_p`ask_16_v`ask_17_p`ask_17_v`ask_18_p`ask_18_v`ask_19_p`ask_19_v`bid_0_p`bid_0_v`bid_1_p`bid_1_v`bid_2_p`bid_2_v`bid_3_p`bid_3_v`bid_4_p`bid_4_v`bid_5_p`bid_5_v`bid_6_p`bid_6_v`bid_7_p`bid_7_v`bid_8_p`bid_8_v`bid_9_p`bid_9_v`bid_10_p`bid_10_v`bid_11_p`bid_11_v`bid_12_p`bid_12_v`bid_13_p`bid_13_v`bid_14_p`bid_14_v`bid_15_p`bid_15_v`bid_16_p`bid_16_v`bid_17_p`bid_17_v`bid_18_p`bid_18_v`bid_19_p`bid_19_v`timestamp`datetime)
return temp
}

def importData(filepath){
dbName = "dfs://tick_data"
tableName = "tickData"
tb = loadTable(dbName,tableName)
importData = loadOneDayData(filepath)
tb.append!(importData)
}

def importDataByCsv(path){
fileList = exec filename from files(path,"%.csv")

i = 0
for(file in fileList){
tmp_path = path+file
i += 1
submitJob("import_job"+ i,'import csv'+tmp_path,importData{tmp_path})
// importData(tmp_path)
}
}

请先 登录 后评论

1 个回答

陈枢之 - 工程师
建议使用textChunkDS 函数来导入,这样大文件会被切分为数据源,不会单次事务占用过大内存

例子:

def writeData(files, dirPath, dbName, tbName){
login("admin", "123456")
pt = loadTable(dbName, tbName)
for(file in files ){
filePath = dirPath+"/"+file
print(filePath)
schemaTB = table(pt.schema().colDefs.name as name, pt.schema().colDefs.typeString as type)
try{ds = textChunkDS(filePath,512,",",schemaTB)
mr(ds, append!{pt},,,false)
}
catch(ex){print(ex)}
}
}


请先 登录 后评论