用c++ api MultithreadedTableWriter接口怎么把csv文件导入分区表

csv文件前10行数据如下所示:

time	device_id	battery_level	battery_status	battery_temperature	bssid	cpu_avg_1min	cpu_avg_5min	cpu_avg_15min	mem_free	mem_used	rssi	ssid
2016.11.15T07:00:00	demo000029	48	discharging	90.3	01:02:03:04:05:06	7.42	7.004	7.1213	460,781,455	539,218,545	(58)	demo-net
2016.11.15T07:00:00	demo000034	31	discharging	88.9	01:02:03:04:05:06	5.36	7.152	7.6373	649,945,465	350,054,535	(58)	demo-net
2016.11.15T07:00:00	demo000037	75	discharging	90.7	A0:B1:C5:D2:E0:F3	7.26	6.572	6.644	420,956,932	579,043,068	(49)	stealth-net
2016.11.15T07:00:00	demo000059	78	discharging	89.9	A0:B1:C5:D2:E0:F3	24.35	9.91	7.69	559,660,292	440,339,708	(48)	stealth-net
2016.11.15T07:00:00	demo000063	47	discharging	89.8	22:32:A2:B3:05:98	30.23	13.166	10.5087	730,143,821	269,856,179	(41)	demo-5ghz
2016.11.15T07:00:00	demo000084	56	discharging	92.4	A0:B1:C5:D2:E0:F3	6.97	8.354	8.7713	490,555,023	509,444,977	(44)	stealth-net
2016.11.15T07:00:00	demo000087	85	discharging	88.5	22:32:A2:B3:05:98	28.13	10.186	7.382	669,735,198	330,264,802	(47)	demo-5ghz
2016.11.15T07:00:00	demo000096	92	discharging	93	A0:B1:C5:D2:E0:F3	28.28	11.416	8.792	569,078,358	430,921,642	(66)	stealth-net
2016.11.15T07:00:00	demo000099	53	discharging	87.9	22:32:A2:B3:05:98	8.98	8.196	8.252	449,422,871	550,577,129	(57)	demo-5ghz
2016.11.15T07:00:00	demo000101	72	discharging	92.3	A0:B1:C5:D2:E0:F3	96.74	26.228	14.6627	619,974,710	380,025,290	(57)	stealth-net

建库建表如下:

login(`admin, `123456)
if (exists('dfs://iot') ) dropDatabase('dfs://iot')

db1 = database('',VALUE,2016.11.15..2016.11.18)
db2 = database('',HASH,[SYMBOL,10])
db = database('dfs://iot',COMPO,[db1,db2])

schema=table(1:0,`time`device_id`battery_level`battery_status`battery_temperature`bssid`cpu_avg_1min`cpu_avg_5min`cpu_avg_15min`mem_free`mem_used`rssi`ssid,
 [DATETIME,SYMBOL,INT,SYMBOL,DOUBLE,SYMBOL,DOUBLE,DOUBLE,DOUBLE,LONG,LONG,SHORT,SYMBOL])
 db.createPartitionedTable(schema,`readings,`time`device_id)

现在想用MultithreadedTableWriter把数据写入分区表,请问代码怎么写?

请先 登录 后评论

1 个回答

wale

可以利用开源软件 rapidCsv 读入csv文件,然后用MultithreadedTableWriter接口写入数据,示例代码如下:

#include "MultithreadedTableWriter.h"
#include "DolphinDB.h"
#include "Util.h"
#include "rapidcsv.h"
#include <string>
#include <vector>
#include <thread>

using namespace std;
using namespace dolphindb;

Constant *createDateTime(string str) {
	int year, month, day, hour, minute, second;
	year = atoi(str.substr(0, 4).c_str());
	month = atoi(str.substr(5, 2).c_str());
	day = atoi(str.substr(8, 2).c_str());
	hour = atoi(str.substr(11, 2).c_str());
	minute = atoi(str.substr(14, 2).c_str());
	second = atoi(str.substr(17, 2).c_str());
	return Util::createDateTime(year, month, day, hour, minute, second);
}
int main(int argc, char *argv[]) {
	DBConnection::initialize();
	DBConnection conn;
	string host = "127.0.0.1";
	int port = 8848;
	string userId = "admin";
	string password = "123456";

	string path = "d:/data/devices_big/devices_big_readings.csv";

	try {
		vector<COMPRESS_METHOD> compress;
		compress.push_back(COMPRESS_DELTA);
		compress.push_back(COMPRESS_LZ4);
		compress.push_back(COMPRESS_DELTA);
		compress.push_back(COMPRESS_LZ4);
		compress.push_back(COMPRESS_LZ4);
		compress.push_back(COMPRESS_LZ4);
		compress.push_back(COMPRESS_LZ4);
		compress.push_back(COMPRESS_LZ4);
		compress.push_back(COMPRESS_LZ4);
		compress.push_back(COMPRESS_DELTA);
		compress.push_back(COMPRESS_DELTA);
		compress.push_back(COMPRESS_LZ4);
		compress.push_back(COMPRESS_LZ4);

		MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "dfs://iot", "readings",
			false, false, NULL, 10000, 1, 10, "device_id", &compress);

		rapidcsv::Document doc(path, rapidcsv::LabelParams(-1, -1));
		std::vector<string> time = doc.GetColumn<string>(0);
		std::vector<string> device_id = doc.GetColumn<string>(1);
		std::vector<int> battery_level = doc.GetColumn<int>(2);
		std::vector<string> battery_status = doc.GetColumn<string>(3);
		std::vector<double> battery_temperature = doc.GetColumn<double>(4);
		std::vector<string> bssid = doc.GetColumn<string>(5);
		std::vector<double> cpu_avg_1min = doc.GetColumn<double>(6);
		std::vector<double> cpu_avg_5min = doc.GetColumn<double>(7);
		std::vector<double> cpu_avg_15min = doc.GetColumn<double>(8);
		std::vector<long long> mem_free = doc.GetColumn<long long>(9);
		std::vector<long long> mem_used = doc.GetColumn<long long>(10);
		std::vector<int> rssi = doc.GetColumn<int>(11);
		std::vector<string> ssid = doc.GetColumn<string>(12);

		int rowNum = time.size();

		ErrorCodeInfo errorInfo;

		long long startTime = Util::getEpochTime();
		
		for (int i = 0; i < rowNum; i++) {
			if (writer.insert(errorInfo, 
				createDateTime(time[i]),
				device_id[i],
				battery_level[i],
				battery_status[i],
				battery_temperature[i],
				bssid[i],
				cpu_avg_1min[i],
				cpu_avg_5min[i],
				cpu_avg_15min[i],
				mem_free[i],
				mem_used[i],
				(short)rssi[i],
				ssid[i]
				) == false) {
				//此处不会执行到
				cout << "insert failed: " << errorInfo.errorInfo << endl;
				break;
			}
				
		}
		//检查目前MTW的状态
		MultithreadedTableWriter::Status status;
		writer.getStatus(status);
		if (status.hasError()) {
			cout << "error in writing: " << status.errorInfo << endl;
		}
		writer.waitForThreadCompletion();
		//再次检查完成后的MTW状态
		writer.getStatus(status);
		if (status.hasError()) {
			cout << "error after write complete: " << status.errorInfo << endl;
			//获取未写入的数据
			std::vector<std::vector<ConstantSP>*> unwrittenData;
			writer.getUnwrittenData(unwrittenData);
			cout << "unwriterdata length " << unwrittenData.size() << endl;
		}
			   		 
		cout << "Insert Time " << Util::getEpochTime() - startTime << " ms" << endl;
		//检查最后写入结果
		cout << conn.run("select count(*) from pt")->getString() << endl;


	}
	catch (std::exception &e) {
		cerr << "Failed to insert table, with exception: " << e.what() << endl;
	}



}


请先 登录 后评论