如何正确使用新配置enableChunkGranularityConfig?
我刚测试了发现我开启这个配置单库多表,多线程并发写入三个表比多库单表,单线程写入每个库的单个表的时间要长。看起来并没有起什么作用?
请问应该如何正确使用?
需要在建表建库的时候做什么特殊配置吗? 我现在建库建表的两个场景是1个库下3个表和3个库每个库下一个表。其他都不变,开启这个配置之后,多线程并发写入单库多表时间还更长。
下面是我之前问的问题。
login(`admin,`123456)
print("Begin time is : " + now())
//表名,文件名
pnodeRun(clearAllCache); // 清理缓存
dbName = "dfs://1db3tb-pload"
tbNameOq = "oq_pload"
fileNameOq="/db-storage/yhs/test_data/20220114/20220114_OrderQueue.csv"
tbNameMk = "mk_pload"
fileNameMk="/db-storage/yhs/test_data/20220114/20220114_MarketData.csv"
tbNameTs = "ts_pload"
fileNameTs="/db-storage/yhs/test_data/20220114/20220114_Transaction.csv"
//schema=extractTextSchema(fileName)
//schema
// 先以月份进行分区
//后以symbol进行hash分区
//有则删除
if(existsDatabase(dbName))
dropDatabase(dbName)
dbDate = database("", VALUE, 2022.01.01..2022.02.28)
dbSymbol=database("", HASH, [INT, 30])
db = database(dbName, COMPO, [dbDate,dbSymbol])
// 定义表字段
columnsTs =`DateTime`DataStatus`TradeIndex`TradeChan`SecurityID`TradTime`TradPrice`TradVolume`TradeMoney`TradeBuyNo`TradeSellNo`TradeBSFlag`BizIndex`LocalTime`SeqNo //字段
typeTs=[DATETIME,INT,INT,INT,INT,SECOND,DOUBLE,DOUBLE,DOUBLE,INT,INT,CHAR,INT,SECOND,INT]// 字段类型
columnsOq = `DateTime`UpdateTime`SecurityID`ImageStatus`Side`NoPriceLevel`PrcLvlOperator`Price`Volume`NumOrders`NoOrders`OrderQty1`OrderQty2`OrderQty3`OrderQty4`OrderQty5`OrderQty6`OrderQty7`OrderQty8`OrderQty9`OrderQty10`OrderQty11`OrderQty12`OrderQty13`OrderQty14`OrderQty15`OrderQty16`OrderQty17`OrderQty18`OrderQty19`OrderQty20`OrderQty21`OrderQty22`OrderQty23`OrderQty24`OrderQty25`OrderQty26`OrderQty27`OrderQty28`OrderQty29`OrderQty30`OrderQty31`OrderQty32`OrderQty33`OrderQty34`OrderQty35`OrderQty36`OrderQty37`OrderQty38`OrderQty39`OrderQty40`OrderQty41`OrderQty42`OrderQty43`OrderQty44`OrderQty45`OrderQty46`OrderQty47`OrderQty48`OrderQty49`OrderQty50`LocalTime`SeqNo //字段
typeOq=[DATETIME,TIME,INT,INT,CHAR,INT,INT,DOUBLE,DOUBLE,INT,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,TIME,INT] //字段类型
columnsMk = `DateTime`UpdateTime`SecurityID`ImageStatus`PreCloPrice`OpenPrice`HighPrice`LowPrice`LastPrice`ClosePrice`InstruStatus`TradNumber`TradVolume`Turnover`TotalBidVol`WAvgBidPri`AltWAvgBidPri`TotalAskVol`WAvgAskPri`AltWAvgAskPri`EtfBuyNumber`EtfBuyVolume`EtfBuyMoney`EtfSellNumber`EtfSellVolume`ETFSellMoney`YieldToMatu`TotWarExNum`WarLowerPri`WarUpperPri`WiDBuyNum`WiDBuyVol`WiDBuyMon`WiDSellNum`WiDSellVol`WiDSellMon`TotBidNum`TotSellNum`MaxBidDur`MaxSellDur`BidNum`SellNum`IOPV`AskPrice1`AskVolume1`AskPrice2`AskVolume2`AskPrice3`AskVolume3`AskPrice4`AskVolume4`AskPrice5`AskVolume5`AskPrice6`AskVolume6`AskPrice7`AskVolume7`AskPrice8`AskVolume8`AskPrice9`AskVolume9`AskPrice10`AskVolume10`BidPrice1`BidVolume1`BidPrice2`BidVolume2`BidPrice3`BidVolume3`BidPrice4`BidVolume4`BidPrice5`BidVolume5`BidPrice6`BidVolume6`BidPrice7`BidVolume7`BidPrice8`BidVolume8`BidPrice9`BidVolume9`BidPrice10`BidVolume10`NumOrdersB1`NumOrdersB2`NumOrdersB3`NumOrdersB4`NumOrdersB5`NumOrdersB6`NumOrdersB7`NumOrdersB8`NumOrdersB9`NumOrdersB10`NumOrdersS1`NumOrdersS2`NumOrdersS3`NumOrdersS4`NumOrdersS5`NumOrdersS6`NumOrdersS7`NumOrdersS8`NumOrdersS9`NumOrdersS10`LocalTime`SeqNo //字段
typeMk=[DATETIME,TIME,INT,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,SYMBOL,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE,INT,INT,INT,INT,INT,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,TIME,INT] // 字段类型
//加载 OQ
oq = table(1:0, columnsOq,typeOq) // 创建transaction表结构;
db.createPartitionedTable(oq, tbNameOq,`DateTime`securityID) // 创建分区表;
tableObj1 = loadTable( dbName , tbNameOq)
schema1=extractTextSchema(fileNameOq)
schemaTB1 = tableObj1.schema().colDefs
t1=ploadText(fileNameOq,,schema1)
dt=files("/db-storage/yhs/test_data","202201%").filename[0]
dt=temporalParse(dt,"yyyyMMdd")
t1=select dt as DateTime,* from t1;
//tableObj1.append!(t1)
//加载Ts
ts = table(1:0, columnsTs,typeTs) // 创建transaction表结构;
db.createPartitionedTable(ts, tbNameTs,`DateTime`securityID) // 创建分区表;
tableObj2 = loadTable( dbName , tbNameTs)
schema2=extractTextSchema(fileNameTs)
schemaTB2 = tableObj2.schema().colDefs
t2=ploadText(fileNameTs,,schema2)
dt=files("/db-storage/yhs/test_data","202201%").filename[0]
dt=temporalParse(dt,"yyyyMMdd")
t2=select dt as DateTime,* from t2;
//tableObj2.append!(t2)
// 加载MK
mk = table(1:0, columnsMk,typeMk) // 创建transaction表结构;
db.createPartitionedTable(mk, tbNameMk,`DateTime`securityID) // 创建分区表;
tableObj3 = loadTable( dbName , tbNameMk)
schema3=extractTextSchema(fileNameMk)
schemaTB3 = tableObj3.schema().colDefs
t3=ploadText(fileNameMk,,schema3)
dt=files("/db-storage/yhs/test_data","202201%").filename[0]
dt=temporalParse(dt,"yyyyMMdd")
t3=select dt as DateTime,* from t3;
//tableObj3.append!(t3)
//
//
def writeData(dbName, tableName, data){
print("nowatime : " + now())
timer loadTable(dbName, tableName).append!(data)
}
submitJob("write01_", "write data of table oq,1 database 3 table write together", writeData, "dfs://1db3tb-pload", "oq_pload", t1)
submitJob("write02_", "write data of table ts,1 database 3 table write together", writeData, "dfs://1db3tb-pload", "ts_pload", t2)
submitJob("write03_", "write data of table mk,1 database 3 table write together", writeData, "dfs://1db3tb-pload", "mk_pload", t3)
getRecentJobs()
配置
P%-node01.volumes=/ssd/ssd0/,/ssd/ssd1/,/ssd/ssd2/,/ssd/ssd12/,/ssd/ssd13/,/ssd/ssd14/
P%-node02.volumes=/ssd/ssd3/,/ssd/ssd4/,/ssd/ssd5/,/ssd/ssd15/,/ssd/ssd16/,/ssd/ssd17/
P%-node03.volumes=/ssd/ssd6/,/ssd/ssd7/,/ssd/ssd8/,/ssd/ssd18/,/ssd/ssd19/,/ssd/ssd20/
P%-node04.volumes=/ssd/ssd9/,/ssd/ssd10/,/ssd/ssd11/,/ssd/ssd21/,/ssd/ssd22,/ssd/ssd23/
diskIOConcurrencyLevel=12
enableHTTPS=false
maxConnections=512
maxConnectionPerSite=4
tcpNoDelay=false
maxMsgNumPerBlock=1024
maxPersitenceQueueDepth=10000000
maxPubQueueDepthPerSite=10000000
maxPubConnections=32
persistenceWorkerNum=2
P%-node01.persistenceDir=/ssd/ssd0/node01/persist
P%-node02.persistenceDir=/ssd/ssd3/node02/persist
P%-node03.persistenceDir=/ssd/ssd6/node03/persist
P%-node04.persistenceDir=/ssd/ssd9/node04/persist
maxSubConnections=16
maxSubQueueDepth=10000000
P%-node01.persistOffsetDir=/ssd/ssd0/node01/persistOffset
P%-node02.persistOffsetDir=/ssd/ssd3/node02/persistOffset
P%-node03.persistOffsetDir=/ssd/ssd6/node03/persistOffset
P%-node04.persistOffsetDir=/ssd/ssd9/node04/persistOffset