请问python api 导入数据时如何处理各种时间类型

类似如下需求在Python API中应该如何处理

9:15 保存成MINUTE

13:30:10.008 保存成time

2012.06.13 13:30:10 保存成DATETIME

2012.06.13 13:30:10.008 保存TIMESTAMP

请先 登录 后评论

1 个回答

YcHan韩迎春

回答:

python中有各种时间类型表达方式,而导入时需要转换成DolphinDB的标准格式。如图1所示:

图1

图2中的数据用Python读取时,以pandas的read_csv函数为例,前4列识别为字符串类型,后两列识别为int类型,把结果直接写入DolphinDB的表时报如下错误:

column 4, expect category TEMPORAL, got category INTEGRAL或者column 1, expect category TEMPORAL, got category LITERAL。

图2

解决这种错误的方法很多,本文给出一种推荐的处理方式。其原则是:先把python端的所有时间日期类型都转为datetime64类型,然后根据场景调用Python API的如下两种对象进行数据写入:

使用这两种方式,不论是分布式表,还是内存表都内置了时间类型转换的功能,可以把datetime64类型自动转成DolphinDB数据表定义的时间日期格式。

1、批量写入

要做批量csv导入的情况下,可以适用如下方案:

如下的CSV文件构建了日期时间类型:

文件内容和图2一样。

我们以这个CSV文件为例,介绍一下Python api 如何向DolphinDB写入合适的格式。

这个CSV文件共6列,从左到右DolphinDB合适的数据类型分别为:MINUTE,TIME,DATETIME,TIMESTAMP,DATE,DATETIME。

Python api读取原始数据后,包含时间日期的列全部都转成datetime64类型,读取及转化的代码如下:

import pandas as pd
import numpy as np
import dolphindb as ddb

# 读取CSV文件,并把所有的日期时间列都转成datetime64类型
df = pd.read_csv("D:/pythontm/datetime.csv")

df['minuteCol'] = pd.to_datetime(df['minuteCol'])
df['timeCol'] = pd.to_datetime(df['timeCol'])
df['datetimeCol'] = pd.to_datetime(df['datetimeCol'])
df['timestampCol'] = pd.to_datetime(df['timestampCol'])
df['allDigitalDate'] = df['allDigitalDate'].astype('str')
df['allDigitalDate'] = pd.to_datetime(df['allDigitalDate'])
df['allDigitaldateTime'] = df['allDigitaldateTime'].astype('str')
df['allDigitaldateTime'] = pd.to_datetime(df['allDigitaldateTime'])

1、先演示往分布式表中写数据。建立一个分布式数据库,创建数据表,表的数据类型分别是MINUTE,TIME,DATETIME,TIMESTAMP,DATE,DATETIME,代码如下:

# 连接DolphinDB服务器
s = ddb.session()
s.connect("192.168.100.3",20030,"admin","123456")

# 创建分布式表
script_dfs = """
    if(existsDatabase("dfs://dtDfs"))
    {
        dropDatabase("dfs://dtDfs")
    }
    db = database("dfs://dtDfs",VALUE,2021.02.21..2021.02.22)
    sch = table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME)
    db.createPartitionedTable(sch,`dt,`datetimeCol)    
"""
s.run(script_dfs)

创建一个tableAppender对象,并把df的数据写入分布式数据库,代码如下:

appender_dfs = ddb.tableAppender("dfs://dtDfs","dt", s)
appender_dfs.append(df)

使用vscode插件或gui查询这个数据库中的分布式表:

login(`admin,`123456)
select * from loadTable("dfs://dtDfs",`dt)

结果如图3所示:

图3

从图3可以看到,数据类型正确转换了。

2、演示往内存表中写数据,建立一个内存表,代码如下:

# 创建内存表
script = """
    share table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME) as dtTable
"""
s.run(script)

创建一个tableAppender对象,并把df的数据写入内存表,代码如下:

# 对内存表创建tableAppender对象,并上传导入数据
appender = ddb.tableAppender(tableName="dtTable", ddbSession=s)
appender.append(df)

使用vscode插件或gui查询这个内存表,代码如下:

select * from dtTable

结果如图4所示:

图4

从图4可以看出,内存表的数据类型正确转换了。

完整的代码如下:

import pandas as pd
import numpy as np
import dolphindb as ddb

# 读取CSV文件,并把所有的日期时间列都转成datetime64类型
df = pd.read_csv("D:/pythontm/datetime.csv")

print(type(df["allDigitaldateTime"][0]))
df['minuteCol'] = pd.to_datetime(df['minuteCol'])
df['timeCol'] = pd.to_datetime(df['timeCol'])
df['datetimeCol'] = pd.to_datetime(df['datetimeCol'])
df['timestampCol'] = pd.to_datetime(df['timestampCol'])
df['allDigitalDate'] = df['allDigitalDate'].astype('str')
df['allDigitalDate'] = pd.to_datetime(df['allDigitalDate'])
df['allDigitaldateTime'] = df['allDigitaldateTime'].astype('str')
df['allDigitaldateTime'] = pd.to_datetime(df['allDigitaldateTime'])

# 连接DolphinDB服务器
s = ddb.session()
s.connect("192.168.100.3",20030,"admin","123456")

# 创建分布式表
script_dfs = """
    if(existsDatabase("dfs://dtDfs"))
    {
        dropDatabase("dfs://dtDfs")
    }
    db = database("dfs://dtDfs",VALUE,2021.02.21..2021.02.22)
    sch = table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME)
    db.createPartitionedTable(sch,`dt,`datetimeCol)    
"""
s.run(script_dfs)

# 对分布式表创建tableAppender对象,并上传导入数据
appender_dfs = ddb.tableAppender("dfs://dtDfs","dt", s)
appender_dfs.append(df)

# 创建内存表
script = """
    share table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME) as dtTable
"""
s.run(script)

# 对内存表创建tableAppender对象,并上传导入数据
appender = ddb.tableAppender(tableName="dtTable", ddbSession=s)
appender.append(df)

2、流式写入

把数据导入分布式表时,写硬盘是一个比较耗时的操作,写一次硬盘,导入1条数据和1000条数据耗时几乎是一样的。当数据以流式的方式逐条到来时,如果每到一条数据都调用一次tableAppender,那么会不断触发硬盘的写操作。在流量很大时,频繁写硬盘就会是瓶颈,导致数据堆积。

因此,流式的单条导入推荐使用MultithreadedTableWriter对象,它自动做异步批量写入,并且可以把datetime64时间日期格式自动转换成DolphinDB的分布式表定义的类型。

先建立的分布式表,数据类型从左到右分别为:MINUTE,TIME,DATETIME,TIMESTAMP,DATE,DATETIME。代码如下:

import numpy as np
import dolphindb as ddb
import time
import datetime

# 连接DolphinDB服务器
s = ddb.session()
s.connect("192.168.100.3",20030,"admin","123456")

# 创建接收mtw写入的分布式表
script_dfs = """
    if(existsDatabase("dfs://dtDfs_mtw"))
    {
        dropDatabase("dfs://dtDfs_mtw")
    }
    db = database("dfs://dtDfs_mtw",VALUE,2021.02.21..2021.02.22)
    sch = table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME)
    db.createPartitionedTable(sch,`dt,`datetimeCol)    
"""
s.run(script_dfs)

创建一个MultithreadedTableWriter对象,代码如下:

# 创建MTW对象,写入数据
writer1 = ddb.MultithreadedTableWriter("192.168.100.3",20030,"admin","123456","dfs://dtDfs_mtw","dt",False,False,[],10000,1,5,"datetimeCol")

用一个for循环读取1000次当前时间,并把类型转为datetime64,和分布式表列数保持一致,进行数据写入。代码如下:

try:
    for i in range(1000):
        writer1.insert(np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),\
            np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()))
        time.sleep(0.01)
except Exception as ex:
    # MTW 抛出异常
    print("MTW exit with exception %s" % ex)

提交后等待写入完成,代码如下:

# 等待 MTW 插入完成
writer1.waitForThreadCompletion()
writeStatus=writer1.getStatus()
if writeStatus.hasError():
    print("Error in writing:")
print(writeStatus)

完成后,打印出写入状态,如图5所示,其中的errorCode为None表示写入成功。

图5

使用vscode插件或gui查询这个分布式表,代码如下:

login(`admin,`123456)
select * from loadTable("dfs://dtDfs_mtw",`dt)

得到结果如图6所示

图6

从图6可以看出,所有的类型都正确转化为了分布式表指定的类型。

完整的导入代码如下:

import numpy as np
import dolphindb as ddb
import time
import datetime

# 连接DolphinDB服务器
s = ddb.session()
s.connect("192.168.100.3",20030,"admin","123456")

# 创建接收mtw写入的分布式表
script_dfs = """
    if(existsDatabase("dfs://dtDfs_mtw"))
    {
        dropDatabase("dfs://dtDfs_mtw")
    }
    db = database("dfs://dtDfs_mtw",VALUE,2021.02.21..2021.02.22)
    sch = table(1:0,`minuteCol`timeCol`datetimeCol`timestampCol`digitalDate`DigitalDatetime,`MINUTE`TIME`DATETIME`TIMESTAMP`DATE`DATETIME)
    db.createPartitionedTable(sch,`dt,`datetimeCol)    
"""
s.run(script_dfs)

# 创建MTW对象,写入数据
writer1 = ddb.MultithreadedTableWriter("192.168.100.3",20030,"admin","123456","dfs://dtDfs_mtw","dt",False,False,[],10000,1,5,"datetimeCol")
try:
    for i in range(1000):
        writer1.insert(np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),\
            np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()),np.datetime64(datetime.datetime.now()))
        time.sleep(0.01)
except Exception as ex:
    # MTW 抛出异常
    print("MTW exit with exception %s" % ex)

# 等待 MTW 插入完成
writer1.waitForThreadCompletion()
writeStatus=writer1.getStatus()
if writeStatus.hasError():
    print("Error in writing:")
print(writeStatus)

总结

使用Python导入包含时间日期类型的数据时,首先在Python api端把时间日期类型的数据转成datetime64类型,然后:

日期时间即可自动转为DolphinDB建表时指定的类型。






请先 登录 后评论