如何高效地关联维度表?流计算如何关联维表?

比如对流表A(`id`date`time`col1`col2...`col10)的各个字段做计算,并且希望将表A中做计算的范围限制在dfs中表B(`id`...`value)中value>5的id。请问如何实现?

请先 登录 后评论

1 个回答

Yating Xie

可以用字典实现,将表B读到内存里用字典存起来,用id做字典的key。具体实现示例如下:

// 模拟表A,表B
n = 5000
tableB = table("no."+string(1..n) as `id, rand(1..10, n) as value)
rowA = 100000000
rowA = 100000000
tableA = table(rand("no."+string(1..n), rowA) as `id, rand(1..100000, rowA) as `col1, rand(1..100000, rowA) as `col2, rand(1..100000, rowA) as `col3)

// 普通的维度表join方法
select tableA.* from lj(tableA, tableB, `id) where tableB.value > 5

// 用字典代替join,结果等同于上一行代码的实现
dict_B = dict(tableB.id, tableB.value)
select * from tableA where dict_B[tableA.id] > 5


很多维度表的join都可以用过字典来代替,既提升性能,代码也更简洁。流计算引擎里也可以使用。


// 模拟表A,表B
n = 5000
tableB = table("no."+string(1..n) as `id, rand(1..10, n) as value)
rowA = 1000000
streamtableA_temp = streamTable(rand("no."+string(1..n), rowA) as `id, rand(1..100000, rowA) as `col1, rand(1..100000, rowA) as `col2, rand(1..100000, rowA) as `col3, rand(1..100000, rowA) as `col4, rand(1..100000, rowA) as `col5, rand(1..100000, rowA) as `col6)
share streamtableA_temp as streamtableA

// 构造横截面引擎,过滤表A并计算
// 这里filter参数中均有传入dict_B[id],实现按表B中记录过滤表A
output = table(1:0, `code`v1`v2`v3`b_value, [STRING,INT, INT,DOUBLE,INT])
rse = createReactiveStateEngine(name="ReactiveStateEngine1", metrics =[<col1>, <col2-col3>, <(col4+col5)\col6>, <dict_B[id]>], dummyTable=streamtableA, outputTable=output, keyColumn=`id, filter= < dict_B[id]>5 >) 

// 订阅表A
subscribeTable(tableName = "streamtableA", actionName="filterAndCalculate", offset=0, handler=append!{rse}, msgAsTable=true, reconnect=true)


请先 登录 后评论
  • 1 关注
  • 0 收藏,1087 浏览
  • BMO 提出于 2021-11-28 17:53