"""
DolphinDB脚本,用于在DolphinDB中创建数据库和数据表。
"""
import json
import time
import pandas as pd
import dolphindb as ddb
from datetime import datetime
DB_PATH = "dfs://" + "history_data_test"
user: str = ""
password: str = ""
host: str = ""
port: int =
db_path: str = DB_PATH
# 创建数据库
CREATE_DATABASE_SCRIPT = f"""
dataPath = "{DB_PATH}"
db = database(dataPath, VALUE, 2000.01M..2030.12M, engine=`TSDB)
"""
# 创建bar表
CREATE_BAR_TABLE_SCRIPT = f"""
dataPath = "{DB_PATH}"
db = database(dataPath)
bar_columns = ["vt_symbol",
"interval",
"datetime",
"trading_day",
"real_symbol",
"open_price",
"high_price",
"low_price",
"close_price",
"volume",
"turnover",
"open_interest",
"limit_up",
"limit_down",
"settlement_price",
"pre_settlement_price",
"adjust_factor",
"extra"]
bar_type = [SYMBOL,
STRING,
NANOTIMESTAMP,
STRING,
SYMBOL,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
DOUBLE,
STRING
]
bar = table(1:0, bar_columns, bar_type)
db.createPartitionedTable(
bar,
"bar",
partitionColumns=["datetime"],
sortColumns=["vt_symbol","real_symbol", "interval","trading_day", "datetime"],
keepDuplicates=LAST)
"""
# 连接数据库
session: ddb.session = ddb.session()
session.connect(host, port, user, password)
# 创建连接池(用于数据写入)
pool: ddb.DBConnectionPool = ddb.DBConnectionPool(host, port, 1, user, password)
def create_database():
if not session.existsDatabase(db_path):
session.run(CREATE_DATABASE_SCRIPT)
session.run(CREATE_BAR_TABLE_SCRIPT)
def drop_database():
session.run("dropDatabase('" + DB_PATH + "')")
def save_bar_data():
"""保存k线数据"""
# 读取主键参数
# 转换为DatFrame写入数据库
data: list[dict] = [
{
"vt_symbol": "rb2310.SHFE",
"interval": "1m",
"datetime": datetime(2023, 5, 22, 9, 30, 0, 0),
"trading_day": "20230522",
"real_symbol": "rb2310",
"open_price": 2.0,
"high_price": 2.0,
"low_price": 2.0,
"close_price": 1.0,
"volume": 1.0,
"turnover": 1.0,
"open_interest": 1.0,
"limit_up": 1.0,
"limit_down": 1.0,
"settlement_price": 1.0,
"pre_settlement_price": 1.0,
"adjust_factor": 1.0,
"extra": json.dumps({"a": 2.0, "b": 2.0}),
},
{
"vt_symbol": "rb2310.SHFE",
"interval": "1m",
"datetime": datetime(2023, 5, 22, 9, 30, 1, 0),
"trading_day": "20230522",
"real_symbol": "rb2310",
"open_price": 1.0,
"high_price": 1.0,
"low_price": 1.0,
"close_price": 1.0,
"volume": 1.0,
"turnover": 1.0,
"open_interest": 1.0,
"limit_up": 1.0,
"limit_down": 1.0,
"settlement_price": 1.0,
"pre_settlement_price": 1.0,
"adjust_factor": 1.0,
"extra": json.dumps({"a": 2.0, "b": 2.0}),
},
]
df: pd.DataFrame = pd.DataFrame.from_records(data)
print(df)
appender: ddb.PartitionedTableAppender = ddb.PartitionedTableAppender(
db_path, "bar", "datetime", pool
)
appender.append(df)
if __name__ == "__main__":
# create_database()
save_bar_data()
time.sleep(1)
if not session.isClosed():
session.close()