aggrTopN group by无法应用于分布式表,分区条件于group by条件相同,报错无map-reduce方法。

期望在分布式表上利用aggrTopN配合group by进行组内(股票代码和交易日期分组)数据筛选的聚合计算,提示aggrTopN无map-reduce方法。分布式表是按照交易日期分区的,频率为天。

result_temp = select SecurityID,TradeDate,aggrTopN(func=min, funcArgs=CLOSE, sortingCol=CLOSE, top=0.20, ascending=false) as res from  loadTable('dfs://SHSZ_OHLCV_1MIN','SHSZ_OHLCV_1MIN')   group by SecurityID,date(TradeTime) as TradeDate

报错:The aggregate function in column aggrTopN doesn't have a map-reduce implementation and can't be applied to a partitioned or distributed table.

请先 登录 后评论

1 个回答

Hao Jia

从代码上看,期望是能在单个交易日期内对股票代码分组计算,在组内还要按照CLOSE进行筛选。由于分布式表的分区规则与group by需求相同,理论上可以对单个分区内的数据进行股票分组聚合计算,可以使用mr方法进行自定义的map-reduce任务计算。

def myMap(table){
    result_temp = select SecurityID,date(first(TradeTime)),aggrTopN(func=min, funcArgs=CLOSE, sortingCol=CLOSE, top=0.20, ascending=false) as res from table group by SecurityID
    return result_temp
}
res = mr(sqlDS(<select * from loadTable('dfs://SHSZ_OHLCV_1MIN','SHSZ_OHLCV_1MIN')>), myMap)
result = table(1000000:0,`SecurityID`date`res,`SYMBOL`TIMESTAMP`DOUBLE)
for (i in res[0])
{
	result.append!(i)
}

也可以在sql后方加上map关键字,表示强制分区执行:

result_temp = select SecurityID,TradeDate,aggrTopN(func=min, funcArgs=CLOSE, sortingCol=CLOSE, top=0.20, ascending=false) as res from  loadTable('dfs://SHSZ_OHLCV_1MIN','SHSZ_OHLCV_1MIN')   group by SecurityID,date(TradeTime) as TradeDate map
请先 登录 后评论