新聞中心
RocketMQ存儲文件是怎樣的,針對這個(gè)問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。
成都創(chuàng)新互聯(lián)專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都做網(wǎng)站、網(wǎng)站建設(shè)、黎平網(wǎng)絡(luò)推廣、微信小程序、黎平網(wǎng)絡(luò)營銷、黎平企業(yè)策劃、黎平品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營等,從售前售中售后,我們都將竭誠為您服務(wù),您的肯定,是我們最大的嘉獎;成都創(chuàng)新互聯(lián)為所有大學(xué)生創(chuàng)業(yè)者提供黎平建站搭建服務(wù),24小時(shí)服務(wù)熱線:028-86922220,官方網(wǎng)址:www.cdcxhl.com
RocketMQ存儲路徑默認(rèn)是${ROCKRTMQ_HOME}/store,主要存儲消息、主題對應(yīng)的消息隊(duì)列的索引等。
1、概述
查看其目錄文件
commitlog:消息的存儲目錄
config:運(yùn)行期間一些配置信息
consumequeue:消息消費(fèi)隊(duì)列存儲目錄
index:消息索引文件存儲目錄
abort:如果存在abort文件說明Broker非正常關(guān)閉,該文件默認(rèn)啟動時(shí)創(chuàng)建,正常退出時(shí)刪除
checkpoint:文件檢測點(diǎn)。存儲commitlog文件最后一次刷盤時(shí)間戳、consumequeue最后一次刷盤時(shí)間、index索引文件最后一次刷盤時(shí)間戳。
2、文件簡介
2.1、commitlog文件
commitlog文件的存儲地址:$HOME\store\commitlog${fileName},每個(gè)文件的大小默認(rèn)1G =102410241024,commitlog的文件名fileName,名字長度為20位,左邊補(bǔ)零,剩余為起始偏移量;比如00000000000000000000代表了第一個(gè)文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)這個(gè)文件滿了,第二個(gè)文件名字為00000000001073741824,起始偏移量為1073741824,以此類推,第三個(gè)文件名字為00000000002147483648,起始偏移量為2147483648 ,消息存儲的時(shí)候會順序?qū)懭胛募?,?dāng)文件滿了,寫入下一個(gè)文件。
commitlog目錄下的文件主要存儲消息,每條消息的長度不同,查看其存儲的邏輯視圖,每條消息的前面4個(gè)字節(jié)存儲該條消息的總長度。
文件的消息單元存儲詳細(xì)信息
編號 | 字段簡稱 | 字段大小(字節(jié)) | 字段含義 |
---|---|---|---|
1 | msgSize | 4 | 代表這個(gè)消息的大小 |
2 | MAGICCODE | 4 | MAGICCODE = daa320a7 |
3 | BODY CRC | 4 | 消息體BODY CRC 當(dāng)broker重啟recover時(shí)會校驗(yàn) |
4 | queueId | 4 | |
5 | flag | 4 | |
6 | QUEUEOFFSET | 8 | 這個(gè)值是個(gè)自增值不是真正的consume queue的偏移量,可以代表這個(gè)consumeQueue隊(duì)列或者tranStateTable隊(duì)列中消息的個(gè)數(shù),若是非事務(wù)消息或者commit事務(wù)消息,可以通過這個(gè)值查找到consumeQueue中數(shù)據(jù),QUEUEOFFSET * 20才是偏移地址;若是PREPARED或者Rollback事務(wù),則可以通過該值從tranStateTable中查找數(shù)據(jù) |
7 | PHYSICALOFFSET | 8 | 代表消息在commitLog中的物理起始地址偏移量 |
8 | SYSFLAG | 4 | 指明消息是事物事物狀態(tài)等消息特征,二進(jìn)制為四個(gè)字節(jié)從右往左數(shù):當(dāng)4個(gè)字節(jié)均為0(值為0)時(shí)表示非事務(wù)消息;當(dāng)?shù)?個(gè)字節(jié)為1(值為1)時(shí)表示表示消息是壓縮的(Compressed);當(dāng)?shù)?個(gè)字節(jié)為1(值為2)表示多消息(MultiTags);當(dāng)?shù)?個(gè)字節(jié)為1(值為4)時(shí)表示prepared消息;當(dāng)?shù)?個(gè)字節(jié)為1(值為8)時(shí)表示commit消息;當(dāng)?shù)?/4個(gè)字節(jié)均為1時(shí)(值為12)時(shí)表示rollback消息;當(dāng)?shù)?/4個(gè)字節(jié)均為0時(shí)表示非事務(wù)消息 |
9 | BORNTIMESTAMP | 8 | 消息產(chǎn)生端(producer)的時(shí)間戳 |
10 | BORNHOST | 8 | 消息產(chǎn)生端(producer)地址(address:port) |
11 | STORETIMESTAMP | 8 | 消息在broker存儲時(shí)間 |
12 | STOREHOSTADDRESS | 8 | 消息存儲到broker的地址(address:port) |
13 | RECONSUMETIMES | 8 | 消息被某個(gè)訂閱組重新消費(fèi)了幾次(訂閱組之間獨(dú)立計(jì)數(shù)),因?yàn)橹卦囅l(fā)送到了topic名字為%retry%groupName的隊(duì)列queueId=0的隊(duì)列中去了,成功消費(fèi)一次記錄為0; |
14 | PreparedTransaction Offset | 8 | 表示是prepared狀態(tài)的事物消息 |
15 | messagebodyLength | 4 | 消息體大小值 |
16 | messagebody | bodyLength | 消息體內(nèi)容 |
17 | topicLength | 1 | topic名稱內(nèi)容大小 |
18 | topic | topicLength | topic的內(nèi)容值 |
19 | propertiesLength | 2 | 屬性值大小 |
20 | properties | propertiesLength | propertiesLength大小的屬性數(shù)據(jù) |
2.2、consumequeue
RocketMQ基于主題訂閱模式實(shí)現(xiàn)消息的消費(fèi),消費(fèi)者關(guān)心的是主題下的所有消息。但是由于不同的主題的消息不連續(xù)的存儲在commitlog文件中,如果只是檢索該消息文件可想而知會有多慢,為了提高效率,對應(yīng)的主題的隊(duì)列建立了索引文件,為了加快消息的檢索和節(jié)省磁盤空間,每一個(gè)consumequeue條目存儲了消息的關(guān)鍵信息commitog文件中的偏移量、消息長度、tag的hashcode值。
查看目錄結(jié)構(gòu):
單個(gè)consumequeue文件中默認(rèn)包含30萬個(gè)條目,每個(gè)條目20個(gè)字節(jié),所以每個(gè)文件的大小是固定的20w x 20字節(jié),單個(gè)consumequeue文件可認(rèn)為是一個(gè)數(shù)組,下標(biāo)即為邏輯偏移量,消息的消費(fèi)進(jìn)度存儲的偏移量即邏輯偏移量。
2.3、IndexFile
IndexFile:用于為生成的索引文件提供訪問服務(wù),通過消息Key值查詢消息真正的實(shí)體內(nèi)容。在實(shí)際的物理存儲上,文件名則是以創(chuàng)建時(shí)的時(shí)間戳命名的,固定的單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引;
2.3.1、IndexFile結(jié)構(gòu)分析
IndexHead 數(shù)據(jù): beginTimestamp:該索引文件包含消息的最小存儲時(shí)間 endTimestamp:該索引文件包含消息的最大存儲時(shí)間 beginPhyoffset:該索引文件中包含消息的最小物理偏移量(commitlog 文件偏移量) endPhyoffset:該索引文件中包含消息的最大物理偏移量(commitlog 文件偏移量) hashSlotCount:hashslot個(gè)數(shù),并不是 hash 槽使用的個(gè)數(shù),在這里意義不大, indexCount:已使用的 Index 條目個(gè)數(shù)
Hash 槽: 一個(gè) IndexFile 默認(rèn)包含 500W 個(gè) Hash 槽,每個(gè) Hash 槽存儲的是落在該 Hash 槽的 hashcode 最新的 Index 的索引
Index 條目列表hashcode:key 的 hashcode phyoffset:消息對應(yīng)的物理偏移量 timedif:該消息存儲時(shí)間與第一條消息的時(shí)間戳的差值,小于 0 表示該消息無效 preIndexNo:該條目的前一條記錄的 Index 索引,hash 沖突時(shí),根據(jù)該值構(gòu)建鏈表結(jié)構(gòu)
2.3.2、IndexFile條目存儲
RocketMQ將消息索引鍵與消息的偏移量映射關(guān)系寫入IndexFile中,其核心的實(shí)現(xiàn)方法是public boolean putKey(final String key, final long phyOffset, final long storeTimestamp);參數(shù)含義分別是消息的索引、消息的物理偏移量、消息的存儲時(shí)間。
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { //判斷當(dāng)前的條目數(shù)是否大于最大的允許的條目數(shù) if (this.indexHeader.getIndexCount() < this.indexNum) { //獲取KEY的hash值(正整數(shù)) int keyHash = indexKeyHashMethod(key); //計(jì)算hash槽的下標(biāo) int slotPos = keyHash % this.hashSlotNum; //獲取hash槽的物理地址 int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize, // false); //獲取hash槽中存儲的數(shù)據(jù) int slotValue = this.mappedByteBuffer.getInt(absSlotPos); //判斷值是否小于等于0或者 大于當(dāng)前索引文件的最大條目 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; } //計(jì)算當(dāng)前消息存儲時(shí)間與第一條消息時(shí)間戳的時(shí)間差 long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); //秒 timeDiff = timeDiff / 1000; if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; } //計(jì)算條目的物理地址 = 索引頭部大?。?0字節(jié)) + hash槽的大小(4字節(jié))*槽的數(shù)量(500w) + 當(dāng)前索引最大條目的個(gè)數(shù)*每index的大小(20字節(jié)) int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; //依次存入 key的hash值(4字節(jié))+消息的物理偏移量(8字節(jié))+消息存儲時(shí)間戳和index文件的時(shí)間戳差(4字節(jié))+當(dāng)前hash槽的值(4字節(jié)) this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); //存儲當(dāng)前index中包含的條目數(shù)量存入hash槽中,覆蓋原先hash槽的值 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); } //更新文件索引的頭信息,hash槽的總數(shù)、index條目的總數(shù)、最后消息的物理偏移量、最后消息的存儲時(shí)間 this.indexHeader.incHashSlotCount(); this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); } return false; }
以上詳細(xì)了分析了IndexFile條目存儲的業(yè)務(wù)邏輯
2.3.3、通過KEY查找消息
DefaultMessageStore類中的public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) 中其核心方法是QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);獲取消息的物理存儲地址,通過偏移量去commitLog中獲取消息集。
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end)核心方法又是IndexFile類中的public void selectPhyOffset(final List
public void selectPhyOffset(final ListphyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { //獲取key的hash信息 int keyHash = indexKeyHashMethod(key); //獲取hash槽的下標(biāo) int slotPos = keyHash % this.hashSlotNum; //獲取hash槽的物理地址 int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { if (lock) { // fileLock = this.fileChannel.lock(absSlotPos, // hashSlotSize, true); } //獲取hash槽的值 int slotValue = this.mappedByteBuffer.getInt(absSlotPos); // if (fileLock != null) { // fileLock.release(); // fileLock = null; // } //判斷值是否小于等于0或者 大于當(dāng)前索引文件的最大條目 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { } else { for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; } //計(jì)算條目的物理地址 = 索引頭部大小(40字節(jié)) + hash槽的大小(4字節(jié))*槽的數(shù)量(500w) + 當(dāng)前索引最大條目的個(gè)數(shù)*每index的大?。?0字節(jié)) int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; //獲取key的hash值 int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); //獲取消息的物理偏移量 long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); //獲取當(dāng)前消息的存儲時(shí)間戳與index文件的時(shí)間戳差值 long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); //獲取前一個(gè)條目的信息(鏈表結(jié)構(gòu)) int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); if (timeDiff < 0) { break; } timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; //判斷該消息是否在查詢的區(qū)間 boolean timeMatched = (timeRead >= begin) && (timeRead <= end); //判斷key的hash值是否相等并且在查詢的時(shí)間區(qū)間內(nèi) if (keyHash == keyHashRead && timeMatched) { //加入到物理偏移量的List中 phyOffsets.add(phyOffsetRead); } if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; } //繼續(xù)前一個(gè)條目信息獲取進(jìn)行匹配 nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } this.mappedFile.release(); } } }
根據(jù)查詢的 key 的 hashcode%slotNum 得到具體的槽的位置( slotNum 是一個(gè)索引文件里面包含的最大槽的數(shù)目,例如圖中所示 slotNum=5000000)。
根據(jù) slotValue( slot 位置對應(yīng)的值)查找到索引項(xiàng)列表的最后一項(xiàng)(倒序排列, slotValue 總是指向最新的一個(gè) 索引項(xiàng))。
遍歷索引項(xiàng)列表返回查詢時(shí)間范圍內(nèi)的結(jié)果集(默認(rèn)一次最大返回的 32 條記彔)
Hash 沖突;尋找 key 的 slot 位置時(shí)相當(dāng)于執(zhí)行了兩次散列函數(shù),一次 key 的 hash,一次 key 的 hash 值取模,因此返里存在兩次沖突的情況;第一種, key 的 hash 不同但模數(shù)相同,此時(shí)查詢的時(shí)候會在比較一次key 的hash 值(每個(gè)索引項(xiàng)保存了 key 的 hash 值),過濾掉 hash 值不相等的項(xiàng)。第二種, hash 值相等但 key 不等,出于性能的考慮沖突的檢測放到客戶端處理( key 的原始值是存儲在消息文件中的,避免對數(shù)據(jù)文件的解析),客戶端比較一次消息體的 key 是否相同
2.4、checkpoint
checkpoint文件的作用是記錄commitlog、consumequeue、index文件的刷盤時(shí)間點(diǎn),文件固定長度4k,其中只用了該文件的前24個(gè)字節(jié)。查看其存儲格式
physicMsgTimestamp:commitlog文件刷盤時(shí)間點(diǎn)
logicsMsgTimestamp:消息的消費(fèi)隊(duì)列文件刷盤時(shí)間點(diǎn)
indexMsgTimestamp:索引文件刷盤時(shí)間點(diǎn)
關(guān)于RocketMQ存儲文件是怎樣的問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。
分享名稱:RocketMQ存儲文件是怎樣的
文章起源:http://fisionsoft.com.cn/article/ijdopg.html