新聞中心
今天我們就來聊聊 Kafka 是如何對 Java NIO 進行封裝的,本系列總共分為3篇,主要剖析以下幾個問題:

創(chuàng)新互聯(lián)2013年開創(chuàng)至今,是專業(yè)互聯(lián)網(wǎng)技術服務公司,擁有項目成都做網(wǎng)站、網(wǎng)站設計、外貿營銷網(wǎng)站建設網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元織金做網(wǎng)站,已為上家服務,為織金各地企業(yè)和個人服務,聯(lián)系電話:13518219792
- 針對 Java NIO 的 SocketChannel,kafka 是如何封裝統(tǒng)一的傳輸層來實現(xiàn)最基礎的網(wǎng)絡連接以及讀寫操作的?
- 剖析 KafkaChannel 是如何對傳輸層、讀寫 buffer 操作進行封裝的?
- 剖析工業(yè)級 NIO 實戰(zhàn):如何基于位運算來控制事件的監(jiān)聽以及拆包、粘包是如何實現(xiàn)的?
- 剖析 Kafka 是如何封裝 Selector 多路復用器的?
- 剖析 Kafka 封裝的 Selector 是如何初始化并與 Broker 進行連接以及網(wǎng)絡讀寫的?
- 剖析 Kafka 網(wǎng)絡發(fā)送消息和接收響應的整個過程是怎樣的?
本篇只討論前3個問題,剩余的放到后2篇中。
認真讀完這篇文章,我相信你會對 Kafka 封裝 Java NIO 源碼有更加深刻的理解。
這篇文章干貨很多,希望你可以耐心讀完。
一、總體概述
??上篇??剖析了「生產者元數(shù)據(jù)的拉取和管理的全過程」,此時發(fā)送消息的時候就有了元數(shù)據(jù),但是還沒有進行網(wǎng)絡通信,而網(wǎng)絡通信是一個相對復雜的過程,對于 Java 系統(tǒng)來說網(wǎng)絡通信一般會采用 NIO 庫來實現(xiàn),所以 Kafka 對 Java NIO 封裝了統(tǒng)一的框架,來實現(xiàn)多路復用的網(wǎng)絡 I/O 操作。
為了方便大家理解,所有的源碼只保留骨干。
二、Kafka 對 Java NIO 的封裝
如果大家對 Java NIO 不了解的話,可以看下這個文檔,這里就不過多介紹了。
https://pdai.tech/md/java/io/java-io-nio.html。
我們來看看 Kafka 對 Java NIO 組件做了哪些封裝? 這里先說下結果,后面會深度剖析。
- TransportLayer:它是一個接口,封裝了底層 NIO 的 SocketChannel。
- NetworkReceive:封裝了 NIO 的 ByteBuffer 中的讀 Buffer,對網(wǎng)絡編程中的粘包、拆包經典實現(xiàn)。
- NetworkSend:封裝了 NIO 的 ByteBuffer 中的寫 Buffer。
- KafkaChannel:對 TransportLayer、NetworkReceive、NetworkSend 進一步封裝,屏蔽了底層的實現(xiàn)細節(jié),對上層更友好。
- KafkaSelector:封裝了 NIO 的 Selector 多路復用器組件。
接下來我們挨個對上面組件進行剖析。
三、TransportLayer 封裝過程
TransportLayer 接口是對 NIO 中 「SocketChannel」 的封裝。它的實現(xiàn)類總共有 2 個:
- PlaintextTransportLayer:明文網(wǎng)絡傳輸實現(xiàn)。
- SslTransportLayer:SSL 加密網(wǎng)絡傳輸實現(xiàn)。
本篇只剖析 PlaintextTransportLayer 的實現(xiàn)。
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java。
public class PlaintextTransportLayer implements TransportLayer {
// java nio 中 SelectionKey 事件
private final SelectionKey key;
// java nio 中的SocketChannel
private final SocketChannel socketChannel;
// 安全相關
private final Principal principal = KafkaPrincipal.ANONYMOUS;
// 初始化
public PlaintextTransportLayer(SelectionKey key) throws IOException {
// 對 NIO 中 SelectionKey 類的對象引用
this.key = key;
// 對 NIO 中 SocketChannel 類的對象引用
this.socketChannel = (SocketChannel) key.channel();
}
}從上面代碼可以看出,該類就是對底層 NIO 的 socketChannel 封裝引用。將構造函數(shù)的 SelectionKey 類對象賦值給 key,然后從 key 中取出對應的 SocketChannel 賦值給 socketChannel,這樣就完成了初始化工作。
接下來,我們看看幾個重要方法是如何使用這2個 NIO 組件的。
1、finishConnect()
@Override
// 判斷網(wǎng)絡連接是否完成
public boolean finishConnect() throws IOException {
// 1. 調用socketChannel的finishConnect方法,返回該連接是否已經連接完成
boolean connected = socketChannel.finishConnect();
// 2. 如果網(wǎng)絡連接完成以后就刪除對OP_CONNECT事件的監(jiān)聽,同時添加對OP_READ事件的監(jiān)聽
if (connected)
// 事件操作
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
// 3. 最后返回網(wǎng)絡連接
return connected;
}
該方法主要用來判斷網(wǎng)絡連接是否完成,如果完成就關注 「OP_READ」 事件,并取消 「OP_CONNECT」 事件。
- 首先調用 socketChannel 通道的 finishConnect() 判斷連接是否完成。
- 如果網(wǎng)絡連接完成以后就刪除對 OP_CONNECT 事件的監(jiān)聽,同時添加對 OP_READ 事件的監(jiān)聽,因為連接完成后就可能接收數(shù)據(jù)了。
- 最后返回網(wǎng)絡連接 connected。
二進制位運算事件監(jiān)聽
這里通過「二進制位運算」巧妙的解決了網(wǎng)絡事件的監(jiān)聽操作,實現(xiàn)非常經典。
通過 socketChannel 在 Selector 多路復用器注冊事件返回 SelectionKey ,SelectionKey 的類型包括:
- OP_READ:可讀事件,值為:1<<0 == 1 == 00000001。
- OP_WRITE:可寫事件,值為:1<<2 == 4 == 00000100。
- OP_CONNECT:客戶端連接服務端的事件,一般為創(chuàng)建 SocketChannel 客戶端 channel,值為:1<<3 == 8 ==00001000。
- OP_ACCEPT:服務端接收客戶端連接的事件,一般為創(chuàng)建 ServerSocketChannel 服務端 channel,值為:1<<4 == 16 == 00010000。
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
首先"~"符號代表按位取反,"&"代表按位取與,通過 key.interestOps() 獲取當前的事件,然后和 OP_CONNECT事件取反「11110111」 后按位與操作。
所以,"& ~xx" 代表刪除 xx 事件,有就刪除,沒有就不變;而 "| xx" 代表將 xx 事件添加進去。
2、read()
@Override
public int read(ByteBuffer dst) throws IOException {
// 調用 NIO 的通道實現(xiàn)數(shù)據(jù)的讀取
return socketChannel.read(dst);
}
該方法主要用來把 socketChannel 里面的數(shù)據(jù)讀取緩沖區(qū) ByteBuffer 里,通過調用 socketChannel.read() 實現(xiàn)。
3、write()
@Override
public int write(ByteBuffer src) throws IOException {
return socketChannel.write(src);
}
該方法主要用來把緩沖區(qū) ByteBuffer 的數(shù)據(jù)寫到 SocketChannel 里,通過調用 socketChannel.write() 實現(xiàn)。
大家都知道在網(wǎng)絡編程中,一次讀寫操作并一定能把數(shù)據(jù)讀寫完,所以就需要判斷是否讀寫完成,勢必會涉及數(shù)據(jù)的「拆包」、「粘包」操作。 這些操作比較繁瑣,因此 Kafka 將 ByteBuffer 的讀寫操作進行重新封裝,分別對應 NetworkReceive 讀操作、NetworkSend 寫操作,對于上層調用無需判斷是否讀寫完成,更加友好。
接下來我們就來分別剖析下這2個類的實現(xiàn)。
四、NetworkReceive 封裝過程
public class NetworkReceive implements Receive {
....
// 空 ByteBuffer
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
private final String source;
// 存儲響應消息數(shù)據(jù)長度
private final ByteBuffer size;
// 響應消息數(shù)據(jù)的最大長度
private final int maxSize;
// ByteBuffer 內存池
private final MemoryPool memoryPool;
// 已讀取字節(jié)大小
private int requestedBufferSize = -1;
// 存儲響應消息數(shù)據(jù)體
private ByteBuffer buffer;
// 初始化構造函數(shù)
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
this.source = source;
// 分配4個字節(jié)大小的數(shù)據(jù)長度
this.size = ByteBuffer.allocate(4);
this.buffer = null;
// 能接收消息的最大長度
this.maxSize = maxSize;
this.memoryPool = memoryPool;
}
}
- EMPTY_BUFFER:空 Buffer,值為 ByteBuffer.allocate(0)。
- source:final類型,用來確定對應 channel id。
- size:final類型,存儲響應消息數(shù)據(jù)長度,大小為4字節(jié)。
- maxSize:final類型,接收響應消息數(shù)據(jù)的最大長度。
- memoryPool:final類型,ByteBuffer 內存池。
- requestedBufferSize:已讀取字節(jié)大小。
- buffer:存儲響應消息數(shù)據(jù)體。
從屬性可以看出,包含2個 ByteBuffer,分別是 size 和 buffer。這里重點說下源碼中的size字段的初始化。通過長度編碼方式實現(xiàn),上來就先分配了4字節(jié)大小的 ByteBuffer 來存儲響應消息數(shù)據(jù)長度,即32位,與 Java int 占用相同的字節(jié)數(shù),完全滿足表示消息長度的值。
介紹完字段后,我們來深度剖析下該類的幾個重要的方法。
1、readFrom()
public long readFrom(ScatteringByteChannel channel) throws IOException {
// 讀取數(shù)據(jù)總大小
int read = 0;
// 1.判斷響應消息數(shù)據(jù)長度的 ByteBuffer 是否讀完
if (size.hasRemaining()) {
// 2.還有剩余,直接讀取消息數(shù)據(jù)的長度
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
// 3.每次讀取后,累加到總讀取數(shù)據(jù)大小里
read += bytesRead;
// 4.判斷響應消息數(shù)據(jù)長度的緩存是否讀完了
if (!size.hasRemaining()) {
// 5.重置position
size.rewind();
// 6.讀取響應消息數(shù)據(jù)長度
int receiveSize = size.getInt();
// 7.如果有異常就拋出
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
// 8.將讀到數(shù)據(jù)長度賦值已讀取字節(jié)大小,即數(shù)據(jù)體的大小
requestedBufferSize = receiveSize;
if (receiveSize == 0) {
buffer = EMPTY_BUFFER;
}
}
}
// 9.如果數(shù)據(jù)體buffer還沒有分配,且響應消息數(shù)據(jù)頭已讀完
if (buffer == null && requestedBufferSize != -1) {
// 10.分配requestedBufferSize字節(jié)大小的內存空間給數(shù)據(jù)體buffer
buffer = memoryPool.tryAllocate(requestedBufferSize);
if (buffer == null)
log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
}
// 11.判斷buffer是否分配成功
if (buffer != null) {
// 12.把channel里的數(shù)據(jù)讀到buffer中
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
// 13.累計讀取數(shù)據(jù)總大小
read += bytesRead;
}
// 14. 返回總大小
return read;
}該方法主要用來把對應 channel 中的數(shù)據(jù)讀到 ByteBuffer 中,包括響應消息數(shù)據(jù)長度的 size 和響應消息數(shù)據(jù)體長度的 buffer,可能會被多次調用,每次都需要判斷 size 和 buffer 的狀態(tài)并讀取。
在讀取時,先讀取4字節(jié)到 size 中,再根據(jù) size 的大小為 buffer 分配內存,然后讀滿整個 buffer 時就表示讀取完成了。
通過短短的30行左右代碼就解決了工業(yè)級「拆包」 、「粘包」問題,相當?shù)慕浀洹?/p>
如果要解決「粘包」問題,就是在每個響應數(shù)據(jù)中間插入一個特殊的字節(jié)大小的「分隔符」,這里就在響應消息體前面插入4個字節(jié),代表響應消息自己本身的數(shù)據(jù)大小,如下圖所示:
具體「拆包」的操作步驟如下:
- 調用 size.hasRemaining() 返回position 至 limit 之間的字節(jié)大小來判斷響應消息數(shù)據(jù)長度的 ByteBuffer 是否讀完。
- 當未讀完則通過調用 NIO 的方法 channel.read(size),直接把讀取4字節(jié)的響應消息數(shù)據(jù)的長度寫入到 ByteBuffer size 中,如果已經讀取到了4字節(jié),此時 position=4,與 limit 相同,表示 ByteBuffer size 已經讀滿了。
- 每次讀取后,累加到總讀取數(shù)據(jù)大小里
- 再次判斷響應消息數(shù)據(jù)長度的緩存是否讀完了。
- 如果讀完了,先重置 position 位置為0,此時就可以從 ByteBuffer 中讀取數(shù)據(jù)了,然后調用 size.getInt() 從 ByteBuffer 當前 position 位置讀取4個字節(jié),并轉化成int 類型數(shù)值賦給 receiveSize,即響應體的長度。
- 如果有異常就拋出,包括響應數(shù)據(jù)體的長度無效或者大于最大長度等。
- 將讀到響應數(shù)據(jù)長度賦值 requestedBufferSize,即數(shù)據(jù)體的大小。
- 如果響應數(shù)據(jù)體 buffer 還沒有分配,且響應數(shù)據(jù)頭已讀完,分配 requestedBufferSize 字節(jié)大小的內存空間給數(shù)據(jù)體 buffer。
- 如果 buffer 分配成功,表示 size 已讀完,此時直接把 channel 里的響應數(shù)據(jù)讀到跟它大小一致的 ByteBuffer 中,再次累計讀取數(shù)據(jù)總大小。
- 最后返回數(shù)據(jù)總大小。
2、complete()
@Override
public boolean complete() {
// 響應消息頭已讀完 && 響應消息體已讀完
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}
該方法主要用來判斷是否都讀取完成,即響應頭大小和響應體大小都讀取完。
3、size()
// 返回大小
public int size() {
return payload().limit() + size.limit();
}
public ByteBuffer payload() {
return this.buffer;
}
該方法主要用來返回響應頭和響應體還有多少數(shù)據(jù)需要讀出。
此時已經剖析完讀 Buffer 的封裝,接下來我們看看寫 Buffer。
五、NetworkSend 封裝過程
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java。
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java。
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Send.java。
調用關系圖如下:
1、Send 接口
我們先看一下接口 Send 都定義了哪些方法。
public interface Send {
// 要把數(shù)據(jù)寫入目標的 channel id
String destination();
// 要發(fā)送的數(shù)據(jù)是否發(fā)送完了
boolean completed();
// 把數(shù)據(jù)寫到對應 channel 中
long writeTo(GatheringByteChannel channel) throws IOException;
// 發(fā)送數(shù)據(jù)的大小
long size();
}Send 作為要發(fā)送數(shù)據(jù)的接口, 子類 ByteBufferSend 實現(xiàn) complete() 方法用于判斷是否已經發(fā)送完成,實現(xiàn) writeTo() 方法來實現(xiàn)寫入數(shù)據(jù)到Channel中。
2、ByteBufferSend 類
ByteBufferSend 類實現(xiàn)了 Send 接口,即實現(xiàn)了數(shù)據(jù)從 ByteBuffer 數(shù)組發(fā)送到 channel:
public class ByteBufferSend implements Send {
private final String destination;
// 總共要寫多少字節(jié)數(shù)據(jù)
private final int size;
// 用于寫入channel里的ByteBuffer數(shù)組,說明kafka一次最大傳輸字節(jié)是有限定的
protected final ByteBuffer[] buffers;
// 總共還剩多少字節(jié)沒有寫完
private int remaining;
private boolean pending = false;
public ByteBufferSend(String destination, ByteBuffer... buffers) {
this.destination = destination;
this.buffers = buffers;
for (ByteBuffer buffer : buffers)
remaining += buffer.remaining();
// 計算需要寫入字節(jié)的總和
this.size = remaining;
}
}我們來看下這個類中的幾個重要字段:
- destination:數(shù)據(jù)寫入的目標 channel id。
- size:總共需要往 channel 里寫多少字節(jié)數(shù)據(jù)。
- buffers:ByteBuffer數(shù)組類型,用來存儲要寫入 channel 里的數(shù)據(jù)。
- remaining:ByteBuffer數(shù)組所有的ByteBuffer 還剩多少字節(jié)沒有寫完。
介紹完字段后,我們來深度剖析下該類的幾個重要的方法。
(1)writeTo()
@Override
// 將字節(jié)流數(shù)據(jù)寫入到channel中
public long writeTo(GatheringByteChannel channel) throws IOException {
// 1.調用nio底層write方法把buffers寫入傳輸層返回寫入的字節(jié)數(shù)
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
// 2.計算還剩多少字節(jié)沒有寫入傳輸層
remaining -= written;
// 每次發(fā)送 都檢查是否
pending = TransportLayers.hasPendingWrites(channel);
return written;
}
該方法主要用來把 buffers 數(shù)組寫入到 SocketChannel里,因為在網(wǎng)絡編程中,寫一次不一定可以完全把數(shù)據(jù)都寫成功,所以調用底層 channel.write(buffers) 方法會返回「已經寫入成功多少字節(jié)」的返回值,這樣調用一次后就知道已經寫入多少字節(jié)了。
(2)some other
@Override
public String destination() {
// 返回對應的channel id
return destination;
}
@Override
public boolean completed() {
// 判斷是否完成 即沒有剩余&pending=false
return remaining <= 0 && !pending;
}
/**
* always returns false as there will be not be any
* pending writes since we directly write to socketChannel.
*/
@Override
public boolean hasPendingWrites() {
// 在PLAINTEXT下 pending 始終為 false
return false;
}
@Override
public long size() {
// 返回寫入字節(jié)的總和
return this.size;
}
3、NetworkSend 類
NetworkSend 類繼承了 ByteBufferSend 類,真正用來寫 Buffer。
public class NetworkSend extends ByteBufferSend {
// 實例化
public NetworkSend(String destination, ByteBuffer buffer) {
// 調用父類的方法初始化
super(destination, sizeBuffer(buffer.remaining()), buffer);
}
// 用來構造4個字節(jié)的 sizeBuffer
private static ByteBuffer sizeBuffer(int size) {
// 先分配一個4個字節(jié)的ByteBuffer
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
// 寫入size長度值
sizeBuffer.putInt(size);
// 重置 position
sizeBuffer.rewind();
// 返回 sizeBuffer
return sizeBuffer;
}
}該類相對簡單些,就是構建一個發(fā)往 channel 對應的節(jié)點 id 的消息數(shù)據(jù),它的實例化過程如下:
- 先分配一個4個字節(jié)的 ByteBuffer 的變量 sizeBuffer,再把要發(fā)送的數(shù)據(jù)長度賦值給 sizeBuffer。
- 此時 sizeBuffer 的響應頭字節(jié)數(shù)和 sizeBuffer 的響應數(shù)據(jù)就都有了。
- 然后調用父類 ByteBufferSend 的方法進行初始化。
另外 ByteBuffer[] 為兩個 buffer,可以理解為一個消息頭 buffer 即 size,一個消息體 buffer。消息頭 buffer 的長度為4byte,存放的是消息體 buffer 的長度。而消息體 buffer 是上層傳入的業(yè)務數(shù)據(jù),所以 send 就是持有一個待發(fā)送的 ByteBuffer。
接下來我們來看看 KafkaChannel 是如何對上面幾個類進行封裝的。
六、KafkaChannel 封裝過程
github 源碼地址如下:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java。
public class KafkaChannel implements AutoCloseable {
....
// 節(jié)點 id
private final String id;
// 傳輸層對象
private final TransportLayer transportLayer;
....
// 最大能接收請求的字節(jié)數(shù)
private final int maxReceiveSize;
// 內存池,用來分配指定大小的 ByteBuffer
private final MemoryPool memoryPool;
// NetworkReceive 類的實例
private NetworkReceive receive;
// NetworkSend 類的實例
private Send send;
// 是否關閉連接
private boolean disconnected;
....
// 連接狀態(tài)
private ChannelState state;
// 需要連接的遠端地址
private SocketAddress remoteAddress;
// 初始化
public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator,int maxReceiveSize, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) {
this.id = id;
this.transportLayer = transportLayer;
this.authenticatorCreator = authenticatorCreator;
this.authenticator = authenticatorCreator.get();
this.networkThreadTimeNanos = 0L;
this.maxReceiveSize = maxReceiveSize;
this.memoryPool = memoryPool;
this.metadataRegistry = metadataRegistry;
this.disconnected = false;
this.muteState = ChannelMuteState.NOT_MUTED;
this.state = ChannelState.NOT_CONNECTED;
}
} 我們來看下這個類中的幾個重要字段:
- id:channel 對應的節(jié)點 id。
- transportLayer:傳輸層對象。
- maxReceiveSize:最大能接收請求的字節(jié)數(shù)。
- memoryPool:內存池,用來分配指定大小的 ByteBuffer。
- receive:NetworkReceive 類的實例。
- send:NetworkSend 類的實例。
- disconnected:是否關閉連接。
- state:KafkaChannel 的狀態(tài)。
- remoteAddress:需要連接的遠端地址。
從屬性可以看出,有3個最重要的成員變量:TransportLayer、NetworkReceive、Send。KafkaChannel 通過 TransportLayer 進行讀寫操作,NetworkReceive 用來讀取,Send 用來寫出。
為了封裝普通和加密的Channel「TransportLayer根據(jù)網(wǎng)絡協(xié)議的不同,提供不同的子類」而對于 KafkaChannel 提供統(tǒng)一的接口,「這是策略模式很好的應用」。
- 每個 NetworkReceive 代表一個單獨的響應,KafkaChannel 讀取的數(shù)據(jù)會存儲到 NetworkReceive 中,當 NetworkReceive 讀滿,一個請求就完整讀取了。
- 每個 Send 代表一個單獨的請求,需要寫出時只需賦值此變量,之后調用 write() 方法將其中的數(shù)據(jù)寫出。
介紹完字段后,我們來深度剖析下其網(wǎng)絡讀寫操作是如何實現(xiàn)的?
1、setSend()
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
// 設置要發(fā)送消息的字段
this.send = send;
// 調用傳輸層增加寫事件
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
// PlaintextTransportLayer 類方法
@Override
public void addInterestOps(int ops) {
//通過 key.interestOps() | ops 來添加事件
key.interestOps(key.interestOps() | ops);
}該方法主要用來預發(fā)送,即在發(fā)送網(wǎng)絡請求前,將需要發(fā)送的ByteBuffer 數(shù)據(jù)保存到 KafkaChannel 的 send 中,然后調用傳輸層方法增加對這個 channel 上「OP_WRITE」事件的關注。當真正執(zhí)行發(fā)送的時候,會從 send 中讀取數(shù)據(jù)。
2、write()
public long write() throws IOException {
// 判斷 send 是否為空,如果為空表示已經發(fā)送完畢了
if (send == null)
return 0;
midWrite = true;
// 調用ByteBufferSend.writeTo把數(shù)據(jù)真正發(fā)送出去
return send.writeTo(transportLayer);
}該方法主要用來把保存在 send 上的數(shù)據(jù)真正發(fā)送出去。
- 首先判斷要發(fā)送的 send 是否為空,如果為空則表示在 KafkaChannel 的 Buffer 的數(shù)據(jù)都發(fā)送完畢了。
- 如果不為空就調用ByteBufferSend.writeTo() 方法通過網(wǎng)絡 I/O 操作將數(shù)據(jù)發(fā)送出去。
3、read()
public long read() throws IOException {
// 如果receive為空表示數(shù)據(jù)已經讀完,需要重新實例化對象
if (receive == null) {
當前文章:圖解 Kafka 網(wǎng)絡層實現(xiàn)機制(一)
標題鏈接:http://fisionsoft.com.cn/article/dhipood.html


咨詢
建站咨詢
