回答:
python中有各种时间类型表达方式,而导入时需要转换成DolphinDB的标准格式。如图1所示:
图2中的数据用Python读取时,以pandas的read_csv函数为例,前4列识别为字符串类型,后两列识别为int类型,把结果直接写入DolphinDB的表时报如下错误:
column 4, expect category TEMPORAL, got category INTEGRAL或者column 1, expect category TEMPORAL, got category LITERAL。
解决这种错误的方法很多,本文给出一种推荐的处理方式。其原则是:先把python端的所有时间日期类型都转为datetime64类型,然后根据场景调用Python API的如下两种对象进行数据写入:
批量数据写入调用tableAppender对象
逐条流式写入数据调用MultithreadedTableWriter对象
使用这两种方式,不论是分布式表,还是内存表都内置了时间类型转换的功能,可以把datetime64类型自动转成DolphinDB数据表定义的时间日期格式。
1、批量写入
要做批量csv导入的情况下,可以适用如下方案:
如下的CSV文件构建了日期时间类型:
文件内容和图2一样。
我们以这个CSV文件为例,介绍一下Python api 如何向DolphinDB写入合适的格式。
这个CSV文件共6列,从左到右DolphinDB合适的数据类型分别为:MINUTE,TIME,DATETIME,TIMESTAMP,DATE,DATETIME。
Python api读取原始数据后,包含时间日期的列全部都转成datetime64类型,读取及转化的代码如下:
import pandas as pd import numpy as np import dolphindb as ddb # 读取CSV文件,并把所有的日期时间列都转成datetime64类型 df = pd.read_csv("D:/pythontm/datetime.csv") df['minuteCol'] = pd.to_datetime(df['minuteCol']) df['timeCol'] = pd.to_datetime(df['timeCol']) df['datetimeCol'] = pd.to_datetime(df['datetimeCol']) df['timestampCol'] = pd.to_datetime(df['timestampCol']) df['allDigitalDate'] = df['allDigitalDate'].astype('str') df['allDigitalDate'] = pd.to_datetime(df['allDigitalDate']) df['allDigitaldateTime'] = df['allDigitaldateTime'].astype('str') df['allDigitaldateTime'] = pd.to_datetime(df['allDigitaldateTime'])
1、先演示往分布式表中写数据。建立一个分布式数据库,创建数据表,表的数据类型分别是MINUTE,TIME,DATETIME,TIMESTAMP,DATE,DATETIME,代码如下:
# 连接DolphinDB服务器 s = ddb.session() s.connect("192.168.100.3",20030,"admin","123456") # 创建分布式表 script_dfs = """ if(existsDatabase("dfs://dtDfs")) { dropDatabase("dfs://dtDfs") } db = database("dfs://dtDfs",VALUE,2021.02.21..2021.02.22) sch = table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME) db.createPartitionedTable(sch,`dt,`datetimeCol) """ s.run(script_dfs)
创建一个tableAppender对象,并把df的数据写入分布式数据库,代码如下:
appender_dfs = ddb.tableAppender("dfs://dtDfs","dt", s) appender_dfs.append(df)
使用vscode插件或gui查询这个数据库中的分布式表:
login(`admin,`123456) select * from loadTable("dfs://dtDfs",`dt)
结果如图3所示:
从图3可以看到,数据类型正确转换了。
2、演示往内存表中写数据,建立一个内存表,代码如下:
# 创建内存表 script = """ share table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME) as dtTable """ s.run(script)
创建一个tableAppender对象,并把df的数据写入内存表,代码如下:
# 对内存表创建tableAppender对象,并上传导入数据 appender = ddb.tableAppender(tableName="dtTable", ddbSession=s) appender.append(df)
使用vscode插件或gui查询这个内存表,代码如下:
select * from dtTable
结果如图4所示:
从图4可以看出,内存表的数据类型正确转换了。
完整的代码如下:
import pandas as pd import numpy as np import dolphindb as ddb # 读取CSV文件,并把所有的日期时间列都转成datetime64类型 df = pd.read_csv("D:/pythontm/datetime.csv") print(type(df["allDigitaldateTime"][0])) df['minuteCol'] = pd.to_datetime(df['minuteCol']) df['timeCol'] = pd.to_datetime(df['timeCol']) df['datetimeCol'] = pd.to_datetime(df['datetimeCol']) df['timestampCol'] = pd.to_datetime(df['timestampCol']) df['allDigitalDate'] = df['allDigitalDate'].astype('str') df['allDigitalDate'] = pd.to_datetime(df['allDigitalDate']) df['allDigitaldateTime'] = df['allDigitaldateTime'].astype('str') df['allDigitaldateTime'] = pd.to_datetime(df['allDigitaldateTime']) # 连接DolphinDB服务器 s = ddb.session() s.connect("192.168.100.3",20030,"admin","123456") # 创建分布式表 script_dfs = """ if(existsDatabase("dfs://dtDfs")) { dropDatabase("dfs://dtDfs") } db = database("dfs://dtDfs",VALUE,2021.02.21..2021.02.22) sch = table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME) db.createPartitionedTable(sch,`dt,`datetimeCol) """ s.run(script_dfs) # 对分布式表创建tableAppender对象,并上传导入数据 appender_dfs = ddb.tableAppender("dfs://dtDfs","dt", s) appender_dfs.append(df) # 创建内存表 script = """ share table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME) as dtTable """ s.run(script) # 对内存表创建tableAppender对象,并上传导入数据 appender = ddb.tableAppender(tableName="dtTable", ddbSession=s) appender.append(df)
2、流式写入
把数据导入分布式表时,写硬盘是一个比较耗时的操作,写一次硬盘,导入1条数据和1000条数据耗时几乎是一样的。当数据以流式的方式逐条到来时,如果每到一条数据都调用一次tableAppender,那么会不断触发硬盘的写操作。在流量很大时,频繁写硬盘就会是瓶颈,导致数据堆积。
因此,流式的单条导入推荐使用MultithreadedTableWriter对象,它自动做异步批量写入,并且可以把datetime64时间日期格式自动转换成DolphinDB的分布式表定义的类型。
先建立的分布式表,数据类型从左到右分别为:MINUTE,TIME,DATETIME,TIMESTAMP,DATE,DATETIME。代码如下:
import numpy as np import dolphindb as ddb import time import datetime # 连接DolphinDB服务器 s = ddb.session() s.connect("192.168.100.3",20030,"admin","123456") # 创建接收mtw写入的分布式表 script_dfs = """ if(existsDatabase("dfs://dtDfs_mtw")) { dropDatabase("dfs://dtDfs_mtw") } db = database("dfs://dtDfs_mtw",VALUE,2021.02.21..2021.02.22) sch = table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME) db.createPartitionedTable(sch,`dt,`datetimeCol) """ s.run(script_dfs)
创建一个MultithreadedTableWriter对象,代码如下:
# 创建MTW对象,写入数据 writer1 = ddb.MultithreadedTableWriter("192.168.100.3",20030,"admin","123456","dfs://dtDfs_mtw","dt",False,False,[],10000,1,5,"datetimeCol")
用一个for循环读取1000次当前时间,并把类型转为datetime64,和分布式表列数保持一致,进行数据写入。代码如下:
try: for i in range(1000): writer1.insert(np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),\ np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now())) time.sleep(0.01) except Exception as ex: # MTW 抛出异常 print("MTW exit with exception %s" % ex)
提交后等待写入完成,代码如下:
# 等待 MTW 插入完成 writer1.waitForThreadCompletion() writeStatus=writer1.getStatus() if writeStatus.hasError(): print("Error in writing:") print(writeStatus)
完成后,打印出写入状态,如图5所示,其中的errorCode为None表示写入成功。
使用vscode插件或gui查询这个分布式表,代码如下:
login(`admin,`123456) select * from loadTable("dfs://dtDfs_mtw",`dt)
得到结果如图6所示
从图6可以看出,所有的类型都正确转化为了分布式表指定的类型。
完整的导入代码如下:
import numpy as np import dolphindb as ddb import time import datetime # 连接DolphinDB服务器 s = ddb.session() s.connect("192.168.100.3",20030,"admin","123456") # 创建接收mtw写入的分布式表 script_dfs = """ if(existsDatabase("dfs://dtDfs_mtw")) { dropDatabase("dfs://dtDfs_mtw") } db = database("dfs://dtDfs_mtw",VALUE,2021.02.21..2021.02.22) sch = table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME) db.createPartitionedTable(sch,`dt,`datetimeCol) """ s.run(script_dfs) # 创建MTW对象,写入数据 writer1 = ddb.MultithreadedTableWriter("192.168.100.3",20030,"admin","123456","dfs://dtDfs_mtw","dt",False,False,[],10000,1,5,"datetimeCol") try: for i in range(1000): writer1.insert(np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),\ np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now())) time.sleep(0.01) except Exception as ex: # MTW 抛出异常 print("MTW exit with exception %s" % ex) # 等待 MTW 插入完成 writer1.waitForThreadCompletion() writeStatus=writer1.getStatus() if writeStatus.hasError(): print("Error in writing:") print(writeStatus)
总结
使用Python导入包含时间日期类型的数据时,首先在Python api端把时间日期类型的数据转成datetime64类型,然后:
批量数据写入调用tableAppender对象
逐条流式写入数据调用MultithreadedTableWriter对象
日期时间即可自动转为DolphinDB建表时指定的类型。