从代码上看,期望是能在单个交易日期内对股票代码分组计算,在组内还要按照CLOSE进行筛选。由于分布式表的分区规则与group by需求相同,理论上可以对单个分区内的数据进行股票分组聚合计算,可以使用mr方法进行自定义的map-reduce任务计算。
def myMap(table){ result_temp = select SecurityID,date(first(TradeTime)),aggrTopN(func=min, funcArgs=CLOSE, sortingCol=CLOSE, top=0.20, ascending=false) as res from table group by SecurityID return result_temp } res = mr(sqlDS(<select * from loadTable('dfs://SHSZ_OHLCV_1MIN','SHSZ_OHLCV_1MIN')>), myMap) result = table(1000000:0,`SecurityID`date`res,`SYMBOL`TIMESTAMP`DOUBLE) for (i in res[0]) { result.append!(i) }
也可以在sql后方加上map关键字,表示强制分区执行:
result_temp = select SecurityID,TradeDate,aggrTopN(func=min, funcArgs=CLOSE, sortingCol=CLOSE, top=0.20, ascending=false) as res from loadTable('dfs://SHSZ_OHLCV_1MIN','SHSZ_OHLCV_1MIN') group by SecurityID,date(TradeTime) as TradeDate map