环境信息
dolphindb版本信息:
DolphinDB Systems 1.20.24 64 bit Copyright (c) 2011~2022 DolphinDB, Inc. Licensed to Trial Users. Expires on 2042.01.01 (Build:2022.02.16)
系统:
Linux VM-20-16-centos 3.10.0-1160.45.1.el7.x86_64 #1 SMP Wed Oct 13 17:20:51 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
参照一下gitee,使用python实现流数据订阅,发现出来的数据是混乱的,不知是哪里没有用对?
https://gitee.com/dolphindb/api_python3#10-python-streaming-api
问题效果图:
复现方法:
1.运行单节点dolphindb
./dolphindb
2.在dolphindb notebook中执行以下4行代码
share streamTable(100:0, `Symbol`Datetime`Price`Volume,[SYMBOL,DATETIME,DOUBLE,INT]) as Trade;
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME, SYMBOL, DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC;
tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=300, step=60, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol);
subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true);
3.在dolphindb的所在服务器,使用python插入数据
import dolphindb as ddb
import pandas as pd
import numpy as np
csv_file = "trades.csv"
csv_data = pd.read_csv(csv_file,parse_dates=['Datetime'], dtype={'Symbol':str})
csv_df = pd.DataFrame(csv_data)
s = ddb.session()
s.connect("localhost", 8888)
print(csv_df)
#上传 DataFrame 到 DolphinDB,并对 Datetime 字段做类型转换
print(s.getSessionId())
s.upload({"tmpData":csv_df})
s.run("data = select Symbol, datetime(Datetime) as Datetime, Price, Volume from tmpData;tableInsert(Trade,data)")
print("sucess...")
4.在notebook中,验证数据是否正常
select * from Trade;
select * from OHLC;
两个表里面均产生了数据;
5.在python代码中订阅
from threading import Event
import dolphindb as ddb
import pandas as pd
import numpy as np
s=ddb.session()
#设定本地端口 9999 用于订阅流数据
s.enableStreaming(9999, 4)
count=0
def handler(lst):
global count#添加全局生声明
count+=1
print("handler.......", count)
print(lst)
# 订阅 DolphinDB(本机 8888 端口) 上的 OHLC 流数据表
s.subscribe("127.0.0.1", 8888, handler, "OHLC", offset=0, actionName= "act_tsaggr")
Event().wait()
print("sucess...")
于是,就出现了乱序。请问这是怎么回事了?谢谢。
自问自答:
因为python api回调是多线程,所以必须加锁执行。
from threading import Event
import dolphindb as ddb
import pandas as pd
import numpy as np
import threading
s=ddb.session()
#设定本地端口 9999 用于订阅流数据
s.enableStreaming(9999, 4)
count=0
lock=threading.Lock() #申请一把锁
#handler是多线程,必须加锁,否则会乱序
def handler(lst):
global count#添加全局生声明
lock.acquire() #加锁
count+=1
t = threading.currentThread()
print("handler.......", count, " thread id=", t.ident)
print(lst)
lock.release() #释放锁
# 订阅 DolphinDB(本机 8888 端口) 上的 OHLC 流数据表
s.subscribe("127.0.0.1", 8888, handler, "OHLC", offset=0, actionName= "act_tsaggr")
Event().wait()
print("sucess...")
这样就舒服了!坑!