新聞中心
Kafka是由Apache軟件基金會(huì)開(kāi)發(fā)的一款開(kāi)源的分布式消息系統(tǒng),它的高吞吐率、高并發(fā)性能以及良好的水平擴(kuò)展性,使得它在數(shù)據(jù)處理領(lǐng)域中備受青睞。在Linux下使用Kafka,您可能需要查看一些與Kafka相關(guān)的信息,包括已安裝的Kafka版本、Topic、Partition和Broker等信息。那么,在本文中,我們將介紹如何在linux下查看kafka的相關(guān)信息。

查看已安裝的Kafka版本
在Linux中查看已安裝的Kafka版本可以通過(guò)以下命令實(shí)現(xiàn):
“`shell
$ kafka-topics.sh –version
“`
該命令將會(huì)返回當(dāng)前Kafka的版本信息,如下:
“`
kafka-topics.sh v2.4.0 (Commit: c57222ae8cd7866d)
“`
查看Topic信息
在Linux中查看Kafka的Topic信息可以通過(guò)以下命令實(shí)現(xiàn):
“`shell
$ kafka-topics.sh –describe –topic –zookeeper :
“`
該命令將會(huì)返回與指定Topic相關(guān)的所有信息,包括Partition數(shù)量、Replication Factor、Partition分布情況等,如下:
“`
Topic: PartitionCount:3 ReplicationFactor:2 Configs:
Topic: Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
“`
其中,“Leader”表示管理該P(yáng)artition的Broker,而“Replicas”表示Partition的副本集。
查看Broker信息
在Linux中查看Kafka的Broker信息可以通過(guò)以下命令實(shí)現(xiàn):
“`shell
$ kafka-broker-api-versions.sh –bootstrap-server : –command-config
“`
該命令將會(huì)返回當(dāng)前Kafka的Broker信息,包括API版本、Broker ID、版本等,如下:
“`
{ version: 4, leader_epoch: -1, attributes: 0 }
… omitted the rest …
( note that the output may include other revisions, you have to look it up).
“`
查看Partition信息
在Linux中查看Kafka的Partition信息可以通過(guò)以下命令實(shí)現(xiàn):
“`shell
$ kafka-run-class.sh kafka.tools.GetOffsetShell –broker-list : –topic –time -2
“`
該命令將會(huì)返回指定Topic的Partition相關(guān)信息,包括Offset信息,如下:
“`
topic_name:0:100
“`
其中,“100”是指該P(yáng)artition的最新Offset。
查看Consumer Group信息
在Linux中查看Kafka的Consumer Group信息可以通過(guò)以下命令實(shí)現(xiàn):
“`shell
$ kafka-consumer-groups.sh –bootstrap-server : –group –describe
“`
該命令將會(huì)返回指定Group Name的相關(guān)信息,包括當(dāng)前消費(fèi)者數(shù)量、Offset信息以及Lag信息等,如下:
“`
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test_topic 0 12023 12100 33 YjSIHz94Lua6a-EU0Ti9Xw-1593316979417 /192.168.0.13 YjSIHz94Lua6a-EU0Ti9Xw-1593316979417
“`
其中,“Consumer-ID”是指該消費(fèi)者當(dāng)前的ID,而“HOST”是指該消費(fèi)者所在的IP地址。
相關(guān)問(wèn)題拓展閱讀:
- 深入理解kafka(五)日志存儲(chǔ)
- Linux中Cache內(nèi)存占用過(guò)高清理
- Kafka數(shù)據(jù)丟失分析
深入理解kafka(五)日志存儲(chǔ)
5.1文件目錄布局
根目錄下有以下5個(gè)checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, meta.properties, recovery-point-offset-checkpoint, replication-offset-checkpoint
分區(qū)目錄下有以下目錄: 0000xxx.index(偏移量為64位長(zhǎng)整形,長(zhǎng)度固定為20位), 0000xxx.log, 0000xxx.timeindex.
還有可能包含.deleted .cleaned .swap等臨時(shí)文件, 以及可能的.snapshot .txnindex leader-epoch-checkpoint
5.2日志格式演變
5.2.1 v0版本
kafka0.10.0之前
RECORD_OVERHEAD包括offset(8B)和message size(4B)
RECORD包括:
crc32(4B):crc32校驗(yàn)值
magic(1B):消息版本號(hào)0
attributes(1B):消息屬性。低3位表示壓縮類型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(0.9.x引入)
key length(4B):表示消息的key的長(zhǎng)度。-1代表null
key: 可選
value length(4B):實(shí)際消息體的長(zhǎng)度。-1代表null
value: 消息體。可以為空,如墓族顫碑消息
5.2.2 v1版本
kafka0.10.0-0.11.0
比v0多了timestamp(8B)字段,表示消息的時(shí)間戳
attributes的第4位也被利用起來(lái),0表示timestamp的類型為CreateTime,1表示timestamp的類型為L(zhǎng)ogAppendTime
timestamp類型由broker端參數(shù)log.message.timestamp.type來(lái)配置,默認(rèn)為CreateTime,即采用生產(chǎn)者創(chuàng)建的時(shí)間戳
5.2.3 消息壓縮
保證端到端的壓縮,服務(wù)端配置compression.type,默認(rèn)為”producer”,表示保留生產(chǎn)者使用的壓縮方式,還可以配置為”gzip”,”snappy”,”lz4″
多條消息壓縮至value字段,以提高壓縮率
5.2.4 變長(zhǎng)字段
變長(zhǎng)整缺隱形(Varints):每一個(gè)字節(jié)都有一個(gè)位于更高位的m位(most significant bit),除了最后一個(gè)字節(jié)為1,其余都為0,字節(jié)倒序排列
為了使編碼更加高效,Varints使用ZigZag編碼:sint32對(duì)應(yīng) (n>31) sint64對(duì)應(yīng) (n>63)
5.2.5 v2版本
Record Batch
first offset:
length:
partition leader epoch:
magic:固定為2
attributes:兩個(gè)字節(jié)。低3位表示壓縮格式,第4位表示時(shí)間戳類型,第5位表示事務(wù)(0-非事務(wù)1-事務(wù)),第6位控制消息(0-非控制1控制)
first timestamp:
max timestamp:
producer id:
producer epoch:
first sequence:
records count:
v2版本的消息去掉了crc字段,另外增加了length(消息總長(zhǎng)度)、timestamp delta(時(shí)間戳增量)、offset delta(位移增伏穗廳量)和headers信息,并且棄用了attributes
Record
length:
attributes:棄用,但仍占據(jù)1B
timestamp delta:
offset delta:
headers:
5.3日志索引
稀疏索引(sparse index):每當(dāng)寫入一定量(broker端參數(shù)log.index.interval.bytes指定,默認(rèn)為4096B),偏移量索引文件和時(shí)間索引文件分別對(duì)應(yīng)一個(gè)索引項(xiàng)
日志段切分策略:
1.大小超過(guò)broker端參數(shù)log.segment.bytes配置的值,默認(rèn)為(1GB)
2.當(dāng)前日志段消息的更大時(shí)間戳與當(dāng)前系統(tǒng)的時(shí)間戳差值大于log.roll.ms或者log.roll.hours,ms優(yōu)先級(jí)高,默認(rèn)log.roll.hours=168(7天)
3.索引文件或者時(shí)間戳索引文件的大小大于log.index.size.max.bytes配置的值,默認(rèn)為(10MB)
4.偏移量差值(offset-baseOffset)>Integer.MAX_VALUE
5.3.1 偏移量索引
每個(gè)索引項(xiàng)占用8個(gè)字節(jié),分為兩個(gè)部分:1.relativeOffset相對(duì)偏移量(4B) 2.position物理地址(4B)
使用kafka-dump-log.sh腳本來(lái)解析.index文件(包括.timeindex、.snapshot、.txnindex等文件),如下:
bin/kafka-dump-log.sh –files /tmp/kafka-logs/topicId-0/00……00.index
如果broker端參數(shù)log.index.size.max.bytes不是8的倍數(shù),內(nèi)部會(huì)自動(dòng)轉(zhuǎn)換為8的倍數(shù)
5.3.2 時(shí)間戳索引
每個(gè)索引項(xiàng)占用12個(gè)字節(jié),分為兩個(gè)部分:1.timestamp當(dāng)前日志分段的更大時(shí)間戳(12B) 2.relativeOffset時(shí)間戳對(duì)應(yīng)的相對(duì)偏移量(4B)
如果broker端參數(shù)log.index.size.max.bytes不是12的倍數(shù),內(nèi)部會(huì)自動(dòng)轉(zhuǎn)換為12的倍數(shù)
5.4日志清理
日志清理策略可以控制到主題級(jí)別
5.4.1 日志刪除
broker端參數(shù)log.cleanup.policy設(shè)置為delete(默認(rèn)為delete)
檢測(cè)周期broker端參數(shù)log.retention.check.interval.ms=300000(默認(rèn)5分鐘)
1.基于時(shí)間
broker端參數(shù)log.retention.hours,log.retention.minutes,log.retention.ms,優(yōu)先級(jí)ms>minutes>hours
刪除時(shí)先增加.delete后綴,延遲刪除根據(jù)file.delete.delay.ms(默認(rèn)60000)配置
2.基于日志大小
日志總大小為broker端參數(shù)log.retention.bytes(默認(rèn)為-1,表示無(wú)窮大)
日志段大小為broker端參數(shù)log.segment.bytes(默認(rèn)為,1GB)
3.基于日志起始偏移量
DeleteRecordRequest請(qǐng)求
1.KafkaAdminClient的deleteRecord()
2.kafka-delete-record.sh腳本
5.4.2 日志壓縮
broker端參數(shù)log.cleanup.policy設(shè)置為compact,且log.cleaner.enable設(shè)置為true(默認(rèn)為true)
5.5磁盤存儲(chǔ)
相關(guān)測(cè)試:一個(gè)由6塊7200r/min的RAID-5陣列組成的磁盤簇的線性寫入600MB/s,隨機(jī)寫入100KB/s,隨機(jī)內(nèi)存寫入400MB/s,線性內(nèi)存3.6GB/s
5.5.1 頁(yè)緩存
Linux操作系統(tǒng)的vm.dirty_background_ratio參數(shù)用來(lái)指定臟頁(yè)數(shù)量達(dá)到系統(tǒng)的百分比之后就觸發(fā)pdflush/flush/kdmflush,一般小于10,不建議為0
vm.dirty_ratio表示臟頁(yè)百分比之后刷盤,但是阻塞新IO請(qǐng)求
kafka同樣提供同步刷盤及間斷性強(qiáng)制刷盤(fsync)功能,可以通過(guò)log.flush.interval.messages、log.flush.interval.ms等參數(shù)來(lái)控制
kafka不建議使用swap分區(qū),vm.swappiness參數(shù)上限為100,下限為0,建議設(shè)置為1
5.5.2 磁盤I/O流程
一般磁盤IO的場(chǎng)景有以下4種:
1.用戶調(diào)用標(biāo)準(zhǔn)C庫(kù)進(jìn)行IO操作,數(shù)據(jù)流為:應(yīng)用程序Buffer->C庫(kù)標(biāo)準(zhǔn)IOBuffer->文件系統(tǒng)也緩存->通過(guò)具體文件系統(tǒng)到磁盤
2.用戶調(diào)用文件IO,數(shù)據(jù)流為:應(yīng)用程序Buffer->文件系統(tǒng)也緩存->通過(guò)具體文件系統(tǒng)到磁盤
3.用戶打開(kāi)文件時(shí)使用O_DIRECT,繞過(guò)頁(yè)緩存直接讀寫磁盤
4.用戶使用類似dd工具,并使用direct參數(shù),繞過(guò)系統(tǒng)cache與文件系統(tǒng)直接讀寫磁盤
Linux系統(tǒng)中IO調(diào)度策略有4種:
1.NOOP:no operation
2.CFQ
3.DEADLINE
4.ANTICIPATORY
5.5.3 零拷貝
指數(shù)據(jù)直接從磁盤文件復(fù)制到網(wǎng)卡設(shè)備中,不需要經(jīng)應(yīng)用程序
對(duì)linux而言依賴于底層的sendfile()
對(duì)java而言,F(xiàn)ileChannal.transferTo()的底層實(shí)現(xiàn)就是sendfile()
Linux中Cache內(nèi)存占用過(guò)高清理
在Linux中每次用free查看的時(shí)候,發(fā)現(xiàn)free的空間都只有500M左右。同樣的環(huán)境32G只剩下這點(diǎn),64G的也只剩下這么一點(diǎn)。后來(lái)發(fā)現(xiàn)都被Cache占用了,因?yàn)榉?wù)器上運(yùn)行了Kafka環(huán)境,每周的日志文件都有一二百G的,估計(jì)就是他占用了page cache吧。
Free中的buffer和cache:(它們都是占用內(nèi)存):
buffer : 作為buffer cache的內(nèi)存,是塊設(shè)備的讀寫緩沖區(qū)
cache: 作為page cache的內(nèi)存, 文件系統(tǒng)的cache
如果亂鏈握 cache 的值很大,說(shuō)明cache住的文件數(shù)很多。如果頻繁訪問(wèn)到的文件都能被cache住,那么磁盤的讀IO bi會(huì)非常小。
Linux內(nèi)核會(huì)在內(nèi)存將要喚大耗盡的時(shí)候,觸發(fā)內(nèi)存回收的工作,以便釋放出內(nèi)存給急需內(nèi)存的進(jìn)程使用。也可以用動(dòng)釋放,釋放的時(shí)候需要對(duì)cache中的數(shù)據(jù)跟對(duì)應(yīng)文件中的數(shù)據(jù)一致。
釋放的方式有下以幾種
一般情況下釋放pagecache就可以了。這樣可以寫一個(gè)sh腳本來(lái)在服務(wù)器空閑的時(shí)候定時(shí)執(zhí)行
使用嘩慶crontab來(lái)設(shè)置定時(shí)任務(wù),如每天4點(diǎn)開(kāi)始清理
本文參考:
Kafka數(shù)據(jù)丟失分析
Kafka存在丟消息的問(wèn)題,消息丟失會(huì)發(fā)生在Broker,Producer和Consumer三種。
Broker丟失消息是由于Kafka本身的原因造成的,kafka為了得到更高的性能和吞吐量,將數(shù)據(jù)異步批量的存儲(chǔ)在磁盤中。消息的刷盤過(guò)程,為了提高性能,減少刷盤次數(shù),kafka采用了批量刷盤的做法。即,按照一定的消息量,和時(shí)間間隔進(jìn)行刷盤。這種機(jī)制也是由于linux操作系統(tǒng)決定的。將數(shù)據(jù)存儲(chǔ)到linux操作系統(tǒng)種,會(huì)先存儲(chǔ)到頁(yè)緩存(Page cache)中,按照時(shí)間或者其他條件進(jìn)行刷盤(從page cache到file),或者通過(guò)fsync命令強(qiáng)制刷盤。數(shù)據(jù)在page cache中時(shí),如果系統(tǒng)掛掉,數(shù)據(jù)會(huì)丟失。
Broker在linux服務(wù)器上高速讀寫以及同步到Replica
上圖簡(jiǎn)述了broker寫數(shù)據(jù)以及同步的一個(gè)過(guò)程。broker寫數(shù)據(jù)只寫到PageCache中,而pageCache位于內(nèi)存。這部分?jǐn)?shù)據(jù)在斷電后是會(huì)丟失的。pageCache的數(shù)據(jù)通過(guò)linux的flusher程序進(jìn)行刷盤。刷盤觸發(fā)條件有三:
Broker配置刷盤機(jī)制,是通過(guò)調(diào)用fsync函數(shù)接管了刷盤動(dòng)作。從單個(gè)Broker來(lái)看,pageCache的數(shù)據(jù)會(huì)丟失。
Kafka沒(méi)有提供同步刷盤的方式。同步刷盤在RocketMQ中有實(shí)現(xiàn),實(shí)現(xiàn)原理是將異步刷盤的流程進(jìn)行阻塞,等待響應(yīng),類似ajax的callback或者是java的future。下面是一段rocketmq的源碼。
也就是說(shuō),理論上,要完全讓kafka保證單個(gè)broker不丟失消息是做不到的,只能通過(guò)調(diào)整刷盤機(jī)制的參數(shù)緩解該情況。比如,減少刷盤間隔,減少刷盤數(shù)據(jù)量大小。時(shí)間越短,性能越差,可靠性越好(盡可能可靠)。這是一個(gè)選擇題。
為了解決該問(wèn)題,kafka通過(guò)producer和broker協(xié)同處理單個(gè)broker丟失參數(shù)的情況。一旦producer發(fā)現(xiàn)broker消息丟失,即可自動(dòng)進(jìn)行retry。除非retry次數(shù)超過(guò)閥值乎橘(可配置),消息才會(huì)丟失。此時(shí)需要生產(chǎn)者客戶端手動(dòng)處理該情況。那么producer是如何檢測(cè)到數(shù)據(jù)丟失的呢?是通過(guò)ack機(jī)制,類似于http的三次握手的方式。
以上的引用是kafka官方對(duì)于參數(shù) acks 的解釋(在老版本中,該參數(shù)是 request.required.acks )。
上面第三點(diǎn)提到了ISR的列表的follower,需要配合另一個(gè)參數(shù)才能更好的保證ack的有效性。ISR是Broker維護(hù)的一個(gè)“可靠的follower列表”,in-sync Replica列表,broker的配置包含一個(gè)參數(shù): min.insync.replicas 。該參數(shù)表示ISR中最少的副本數(shù)。如果不設(shè)置該值,ISR中的follower列表可能為空。此時(shí)相當(dāng)于acks=1。
如上圖中:
性能依次遞減,可靠性依次升高。
Producer丟失消息,發(fā)生在生產(chǎn)者客戶端。
為了提升效率,減少IO,producer在發(fā)送數(shù)據(jù)時(shí)可以將多個(gè)請(qǐng)求進(jìn)行合并后發(fā)送。被合并的請(qǐng)求咋發(fā)送一線緩存在本地buffer中。緩存的方式和前文提到的刷盤類似,producer可以將請(qǐng)求打包成“塊”或者按照時(shí)間間隔,將buffer中的數(shù)據(jù)發(fā)出。通過(guò)buffer我們可以將生產(chǎn)者改造為異步的方式,而這可以提升我們的發(fā)送效率。
但是,buffer中的數(shù)據(jù)就是危險(xiǎn)的。在正常情況下,客戶端的異步調(diào)用可以通過(guò)callback來(lái)處理消息發(fā)送失敗或者超時(shí)的情況,但是,一旦producer被非法的停止了歲談團(tuán),那么buffer中的數(shù)據(jù)將丟失,broker將無(wú)法收到該部分?jǐn)?shù)據(jù)。又或者,當(dāng)Producer客戶端內(nèi)存不夠時(shí),如果采取的策略是丟棄消息(另一種策略是block阻塞),消息也會(huì)被丟失。抑或,消息產(chǎn)生(異步產(chǎn)生)過(guò)快,導(dǎo)致掛起線程過(guò)多,內(nèi)存不足,侍戚導(dǎo)致程序崩潰,消息丟失。
producer
根據(jù)上圖,可以想到幾個(gè)解決的思路:
Consumer消費(fèi)消息有下面幾個(gè)步驟:
Consumer的消費(fèi)方式主要分為兩種:
上面的示例是自動(dòng)提交的例子。如果此時(shí),insertIntoDB(record)發(fā)生異常,消息將會(huì)出現(xiàn)丟失。接下來(lái)是手動(dòng)提交的例子:
將提交類型改為手動(dòng)以后,可以保證消息“至少被消費(fèi)一次”(at least once)。但此時(shí)可能出現(xiàn)重復(fù)消費(fèi)的情況,重復(fù)消費(fèi)不屬于本篇討論范圍。
上面兩個(gè)例子,是直接使用Consumer的High level API,客戶端對(duì)于offset等控制是透明的。也可以采用Low level API的方式,手動(dòng)控制offset,也可以保證消息不丟,不過(guò)會(huì)更加復(fù)雜。
關(guān)于linux下查看kafka的介紹到此就結(jié)束了,不知道你從中找到你需要的信息了嗎 ?如果你還想了解更多這方面的信息,記得收藏關(guān)注本站。
香港服務(wù)器選創(chuàng)新互聯(lián),2H2G首月10元開(kāi)通。
創(chuàng)新互聯(lián)(www.cdcxhl.com)互聯(lián)網(wǎng)服務(wù)提供商,擁有超過(guò)10年的服務(wù)器租用、服務(wù)器托管、云服務(wù)器、虛擬主機(jī)、網(wǎng)站系統(tǒng)開(kāi)發(fā)經(jīng)驗(yàn)。專業(yè)提供云主機(jī)、虛擬主機(jī)、域名注冊(cè)、VPS主機(jī)、云服務(wù)器、香港云服務(wù)器、免備案服務(wù)器等。
分享標(biāo)題:如何在Linux下查看Kafka的相關(guān)信息?(linux下查看kafka)
URL鏈接:http://fisionsoft.com.cn/article/cdcgpjd.html


咨詢
建站咨詢
