是否在多个线程中共用了同一个连接?
使用data2MemoryTable 将df插入流数据表,没运行两分钟就显示连接断了, 不知道怎么搞的?
代码有些复杂, 大致是:在一个engine中开启了行情订阅、风险管理两个子线程,其中行情订阅是使用的websockets+asyncio异步订阅数据,流数据插入流数据表和latestIndexedTable两种表,风险管理子线程轮询查询内存中的latestIndexTable, 就是查询latestIndexTable这个内存表的时候报错。
class Dolphin: def init_(self,host=DOLPHINDB HOST,port=DOLPHINDB PORT,userid=DOLPHINDB USER,password=DOLPHINDB PASSWORD) -> None self.host = host self.port = port self.userid = userid self.password = password self.session = ddb.session(userid=userid, password-password, enableASYNC=False) self.session.connect(host=host, port=port, keepAliveTime=TIMEOUT, reconnect=True) self.asyncSession = ddb.session(userid=userid, password=password, enableASYNC=True) self.asyncSession.connect(host=host, port=portkeepAliveTime=TIMEOUT, reconnect=True)
def runScript(self,*script,enableAsync=False,clearMemory=False): if enableAsync: self.asyncSession.run(*script,clearMemory=clearMemory) else: result = self.session.run(*script,clearMemory=clearMemory) return result def data2MemoryTable(self,tableName,*data,enableAsync=False): """将DataFrame 插入内存表中""" script = "tableInsert{%s}" % tableName if len(data)>0: self.runScript(script,*data, enableAsync=enableAsync)
日志中打印如下:
2023-08-07 20:18:15.258423 <INFO> :Created a new socket connection. Number of connections: 17 2023-08-07 2018:15.258423 <INFO> :New connection from ip = 127.0.0.1 port = 54558 2023-08-07 20:18:15.259422 <ERROR> :Received ED CLOSE 2023-08-07 20:18:15.259422 <INFO> :Close a connection with index=17. Number of remaining connections: 16
同样 ,
In [142]: s.undefAll()
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-142-07129f4f0db4> in <module>
----> 1 s.undefAll()
~/anaconda3/lib/python3.9/site-packages/dolphindb/session.py in undefAll(self)
988 def undefAll(self) -> None:
989 """Release all objects in the Session."""
--> 990 self.run("undef all")
991
992 def clearAllCache(self, dfs: bool = False) -> None:
~/anaconda3/lib/python3.9/site-packages/dolphindb/session.py in run(self, script, *args, **kwargs)
475 if "fetchSize" in kwargs.keys():
476 return BlockReader(self.cpp.runBlock(script, **kwargs))
--> 477 return self.cpp.run(script, *args, **kwargs)
478
479 def runFile(self, filepath: str, *args, **kwargs):
RuntimeError: <Exception> in run: Couldn't send script/function to the remote host because the connection has been closed
In [143]: s.isClosed()
Out[143]: False