分片读取数据

有分片读取DFS表的案例么?比如想按一定的数据大小,如1000条,依次读取库里的数据。

请先 登录 后评论

1 个回答

peter

方法一:分页查询

n = 100
SecurityID = rand(`st0001`st0002`st0003`st0004`st0005, n)
sym = rand(`A`B, n)
TradeDate = 2022.01.01 + rand(100,n)
TotalVolumeTrade = rand(1000..3000, n)
TotalValueTrade = rand(100.0, n)
schemaTable_snap = table(SecurityID, TradeDate, TotalVolumeTrade, TotalValueTrade)
    .sortBy!(`SecurityID`TradeDate)
dbPath = "dfs://TSDB_STOCK"
if(existsDatabase(dbPath)){dropDatabase(dbPath)}
db_snap = database(dbPath, VALUE, 2022.01.01..2022.01.05, engine='TSDB')
snap=createPartitionedTable(dbHandle=db_snap, table=schemaTable_snap, tableName="snap", partitionColumns=`TradeDate, sortColumns=`SecurityID`TradeDate, keepDuplicates=ALL, sortKeyMappingFunction=[hashBucket{,5}])
snap.append!(schemaTable_snap)
offset = 0
pageSize = 1000
select * from loadTable("dfs://TSDB_STOCK", "snap") limit offset, pageSize

api端控制下offset、pageSize传参可以实现每次读取一页(1000行)的数据。


方法二:使用 BlockReader 方法

api封装了 BlockReader 类,以一定的数据量迭代读取数据,以java api为例:

DBConnection conn = new DBConnection();
conn.connect(SERVER, PORT, USER, PASSWORD);
EntityBlockReader v = (EntityBlockReader)conn.run("select * from loadTable(\"dfs://TSDB_STOCK\", \"snap\")",(ProgressListener) null,4,4,1000);
BasicTable data = (BasicTable)v.read(); while(v.hasNext()){ BasicTable t = (BasicTable)v.read(); data = data.combine(t); }

注意:若数据未读取完毕,需要放弃后续数据的读取时,必须调用 skipAll 方法来忽略后续数据。否则会导致套接字缓冲区滞留数据,引发后续数据的反序列化失败。

请先 登录 后评论