新聞中心
[[383867]]

成都創(chuàng)新互聯(lián)主要從事成都網(wǎng)站建設(shè)、做網(wǎng)站、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)長(zhǎng)寧,10余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專(zhuān)業(yè),歡迎來(lái)電咨詢(xún)建站服務(wù):18982081108
本文轉(zhuǎn)載自微信公眾號(hào)「SH的全棧筆記」,作者SH。轉(zhuǎn)載本文請(qǐng)聯(lián)系SH的全棧筆記公眾號(hào)。
基礎(chǔ)概念Broker
首先我們要知道,使用RocketMQ時(shí)我們經(jīng)歷了什么。那就是生產(chǎn)者發(fā)送一條消息給RocketMQ,RocketMQ拿到這條消息之后將其持久化存儲(chǔ)起來(lái),然后消費(fèi)者去找MQ消費(fèi)這條消息。
RocketMQ操作
上圖中,RocketMQ被標(biāo)識(shí)為了一個(gè)單點(diǎn),但事實(shí)上肯定不是如此,對(duì)于可以隨時(shí)橫向擴(kuò)展的服務(wù)來(lái)說(shuō),生產(chǎn)者向MQ生產(chǎn)消息的數(shù)量也會(huì)隨之而變化,所以一個(gè)合格成熟的MQ必然是要能夠處理這種情況的;而且MQ自身需要做到高可用,否則一旦這個(gè)單點(diǎn)宕機(jī),那所有存儲(chǔ)在MQ中的消息就全部丟失且無(wú)法找回了。
所以在實(shí)際的生產(chǎn)環(huán)境中,肯定是會(huì)部署一個(gè)MQ的集群。而在RocketMQ中,這個(gè)“實(shí)例”有個(gè)專(zhuān)屬名詞,叫做Broker。并且,每個(gè)Broker都會(huì)部署一個(gè)Slave Broker,Master Broker會(huì)定時(shí)的向Slave Broker同步數(shù)據(jù),形成一個(gè)Broker的主從架構(gòu)。
那么問(wèn)題來(lái)了,在微服務(wù)的架構(gòu)中,部署的服務(wù)也存在多實(shí)例部署的情況,服務(wù)之間相互調(diào)用是通過(guò)注冊(cè)中心來(lái)獲取對(duì)應(yīng)服務(wù)的實(shí)例列表的。
拿Spring Cloud舉例,服務(wù)通過(guò)Eureka注冊(cè)中心獲取到某個(gè)服務(wù)的全部實(shí)例,然后交給Ribbon,Ribbon聯(lián)動(dòng)Eureka,從Eureka處獲取到服務(wù)實(shí)例的列表,然后通過(guò)負(fù)載均衡算法選出一個(gè)實(shí)例,最后發(fā)起請(qǐng)求。
同理,此時(shí)MQ中存在多個(gè)Broker實(shí)例,那生產(chǎn)者如何得知MQ集群中有多少Broker實(shí)例呢?自己應(yīng)該連接哪個(gè)實(shí)例?
首先我們直接排除在代碼里Hard Code,具體原因我覺(jué)得應(yīng)該不用再贅述了。RocketMQ是如何解決這個(gè)問(wèn)題呢?這就是接下來(lái)我們要介紹的NameServer了。
NameServer
NameServer可以被簡(jiǎn)單的理解為上一小節(jié)中提到的注冊(cè)中心,所有的Broker的在啟動(dòng)的時(shí)候都會(huì)向NameServer進(jìn)行注冊(cè),將自己的信息上報(bào)。這些信息除了Broker的IP、端口相關(guān)數(shù)據(jù),還有RocketMQ集群的路由信息,路由信息后面再聊。
RocketMQ操作
有了NameServer,客戶(hù)端啟動(dòng)之后會(huì)和NameServer交互,獲取到當(dāng)前RocketMQ集群中所有的Broker信息、路由信息。這樣一來(lái),生產(chǎn)者就知道自己需要連接的Broker信息了,就可以進(jìn)行消息投遞。
那么問(wèn)題來(lái)了,如果在運(yùn)行過(guò)程中,如果某個(gè)Broker突然宕機(jī),NameServer會(huì)如何處理?
這需要提到RocketMQ的這續(xù)約機(jī)制和故障感知機(jī)制。Broker在完成向NameServer的注冊(cè)之后,會(huì)每隔30秒向NameServer發(fā)送心跳進(jìn)行續(xù)約;如果NameServer感知到了某個(gè)Broker超過(guò)了120秒都沒(méi)有發(fā)送心跳,則會(huì)認(rèn)為這個(gè)Broker不可用,將其從自己維護(hù)的信息中移除。
這套機(jī)制,和Spring Cloud中的Eureka的實(shí)現(xiàn)如出一轍。Eureka中的Service在啟動(dòng)之后也會(huì)向Eureka注冊(cè)自己,這樣一來(lái)其他的服務(wù)就可以向該服務(wù)發(fā)起請(qǐng)求,交換數(shù)據(jù)。Service每隔30秒會(huì)向Eureka發(fā)送心跳續(xù)約,如果某個(gè)Service超過(guò)了90秒沒(méi)有發(fā)送心跳,Eureka就會(huì)認(rèn)為該服務(wù)宕機(jī),將其從Eureka維護(hù)的注冊(cè)表中移除。
上面圖中我聊到了多實(shí)例部署,這個(gè)多實(shí)例部署和微服務(wù)中的多實(shí)例部署還不太一樣,微服務(wù)中,所有的服務(wù)都是無(wú)狀態(tài)的,可以橫向的擴(kuò)展,而在RocketMQ中,每個(gè)Broker所存的數(shù)據(jù)可能都不一樣。
我們來(lái)看一下RocketMQ的簡(jiǎn)單使用。
- Message msg = new Message(
- "TopicTest",
- "TagA",
- ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
- );
- SendResult sendResult = producer.send(msg);
可以看到,Message的第一個(gè)參數(shù),為當(dāng)前這條消息指定了一個(gè)Topic,那Topic又是什么呢?
Topic
Topic是對(duì)發(fā)送到RocketMQ中的消息的邏輯分類(lèi),例如我們的訂單系統(tǒng)、積分系統(tǒng)、倉(cāng)儲(chǔ)系統(tǒng)都會(huì)用到這個(gè)MQ,為了對(duì)其進(jìn)行區(qū)分,我們就可以為不同的系統(tǒng)建立不同的Topic。
那為什么說(shuō)是邏輯分區(qū)呢?因?yàn)镽ocketMQ在真實(shí)存儲(chǔ)中,并不是一個(gè)Broker就存儲(chǔ)一個(gè)Topic的數(shù)據(jù),道理很簡(jiǎn)單,如果當(dāng)前這個(gè)Broker宕機(jī),甚至極端情況磁盤(pán)壞了,那這個(gè)Topic的數(shù)據(jù)就會(huì)永久丟失。
所以在真實(shí)存儲(chǔ)中,消息是分布式的存儲(chǔ)在多個(gè)Broker上的,這這些分散在多個(gè)Broker上的存儲(chǔ)介質(zhì)叫MessageQueue,如果你熟悉Kafka的底層原理,就知道這個(gè)跟Kafka中的Partition是同類(lèi)的實(shí)現(xiàn)。
Message Queue存儲(chǔ)
通過(guò)上圖可以看出,同一個(gè)Topic的數(shù)據(jù),被分成了好幾份,分別存儲(chǔ)在不同的Broker上,那RocketMQ為什么要這么實(shí)現(xiàn)?
首先,一個(gè)Topic中如果只有一個(gè)Queue,那么消費(fèi)者在消費(fèi)時(shí)的速度必然受到影響;而如果一個(gè)Topic有很多個(gè)Queue,那么Consumer就可以將消費(fèi)操作同時(shí)進(jìn)行,從而扛住更多的并發(fā)。
除此之外,單臺(tái)機(jī)器的資源是有限的。一個(gè)Topic的消息量可能會(huì)非常之巨大,一臺(tái)機(jī)器的磁盤(pán)很快就會(huì)被塞滿(mǎn)。所以RocketMQ將一個(gè)Topic的數(shù)據(jù)分?jǐn)偨o了多臺(tái)機(jī)器,進(jìn)行分散存儲(chǔ)。其本質(zhì)上就是一個(gè)數(shù)據(jù)分片存儲(chǔ)的一種機(jī)制。
所以我們知道了,發(fā)送到某個(gè)Topic的數(shù)據(jù)是分布式的存儲(chǔ)在多個(gè)Broker中的MessageQueue上的。
Broker消息存儲(chǔ)原理
那Producer發(fā)送到Broker中的消息,到底是以什么方式存儲(chǔ)的呢?答案是Commit Log,Broker收到消息,會(huì)將該消息采用順序?qū)懭氲姆绞?,追加到磁盤(pán)上的Commit Log文件中,每個(gè)Commit Log大小為1G,如果寫(xiě)滿(mǎn)了1G則會(huì)新建一個(gè)Commit Log繼續(xù)寫(xiě),Commit Log文件的特點(diǎn)是順序?qū)?、隨機(jī)讀。
Topic詳情
這就是最底層的存儲(chǔ)的方式,那么問(wèn)題來(lái)了,Consumer來(lái)取消息的時(shí)候,Broker是如何從這一堆的Commit Log中找到相應(yīng)的數(shù)據(jù)呢?眾所周知,一提到磁盤(pán)的I/O操作,就會(huì)聯(lián)想到耗時(shí)這兩個(gè)字,而RocketMQ的一大特點(diǎn)就是高吞吐,看似很矛盾,RocketMQ是如何做的呢?
答案是ConsumeQueue,Broker在寫(xiě)入Commit Log的同時(shí),還會(huì)將當(dāng)前這條消息在Commit Log中的Offset、消息的Size和對(duì)應(yīng)的Tag的Hash寫(xiě)入到ConsumeQueue文件中。每個(gè)Message Queue會(huì)有相對(duì)應(yīng)的ConsumeQueue文件存儲(chǔ)在磁盤(pán)上。
和Commit Log一樣,一個(gè)ConsumeQueue包含了30W條消息,每條消息的大小為20字節(jié),所以每個(gè)ConsumeQueue文件的大小約為5.72M;當(dāng)其寫(xiě)滿(mǎn)了之后,會(huì)再新建一個(gè)ConsumeQueue文件繼續(xù)寫(xiě)入。
ConsumeQueue是一種邏輯隊(duì)列,更是一種索引,讓Consumer來(lái)消費(fèi)的時(shí)候可以快速的從磁盤(pán)文件中定位到這條消息。
看到這你可能會(huì)想,上面提到的Tag又是個(gè)什么東西?
Tag
Tag,標(biāo)簽,用于對(duì)同一個(gè)Topic內(nèi)的消息進(jìn)行分類(lèi),為什么還需要對(duì)Topic進(jìn)行消息類(lèi)型劃分呢?
舉一個(gè)極端的例子,某一個(gè)新的服務(wù),需要去消費(fèi)訂單系統(tǒng)的MQ,但是由于業(yè)務(wù)的特殊性,只需要去消費(fèi)商品類(lèi)型為數(shù)碼產(chǎn)品的訂單消息,如果沒(méi)有Tag,那么該Consumer就會(huì)去做判斷,該訂單消息是否是數(shù)碼產(chǎn)品類(lèi),如果不是,則丟棄,如果是則進(jìn)行消費(fèi)。
這樣一來(lái),Consumer側(cè)就執(zhí)行了大量的無(wú)用功。引入了Tag之后,Producer在生產(chǎn)消息的時(shí)候會(huì)給訂單打上Tag,Consumer進(jìn)行消費(fèi)的時(shí)候,可以配置只消費(fèi)指定的Tag的消息。這樣一來(lái)就不需要Consumer自己去做這個(gè)事情了,RocketMQ會(huì)幫我們實(shí)現(xiàn)這個(gè)過(guò)濾。
那其過(guò)濾的原理是什么?首先在Broker側(cè)是通過(guò)消息中保存的Tag的Hash值進(jìn)行過(guò)濾,然后Consumer側(cè)在去拉取消息的時(shí)候還需要再過(guò)濾一次。
為什么在Broker過(guò)濾了,還需要在Consumer側(cè)再過(guò)濾一次?因?yàn)镠ash沖突,不同的Tag經(jīng)過(guò)Hash算法之后可能會(huì)得到一樣的值,所以Consumer側(cè)在拉取消息的時(shí)候會(huì)通過(guò)字符串進(jìn)行二次過(guò)濾。
Producer發(fā)送消息源碼分析
流程總覽
首先給出整個(gè)發(fā)送消息的大致流程,先熟悉這個(gè)流程看源碼,會(huì)更加的清晰一點(diǎn)。
總體流程
初始化Prodcuer
還是按照下面這個(gè)例子出發(fā)。
producer使用樣例
首先我們會(huì)初始化一個(gè)DefaultMQProducer,RocketMQ會(huì)給這個(gè)Producer一個(gè)默認(rèn)的實(shí)現(xiàn)DefaultMQProducerImpl。然后producer.start()會(huì)啟動(dòng)一個(gè)線(xiàn)程池。
合法性校驗(yàn)
接下來(lái)就是比較核心的producer.send(msg)了,首先RocketMQ會(huì)調(diào)用checkMessage來(lái)檢測(cè)發(fā)送的消息是否合法。
send消息
這些檢測(cè)包含了待發(fā)送的消息是否為空,Topic是否為空、Topic是否包含了非法的字符串、Topic的長(zhǎng)度是否超過(guò)了最大限制127,然后會(huì)去檢查Body是否符合發(fā)送要求,例如msg的Body是否為空、msg的Body是否超過(guò)了最大的限制等等,這里消息的Body最大不能超過(guò)4M。
檢查消息合法性源碼
調(diào)用發(fā)送消息
對(duì)于msg的Topic,RocketMQ會(huì)用NameSpace將其包裝一層,然后就會(huì)調(diào)用DefaultMQProducerImpl中的sendDefaultImpl默認(rèn)實(shí)現(xiàn),發(fā)送消息給Broker,默認(rèn)的發(fā)送消息Timeout是3秒。
發(fā)送消息默認(rèn)實(shí)現(xiàn)
發(fā)送消息中,MQ會(huì)再次調(diào)用checkMessage對(duì)消息的合法性再次進(jìn)行檢查,然后就會(huì)去嘗試獲取Topic的詳細(xì)信息。
所有的Topic的信息都會(huì)存在一個(gè)叫topicPublishInfoTable的 ConcurrentHashMap中,這個(gè)Map中Key就是Topic的字符串,而Value則是TopicPublishInfo。
這個(gè)TopicPublishInfo中就包含了之前在基礎(chǔ)概念中提到的,從Broker中獲取到的相應(yīng)的元數(shù)據(jù),其中就包含了關(guān)鍵的MessageQueue和集群元數(shù)據(jù),其基礎(chǔ)的結(jié)構(gòu)如下。
Topic詳情
messageQueueList包含了該Topic下的所有的MessageQueue,每個(gè)MessageQueue的所屬Topic,每個(gè)MessageQueue所在的Broker的名稱(chēng)以及專(zhuān)屬的queueId。
topicRouteData包含了該Topic下的所有的Queue、Broker相關(guān)的數(shù)據(jù)。
獲取Topic詳細(xì)數(shù)據(jù)
在最終發(fā)送消息前,需要獲取到Topic的詳情,例如像Broker地址這樣的數(shù)據(jù),Producer中是通過(guò)tryToFindTopicPublishInfo方法獲取的,詳細(xì)的注釋我已經(jīng)寫(xiě)在了下圖中。
獲取topic詳情
對(duì)于首次使用的Topic,在上面的Map肯定是不存在的。所以RocketMQ會(huì)將其加入到Map中去,并且調(diào)用方法updateTopicRouteInfoFromNameServer從NameServer處獲取該Topic的元數(shù)據(jù),將其一并寫(xiě)入Map。初次之外,還會(huì)將路由信息、Broker的詳細(xì)信息分別放入topicRouteTable和brokerAddrTable中,這兩個(gè)都是Producer維護(hù)在內(nèi)存中的ConcurrentHashMap。
獲取到了Topic的詳細(xì)信息之后,接下來(lái)會(huì)確認(rèn)一個(gè)發(fā)送的重試次數(shù)timesTotal,假設(shè)timesTotal為N,那么發(fā)送消息如果失敗就會(huì)重試N次。不過(guò)當(dāng)且僅當(dāng)發(fā)送失敗的時(shí)候才會(huì)進(jìn)行重試,其余的case都不會(huì),例如超時(shí)、或者沒(méi)有選擇到合適的MessageQueue。
這個(gè)重試的次數(shù)timesTotal受到參數(shù)communicationMode的影響;CommunicationMode有三個(gè)值,分別是SYNC、ASYNC和ONEWAY。RocketMQ默認(rèn)的實(shí)現(xiàn)中,選擇了SYNC同步。
計(jì)算重試次數(shù)
通過(guò)代碼我們可以看到,如果是communicationMode是SYNC的話(huà),timesTotal的值為1+retryTimesWhenSendFailed,而retryTimesWhenSendFailed的值默認(rèn)為2,代表在消息發(fā)送失敗之后的重試次數(shù)。
這樣一來(lái),如果我們選擇了SYNC的方式,Producer在發(fā)送消息的時(shí)候默認(rèn)的重試次數(shù)就為3。不過(guò)當(dāng)且僅當(dāng)發(fā)送失敗的時(shí)候才會(huì)進(jìn)行重試,其余的case都不會(huì)。
MessageQueue選擇機(jī)制
我們之前聊過(guò),一個(gè)Topic的數(shù)據(jù)是分片存儲(chǔ)在一個(gè)或者多個(gè)Broker上的,底層的存儲(chǔ)介質(zhì)為MessageQueue,之前的圖中,我們沒(méi)有給出Producer是如何選擇具體發(fā)送到哪個(gè)MessageQueue,這里我們通過(guò)源碼來(lái)看一下。
Producer中是通過(guò)selectOneMessageQueue來(lái)進(jìn)行的Message Queue選擇,該方法通過(guò)Topic的詳細(xì)元數(shù)據(jù)和上次選擇的MessageQueue所在的Broker,來(lái)決定下一個(gè)的選擇。
核心的選擇邏輯
其核心的選擇邏輯是什么呢?用大白話(huà)來(lái)說(shuō),就是選出一個(gè)index,然后將其和當(dāng)前Topic的MessageQueue數(shù)量取模。這個(gè)index在首次選擇的時(shí)候,肯定是沒(méi)有的, RocketMQ會(huì)搞一個(gè)隨機(jī)數(shù)出來(lái)。然后在該值的基礎(chǔ)上+1,因?yàn)闉榱送ㄓ?,在外層看?lái),這個(gè)index上次已經(jīng)用過(guò)了,所以每次獲取你都直接幫我+1就好了。
核心的選擇機(jī)制
上圖就是MessageQueue最核心的、最底層的原則機(jī)制了。但是由于實(shí)際的業(yè)務(wù)情況十分復(fù)雜, RocketMQ在實(shí)現(xiàn)中還額外的做了很多的事情。
發(fā)送故障延遲下的選擇邏輯
在實(shí)際的選擇過(guò)程中,會(huì)判斷當(dāng)前是否啟用了發(fā)送延遲故障,這個(gè)由變量sendLatencyFaultEnable的值決定,其默認(rèn)值是false,也就是默認(rèn)是不開(kāi)啟的,從代碼里我暫時(shí)沒(méi)找到其開(kāi)啟的位置。
不過(guò)我們可以聊聊開(kāi)啟之后,會(huì)發(fā)生什么。它同樣會(huì)開(kāi)啟for循環(huán),次數(shù)為MessageQueue的數(shù)量,計(jì)算拿到確定的Queue之后,會(huì)通過(guò)內(nèi)存的一張表faultItemTable去判斷當(dāng)前這個(gè)Broker是否可用,該表是每次發(fā)送消息的時(shí)候都會(huì)去更新它。
如果當(dāng)前沒(méi)有可用的Broker,則會(huì)觸發(fā)其兜底的邏輯,再選擇一個(gè)MessageQueue出來(lái)。
選擇queue的源碼
常規(guī)的選擇邏輯
如果當(dāng)前發(fā)送故障延遲沒(méi)有啟用,則會(huì)走常規(guī)邏輯,同樣的會(huì)去for循環(huán)計(jì)算,循環(huán)中取到了MessageQueue之后會(huì)去判斷是否和上次選擇的MessageQueue屬于同一個(gè)Broker,如果是同一個(gè)Broker,則會(huì)重新選擇,直到選擇到不屬于同一個(gè)Broker的MessageQueue,或者直到循環(huán)結(jié)束。這也是為了將消息均勻的分發(fā)存儲(chǔ),防止數(shù)據(jù)傾斜。
常規(guī)邏輯下的選擇邏輯
消息發(fā)送
最后就會(huì)調(diào)用Netty相關(guān)的組件,將消息發(fā)送出去了。
EOF關(guān)于RocketMQ中的一些基礎(chǔ)的概念,和RocketMQ的Producer發(fā)送消息的源碼就先分析到這里,后續(xù)看緣分再分享其他部分的源碼吧。
當(dāng)前題目:RocketMQ基礎(chǔ)概念剖析,并分析一下Producer的底層源碼
本文網(wǎng)址:http://fisionsoft.com.cn/article/djcjeoi.html


咨詢(xún)
建站咨詢(xún)
