流计算中如何把同一只股票的两行的数据拼接到同一行中

我现在的表是一列时间列,一列股票代码列,一列标识列,50列数据。同一时间同一股票有两个不同的标识列,我想合并成一列时间列,一列股票代码列,100列数据的格式

以下是一个示例表格:

原始表格:

| 时间       | 股票代码 | 标识列 | 数据1 | 数据2 | ... | 数据50 |
|------------|----------|--------|------|------|-----|-------|
| 2022-01-01 | AAPL     | A      | ...  | ...  | ... | ...   |
| 2022-01-01 | AAPL     | B      | ...  | ...  | ... | ...   |
| 2022-01-02 | MSFT     | A      | ...  | ...  | ... | ...   |
| 2022-01-02 | MSFT     | B      | ...  | ...  | ... | ...   |
| ...        | ...      | ...    | ...  | ...  | ... | ...   |

在这个表格中,同一时间、同一股票可能有两行数据,它们的标识列分别为A和B。

目标表格:

| 时间       | 股票代码 | 数据1_A | 数据2_A | ... | 数据50_A | 数据1_B | 数据2_B | ... | 数据50_B |
|------------|----------|--------|--------|-----|---------|--------|--------|-----|---------|
| 2022-01-01 | AAPL     | ...    | ...    | ... | ...     | ...    | ...    | ... | ...     |
| 2022-01-02 | MSFT     | ...    | ...    | ... | ...     | ...    | ...    | ... | ...     |
| ...        | ...      | ...    | ...    | ... | ...     | ...    | ...    | ... | ...     |

请先 登录 后评论

1 个回答

peter

可以用equaljoinengine,对同一张表根据标识列做过滤订阅,分别把不同标识的行注入引擎左输入和右输入,

关联列是股票id和时间的等值连接。参考案例:

拼接不同数据源的实时分钟指标

下面这段代码将tmp1和tmp2两个表中相同时间同一股票的数据进行拼接计算:

share streamTable(1:0, `time`sym`price, [SECOND, SYMBOL, DOUBLE]) as leftTable
share streamTable(1:0, `time`sym`val, [SECOND, SYMBOL, DOUBLE]) as rightTable
output=table(100:0, `time`sym`price`val`total, [SECOND, SYMBOL, DOUBLE, DOUBLE, DOUBLE])
ejEngine=createEquiJoinEngine("test1", leftTable, rightTable, output, [<price>, <val>, <price*val>], `sym, `time)
subscribeTable(tableName="leftTable", actionName="joinLeft", offset=0, handler=appendForJoin{ejEngine, true}, msgAsTable=true)
subscribeTable(tableName="rightTable", actionName="joinRight", offset=0, handler=appendForJoin{ejEngine, false}, msgAsTable=true)

tmp1=table(13:30:10+1..20 as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(1..20) as price)
leftTable.append!(tmp1)
tmp2=table(13:30:10+1..20 as time, take(`AAPL, 10) join take(`IBM, 10) as sym, double(50..31) as val)
rightTable.append!(tmp2)

select count(*) from output



请先 登录 后评论