我的表是一个单值模型的分布式表(time,id,value三列),现在有个小需求,传入参数:多个id,一个时间点。这个时间点可能有值,可能没值,如果没值的时候,是否可以用左边或者右边或者左右两边最相邻的两个值的线性插值作为此时间点的值?查找的时间跨度也可以作为一个参数。
//默认type = "prev"
//type 有三个选项,"prev","next","linear"
//默认search_range=1d
//search_range 可以是 2d(几天), 7w(几周), 5M(几个月),1y(几年)等,不加引号
//用法:geInterpolatedDataPoint(dbName, tblName, 2019.01.02T02:00:01.042, ids, "linear",2d),geInterpolatedDataPoint(dbName, tblName, timestamp, ids),geInterpolatedDataPoint(dbName, tblName, timestamp, ids,,5w)等
def prev_next_method(rawDataTable, timestamp, ids,type,search_range=1d){ if (type=="prev") { original_table = select * from rawDataTable where id in ids, time between temporalAdd(timestamp,-1H) : timestamp context by id csort time limit -1// 如果表内数据是按照时间增序,则把csort time删掉,会快很多 } if (type=="next") { original_table = select * from rawDataTable where id in ids, time between timestamp : temporalAdd(timestamp,1H) context by id csort time limit 1 } exclude_id= (set(ids)-set(exec id from original_table)).keys() if (size(exclude_id) >0){ if (type=="prev") { search=duration("-"+string(search_range)) original_table_con = select * from rawDataTable where id in exclude_id , time between temporalAdd(timestamp,search) : timestamp context by exclude_id csort time limit -1 } if (type=="next") { original_table_con = select * from rawDataTable where id in exclude_id , time between timestamp : temporalAdd(timestamp,search_range) context by exclude_id csort time limit 1 } exclude_id_final = (set(exclude_id)-set(exec id from original_table_con)).keys() exclude_table = table(timestamp as time, exclude_id_final as id, take(double(NULL), size(exclude_id_final)) as value) return original_table.append!(original_table_con).append!(exclude_table) } else return original_table } def geInterpolatedDataPoint(dbName, tblName, timestamp, ids, type="prev",search_range=1d){ tmp=table(take(timestamp, size(ids)) as time, ids as id) rawDataTable = loadTable(dbName,tblName) if (type=="prev") { res_tmp= prev_next_method(rawDataTable, timestamp, ids,"prev",search_range) update res_tmp set time = timestamp return res_tmp } if (type=="next") { res_tmp= prev_next_method(rawDataTable, timestamp, ids,"next",search_range) update res_tmp set time = timestamp return res_tmp } if (type=="linear"){ pre = prev_next_method(rawDataTable, timestamp, ids,"prev",search_range) nex = prev_next_method(rawDataTable, timestamp, ids,"next",search_range) update tmp set value = double() join_res=(pre.append!(tmp).append!(nex)).sortBy!(`id,1) res_tmp=select time, id, nullFill((time-prev(time))\(next(time)-prev(time))*(next(value)-prev(value)),0)+prev(value) as value1 from join_res context by id res = select timestamp as time, id, sum(value1) from res_tmp group by id return res } }