实时流数据处理的metrics很复杂,一个订阅处理不过来,如何解决消费阻塞问题?

如果我实时流数据处理的metrics很复杂,刚开始使用一个订阅,通过getStreamingStat().pubConns函数查看发布情况,发布队列queueDepth没有堆积,说明流数据发布没有阻塞。

但是我通过getStreamingStat().subWorkers函数查看消费情况,看到上面订阅的消费队列queueDepth使用有堆积,说明流数据消费发生堵塞。请问这种情况应该如何解决?

请先 登录 后评论

1 个回答

Xinhai Tang

基本解决思路:

(1)通过setStreamTableFilterColumn函数设置流数据表的过滤列

(2)创建n个相同处理动作的handler(metrics相同的流数据引擎)

(3)创建n个订阅,订阅目标是上述设置过滤列的streamTable,每隔订阅通过hash过滤订阅部分数据,同时指定到不同的线程处理

demo例子如下:

def sum_diff(x, y){
    return (x-y)/(x+y)
}
factor1 = <ema(1000 * sum_diff(ema(price, 20), ema(price, 40)),10) -  ema(1000 * sum_diff(ema(price, 20), ema(price, 40)), 20)>

share streamTable(1:0, `sym`price, [STRING,DOUBLE]) as tickStream
setStreamTableFilterColumn(tickStream, `sym)
share streamTable(1000:0, `sym`factor1, [STRING,DOUBLE]) as resultStream

for(i in 0..3){
    rse = createReactiveStateEngine(name="reactiveDemo"+string(i), metrics =factor1, dummyTable=tickStream, outputTable=resultStream, keyColumn="sym")
    subscribeTable(tableName=`tickStream, actionName="sub"+string(i), handler=tableInsert{rse}, msgAsTable = true, hash = i, filter = (4,i))
}

n=2000000
tmp = table(take("A"+string(1..4000), n) as sym, rand(10.0, n) as price)
tickStream.append!(tmp)
请先 登录 后评论