你给的这个 sample 只有两条数据,右表直接触发左表 join, 左表匹配不到右表时间在它前面的数据,所以是没有结果的。但是引擎定义这块是正常的,我先写一条右表 280 再写一条左表 290,然后再写一条右表 291 可以正常触发 280 和 290 的 join 输出左右表的计算结果。asof join 是左表每一条去匹配他时间点之前最近的右表的数据,这个匹配的触发是由当前左表数据到来后的下一条右表数据触发的。触发计算右表数据是不会参与join 的。
想问一下,createAsofJoinEngine 函数中的 metrics 参数,在调用右表的列的时候,获取不到数据,但是左表的数据是有的
我的metrics是这么写的 '<[LogDateTime, TRANSACTION_SR401.LogDateTime, InstrumentID, Cont, LastPrice, MatchPrice]>', 其中 Cont、MatchPrice 均为右表列名,且大小写与schema一致。
代码如下:前一段是数据库脚本,后一段是python写的测试数据
share streamTable(1:0, `InstrumentID`TradeDateTime`LimitUpPrice`LimitDownPrice`PreSettlePrice`SettlePrice`LastPrice`Volume`OpenInterest`PreOpenInterest`OpenPrice`HighPrice`LowPrice`Amount`BidPrice`BidVol`AskPrice`AskVol`LogDateTime, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,INT,TIMESTAMP]) as EXCHANGE_SR401; share streamTable(1:0, `UserNo`Sign`Cont`Direct`Offset`Hedge`OrderNo`MatchPrice`MatchQty`FeeCurrency`MatchFee`CoverProfit`MatchDateTime`AddOne`Deleted`MatchNo`UnderlyingID`LogDateTime, [INT,SYMBOL,SYMBOL,STRING,STRING,STRING,STRING,DOUBLE,INT,STRING,DOUBLE,DOUBLE,TIMESTAMP,STRING,STRING,STRING,SYMBOL,TIMESTAMP]) as TRANSACTION_SR401; share streamTable(1:0, `Timestamp_result`Join_key`timestamp_future`timestamp_option`InstrumentID_future`InstrumentID_option`price_future`price_option, [TIMESTAMP,SYMBOL,TIMESTAMP,TIMESTAMP,SYMBOL,SYMBOL,DOUBLE,DOUBLE]) as PREV_MATCH_SR401; createAsofJoinEngine( name=`aj1, leftTable=EXCHANGE_SR401, rightTable=TRANSACTION_SR401, outputTable=PREV_MATCH_SR401, metrics=<[LogDateTime,TRANSACTION_SR401.LogDateTime,InstrumentID,Cont,LastPrice,MatchPrice]>, matchingColumn=[[`InstrumentID],[`UnderlyingID]], timeColumn=`LogDateTime, useSystemTime=false ); subscribeTable( tableName="EXCHANGE_SR401", actionName="joinLeft", offset=0, handler=appendForJoin{`aj1,true}, msgAsTable=true, reconnect=true ); subscribeTable( tableName="TRANSACTION_SR401", actionName="joinRight", offset=0, handler=appendForJoin{`aj1,false}, msgAsTable=true, reconnect=true );
_exchange_record = { 'InstrumentID': 'ZCE|F|SR|401', 'TradeDateTime': pd.Timestamp(2023,11,28,21,0,0,250000), 'LimitUpPrice': 1, 'LimitDownPrice': 1, 'PreSettlePrice': 1, 'SettlePrice': 1, 'LastPrice': 1, 'Volume': 1, 'OpenInterest': 1, 'PreOpenInterest': 1, 'OpenPrice': 1, 'HighPrice': 1, 'LowPrice': 1, 'Amount': 1, 'BidPrice': 1, 'BidVol': 1, 'AskPrice': 1, 'AskVol': 1, 'LogDateTime': pd.Timestamp(2023,11,28,21,0,0,290000) } _transaction_record = { "UserNo":'000001', "Sign": '#/ntjiw/.d', "Cont": 'ZCE|O|SR|401C6700', "Direct": 'S', "Offset": 'O', "Hedge": 'T', "OrderNo": '202311282100250000', "MatchPrice": 2, "MatchQty": 2, "FeeCurrency": 'CNY' , "MatchFee": 3.1, "CoverProfit": 62, "MatchDateTime": pd.Timestamp(2023,11,28,21,0,0), "AddOne": '' , "Deleted": '' , "MatchNo": '2023112821005700', "UnderlyingID": 'ZCE|F|SR|401', 'LogDateTime': pd.Timestamp(2023,11,28,21,0,0,291000) }