请问如何在状态引擎中实现下列两个因子的增量计算

想在状态引擎中实现在滑动窗口为3行的两个因子的计算

因子f1 是array vector, 它是最新3行的窗口中,price列的值去重后,以最小值和最大值为头尾形成的一个array

因子f2 是array vector (它的长度总与f1相等,并且其中位置一一对应), 它是最新3行的窗口中,如果side=1 就把v的值累加到对应的price的位置上, 所形成的array

可以参考如下例子数据, 如何在状态引擎中实现?

行数 price side v f1 f2
1 9 1 1 [9] [1]
2 13 1 2 [9,10,11,12,13] [1,0,0,0,2]
3 11 -1 3 [9,10,11,12,13] [1,0,0,0,2]
4 10 1 4 [10,11,12,13] [4,0,0,2]
5 10 1 5 [10,11] [9,0]
请先 登录 后评论

1 个回答

Shepherd

用流数据订阅的自定义handler可以处理,没用上状态引擎,参考代码:

share(streamTable(10:0, `rowNum`price`side`v, [INT, INT, INT, INT]), `originStream)
result = table(10:0, `f1`f2, [INT[], INT[]])
msgCache = table(6:0, `rowNum`price`side`v, [INT, INT, INT, INT])

def updateMsgCache(mutable msgCache, row) {
    // maintain a window which length is 3
    insert into msgCache values(row.rowNum, row.price, row.side, row.v)
    msgCache = select top 3 * from msgCache order by rowNum desc
}

def caculateF1(mutable msgCache) {
    minPrice = 2147483647
    maxPrice = 0
    for (msg in msgCache) {
        if (msg.price < minPrice) {
            minPrice = msg.price
        }
        if (msg.price > maxPrice) {
            maxPrice = msg.price
        }
    }
    return minPrice..maxPrice
}

def caculateF2(mutable f1, mutable msgCache) {
    f1Size = size(f1)
    f2 = array(INT, f1Size, f1Size, 0)
    for (msg in msgCache) {
        if (msg.side == 1) {
            index = at(f1 == msg.price)
            f2[index[0]] += msg.v
        }
    }
    return f2
}

def subscribeHandler(mutable result, mutable msgCache, msg) {
    for (row in msg) {
        // update msgCache
        updateMsgCache(msgCache, row)
        // caculate f1 and f2 via msgCache
        f1 = caculateF1(msgCache)
        f2 = caculateF2(f1, msgCache)
        // store f1 and f2 to result
        insert into result values([f1], [f2])
    }
}

subscribeTable(tableName=`originStream, actionName=`caculateFactors, handler=subscribeHandler{result, msgCache}, msgAsTable=true)

//insert into originStream values(1, 9, 1, 1)
//insert into originStream values(2, 13, 1, 2)
//insert into originStream values(3, 11, -1, 3)
//insert into originStream values(4, 10, 1, 4)
//insert into originStream values(5, 10, 1, 5)
//select string(f1), string(f2) from result

//unsubscribeTable(tableName="originStream", actionName="caculateFactors")
//undef("originStream", SHARED)
//undef all
请先 登录 后评论
  • 1 关注
  • 0 收藏,969 浏览
  • ddbuserex 提出于 2022-08-01 17:50

相似问题