怎么把本地的大csv文件导入到远程节点

我的数据文件是本地存储的,每个大约2GB,目前想通过下面方法把本地存储的数据文件上传到server端:

1.在本地部署一个最简单的单节点server
2.将通过dolphindb GUI把本地数据文件加载到内存表中;
3.在本地的dolphindb GUI中通过xdb连接到远程节点,将内存表存入远程节点的数据库中。

但调试过程发现每批写入的数据量太大,导致服务端节点日志出现Warning:

0ut of memory error occured during asynchronous sorting in TSDB cache engine


我的代码如下:

for(dateIdx in dateFiles[:10])
{
    oneDayFullPath  = allFileContents + "\\" + dateIdx;
    t=ploadText(oneDayFullPath,delimiter='\t',schema = schemaTB)//加载到本地内存表
    re_t = select  *   from t;
    t = 0;
    clearAllCache();
    remoteRun(conn, "append!{MyTable}",re_t);
    re_t = 0;
    clearAllCache();
    print oneDayFullPath;

}


t加载文本后的大小约2GB,我怎么对t做分割呢?可以使用loadtextEx 来自动分割成小块吗?




请先 登录 后评论

1 个回答

wale

可以用textChunkDS函数分段导入,下面示例供参考:

实验环境:

在windows本地分别部署2个单节点,端口分别是8848和8888,假设8848端口的节点是本地节点,8888端口的节点是远程节点。

步骤:

1.连接本地节点,用下面语句创建一个1000万行的csv文件:

login("admin","123456")
n=10000000
csvFile=`chunkText.csv
trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6)
trades.saveText(dataFilePath);

2.连接远程节点,用下面语句创建库表:

n=1
trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6)
login("admin","123456")
db=database("dfs://testdb",VALUE,2000.01.01..2000.06.30)
db.createPartitionedTable(trades,`trades,`date);


3. 连接本地节点,执行下面3行语句:

conn=xdb("127.0.0.1",8888,"admin","123456")
ds=textChunkDS(csvFile, 10);
mr(ds, mapFunc=remoteRun{conn, "append!{loadTable('dfs://testdb','trades')}"}, parallel=false)

其中第1行语句的作用是用xdb连接远程节点;第2行是 textChunkDS 函数将文本文件划分为多个小文件数据源,这里10表示每个小数据源的大小是10MB;第3行语句是用mr 函数写入到数据库中,这里要注意 每个小文件数据源可能包含相同分区的数据。DolphinDB 不允许多个线程同时对相同分区进行写入,因此要将 mr 函数 parallel 参数设置为 false,否则会抛出异常。

4. 连接远程节点,用下面语句检查数据是否都成功写入:

select count(*) from loadTable("dfs://testdb","trades")

如下图所示,返回结果10000000,成功!

attachments-2024-03-Ir50MiMe660664641e90b.png


请先 登录 后评论