Python实现流数据订阅乱序严重的问题

环境信息

dolphindb版本信息:

DolphinDB Systems 1.20.24 64 bit Copyright (c) 2011~2022 DolphinDB, Inc. Licensed to Trial Users. Expires on 2042.01.01 (Build:2022.02.16)

系统:

Linux VM-20-16-centos 3.10.0-1160.45.1.el7.x86_64 #1 SMP Wed Oct 13 17:20:51 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

参照一下gitee,使用python实现流数据订阅,发现出来的数据是混乱的,不知是哪里没有用对?

https://gitee.com/dolphindb/api_python3#10-python-streaming-api


问题效果图:

attachments-2022-03-SzkbMPsH6237408f71a28.png


复现方法:

1.运行单节点dolphindb

./dolphindb

2.在dolphindb notebook中执行以下4行代码

share streamTable(100:0, `Symbol`Datetime`Price`Volume,[SYMBOL,DATETIME,DOUBLE,INT]) as Trade;

share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME, SYMBOL, DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC;

tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=300, step=60, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol);

subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true);


3.在dolphindb的所在服务器,使用python插入数据

import dolphindb as ddb
import pandas as pd
import numpy as np
csv_file = "trades.csv"
csv_data = pd.read_csv(csv_file,parse_dates=['Datetime'], dtype={'Symbol':str})
csv_df = pd.DataFrame(csv_data)
s = ddb.session()
s.connect("localhost", 8888)
print(csv_df)
#上传 DataFrame 到 DolphinDB,并对 Datetime 字段做类型转换
print(s.getSessionId())
s.upload({"tmpData":csv_df})
s.run("data = select Symbol, datetime(Datetime) as Datetime, Price, Volume from tmpData;tableInsert(Trade,data)")
print("sucess...")


4.在notebook中,验证数据是否正常

select * from Trade;

select * from OHLC;

两个表里面均产生了数据;

5.在python代码中订阅

from threading import Event
import dolphindb as ddb
import pandas as pd
import numpy as np
s=ddb.session()
#设定本地端口 9999 用于订阅流数据
s.enableStreaming(9999, 4)
count=0
def handler(lst):
global count#添加全局生声明
count+=1
print("handler.......", count)
print(lst)
# 订阅 DolphinDB(本机 8888 端口) 上的 OHLC 流数据表
s.subscribe("127.0.0.1", 8888, handler, "OHLC", offset=0, actionName= "act_tsaggr")
Event().wait()
print("sucess...")

于是,就出现了乱序。请问这是怎么回事了?谢谢。


自问自答:

因为python api回调是多线程,所以必须加锁执行。

from threading import Event
import dolphindb as ddb
import pandas as pd
import numpy as np
import threading
s=ddb.session()
#设定本地端口 9999 用于订阅流数据
s.enableStreaming(9999, 4)
count=0
lock=threading.Lock() #申请一把锁
#handler是多线程,必须加锁,否则会乱序
def handler(lst):
global count#添加全局生声明
lock.acquire() #加锁
count+=1

t = threading.currentThread()
print("handler.......", count, " thread id=", t.ident)
print(lst)
lock.release() #释放锁
# 订阅 DolphinDB(本机 8888 端口) 上的 OHLC 流数据表
s.subscribe("127.0.0.1", 8888, handler, "OHLC", offset=0, actionName= "act_tsaggr")
Event().wait()
print("sucess...")


这样就舒服了!坑!


请先 登录 后评论

1 个回答

Boye

为您的钻研精神点赞!

请先 登录 后评论