您好,建议用getStreamingStat().subWorkers 查看一下是否有报错信息。
使用示例如下,在返归的结果总找到对应的topic(topic由订阅表所在节点的别名、流数据表名称tableName和订阅任务名称actionName组成),相应的lastErrMsg是具体的报错。
请帮忙看一下,哪里出了问题? 谢谢. 代码如下:
///////////////// 一些变量 nMin60=60*60000 ////////////////////////////////// 分割线 --- 以 createTimeSeriesEngine 方式创建引擎并订阅 ///////////////////////// ////////// bar 结构 barColNames=`AdjustTime`InstrumentID`ExchangeID`ProductID`Open`High`Low`Close`Volume`TradingDay`TimeTemplateID`ActionTime barColTypes=[TIMESTAMP,SYMBOL,SYMBOL,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DATE,SYMBOL,TIMESTAMP] ////////////// 创建 键值内存表 用于接收 实际的 bars 数据 ( 覆盖,同一产品同一时间只有一行记录 ) share keyedTable(`ActionTime`InstrumentID`ExchangeID`ProductID, 1000:0, barColNames, barColTypes) as bars_min60 /////////////// 使用自定义函数, 通过 AdjustTime 来调整 ActionTime . ActionTime 是目的 表的 键值时间部分 def calcActionTimeFromAjustTime(stepLen, adjTime) { adjTm = adjTime/stepLen*stepLen+stepLen; ///// 这样就 成为了 AdjustTime 的周期结束时间 actTime = iif(time(adjTm)>=00:00:00 and time(adjTm)<= 02:00:00, temporalAdd(adjTm,-3,"H"), iif(time(adjTm)>02:00:00 and time(adjTm)<= 03:15:00, temporalAdd(adjTm,7,"H"), iif(time(adjTm)>03:15:00 and time(adjTm)<= 04:15:00, temporalAdd(adjTm,425,"m"), iif(time(adjTm)>04:15:00 and time(adjTm)<= 06:15:00, temporalAdd(adjTm,555,"m"), iif(time(adjTm)>06:15:00 and time(adjTm)<= 08:00:00, concatDateTime(date(adjTm),15:00:00), NULL))))); return actTime; } def setActionTime(mutable destTable, stepLen, msg) { tt = select TradingDay,InstrumentID,ExchangeID,ProductID,LastPrice,TickVolune,TickAmount,TickPosition,FlowVolume,FlowFund,TempTemplateID,AdjustTime from msg; tt.addColumn(`ActionTime,TIMESTAMP); update tt set ActionTime = calcActionTimeFromAjustTime(stepLen, AdjustTime); //调用自定义的函数来计算 destTable.append!(tt); tt.clear!(); } /////////////// 若 sub 时使用 自定义函数,那 metrics 该如何设定? ////////应是 先调用函数, 然后才是 使用 metrics 进行聚合 metrics_tse_H1 = <[ first(LastPrice), /* Open */ max(LastPrice), /* High */ min(LastPrice), /* Low */ last(LastPrice), /* Close */ sum(TickVolume), /* Volume */ last(TradingDay), /* TradingDay 交易日*/ first(TimeTemplateID), /* TimetemplateID */ first(ActionTime) /* ActionTime */ ]> ////////////// 60分钟 tse_min_H1 = createTimeSeriesEngine(name="act_tse_min60",windowSize=nMin60,step=nMin60,metrics=metrics_tse_H1,dummyTable=Ticks,outputTable=bars_min60,timeColumn="AdjustTime",useSystemTime=false,keyColumn=`InstrumentID`ExchangeID`ProductID,updateTime=300, useWindowStartTime=false, roundTime=false,fill='null') ///////////// handler 为自定义函数 subscribeTable(tableName="Ticks",actionName="act_tse_min60",offset=0,handler=setActionTime{tse_min_H1,nMin60},msgAsTable=true)