流表订阅使用自定义函数不出结果

请帮忙看一下,哪里出了问题? 谢谢. 代码如下:

/////////////////    一些变量
nMin60=60*60000

//////////////////////////////////  分割线  ---   以 createTimeSeriesEngine 方式创建引擎并订阅    /////////////////////////
//////////  bar 结构
barColNames=`AdjustTime`InstrumentID`ExchangeID`ProductID`Open`High`Low`Close`Volume`TradingDay`TimeTemplateID`ActionTime
barColTypes=[TIMESTAMP,SYMBOL,SYMBOL,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DATE,SYMBOL,TIMESTAMP]
//////////////   创建 键值内存表 用于接收 实际的 bars 数据 ( 覆盖,同一产品同一时间只有一行记录 )
share keyedTable(`ActionTime`InstrumentID`ExchangeID`ProductID, 1000:0,  barColNames, barColTypes) as bars_min60

///////////////   使用自定义函数, 通过 AdjustTime 来调整 ActionTime .  ActionTime 是目的 表的 键值时间部分
def calcActionTimeFromAjustTime(stepLen,  adjTime)
{
	adjTm = adjTime/stepLen*stepLen+stepLen; 	///// 这样就 成为了 AdjustTime 的周期结束时间
	
	actTime = iif(time(adjTm)>=00:00:00 and time(adjTm)<= 02:00:00, temporalAdd(adjTm,-3,"H"),
			iif(time(adjTm)>02:00:00 and time(adjTm)<= 03:15:00, temporalAdd(adjTm,7,"H"),
			iif(time(adjTm)>03:15:00 and time(adjTm)<= 04:15:00, temporalAdd(adjTm,425,"m"),
			iif(time(adjTm)>04:15:00 and time(adjTm)<= 06:15:00, temporalAdd(adjTm,555,"m"),
			iif(time(adjTm)>06:15:00 and time(adjTm)<= 08:00:00, concatDateTime(date(adjTm),15:00:00), 	
			NULL)))));
			
	return actTime;
}

def setActionTime(mutable destTable, stepLen, msg)
{
	tt = select TradingDay,InstrumentID,ExchangeID,ProductID,LastPrice,TickVolune,TickAmount,TickPosition,FlowVolume,FlowFund,TempTemplateID,AdjustTime from msg;
	tt.addColumn(`ActionTime,TIMESTAMP);
	update tt set ActionTime = calcActionTimeFromAjustTime(stepLen, AdjustTime);   //调用自定义的函数来计算
	destTable.append!(tt);
	tt.clear!();
}


///////////////  若 sub 时使用 自定义函数,那 metrics 该如何设定? 	////////应是 先调用函数, 然后才是 使用 metrics 进行聚合
metrics_tse_H1 = <[
first(LastPrice), 		/* Open */
max(LastPrice), 		/* High */
min(LastPrice), 		/* Low */
last(LastPrice), 		/* Close */
sum(TickVolume), 	/* Volume */
last(TradingDay), 	/* TradingDay 交易日*/
first(TimeTemplateID), 	/* TimetemplateID */
first(ActionTime) 	/* ActionTime */ 
]>

//////////////   60分钟
tse_min_H1 = createTimeSeriesEngine(name="act_tse_min60",windowSize=nMin60,step=nMin60,metrics=metrics_tse_H1,dummyTable=Ticks,outputTable=bars_min60,timeColumn="AdjustTime",useSystemTime=false,keyColumn=`InstrumentID`ExchangeID`ProductID,updateTime=300, useWindowStartTime=false, roundTime=false,fill='null')
/////////////   handler 为自定义函数
subscribeTable(tableName="Ticks",actionName="act_tse_min60",offset=0,handler=setActionTime{tse_min_H1,nMin60},msgAsTable=true)

请先 登录 后评论

1 个回答

BMO

您好,建议用getStreamingStat().subWorkers 查看一下是否有报错信息。

使用示例如下,在返归的结果总找到对应的topic(topic由订阅表所在节点的别名、流数据表名称tableName和订阅任务名称actionName组成),相应的lastErrMsg是具体的报错。

attachments-2021-12-TpCK9kgL61b15984b3a32.png

请先 登录 后评论