10 tableInsert(pt, insert_table) => Invalid symbol file: /home/lyj/server/DolphinDB/server/local8848/storage/CHUNKS/common_day_hash/20120820_20120821/Key4/2Uu/chunk.dict

server 版本 2.12 unbuntu 

建库


if (existsDatabase('dfs://common_day_hash'))
 drop database "dfs://common_day_hash"
create database "dfs://common_day_hash"
partitioned by  RANGE(2000.01.01 .. 2040.01.01),HASH([SYMBOL, 5])
engine='TSDB'

建表

db_path = "dfs://common_day_hash"
db = database(db_path)
db = database(db_path)
// 定义表名
tbName = "one_min_kline"
// 创建表
if(existsTable(db_path, tbName))
    dropTable(db, tbName)
colName = "order_book_id""trade_time""open""close""high""low""volume""total_turnover"
colType = "SYMBOL""TIMESTAMP""DOUBLE""DOUBLE""DOUBLE""DOUBLE""INT""DOUBLE"
partCol = "trade_time""order_book_id"
sortCol = "order_book_id""trade_time"
tbSchema = table(1:0, colName, colType)
db.createPartitionedTable(table=tbSchema, tableName=tbName,
    partitionColumns=partCol,
    compressMethods={tradetime:"delta"},
    sortColumns=sortCol, keepDuplicates=LAST,
    sortKeyMappingFunction=[hashBucket{,399}])

导入脚本

db_path ="dfs://common_day_hash"
table_name = "one_min_kline"
record_table_name = "insert_table"
db = database(db_path)
pt = loadTable(db,table_name)
record = loadTable(db,record_table_name)
file_dic = "/home/lyj/win_share/share_c/minbar/equities//"
file_list = files(file_dic)
recode=loadTable(db,`insert_table)
min_kline_code_list = select order_book_id from recode  context by order_book_id limit -1
insert_code_list = select filename from file_list where substr(filename,0,strlen(filename) - 3) not in min_kline_code_list

for(row in insert_code_list)
{
    time_now =now()
   
    print(time_now + "  |||  file: " +row.filename)
    file_path = file_dic + row.filename
    code = substr(row.filename,0,strlen(row.filename) - 3)
    data_table = hdf5::loadHDF5(file_path,"data")
    update data_table set order_book_id = code, trade_time = temporalParse(datetime.format("000000"), "yyyyMMddHHmmss")
    insert_table = select order_book_id,trade_time,open,close,high,low,int(volume) as volume,total_turnover from data_table
    pt.tableInsert(insert_table)
    //add code to record the file has been inserted
    time_now = now()
    record_table = table(code as order_book_id, time_now as update_time, table_name as table_name)
    record.append!(record_table)
   
    size = (select memSize from  getSessionMemoryStat() where userId = `__TSDBCacheEngine__).memSize[0]
    if(size > 30 * 1024 * 1024)
    {
        flushTSDBCache()
    }  
}


插入股票的1分钟数据到数据库中

1. 问题一 从h5 文件导入,内存很容易爆炸,

    问题二, 减少hash分区, 系统内存会被消耗完,导致机器完全动不了

    问题三, tableInsert(pt, insert_table) => Invalid symbol file: /home/lyj/server/DolphinDB/server/local8848/storage/CHUNKS/common_day_hash/20120820_20120821/Key4/2Uu/chunk.dict

    可能是强制关机导致文件损坏还是怎么找,这个hash dict文件不能被服务处理了



请先 登录 后评论

1 个回答

Polly

分区方案可以参考一下 https://docs.dolphindb.cn/zh/tutorials/best_practices_for_partitioned_storage.html;内存炸的话确认一下单次写入的数据量大概是多少,因为 TSDB 内部有一些排序、复制、索引创建的操作,可能内存会涨到实际写入的 2-3 倍的;您如果要导入分布式表,建议用 hdf5::loadHDF5Ex 直接写入,这个接口会内部拆分数据,每 512M 写一次的。

另外 chunk.dict 丢失,如果是集群双副本,可以从其他副本复制一个。

请先 登录 后评论
  • 1 关注
  • 0 收藏,162 浏览
  • worm 提出于 2024-04-15 13:23

相似问题