可以用textChunkDS函数分段导入,下面示例供参考:
实验环境:
在windows本地分别部署2个单节点,端口分别是8848和8888,假设8848端口的节点是本地节点,8888端口的节点是远程节点。
步骤:
1.连接本地节点,用下面语句创建一个1000万行的csv文件:
login("admin","123456") n=10000000 csvFile=`chunkText.csv trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6) trades.saveText(dataFilePath);
2.连接远程节点,用下面语句创建库表:
n=1 trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6) login("admin","123456") db=database("dfs://testdb",VALUE,2000.01.01..2000.06.30) db.createPartitionedTable(trades,`trades,`date);
3. 连接本地节点,执行下面3行语句:
conn=xdb("127.0.0.1",8888,"admin","123456")
ds=textChunkDS(csvFile, 10);
mr(ds, mapFunc=remoteRun{conn, "append!{loadTable('dfs://testdb','trades')}"}, parallel=false)
其中第1行语句的作用是用xdb连接远程节点;第2行是 textChunkDS 函数将文本文件划分为多个小文件数据源,这里10表示每个小数据源的大小是10MB;第3行语句是用mr 函数写入到数据库中,这里要注意 每个小文件数据源可能包含相同分区的数据。DolphinDB 不允许多个线程同时对相同分区进行写入,因此要将 mr 函数 parallel 参数设置为 false,否则会抛出异常。
4. 连接远程节点,用下面语句检查数据是否都成功写入:
select count(*) from loadTable("dfs://testdb","trades")
如下图所示,返回结果10000000,成功!