db2 = database("",RANGE,`AAPL`BABA`C`IBM)
RANGE(start, stop)包括start,不包括stop。因此上述RANGE分区没有包括IBM,所以IBM的数据就写入不了分布式数据库。建议修改为VALUE分区:
db2 = database("",VALUE,`AAPL`BABA`C`IBM)
我写了1000万条数据到流表,分布式表里一直700多万,流表里是对的,1000万条。
我的代码如下:
login("admin","123456") share streamTable(1000000:0,`id`time`v`q,[STRING,TIMESTAMP,DOUBLE,DOUBLE]) as sensorTemp enableTablePersistence(sensorTemp, true, false, 1000000) def writeData(){ station=`IBM`C`AAPL`BABA; deviceNumber = 1000 for (i in 0:10000) { data = table(take(station,deviceNumber) as id ,take(now(),deviceNumber) as time,rand(20..41,deviceNumber) as v,rand(20..41,deviceNumber) as q) sensorTemp.append!(data) sleep(10) } } if(exists("dfs://iotDemoDB")){ dropDatabase("dfs://iotDemoDB") } tableSchema = table(1000000:0,`id`time`v`q,[STRING,TIMESTAMP,DOUBLE,DOUBLE]) db1 = database("",VALUE,2020.09.20..2020.09.30) db2 = database("",RANGE,`AAPL`BABA`C`IBM) db = database("dfs://iotDemoDB",COMPO,[db1,db2]) dfsTable = db.createPartitionedTable(tableSchema,"sensorTemp",`time`id) subscribeTable(, "sensorTemp", "save_to_db", -1, append!{dfsTable}, true, 1000000,10)
db2 = database("",RANGE,`AAPL`BABA`C`IBM)
RANGE(start, stop)包括start,不包括stop。因此上述RANGE分区没有包括IBM,所以IBM的数据就写入不了分布式数据库。建议修改为VALUE分区:
db2 = database("",VALUE,`AAPL`BABA`C`IBM)