新聞中心
一篇帶給你Kafka核心知識(shí)總結(jié)!
作者:日常加油站 2021-09-06 08:31:11
開(kāi)發(fā)
架構(gòu)
Kafka Apache Kafka是由LinkedIn采用Scala和Java開(kāi)發(fā)的開(kāi)源流處理軟件平臺(tái),并捐贈(zèng)給了Apache Software Foundation。

成都創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿(mǎn)足客戶(hù)于互聯(lián)網(wǎng)時(shí)代的永修網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
基本簡(jiǎn)介
Apache Kafka是由LinkedIn采用Scala和Java開(kāi)發(fā)的開(kāi)源流處理軟件平臺(tái),并捐贈(zèng)給了Apache Software Foundation。
該項(xiàng)目旨在提供統(tǒng)一的、高吞吐量、低延遲的平臺(tái)來(lái)處理實(shí)時(shí)數(shù)據(jù)流。
Kafka可以通過(guò)Kafka Connect連接到外部系統(tǒng),并提供了Kafka Streams。
「Kafka的特性」
Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng),主要特性如下:
版本號(hào)
「Kafka版本命名」
我們?cè)诠倬W(wǎng)上下載Kafka時(shí),會(huì)看到這樣的版本:
前面的版本號(hào)是編譯Kafka源代碼的Scala編譯器版本。
Kafka服務(wù)器端的代碼完全由Scala語(yǔ)言編寫(xiě),Scala同時(shí)支持面向?qū)ο缶幊毯秃瘮?shù)式編程,用Scala寫(xiě)成的源代碼編譯之后也是普通的.class文件,因此我們說(shuō)Scala是JVM系的語(yǔ)言。
真正的Kafka版本號(hào)實(shí)際上是2.1.1。
- 那么這個(gè)2.1.1又表示什么呢?
前面的2表示大版本號(hào),即Major Version;中間的1表示小版本號(hào)或次版本號(hào),即Minor Version;最后的1表示修訂版本號(hào),也就是Patch號(hào)。
Kafka社區(qū)在發(fā)布1.0.0版本后寫(xiě)過(guò)一篇文章,宣布Kafka版本命名規(guī)則正式從4位演進(jìn)到3位,比如0.11.0.0版本就是4位版本號(hào)。
有個(gè)建議,不論用的是哪個(gè)版本,都請(qǐng)盡量保持服務(wù)器端版本和客戶(hù)端版本一致,否則你將損失很多Kafka為你提供的性能優(yōu)化收益。
「版本演進(jìn)」
0.7版本:只提供了最基礎(chǔ)的消息隊(duì)列功能。
0.8版本:引入了副本機(jī)制,至此kafka成為了一個(gè)整整意義上完備的分布式可靠消息隊(duì)列解決方案
0.9.0.0版本:增加了基礎(chǔ)的安全認(rèn)證/權(quán)限功能;使用Java重新了新版本消費(fèi)者API;引入了Kafka Connect組件。
0.11.0.0版本:提供了冪等性Producer API以及事務(wù)API;對(duì)Kafka消息格式做了重構(gòu)。
1.0和2.0版本:主要還是Kafka Streams的各種改進(jìn)
基本概念
「主題」
發(fā)布訂閱的對(duì)象是主題(Topic),可以為每 個(gè)業(yè)務(wù)、每個(gè)應(yīng)用甚至是每類(lèi)數(shù)據(jù)都創(chuàng)建專(zhuān)屬的主題
「生產(chǎn)者和消費(fèi)者」
向主題發(fā)布消息的客戶(hù)端應(yīng)用程序稱(chēng)為生產(chǎn)者,生產(chǎn)者程序通常持續(xù)不斷地 向一個(gè)或多個(gè)主題發(fā)送消息
訂閱這些主題消息的客戶(hù)端應(yīng)用程序就被稱(chēng)為消費(fèi)者,消費(fèi)者也能夠同時(shí)訂閱多個(gè)主題的消息
「Broker」
集群由多個(gè) Broker 組成,Broker 負(fù)責(zé)接收和處理客戶(hù)端發(fā)送過(guò)來(lái)的請(qǐng)求,以及對(duì)消息進(jìn)行持久化
雖然多個(gè) Broker 進(jìn)程能夠運(yùn)行在同一臺(tái)機(jī)器上,但更常見(jiàn)的做法是將 不同的 Broker 分散運(yùn)行在不同的機(jī)器上,這樣如果集群中某一臺(tái)機(jī)器宕機(jī),即使在它上面 運(yùn)行的所有 Broker 進(jìn)程都掛掉了,其他機(jī)器上的 Broker 也依然能夠?qū)ν馓峁┓?wù)
「?jìng)浞輽C(jī)制」
備份的思想很簡(jiǎn)單,就是把相同的數(shù)據(jù)拷貝到多臺(tái)機(jī)器上,而這些相同的數(shù)據(jù)拷貝被稱(chēng)為副本
定義了兩類(lèi)副本:領(lǐng)導(dǎo)者副本和追隨者副本
前者對(duì)外提供服務(wù),這里的對(duì)外指的是與 客戶(hù)端程序進(jìn)行交互;而后者只是被動(dòng)地追隨領(lǐng)導(dǎo)者副本而已,不能與外界進(jìn)行交互
「分區(qū)」
分區(qū)機(jī)制指的是將每個(gè)主題劃分成多個(gè)分區(qū),每個(gè)分區(qū)是一組有序的消息日志
生產(chǎn)者生產(chǎn)的每條消息只會(huì)被發(fā)送到一個(gè)分區(qū)中,也就是說(shuō)如果向一個(gè)雙分區(qū)的主題發(fā)送一條消息,這條消息要么在分區(qū) 0 中,要么在分區(qū) 1 中
每個(gè)分區(qū)下可以配置若干個(gè)副本,其中只能有 1 個(gè)領(lǐng) 導(dǎo)者副本和 N-1 個(gè)追隨者副本
生產(chǎn)者向分區(qū)寫(xiě)入消息,每條消息在分區(qū)中的位置信息叫位移
「消費(fèi)者組」
多個(gè)消費(fèi)者實(shí)例共同組成一個(gè)組來(lái) 消費(fèi)一組主題
這組主題中的每個(gè)分區(qū)都只會(huì)被組內(nèi)的一個(gè)消費(fèi)者實(shí)例消費(fèi),其他消費(fèi)者實(shí)例不能消費(fèi)它
同時(shí)實(shí)現(xiàn)了傳統(tǒng)消息引擎系統(tǒng)的兩大模型:
如果所有實(shí)例都屬于同一個(gè) Group, 那么它實(shí)現(xiàn)的就是消息隊(duì)列模型;
如果所有實(shí)例分別屬于不 同的 Group,那么它實(shí)現(xiàn)的就是發(fā)布/訂閱模型
「Coordinator:協(xié)調(diào)者」
所謂協(xié)調(diào)者,它專(zhuān)門(mén)為Consumer Group服務(wù),負(fù)責(zé)為Group執(zhí)行Rebalance以及提供位移管理和組成員管理等。
具體來(lái)講,Consumer端應(yīng)用程序在提交位移時(shí),其實(shí)是向Coordinator所在的Broker提交位移,同樣地,當(dāng)Consumer應(yīng)用啟動(dòng)時(shí),也是向Coordinator所在的Broker發(fā)送各種請(qǐng)求,然后由Coordinator負(fù)責(zé)執(zhí)行消費(fèi)者組的注冊(cè)、成員管理記錄等元數(shù)據(jù)管理操作。
所有Broker在啟動(dòng)時(shí),都會(huì)創(chuàng)建和開(kāi)啟相應(yīng)的Coordinator組件。
也就是說(shuō),「所有Broker都有各自的Coordinator組件」。
那么,Consumer Group如何確定為它服務(wù)的Coordinator在哪臺(tái)Broker上呢?
通過(guò)Kafka內(nèi)部主題__consumer_offsets。
目前,Kafka為某個(gè)Consumer Group確定Coordinator所在的Broker的算法有2個(gè)步驟。
- 第1步:確定由__consumer_offsets主題的哪個(gè)分區(qū)來(lái)保存該Group數(shù)據(jù):partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
- 第2步:找出該分區(qū)Leader副本所在的Broker,該Broker即為對(duì)應(yīng)的Coordinator。
首先,Kafka會(huì)計(jì)算該Group的group.id參數(shù)的哈希值。
比如你有個(gè)Group的group.id設(shè)置成了test-group,那么它的hashCode值就應(yīng)該是627841412。
其次,Kafka會(huì)計(jì)算__consumer_offsets的分區(qū)數(shù),通常是50個(gè)分區(qū),之后將剛才那個(gè)哈希值對(duì)分區(qū)數(shù)進(jìn)行取模加求絕對(duì)值計(jì)算,即abs(627841412 % 50) = 12。
此時(shí),我們就知道了__consumer_offsets主題的分區(qū)12負(fù)責(zé)保存這個(gè)Group的數(shù)據(jù)。
有了分區(qū)號(hào),我們只需要找出__consumer_offsets主題分區(qū)12的Leader副本在哪個(gè)Broker上就可以了,這個(gè)Broker,就是我們要找的Coordinator。
「消費(fèi)者位移:Consumer Offset」
消費(fèi)者消費(fèi)進(jìn)度,每個(gè)消費(fèi)者都有自己的消費(fèi)者位移。
「重平衡:Rebalance」
消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)的過(guò)程。
Rebalance是Kafka消費(fèi)者端實(shí)現(xiàn)高可用的重要手段。
「AR(Assigned Replicas)」:分區(qū)中的所有副本統(tǒng)稱(chēng)為AR。
所有消息會(huì)先發(fā)送到leader副本,然后follower副本才能從leader中拉取消息進(jìn)行同步。
但是在同步期間,follower對(duì)于leader而言會(huì)有一定程度的滯后,這個(gè)時(shí)候follower和leader并非完全同步狀態(tài)
「OSR(Out Sync Replicas)」:follower副本與leader副本沒(méi)有完全同步或滯后的副本集合
「ISR(In Sync Replicas):「AR中的一個(gè)子集,ISR中的副本都」是與leader保持完全同步的副本」,如果某個(gè)在ISR中的follower副本落后于leader副本太多,則會(huì)被從ISR中移除,否則如果完全同步,會(huì)從OSR中移至ISR集合。
在默認(rèn)情況下,當(dāng)leader副本發(fā)生故障時(shí),只有在ISR集合中的follower副本才有資格被選舉為新leader,而OSR中的副本沒(méi)有機(jī)會(huì)(可以通過(guò)unclean.leader.election.enable進(jìn)行配置)
「HW(High Watermark)」:高水位,它標(biāo)識(shí)了一個(gè)特定的消息偏移量(offset),消費(fèi)者只能拉取到這個(gè)水位 offset 之前的消息
下圖表示一個(gè)日志文件,這個(gè)日志文件中只有9條消息,第一條消息的offset(LogStartOffset)為0,最有一條消息的offset為8,offset為9的消息使用虛線(xiàn)表示的,代表下一條待寫(xiě)入的消息。
日志文件的 HW 為6,表示消費(fèi)者只能拉取offset在 0 到 5 之間的消息,offset為6的消息對(duì)消費(fèi)者而言是不可見(jiàn)的。
「LEO(Log End Offset)」:標(biāo)識(shí)當(dāng)前日志文件中下一條待寫(xiě)入的消息的offset
上圖中offset為9的位置即為當(dāng)前日志文件的 LEO,LEO 的大小相當(dāng)于當(dāng)前日志分區(qū)中最后一條消息的offset值加1
分區(qū) ISR 集合中的每個(gè)副本都會(huì)維護(hù)自身的 LEO ,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW,對(duì)消費(fèi)者而言只能消費(fèi) HW 之前的消息。
系統(tǒng)架構(gòu)
「kafka設(shè)計(jì)思想」
一個(gè)最基本的架構(gòu)是生產(chǎn)者發(fā)布一個(gè)消息到Kafka的一個(gè)Topic ,該Topic的消息存放于的Broker中,消費(fèi)者訂閱這個(gè)Topic,然后從Broker中消費(fèi)消息,下面這個(gè)圖可以更直觀(guān)的描述這個(gè)場(chǎng)景:
「消息狀態(tài):」 在Kafka中,消息是否被消費(fèi)的狀態(tài)保存在Consumer中,Broker不會(huì)關(guān)心消息是否被消費(fèi)或被誰(shuí)消費(fèi),Consumer會(huì)記錄一個(gè)offset值(指向partition中下一條將要被消費(fèi)的消息位置),如果offset被錯(cuò)誤設(shè)置可能導(dǎo)致同一條消息被多次消費(fèi)或者消息丟失。
「消息持久化:」 Kafka會(huì)把消息持久化到本地文件系統(tǒng)中,并且具有極高的性能。
「批量發(fā)送:」 Kafka支持以消息集合為單位進(jìn)行批量發(fā)送,以提高效率。
「Push-and-Pull:」 Kafka中的Producer和Consumer采用的是Push-and-Pull模式,即Producer向Broker Push消息,Consumer從Broker Pull消息。
「分區(qū)機(jī)制(Partition):」 Kafka的Broker端支持消息分區(qū),Producer可以決定把消息發(fā)到哪個(gè)Partition,在一個(gè)Partition中消息的順序就是Producer發(fā)送消息的順序,一個(gè)Topic中的Partition數(shù)是可配置的,Partition是Kafka高吞吐量的重要保證。
「系統(tǒng)架構(gòu)」
通常情況下,一個(gè)kafka體系架構(gòu)包括「多個(gè)Producer」、「多個(gè)Consumer」、「多個(gè)broker」以及「一個(gè)Zookeeper集群」。
「Producer」:生產(chǎn)者,負(fù)責(zé)將消息發(fā)送到kafka中。
「Consumer」:消費(fèi)者,負(fù)責(zé)從kafka中拉取消息進(jìn)行消費(fèi)。
「Broker」:Kafka服務(wù)節(jié)點(diǎn),一個(gè)或多個(gè)Broker組成了一個(gè)Kafka集群
「Zookeeper集群」:負(fù)責(zé)管理kafka集群元數(shù)據(jù)以及控制器選舉等。
生產(chǎn)者分區(qū)
「為什么分區(qū)?」
Kafka的消息組織方式實(shí)際上是三級(jí)結(jié)構(gòu):主題-分區(qū)-消息。
主題下的每條消息只會(huì)保存在某一個(gè)分區(qū)中,而不會(huì)在多個(gè)分區(qū)中被保存多份。
其實(shí)分區(qū)的作用就是提供負(fù)載均衡的能力,或者說(shuō)對(duì)數(shù)據(jù)進(jìn)行分區(qū)的主要原因,就是為了實(shí)現(xiàn)系統(tǒng)的高伸縮性(Scalability)。
不同的分區(qū)能夠被放置到不同節(jié)點(diǎn)的機(jī)器上,而數(shù)據(jù)的讀寫(xiě)操作也都是針對(duì)分區(qū)這個(gè)粒度而進(jìn)行的,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立地執(zhí)行各自分區(qū)的讀寫(xiě)請(qǐng)求處理,并且,我們還可以通過(guò)添加新的節(jié)點(diǎn)機(jī)器來(lái)增加整體系統(tǒng)的吞吐量。
「都有哪些分區(qū)策略?」
「所謂分區(qū)策略是決定生產(chǎn)者將消息發(fā)送到哪個(gè)分區(qū)的算法?!?/p>
Kafka為我們提供了默認(rèn)的分區(qū)策略,同時(shí)它也支持你自定義分區(qū)策略。
「自定義分區(qū)策略」
如果要自定義分區(qū)策略,你需要顯式地配置生產(chǎn)者端的參數(shù)partitioner.class。
在編寫(xiě)生產(chǎn)者程序時(shí),你可以編寫(xiě)一個(gè)具體的類(lèi)實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口。
這個(gè)接口也很簡(jiǎn)單,只定義了兩個(gè)方法:partition()和close(),通常你只需要實(shí)現(xiàn)最重要的partition方法。
我們來(lái)看看這個(gè)方法的方法簽名:
- int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
這里的topic、key、keyBytes、value和valueBytes都屬于消息數(shù)據(jù),cluster則是集群信息(比如當(dāng)前Kafka集群共有多少主題、多少Broker等)。
Kafka給你這么多信息,就是希望讓你能夠充分地利用這些信息對(duì)消息進(jìn)行分區(qū),計(jì)算出它要被發(fā)送到哪個(gè)分區(qū)中。
只要你自己的實(shí)現(xiàn)類(lèi)定義好了partition方法,同時(shí)設(shè)置partitioner.class參數(shù)為你自己實(shí)現(xiàn)類(lèi)的Full Qualified Name,那么生產(chǎn)者程序就會(huì)按照你的代碼邏輯對(duì)消息進(jìn)行分區(qū)。
「輪詢(xún)策略」
也稱(chēng)Round-robin策略,即順序分配。
比如一個(gè)主題下有3個(gè)分區(qū),那么第一條消息被發(fā)送到分區(qū)0,第二條被發(fā)送到分區(qū)1,第三條被發(fā)送到分區(qū)2,以此類(lèi)推。當(dāng)生產(chǎn)第4條消息時(shí)又會(huì)重新開(kāi)始,即將其分配到分區(qū)0
這就是所謂的輪詢(xún)策略。輪詢(xún)策略是Kafka Java生產(chǎn)者API默認(rèn)提供的分區(qū)策略。
「輪詢(xún)策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一?!?/p>
「隨機(jī)策略」
也稱(chēng)Randomness策略。所謂隨機(jī)就是我們隨意地將消息放置到任意一個(gè)分區(qū)上。
如果要實(shí)現(xiàn)隨機(jī)策略版的partition方法,很簡(jiǎn)單,只需要兩行代碼即可:
- List partitions = cluster.partitionsForTopic(topic);
- return ThreadLocalRandom.current().nextInt(partitions.size());
先計(jì)算出該主題總的分區(qū)數(shù),然后隨機(jī)地返回一個(gè)小于它的正整數(shù)。
本質(zhì)上看隨機(jī)策略也是力求將數(shù)據(jù)均勻地打散到各個(gè)分區(qū),但從實(shí)際表現(xiàn)來(lái)看,它要遜于輪詢(xún)策略,所以「如果追求數(shù)據(jù)的均勻分布,還是使用輪詢(xún)策略比較好」。事實(shí)上,隨機(jī)策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢(xún)了。
「按消息鍵保序策略」
Kafka允許為每條消息定義消息鍵,簡(jiǎn)稱(chēng)為Key。
這個(gè)Key的作用非常大,它可以是一個(gè)有著明確業(yè)務(wù)含義的字符串,比如客戶(hù)代碼、部門(mén)編號(hào)或是業(yè)務(wù)ID等;也可以用來(lái)表征消息元數(shù)據(jù)。
特別是在Kafka不支持時(shí)間戳的年代,在一些場(chǎng)景中,工程師們都是直接將消息創(chuàng)建時(shí)間封裝進(jìn)Key里面的。
一旦消息被定義了Key,那么你就可以保證同一個(gè)Key的所有消息都進(jìn)入到相同的分區(qū)里面,由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱(chēng)為按消息鍵保序策略
實(shí)現(xiàn)這個(gè)策略的partition方法同樣簡(jiǎn)單,只需要下面兩行代碼即可:
- List partitions = cluster.partitionsForTopic(topic);
- return Math.abs(key.hashCode()) % partitions.size();
前面提到的Kafka默認(rèn)分區(qū)策略實(shí)際上同時(shí)實(shí)現(xiàn)了兩種策略:如果指定了Key,那么默認(rèn)實(shí)現(xiàn)按消息鍵保序策略;如果沒(méi)有指定Key,則使用輪詢(xún)策略。
「其他分區(qū)策略」
其實(shí)還有一種比較常見(jiàn)的,即所謂的基于地理位置的分區(qū)策略。
當(dāng)然這種策略一般只針對(duì)那些大規(guī)模的Kafka集群,特別是跨城市、跨國(guó)家甚至是跨大洲的集群。
我們可以根據(jù)Broker所在的IP地址實(shí)現(xiàn)定制化的分區(qū)策略。比如下面這段代碼:
- List partitions = cluster.partitionsForTopic(topic);
- return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
我們可以從所有分區(qū)中找出那些Leader副本在南方的所有分區(qū),然后隨機(jī)挑選一個(gè)進(jìn)行消息發(fā)送。
生產(chǎn)者壓縮算法
「Kafka是如何壓縮消息的呢?」
目前Kafka共有兩大類(lèi)消息格式,社區(qū)分別稱(chēng)之為V1版本和V2版本。
V2版本是Kafka 0.11.0.0中正式引入的。
不論是哪個(gè)版本,Kafka的消息層次都分為兩層:消息集合以及消息。
一個(gè)消息集合中包含若干條日志項(xiàng),而日志項(xiàng)才是真正封裝消息的地方。
Kafka底層的消息日志由一系列消息集合日志項(xiàng)組成。
Kafka通常不會(huì)直接操作具體的一條條消息,它總是在消息集合這個(gè)層面上進(jìn)行寫(xiě)入操作。
「那么社區(qū)引入V2版本的目的是什么呢?」
V2版本主要是針對(duì)V1版本的一些弊端做了修正,比如把消息的公共部分抽取出來(lái)放到外層消息集合里面,這樣就不用每條消息都保存這些信息了。
舉個(gè)例子:原來(lái)在V1版本中,每條消息都需要執(zhí)行CRC校驗(yàn),但有些情況下消息的CRC值是會(huì)發(fā)生變化的。
比如在Broker端可能會(huì)對(duì)消息時(shí)間戳字段進(jìn)行更新,那么重新計(jì)算之后的CRC值也會(huì)相應(yīng)更新;再比如Broker端在執(zhí)行消息格式轉(zhuǎn)換時(shí)(主要是為了兼容老版本客戶(hù)端程序),也會(huì)帶來(lái)CRC值的變化。
鑒于這些情況,再對(duì)每條消息都執(zhí)行CRC校驗(yàn)就有點(diǎn)沒(méi)必要了,不僅浪費(fèi)空間還耽誤CPU時(shí)間,因此在V2版本中,消息的CRC校驗(yàn)工作就被移到了消息集合這一層。
V2版本還有一個(gè)和壓縮息息相關(guān)的改進(jìn),就是保存壓縮消息的方法發(fā)生了變化。
之前V1版本中保存壓縮消息的方法是把多條消息進(jìn)行壓縮然后保存到外層消息的消息體字段中;而V2版本的做法是對(duì)整個(gè)消息集合進(jìn)行壓縮,顯然后者應(yīng)該比前者有更好的壓縮效果。
「何時(shí)壓縮?」
在Kafka中,壓縮可能發(fā)生在兩個(gè)地方:生產(chǎn)者端和Broker端。
生產(chǎn)者程序中配置compression.type參數(shù)即表示啟用指定類(lèi)型的壓縮算法。
比如下面這段程序代碼展示了如何構(gòu)建一個(gè)開(kāi)啟GZIP的Producer對(duì)象:
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("acks", "all");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 開(kāi)啟GZIP壓縮
- props.put("compression.type", "gzip");
- Producer producer = new KafkaProducer<>(props);
這里比較關(guān)鍵的代碼行是props.put(“compression.type”, “gzip”),它表明該P(yáng)roducer的壓縮算法使用的是GZIP。
這樣Producer啟動(dòng)后生產(chǎn)的每個(gè)消息集合都是經(jīng)GZIP壓縮過(guò)的,故而能很好地節(jié)省網(wǎng)絡(luò)傳輸帶寬以及Kafka Broker端的磁盤(pán)占用。
有兩種例外情況就可能讓Broker重新壓縮消息:
「情況一:Broker端指定了和Producer端不同的壓縮算法?!?/h3>
一旦你在Broker端設(shè)置了不同的compression.type值,就一定要小心了,因?yàn)榭赡軙?huì)發(fā)生預(yù)料之外的壓縮/解壓縮操作,通常表現(xiàn)為Broker端CPU使用率飆升。
「情況二:Broker端發(fā)生了消息格式轉(zhuǎn)換?!?/h3>
所謂的消息格式轉(zhuǎn)換主要是為了兼容老版本的消費(fèi)者程序。
在一個(gè)生產(chǎn)環(huán)境中,Kafka集群中同時(shí)保存多種版本的消息格式非常常見(jiàn)。
為了兼容老版本的格式,Broker端會(huì)對(duì)新版本消息執(zhí)行向老版本格式的轉(zhuǎn)換。
這個(gè)過(guò)程中會(huì)涉及消息的解壓縮和重新壓縮。
一般情況下這種消息格式轉(zhuǎn)換對(duì)性能是有很大影響的,除了這里的壓縮之外,它還讓Kafka喪失了Zero Copy特性。
「何時(shí)解壓縮?」
有壓縮必有解壓縮!通常來(lái)說(shuō)解壓縮發(fā)生在消費(fèi)者程序中,也就是說(shuō)Producer發(fā)送壓縮消息到Broker后,Broker照單全收并原樣保存起來(lái)。當(dāng)Consumer程序請(qǐng)求這部分消息時(shí),Broker依然原樣發(fā)送出去,當(dāng)消息到達(dá)Consumer端后,由Consumer自行解壓縮還原成之前的消息。
「基本過(guò)程:Producer端壓縮、Broker端保持、Consumer端解壓縮?!?/h3>
注意:除了在Consumer端解壓縮,Broker端也會(huì)進(jìn)行解壓縮。
每個(gè)壓縮過(guò)的消息集合在Broker端寫(xiě)入時(shí)都要發(fā)生解壓縮操作,目的就是為了對(duì)消息執(zhí)行各種驗(yàn)證。
我們必須承認(rèn)這種解壓縮對(duì)Broker端性能是有一定影響的,特別是對(duì)CPU的使用率而言。
「各種壓縮算法對(duì)比」
在Kafka 2.1.0版本之前,Kafka支持3種壓縮算法:GZIP、Snappy和LZ4。
從2.1.0開(kāi)始,Kafka正式支持Zstandard算法(簡(jiǎn)寫(xiě)為zstd)。
它是Facebook開(kāi)源的一個(gè)壓縮算法,能夠提供超高的壓縮比。
在實(shí)際使用中,GZIP、Snappy、LZ4和zstd的表現(xiàn)各有千秋。
但對(duì)于Kafka而言,在吞吐量方面:LZ4 > Snappy > zstd和GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
具體到物理資源,使用Snappy算法占用的網(wǎng)絡(luò)帶寬最多,zstd最少;
在CPU使用率方面,各個(gè)算法表現(xiàn)得差不多,只是在壓縮時(shí)Snappy算法使用的CPU較多一些,而在解壓縮時(shí)GZIP算法則可能使用更多的CPU。
「最佳實(shí)踐」
何時(shí)啟用壓縮是比較合適的時(shí)機(jī)呢?
啟用壓縮的一個(gè)條件就是Producer程序運(yùn)行機(jī)器上的CPU資源要很充足。
除了CPU資源充足這一條件,如果你的環(huán)境中帶寬資源有限,那么建議你開(kāi)啟壓縮。
消費(fèi)者組
「Consumer Group是Kafka提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制」。
既然是一個(gè)組,那么組內(nèi)必然可以有多個(gè)消費(fèi)者或消費(fèi)者實(shí)例,它們共享一個(gè)公共的ID,這個(gè)ID被稱(chēng)為Group ID。
組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來(lái)消費(fèi)訂閱主題的所有分區(qū)。
- 每個(gè)分區(qū)只能由同一個(gè)消費(fèi)者組內(nèi)的一個(gè)Consumer實(shí)例來(lái)消費(fèi)。
「Consumer Group三個(gè)特性:」
- Consumer Group下可以有一個(gè)或多個(gè)Consumer實(shí)例,這里的實(shí)例可以是一個(gè)單獨(dú)的進(jìn)程,也可以是同一進(jìn)程下的線(xiàn)程。
- Group ID是一個(gè)字符串,在一個(gè)Kafka集群中,它標(biāo)識(shí)唯一的一個(gè)Consumer Group。
- Consumer Group下所有實(shí)例訂閱的主題的單個(gè)分區(qū),只能分配給組內(nèi)的某個(gè)Consumer實(shí)例消費(fèi),這個(gè)分區(qū)當(dāng)然也可以被其他的Group消費(fèi)。
當(dāng)Consumer Group訂閱了多個(gè)主題后,組內(nèi)的每個(gè)實(shí)例不要求一定要訂閱主題的所有分區(qū),它只會(huì)消費(fèi)部分分區(qū)中的消息。
Consumer Group之間彼此獨(dú)立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。
「Kafka僅僅使用Consumer Group這一種機(jī)制,卻同時(shí)實(shí)現(xiàn)了傳統(tǒng)消息引擎系統(tǒng)的兩大模型」:
如果所有實(shí)例都屬于同一個(gè)Group,那么它實(shí)現(xiàn)的就是消息隊(duì)列模型;
如果所有實(shí)例分別屬于不同的Group,那么它實(shí)現(xiàn)的就是發(fā)布/訂閱模型。
「一個(gè)Group下該有多少個(gè)Consumer實(shí)例呢?」
「理想情況下,Consumer實(shí)例的數(shù)量應(yīng)該等于該Group訂閱主題的分區(qū)總數(shù)。」
假設(shè)一個(gè)Consumer Group訂閱了3個(gè)主題,分別是A、B、C,它們的分區(qū)數(shù)依次是1、2、3,那么通常情況下,為該Group設(shè)置6個(gè)Consumer實(shí)例是比較理想的情形,因?yàn)樗茏畲笙薅鹊貙?shí)現(xiàn)高伸縮性。
「針對(duì)Consumer Group,Kafka是怎么管理位移的呢?」
「位移Offset」
老版本的Consumer Group把位移保存在ZooKeeper中。
Apache ZooKeeper是一個(gè)分布式的協(xié)調(diào)服務(wù)框架,Kafka重度依賴(lài)它實(shí)現(xiàn)各種各樣的協(xié)調(diào)管理。
將位移保存在ZooKeeper外部系統(tǒng)的做法,最顯而易見(jiàn)的好處就是減少了Kafka Broker端的狀態(tài)保存開(kāi)銷(xiāo)。
不過(guò),慢慢地發(fā)現(xiàn)了一個(gè)問(wèn)題,即ZooKeeper這類(lèi)元框架其實(shí)并不適合進(jìn)行頻繁的寫(xiě)更新,而Consumer Group的位移更新卻是一個(gè)非常頻繁的操作。
這種大吞吐量的寫(xiě)操作會(huì)極大地拖慢ZooKeeper集群的性能。
于是,在新版本的Consumer Group中,Kafka社區(qū)重新設(shè)計(jì)了Consumer Group的位移管理方式,采用了將位移保存在Kafka內(nèi)部主題的方法。
這個(gè)內(nèi)部主題就是__consumer_offsets。
消費(fèi)者策略
「第一種是Round」
默認(rèn),也叫輪循,說(shuō)的是對(duì)于同一組消費(fèi)者來(lái)說(shuō),使用輪訓(xùn)分配的方式,決定消費(fèi)者消費(fèi)的分區(qū)
「第二種叫做Range」
對(duì)一個(gè)消費(fèi)者組來(lái)說(shuō)決定消費(fèi)方式是以分區(qū)總數(shù)除以消費(fèi)者總數(shù)來(lái)決定,一般如果不能整除,往往是從頭開(kāi)始將剩余的分區(qū)分配開(kāi)
「第三種叫Sticky」
是在0.11.x,新增的,它和前面兩個(gè)不是很一樣,它是在Range上的一種升華,且前面兩個(gè)當(dāng)同組內(nèi)有新的消費(fèi)者加入或者舊的消費(fèi)者退出的時(shí)候,會(huì)從新開(kāi)始決定消費(fèi)者消費(fèi)方式,但是Sticky,在同組中有新的新的消費(fèi)者加入或者舊的消費(fèi)者退出時(shí),不會(huì)直接開(kāi)始新的Range分配,而是保留現(xiàn)有消費(fèi)者原來(lái)的消費(fèi)策略,將退出的消費(fèi)者所消費(fèi)的分區(qū)平均分配給現(xiàn)有消費(fèi)者,新增消費(fèi)者同理,同其他現(xiàn)存消費(fèi)者的消費(fèi)策略中分離
位移提交
假設(shè)一個(gè)分區(qū)中有10條消息,位移分別是0到9。
某個(gè)Consumer應(yīng)用已消費(fèi)了5條消息,這就說(shuō)明該Consumer消費(fèi)了位移為0到4的5條消息,此時(shí)Consumer的位移是5,指向了下一條消息的位移。
因?yàn)镃onsumer能夠同時(shí)消費(fèi)多個(gè)分區(qū)的數(shù)據(jù),所以位移的提交實(shí)際上是在分區(qū)粒度上進(jìn)行的,即「Consumer需要為分配給它的每個(gè)分區(qū)提交各自的位移數(shù)據(jù)」。
「位移提交分為自動(dòng)提交和手動(dòng)提交;從Consumer端的角度來(lái)說(shuō),位移提交分為同步提交和異步提交」。
開(kāi)啟自動(dòng)提交位移的方法:Consumer端有個(gè)參數(shù)enable.auto.commit,把它設(shè)置為true或者壓根不設(shè)置它就可以了。
因?yàn)樗哪J(rèn)值就是true,即Java Consumer默認(rèn)就是自動(dòng)提交位移的。
如果啟用了自動(dòng)提交,Consumer端還有個(gè)參數(shù):auto.commit.interval.ms。
它的默認(rèn)值是5秒,表明Kafka每5秒會(huì)為你自動(dòng)提交一次位移。
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("group.id", "test");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "2000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList("foo", "bar"));
- while (true) {
- ConsumerRecords records = consumer.poll(100);
- for (ConsumerRecord record : records)
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
上面的第3、第4行代碼,就是開(kāi)啟自動(dòng)提交位移的方法。
開(kāi)啟手動(dòng)提交位移的方法就是設(shè)置enable.auto.commit為false。
還需要調(diào)用相應(yīng)的API手動(dòng)提交位移。最簡(jiǎn)單的API就是「KafkaConsumer#commitSync()」。
該方法會(huì)提交KafkaConsumer#poll()返回的最新位移。
從名字上來(lái)看,它是一個(gè)同步操作,即該方法會(huì)一直等待,直到位移被成功提交才會(huì)返回。
如果提交過(guò)程中出現(xiàn)異常,該方法會(huì)將異常信息拋出。
下面這段代碼展示了commitSync()的使用方法:
- while (true) {
- ConsumerRecords records =
- consumer.poll(Duration.ofSeconds(1));
- process(records); // 處理消息
- try {
- consumer.commitSync();
- } catch (CommitFailedException e) {
- handle(e); // 處理提交失敗異常
- }
- }
一旦設(shè)置了enable.auto.commit為true,Kafka會(huì)保證在開(kāi)始調(diào)用poll方法時(shí),提交上次poll返回的所有消息。
從順序上來(lái)說(shuō),poll方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現(xiàn)消費(fèi)丟失的情況。
但自動(dòng)提交位移的一個(gè)問(wèn)題在于,
「它可能會(huì)出現(xiàn)重復(fù)消費(fèi)」。
而手動(dòng)提交位移,它的好處就在于更加靈活,你完全能夠把控位移提交的時(shí)機(jī)和頻率。
但是,它也有一個(gè)缺陷,就是在調(diào)用commitSync()時(shí),Consumer程序會(huì)處于阻塞狀態(tài),直到遠(yuǎn)端的Broker返回提交結(jié)果,這個(gè)狀態(tài)才會(huì)結(jié)束。
鑒于這個(gè)問(wèn)題,Kafka社區(qū)為手動(dòng)提交位移提供了另一個(gè)API方法:
「KafkaConsumer#commitAsync()」。
從名字上來(lái)看它就不是同步的,而是一個(gè)異步操作。
調(diào)用commitAsync()之后,它會(huì)立即返回,不會(huì)阻塞,因此不會(huì)影響Consumer應(yīng)用的TPS。
由于它是異步的,Kafka提供了回調(diào)函數(shù)(callback),供你實(shí)現(xiàn)提交之后的邏輯,比如記錄日志或處理異常等。
下面這段代碼展示了調(diào)用commitAsync()的方法:
- while (true) {
- ConsumerRecords records =
- consumer.poll(Duration.ofSeconds(1));
- process(records); // 處理消息
- consumer.commitAsync((offsets, exception) -> {
- if (exception != null)
- handle(exception);
- });
- }
commitAsync的問(wèn)題在于,出現(xiàn)問(wèn)題時(shí)它不會(huì)自動(dòng)重試。
顯然,如果是手動(dòng)提交,我們需要將commitSync和commitAsync組合使用才能到達(dá)最理想的效果,原因有兩個(gè):
- 我們可以利用commitSync的自動(dòng)重試來(lái)規(guī)避那些瞬時(shí)錯(cuò)誤,比如網(wǎng)絡(luò)的瞬時(shí)抖動(dòng),Broker端GC等,因?yàn)檫@些問(wèn)題都是短暫的,自動(dòng)重試通常都會(huì)成功。
- 我們不希望程序總處于阻塞狀態(tài),影響TPS。
我們來(lái)看一下下面這段代碼,它展示的是如何將兩個(gè)API方法結(jié)合使用進(jìn)行手動(dòng)提交。
- try {
- while(true) {
- ConsumerRecords records =
- consumer.poll(Duration.ofSeconds(1));
- process(records); // 處理消息
- commitAysnc(); // 使用異步提交規(guī)避阻塞
- }
- } catch(Exception e) {
- handle(e); // 處理異常
- } finally {
- try {
- consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
- } finally {
- consumer.close();
- }
- }
這樣一個(gè)場(chǎng)景:你的poll方法返回的不是500條消息,而是5000條。
那么,你肯定不想把這5000條消息都處理完之后再提交位移,因?yàn)橐坏┲虚g出現(xiàn)差錯(cuò),之前處理的全部都要重來(lái)一遍。
比如前面這個(gè)5000條消息的例子,你可能希望每處理完100條消息就提交一次位移,這樣能夠避免大批量的消息重新消費(fèi)。
Kafka Consumer API為手動(dòng)提交提供了這樣的方法:commitSync(Map)和commitAsync(Map)。
它們的參數(shù)是一個(gè)Map對(duì)象,鍵就是TopicPartition,即消費(fèi)的分區(qū),而值是一個(gè)OffsetAndMetadata對(duì)象,保存的主要是位移數(shù)據(jù)。
- 如何每處理100條消息就提交一次位移呢?
在這里,我以commitAsync為例,展示一段代碼,實(shí)際上,commitSync的調(diào)用方法和它是一模一樣的。
- private Map offsets = new HashMap<>();
- int count = 0;
- ……
- while (true) {
- ConsumerRecords records =
- consumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord record: records) {
- process(record); // 處理消息
- offsets.put(new TopicPartition(record.topic(), record.partition()),
- new OffsetAndMetadata(record.offset() + 1);
- if(count % 100 == 0)
- consumer.commitAsync(offsets, null); // 回調(diào)處理邏輯是null
- count++;
- }
- }
程序先是創(chuàng)建了一個(gè)Map對(duì)象,用于保存Consumer消費(fèi)處理過(guò)程中要提交的分區(qū)位移,之后開(kāi)始逐條處理消息,并構(gòu)造要提交的位移值。
代碼的最后部分是做位移的提交。設(shè)置了一個(gè)計(jì)數(shù)器,每累計(jì)100條消息就統(tǒng)一提交一次位移。
與調(diào)用無(wú)參的commitAsync不同,這里調(diào)用了帶Map對(duì)象參數(shù)的commitAsync進(jìn)行細(xì)粒度的位移提交。
這樣,這段代碼就能夠?qū)崿F(xiàn)每處理100條消息就提交一次位移,不用再受poll方法返回的消息總數(shù)的限制了。
重平衡
「(重平衡)Rebalance本質(zhì)上是一種協(xié)議,規(guī)定了一個(gè)Consumer Group下的所有Consumer如何達(dá)成一致,來(lái)分配訂閱Topic的每個(gè)分區(qū)」。
比如某個(gè)Group下有20個(gè)Consumer實(shí)例,它訂閱了一個(gè)具有100個(gè)分區(qū)的Topic。
正常情況下,Kafka平均會(huì)為每個(gè)Consumer分配5個(gè)分區(qū)。這個(gè)分配的過(guò)程就叫Rebalance。
「Rebalance的觸發(fā)條件有3個(gè)?!?/h3>- 組成員數(shù)發(fā)生變更。比如有新的Consumer實(shí)例加入組或者離開(kāi)組,或是有Consumer實(shí)例崩潰被踢出組。
- 訂閱主題數(shù)發(fā)生變更。Consumer Group可以使用正則表達(dá)式的方式訂閱主題,比如consumer.subscribe(Pattern.compile(“t.*c”))就表明該Group訂閱所有以字母t開(kāi)頭、字母c結(jié)尾的主題,在Consumer Group的運(yùn)行過(guò)程中,你新創(chuàng)建了一個(gè)滿(mǎn)足這樣條件的主題,那么該Group就會(huì)發(fā)生Rebalance。
- 訂閱主題的分區(qū)數(shù)發(fā)生變更。Kafka當(dāng)前只能允許增加一個(gè)主題的分區(qū)數(shù),當(dāng)分區(qū)數(shù)增加時(shí),就會(huì)觸發(fā)訂閱該主題的所有Group開(kāi)啟Rebalance。
Rebalance發(fā)生時(shí),Group下所有的Consumer實(shí)例都會(huì)協(xié)調(diào)在一起共同參與。
「分配策略」
當(dāng)前Kafka默認(rèn)提供了3種分配策略,每種策略都有一定的優(yōu)勢(shì)和劣勢(shì),社區(qū)會(huì)不斷地完善這些策略,保證提供最公平的分配策略,即每個(gè)Consumer實(shí)例都能夠得到較為平均的分區(qū)數(shù)。
比如一個(gè)Group內(nèi)有10個(gè)Consumer實(shí)例,要消費(fèi)100個(gè)分區(qū),理想的分配策略自然是每個(gè)實(shí)例平均得到10個(gè)分區(qū)。
這就叫公平的分配策略。
舉個(gè)簡(jiǎn)單的例子來(lái)說(shuō)明一下Consumer Group發(fā)生Rebalance的過(guò)程。
假設(shè)目前某個(gè)Consumer Group下有兩個(gè)Consumer,比如A和B,當(dāng)?shù)谌齻€(gè)成員C加入時(shí),Kafka會(huì)觸發(fā)Rebalance,并根據(jù)默認(rèn)的分配策略重新為A、B和C分配分區(qū)
Rebalance之后的分配依然是公平的,即每個(gè)Consumer實(shí)例都獲得了2個(gè)分區(qū)的消費(fèi)權(quán)。
在Rebalance過(guò)程中,所有Consumer實(shí)例都會(huì)停止消費(fèi),等待Rebalance完成,這是Rebalance為人詬病的一個(gè)方面。
目前Rebalance的設(shè)計(jì)是所有Consumer實(shí)例共同參與,全部重新分配所有分區(qū)。
「Coordinator會(huì)在什么情況下認(rèn)為某個(gè)Consumer實(shí)例已掛從而要退組呢?」
當(dāng)Consumer Group完成Rebalance之后,每個(gè)Consumer實(shí)例都會(huì)定期地向Coordinator發(fā)送心跳請(qǐng)求,表明它還存活著。
如果某個(gè)Consumer實(shí)例不能及時(shí)地發(fā)送這些心跳請(qǐng)求,Coordinator就會(huì)認(rèn)為該Consumer已經(jīng)死了,從而將其從Group中移除,然后開(kāi)啟新一輪Rebalance。
Consumer端有個(gè)參數(shù),叫session.timeout.ms。
該參數(shù)的默認(rèn)值是10秒,即如果Coordinator在10秒之內(nèi)沒(méi)有收到Group下某Consumer實(shí)例的心跳,它就會(huì)認(rèn)為這個(gè)Consumer實(shí)例已經(jīng)掛了。
除了這個(gè)參數(shù),Consumer還提供了一個(gè)允許你控制發(fā)送心跳請(qǐng)求頻率的參數(shù),就是heartbeat.interval.ms。
這個(gè)值設(shè)置得越小,Consumer實(shí)例發(fā)送心跳請(qǐng)求的頻率就越高。
頻繁地發(fā)送心跳請(qǐng)求會(huì)額外消耗帶寬資源,但好處是能夠更加快速地知曉當(dāng)前是否開(kāi)啟Rebalance,因?yàn)?,目前Coordinator通知各個(gè)Consumer實(shí)例開(kāi)啟Rebalance的方法,就是將REBALANCE_NEEDED標(biāo)志封裝進(jìn)心跳請(qǐng)求的響應(yīng)體中。
除了以上兩個(gè)參數(shù),Consumer端還有一個(gè)參數(shù),用于控制Consumer實(shí)際消費(fèi)能力對(duì)Rebalance的影響,即max.poll.interval.ms參數(shù)。
它限定了Consumer端應(yīng)用程序兩次調(diào)用poll方法的最大時(shí)間間隔。
它的默認(rèn)值是5分鐘,表示你的Consumer程序如果在5分鐘之內(nèi)無(wú)法消費(fèi)完poll方法返回的消息,那么Consumer會(huì)主動(dòng)發(fā)起離開(kāi)組的請(qǐng)求,Coordinator也會(huì)開(kāi)啟新一輪Rebalance。
「可避免Rebalance的配置」
第一類(lèi)Rebalance是因?yàn)槲茨芗皶r(shí)發(fā)送心跳,導(dǎo)致Consumer被踢出Group而引發(fā)的
因此可以設(shè)置「session.timeout.ms和heartbeat.interval.ms」的值。
- 設(shè)置session.timeout.ms = 6s。
- 設(shè)置heartbeat.interval.ms = 2s。
- 要保證Consumer實(shí)例在被判定為dead之前,能夠發(fā)送至少3輪的心跳請(qǐng)求,即session.timeout.ms >= 3 * heartbeat.interval.ms。
將session.timeout.ms設(shè)置成6s主要是為了讓Coordinator能夠更快地定位已經(jīng)掛掉的Consumer。
「第二類(lèi)Rebalance是Consumer消費(fèi)時(shí)間過(guò)長(zhǎng)導(dǎo)致的」。
你要為你的業(yè)務(wù)處理邏輯留下充足的時(shí)間,這樣Consumer就不會(huì)因?yàn)樘幚磉@些消息的時(shí)間太長(zhǎng)而引發(fā)Rebalance了。
ConsumerOffsets
「Kafka將Consumer的位移數(shù)據(jù)作為一條條普通的Kafka消息,提交到__consumer_offsets中?!?/h3>「__consumer_offsets的主要作用是保存Kafka消費(fèi)者的位移信息?!?/h3>
它要求這個(gè)提交過(guò)程不僅要實(shí)現(xiàn)高持久性,還要支持高頻的寫(xiě)操作。
__consumer_offsets主題就是普通的Kafka主題。你可以手動(dòng)地創(chuàng)建它、修改它,甚至是刪除它。
雖說(shuō)__consumer_offsets主題是一個(gè)普通的Kafka主題,但「它的消息格式卻是Kafka自己定義的」,用戶(hù)不能修改,也就是說(shuō)你不能隨意地向這個(gè)主題寫(xiě)消息,因?yàn)橐坏┠銓?xiě)入的消息不滿(mǎn)足Kafka規(guī)定的格式,那么Kafka內(nèi)部無(wú)法成功解析,就會(huì)造成Broker的崩潰。
Kafka Consumer有API幫你提交位移,也就是向__consumer_offsets主題寫(xiě)消息,千萬(wàn)不要自己寫(xiě)個(gè)Producer隨意向該主題發(fā)送消息。
__consumer_offsets有3種消息格式:
- 用于保存Consumer Group信息的消息。
- 用于刪除Group過(guò)期位移甚至是刪除Group的消息。
- 保存了位移值。
第2種格式它有個(gè)專(zhuān)屬的名字:tombstone消息,即墓碑消息,也稱(chēng)delete mark,它的主要特點(diǎn)是它的消息體是null,即空消息體。
一旦某個(gè)Consumer Group下的所有Consumer實(shí)例都停止了,而且它們的位移數(shù)據(jù)都已被刪除時(shí),Kafka會(huì)向__consumer_offsets主題的對(duì)應(yīng)分區(qū)寫(xiě)入tombstone消息,表明要徹底刪除這個(gè)Group的信息。
__consumer_offsets是怎么被創(chuàng)建的?
通常來(lái)說(shuō),「當(dāng)Kafka集群中的第一個(gè)Consumer程序啟動(dòng)時(shí),Kafka會(huì)自動(dòng)創(chuàng)建位移主題」。
「默認(rèn)該主題的分區(qū)數(shù)是50,副本數(shù)是3」。
目前Kafka Consumer提交位移的方式有兩種:「自動(dòng)提交位移和手動(dòng)提交位移。」
Consumer端有個(gè)參數(shù)叫enable.auto.commit,如果值是true,則Consumer在后臺(tái)默默地為你定期提交位移,提交間隔由一個(gè)專(zhuān)屬的參數(shù)auto.commit.interval.ms來(lái)控制。
自動(dòng)提交位移有一個(gè)顯著的優(yōu)點(diǎn),就是省事,你不用操心位移提交的事情,就能保證消息消費(fèi)不會(huì)丟失。
但這一點(diǎn)同時(shí)也是缺點(diǎn),喪失了很大的靈活性和可控性,你完全沒(méi)法把控Consumer端的位移管理。
Kafka Consumer API為你提供了位移提交的方法,如consumer.commitSync等。
當(dāng)調(diào)用這些方法時(shí),Kafka會(huì)向__consumer_offsets主題寫(xiě)入相應(yīng)的消息。
如果你選擇的是自動(dòng)提交位移,那么就可能存在一個(gè)問(wèn)題:只要Consumer一直啟動(dòng)著,它就會(huì)無(wú)限期地向位移主題寫(xiě)入消息。
「舉個(gè)極端一點(diǎn)的例子?!?/h3>
假設(shè)Consumer當(dāng)前消費(fèi)到了某個(gè)主題的最新一條消息,位移是100,之后該主題沒(méi)有任何新消息產(chǎn)生,故Consumer無(wú)消息可消費(fèi)了,所以位移永遠(yuǎn)保持在100。
由于是自動(dòng)提交位移,位移主題中會(huì)不停地寫(xiě)入位移=100的消息。
顯然Kafka只需要保留這類(lèi)消息中的最新一條就可以了,之前的消息都是可以刪除的。
這就要求Kafka必須要有針對(duì)位移主題消息特點(diǎn)的消息刪除策略,否則這種消息會(huì)越來(lái)越多,最終撐爆整個(gè)磁盤(pán)。
「Compact策略」
Kafka使用「Compact策略」來(lái)刪除__consumer_offsets主題中的過(guò)期消息,避免該主題無(wú)限期膨脹。
比如對(duì)于同一個(gè)Key的兩條消息M1和M2,如果M1的發(fā)送時(shí)間早于M2,那么M1就是過(guò)期消息。
Compact的過(guò)程就是掃描日志的所有消息,剔除那些過(guò)期的消息,然后把剩下的消息整理在一起。
我在這里貼一張來(lái)自官網(wǎng)的圖片,來(lái)說(shuō)明Compact過(guò)程。
圖中位移為0、2和3的消息的Key都是K1,Compact之后,分區(qū)只需要保存位移為3的消息,因?yàn)樗亲钚掳l(fā)送的。
「Kafka提供了專(zhuān)門(mén)的后臺(tái)線(xiàn)程定期地巡檢待Compact的主題,看看是否存在滿(mǎn)足條件的可刪除數(shù)據(jù)」。
這個(gè)后臺(tái)線(xiàn)程叫Log Cleaner。
很多實(shí)際生產(chǎn)環(huán)境中都出現(xiàn)過(guò)位移主題無(wú)限膨脹占用過(guò)多磁盤(pán)空間的問(wèn)題,如果你的環(huán)境中也有這個(gè)問(wèn)題,建議你去檢查一下Log Cleaner線(xiàn)程的狀態(tài),通常都是這個(gè)線(xiàn)程掛掉了導(dǎo)致的。
副本機(jī)制
根據(jù)Kafka副本機(jī)制的定義,同一個(gè)分區(qū)下的所有副本保存有相同的消息序列,這些副本分散保存在不同的Broker上,從而能夠?qū)共糠諦roker宕機(jī)帶來(lái)的數(shù)據(jù)不可用。
下面展示的是一個(gè)有3臺(tái)Broker的Kafka集群上的副本分布情況。
從這張圖中,我們可以看到,主題1分區(qū)0的3個(gè)副本分散在3臺(tái)Broker上,其他主題分區(qū)的副本也都散落在不同的Broker上,從而實(shí)現(xiàn)數(shù)據(jù)冗余。
「副本角色」
在Kafka中,副本分成兩類(lèi):領(lǐng)導(dǎo)者副本(Leader Replica)和追隨者副本(Follower Replica)。
每個(gè)分區(qū)在創(chuàng)建時(shí)都要選舉一個(gè)副本,稱(chēng)為領(lǐng)導(dǎo)者副本,其余的副本自動(dòng)稱(chēng)為追隨者副本。
在Kafka中,追隨者副本是不對(duì)外提供服務(wù)的。這就是說(shuō),任何一個(gè)追隨者副本都不能響應(yīng)消費(fèi)者和生產(chǎn)者的讀寫(xiě)請(qǐng)求。所有的請(qǐng)求都必須由領(lǐng)導(dǎo)者副本來(lái)處理,或者說(shuō),所有的讀寫(xiě)請(qǐng)求都必須發(fā)往領(lǐng)導(dǎo)者副本所在的Broker,由該Broker負(fù)責(zé)處理。
追隨者副本不處理客戶(hù)端請(qǐng)求,它唯一的任務(wù)就是從領(lǐng)導(dǎo)者副本「異步拉取」消息,并寫(xiě)入到自己的提交日志中,從而實(shí)現(xiàn)與領(lǐng)導(dǎo)者副本的同步。
當(dāng)領(lǐng)導(dǎo)者副本掛掉了,或者說(shuō)領(lǐng)導(dǎo)者副本所在的Broker宕機(jī)時(shí),Kafka依托于ZooKeeper提供的監(jiān)控功能能夠?qū)崟r(shí)感知到,并立即開(kāi)啟新一輪的領(lǐng)導(dǎo)者選舉,從追隨者副本中選一個(gè)作為新的領(lǐng)導(dǎo)者。老Leader副本重啟回來(lái)后,只能作為追隨者副本加入到集群中。
對(duì)于客戶(hù)端用戶(hù)而言,Kafka的追隨者副本沒(méi)有任何作用,Kafka為什么要這樣設(shè)計(jì)呢?
這種副本機(jī)制有兩個(gè)方面的好處。
1.「方便實(shí)現(xiàn)Read-your-writes」。
所謂Read-your-writes,顧名思義就是,當(dāng)你使用生產(chǎn)者API向Kafka成功寫(xiě)入消息后,馬上使用消費(fèi)者API去讀取剛才生產(chǎn)的消息。
2.「方便實(shí)現(xiàn)單調(diào)讀(Monotonic Reads)」。
假設(shè)當(dāng)前有2個(gè)追隨者副本F1和F2,它們異步地拉取領(lǐng)導(dǎo)者副本數(shù)據(jù)。倘若F1拉取了Leader的最新消息而F2還未及時(shí)拉取,那么,此時(shí)如果有一個(gè)消費(fèi)者先從F1讀取消息之后又從F2拉取消息,它可能會(huì)看到這樣的現(xiàn)象:第一次消費(fèi)時(shí)看到的最新消息在第二次消費(fèi)時(shí)不見(jiàn)了,這就不是單調(diào)讀一致性。
但是,如果所有的讀請(qǐng)求都是由Leader來(lái)處理,那么Kafka就很容易實(shí)現(xiàn)單調(diào)讀一致性。
ISR機(jī)制
In-sync Replicas,也就是所謂的ISR副本集合。
ISR中的副本都是與Leader同步的副本,相反,不在ISR中的追隨者副本就被認(rèn)為是與Leader不同步的。
- 什么副本能夠進(jìn)入到ISR中呢?
Leader副本天然就在ISR中。也就是說(shuō),「ISR不只是追隨者副本集合,它必然包括Leader副本。甚至在某些情況下,ISR只有Leader這一個(gè)副本」。
另外,能夠進(jìn)入到ISR的追隨者副本要滿(mǎn)足一定的條件。
「通過(guò)Broker端參數(shù)replica.lag.time.max.ms參數(shù)值」。
這個(gè)參數(shù)的含義是Follower副本能夠落后Leader副本的最長(zhǎng)時(shí)間間隔,當(dāng)前默認(rèn)值是10秒。
這就是說(shuō),只要一個(gè)Follower副本落后Leader副本的時(shí)間不連續(xù)超過(guò)10秒,那么Kafka就認(rèn)為該Follower副本與Leader是同步的,即使此時(shí)Follower副本中保存的消息明顯少于Leader副本中的消息。
Follower副本唯一的工作就是不斷地從Leader副本拉取消息,然后寫(xiě)入到自己的提交日志中。
倘若該副本后面慢慢地追上了Leader的進(jìn)度,那么它是能夠重新被加回ISR的。
ISR是一個(gè)動(dòng)態(tài)調(diào)整的集合,而非靜態(tài)不變的。
Unclean領(lǐng)導(dǎo)者選舉「Kafka把所有不在ISR中的存活副本都稱(chēng)為非同步副本」。
通常來(lái)說(shuō),非同步副本落后Leader太多,因此,如果選擇這些副本作為新Leader,就可能出現(xiàn)數(shù)據(jù)的丟失。
畢竟,這些副本中保存的消息遠(yuǎn)遠(yuǎn)落后于老Leader中的消息。
在Kafka中,選舉這種副本的過(guò)程稱(chēng)為Unclean領(lǐng)導(dǎo)者選舉。
「Broker端參數(shù)unclean.leader.election.enable控制是否允許Unclean領(lǐng)導(dǎo)者選舉」。
開(kāi)啟Unclean領(lǐng)導(dǎo)者選舉可能會(huì)造成數(shù)據(jù)丟失,但好處是,它使得分區(qū)Leader副本一直存在,不至于停止對(duì)外提供服務(wù),因此提升了高可用性。反之,禁止Unclean領(lǐng)導(dǎo)者選舉的好處在于維護(hù)了數(shù)據(jù)的一致性,避免了消息丟失,但犧牲了高可用性。
副本選舉
對(duì)于kafka集群中對(duì)于任意的topic的分區(qū)以及副本leader的設(shè)定,都需要考慮到集群整體的負(fù)載能力的平衡性,會(huì)盡量分配每一個(gè)partition的副本leader在不同的broker中,這樣會(huì)避免多個(gè)leader在同一個(gè)broker,導(dǎo)致集群中的broker負(fù)載不平衡
kafka引入了優(yōu)先副本的概念,優(yōu)先副本的意思在A(yíng)R(分區(qū)中的所有副本)集合列表中的第一個(gè)副本,在理想狀態(tài)下該副本就是該分區(qū)的leader副本
例如kafka集群由3臺(tái)broker組成,創(chuàng)建了一個(gè)名為topic-partitions的topic,設(shè)置partition為3,副本數(shù)為3,partition0中AR列表為 [1,2,0],那么分區(qū)0的優(yōu)先副本為1
kafka使用多副本機(jī)制提高可靠性,但是只有l(wèi)eader副本對(duì)外提供讀寫(xiě)服務(wù),follow副本只是做消息同步。
「如果一個(gè)分區(qū)的leader副本不可用,就意味著整個(gè)分區(qū)不可用,此時(shí)需要從follower副本中選舉出新的leader副本提供服務(wù)」。
「在創(chuàng)建主題的時(shí)候,該分區(qū)的主題和副本會(huì)盡可能的均勻發(fā)布到kafka的各個(gè)broker上」。
比如我們?cè)诎?個(gè)broker節(jié)點(diǎn)的kafka集群上創(chuàng)建一個(gè)分區(qū)數(shù)為3,副本因子為3的主題topic-partitions時(shí),leader副本會(huì)均勻的分布在3臺(tái)broker節(jié)點(diǎn)上。
「針對(duì)同一個(gè)分區(qū),在同一個(gè)broker節(jié)點(diǎn)上不可能出現(xiàn)它的多個(gè)副本」。
我們可以把leader副本所在的節(jié)點(diǎn)叫作分區(qū)的leader節(jié)點(diǎn),把follower副本所在的節(jié)點(diǎn)叫作follower節(jié)點(diǎn)。
在上面的例子中,分區(qū)0的leader節(jié)點(diǎn)是broker1,分區(qū)1的leader節(jié)點(diǎn)是broker2,分區(qū)2的leader節(jié)點(diǎn)是broker0。
當(dāng)分區(qū)leader節(jié)點(diǎn)發(fā)生故障時(shí),其中的一個(gè)follower節(jié)點(diǎn)就會(huì)選舉為新的leader節(jié)點(diǎn)。
當(dāng)原來(lái)leader的節(jié)點(diǎn)恢復(fù)之后,它只能成為一個(gè)follower節(jié)點(diǎn),此時(shí)就導(dǎo)致了集群負(fù)載不均衡。
比如分區(qū)1的leader節(jié)點(diǎn)broker2崩潰了,此時(shí)選舉了在broker1上的分區(qū)1follower節(jié)點(diǎn)作為新的leader節(jié)點(diǎn)。
當(dāng)broker2重新恢復(fù)時(shí),此時(shí)的kafka集群狀態(tài)如下:
可以看到,此時(shí)broker1上負(fù)載更大,而broker2上沒(méi)有負(fù)載。
「為了解決上述負(fù)載不均衡的情況,kafka支持了優(yōu)先副本選舉,優(yōu)先副本指的是一個(gè)分區(qū)所在的AR集合的第一個(gè)副本」。
比如上面的分區(qū)1,它的AR集合是[2,0,1],表示分區(qū)1的優(yōu)先副本就是在broker2上。
理想情況下,優(yōu)先副本應(yīng)該就是leader副本,kafka保證了優(yōu)先副本的均衡分布,而這與broker節(jié)點(diǎn)宕機(jī)與否沒(méi)有關(guān)系。
「優(yōu)先副本選舉就是對(duì)分區(qū)leader副本進(jìn)行選舉的時(shí)候,盡可能讓優(yōu)先副本成為leader副本」,針對(duì)上述的情況,只要再觸發(fā)一次優(yōu)先副本選舉就能保證分區(qū)負(fù)載均衡。
kafka支持自動(dòng)優(yōu)先副本選舉功能,默認(rèn)每5分鐘觸發(fā)一次優(yōu)先副本選舉操作。
網(wǎng)絡(luò)通信模型
Broker 中有個(gè)Acceptor(mainReactor)監(jiān)聽(tīng)新連接的到來(lái),與新連接建連之后輪詢(xún)選擇一個(gè)Processor(subReactor)管理這個(gè)連接。
而Processor會(huì)監(jiān)聽(tīng)其管理的連接,當(dāng)事件到達(dá)之后,讀取封裝成Request,并將Request放入共享請(qǐng)求隊(duì)列中。
然后IO線(xiàn)程池不斷的從該隊(duì)列中取出請(qǐng)求,執(zhí)行真正的處理。處理完之后將響應(yīng)發(fā)送到對(duì)應(yīng)的Processor的響應(yīng)隊(duì)列中,然后由Processor將Response返還給客戶(hù)端。
每個(gè)listener只有一個(gè)Acceptor線(xiàn)程,因?yàn)樗皇亲鳛樾逻B接建連再分發(fā),沒(méi)有過(guò)多的邏輯,很輕量。
Processor 在Kafka中稱(chēng)之為網(wǎng)絡(luò)線(xiàn)程,默認(rèn)網(wǎng)絡(luò)線(xiàn)程池有3個(gè)線(xiàn)程,對(duì)應(yīng)的參數(shù)是num.network.threads,并且可以根據(jù)實(shí)際的業(yè)務(wù)動(dòng)態(tài)增減。
還有個(gè) IO 線(xiàn)程池,即KafkaRequestHandlerPool,執(zhí)行真正的處理,對(duì)應(yīng)的參數(shù)是num.io.threads,默認(rèn)值是 8。
IO線(xiàn)程處理完之后會(huì)將Response放入對(duì)應(yīng)的Processor中,由Processor將響應(yīng)返還給客戶(hù)端。
可以看到網(wǎng)絡(luò)線(xiàn)程和IO線(xiàn)程之間利用的經(jīng)典的生產(chǎn)者 - 消費(fèi)者模式,不論是用于處理Request的共享請(qǐng)求隊(duì)列,還是IO處理完返回的Response。
冪等性
「冪等性Producer」
在Kafka中,Producer默認(rèn)不是冪等性的,但我們可以創(chuàng)建冪等性Producer。
它其實(shí)是0.11.0.0版本引入的新功能,在此之前,Kafka向分區(qū)發(fā)送數(shù)據(jù)時(shí),可能會(huì)出現(xiàn)同一條消息被發(fā)送了多次,導(dǎo)致消息重復(fù)的情況。
在0.11之后,指定Producer冪等性的方法很簡(jiǎn)單,僅需要設(shè)置一個(gè)參數(shù)即可,即
- props.put(“enable.idempotence”, ture),
- 或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
enable.idempotence被設(shè)置成true后,Producer自動(dòng)升級(jí)成冪等性Producer,其他所有的代碼邏輯都不需要改變。
Kafka自動(dòng)幫你做消息的重復(fù)去重。
底層具體的原理很簡(jiǎn)單,就是經(jīng)典的用空間去換時(shí)間的優(yōu)化思路,即在Broker端多保存一些字段。
當(dāng)Producer發(fā)送了具有相同字段值的消息后,Broker能夠自動(dòng)知曉這些消息已經(jīng)重復(fù)了,于是可以在后臺(tái)默默地把它們丟棄掉。
「冪等性Producer的作用范圍」
首先,它只能保證單分區(qū)上的冪等性,即一個(gè)冪等性Producer能夠保證某個(gè)主題的一個(gè)分區(qū)上不出現(xiàn)重復(fù)消息,它無(wú)法實(shí)現(xiàn)多個(gè)分區(qū)的冪等性。
其次,它只能實(shí)現(xiàn)單會(huì)話(huà)上的冪等性,不能實(shí)現(xiàn)跨會(huì)話(huà)的冪等性。
這里的會(huì)話(huà),你可以理解為Producer進(jìn)程的一次運(yùn)行,當(dāng)你重啟了Producer進(jìn)程之后,這種冪等性保證就喪失了。
事務(wù)
Kafka自0.11版本開(kāi)始也提供了對(duì)事務(wù)的支持,目前主要是在read committed隔離級(jí)別上做事情。
它能保證多條消息原子性地寫(xiě)入到目標(biāo)分區(qū),同時(shí)也能保證Consumer只能看到事務(wù)成功提交的消息。
「事務(wù)型Producer」
事務(wù)型Producer能夠保證將消息原子性地寫(xiě)入到多個(gè)分區(qū)中。
這批消息要么全部寫(xiě)入成功,要么全部失敗,另外,事務(wù)型Producer也不懼進(jìn)程的重啟。
Producer重啟回來(lái)后,Kafka依然保證它們發(fā)送消息的精確一次處理。
設(shè)置事務(wù)型Producer的方法也很簡(jiǎn)單,滿(mǎn)足兩個(gè)要求即可:
- 和冪等性Producer一樣,開(kāi)啟enable.idempotence = true。
- 設(shè)置Producer端參數(shù)transactional. id,最好為其設(shè)置一個(gè)有意義的名字。
此外,你還需要在Producer代碼中做一些調(diào)整,如這段代碼所示:
- producer.initTransactions();
- try {
- producer.beginTransaction();
- producer.send(record1);
- producer.send(record2);
- producer.commitTransaction();
- } catch (KafkaException e) {
- producer.abortTransaction();
- }
和普通Producer代碼相比,事務(wù)型Producer的顯著特點(diǎn)是調(diào)用了一些事務(wù)API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,它們分別對(duì)應(yīng)事務(wù)的初始化、事務(wù)開(kāi)始、事務(wù)提交以及事務(wù)終止。
這段代碼能夠保證Record1和Record2被當(dāng)作一個(gè)事務(wù)統(tǒng)一提交到Kafka,要么它們?nèi)刻峤怀晒?,要么全部?xiě)入失敗。
實(shí)際上即使寫(xiě)入失敗,Kafka也會(huì)把它們寫(xiě)入到底層的日志中,也就是說(shuō)Consumer還是會(huì)看到這些消息。
有一個(gè)isolation.level參數(shù),這個(gè)參數(shù)有兩個(gè)取值:
- read_uncommi
網(wǎng)站名稱(chēng):一篇帶給你Kafka核心知識(shí)總結(jié)!
文章URL:http://fisionsoft.com.cn/article/djciiep.html


咨詢(xún)
建站咨詢(xún)
