新聞中心
本文轉(zhuǎn)載 https://www.javadoop.com
創(chuàng)新互聯(lián)公司是一家成都做網(wǎng)站、成都網(wǎng)站制作、成都外貿(mào)網(wǎng)站建設(shè),提供網(wǎng)頁(yè)設(shè)計(jì),網(wǎng)站設(shè)計(jì),網(wǎng)站制作,建網(wǎng)站,按需求定制開(kāi)發(fā),網(wǎng)站開(kāi)發(fā)公司,成立與2013年是互聯(lián)行業(yè)建設(shè)者,服務(wù)者。以提升客戶(hù)品牌價(jià)值為核心業(yè)務(wù),全程參與項(xiàng)目的網(wǎng)站策劃設(shè)計(jì)制作,前端開(kāi)發(fā),后臺(tái)程序制作以及后期項(xiàng)目運(yùn)營(yíng)并提出專(zhuān)業(yè)建議和思路。
本系列文章將整理到我在GitHub上的《Java面試指南》倉(cāng)庫(kù),更多精彩內(nèi)容請(qǐng)到我的倉(cāng)庫(kù)里查看
https://github.com/h3pl/Java-Tutorial
喜歡的話(huà)麻煩點(diǎn)下Star哈
文章將同步到我的個(gè)人博客:
www.how2playlife.com
本文是微信公眾號(hào)【Java技術(shù)江湖】的《不可輕視的Java網(wǎng)絡(luò)編程》其中一篇,本文部分內(nèi)容來(lái)源于網(wǎng)絡(luò),為了把本文主題講得清晰透徹,也整合了很多我認(rèn)為不錯(cuò)的技術(shù)博客內(nèi)容,引用其中了一些比較好的博客文章,如有侵權(quán),請(qǐng)聯(lián)系作者。
該系列博文會(huì)告訴你如何從計(jì)算機(jī)網(wǎng)絡(luò)的基礎(chǔ)知識(shí)入手,一步步地學(xué)習(xí)Java網(wǎng)絡(luò)基礎(chǔ),從socket到nio、bio、aio和netty等網(wǎng)絡(luò)編程知識(shí),并且進(jìn)行實(shí)戰(zhàn),網(wǎng)絡(luò)編程是每一個(gè)Java后端工程師必須要學(xué)習(xí)和理解的知識(shí)點(diǎn),進(jìn)一步來(lái)說(shuō),你還需要掌握Linux中的網(wǎng)絡(luò)編程原理,包括IO模型、網(wǎng)絡(luò)編程框架netty的進(jìn)階原理,才能更完整地了解整個(gè)Java網(wǎng)絡(luò)編程的知識(shí)體系,形成自己的知識(shí)框架。
為了更好地總結(jié)和檢驗(yàn)?zāi)愕膶W(xué)習(xí)成果,本系列文章也會(huì)提供部分知識(shí)點(diǎn)對(duì)應(yīng)的面試題以及參考答案。
如果對(duì)本系列文章有什么建議,或者是有什么疑問(wèn)的話(huà),也可以關(guān)注公眾號(hào)【Java技術(shù)江湖】聯(lián)系作者,歡迎你參與本系列博文的創(chuàng)作和修訂。
前言
之前寫(xiě)了兩篇關(guān)于 NIO 的文章,第一篇介紹了 NIO 的 Channel、Buffer、Selector 使用,第二篇介紹了非阻塞 IO 和異步 IO,并展示了簡(jiǎn)單的用例。
本文將介紹 Tomcat 中的 NIO 使用,使大家對(duì) Java NIO 的生產(chǎn)使用有更加直觀的認(rèn)識(shí)。
雖然本文的源碼篇幅也不短,但是 Tomcat 的源碼畢竟不像 Doug Lea 的并發(fā)源碼那么“變態(tài)”,對(duì)于大部分讀者來(lái)說(shuō),閱讀難度比之前介紹的其他并發(fā)源碼要簡(jiǎn)單一些,所以讀者不要覺(jué)得有什么壓力。
本文基于 Tomcat 當(dāng)前(2018-03-20) 最新版本 9.0.6。
先簡(jiǎn)單畫(huà)一張圖示意一下本文的主要內(nèi)容:
目錄
源碼環(huán)境準(zhǔn)備
Tomcat 9.0.6 下載地址: https://tomcat.apache.org/download-90.cgi
由于上面下載的 tomcat 的源碼并沒(méi)有使用 maven 進(jìn)行組織,不方便我們看源碼,也不方便我們進(jìn)行調(diào)試。這里我們將使用 maven 倉(cāng)庫(kù)中的 tomcat-embed-core,自己編寫(xiě)代碼進(jìn)行啟動(dòng)的方式來(lái)進(jìn)行調(diào)試。
首先,創(chuàng)建一個(gè)空的 maven 工程,然后添加以下依賴(lài)。
org.apache.tomcat.embed
tomcat-embed-core
9.0.6
上面的依賴(lài),只會(huì)將 tomcat-embed-core-9.0.6.jar 和 tomcat-annotations-api-9.0.6.jar 兩個(gè)包引進(jìn)來(lái),對(duì)于本文來(lái)說(shuō),已經(jīng)足夠了,如果你需要其他功能,需要額外引用其他的依賴(lài),如 Jasper。
然后,使用以下啟動(dòng)方法:
public static void main(String[] args) throws LifecycleException {
Tomcat tomcat = new Tomcat();
Connector connector = new Connector("HTTP/1.1");
connector.setPort(8080);
tomcat.setConnector(connector);
tomcat.start();
tomcat.getServer().await();
}
經(jīng)過(guò)以上的代碼,我們的 Tomcat 就啟動(dòng)起來(lái)了。
Tomcat 中的其他接口感興趣的讀者請(qǐng)自行探索,如設(shè)置 webapp 目錄,設(shè)置 resources 等
這里,介紹第一個(gè)重要的概念: Connector。在 Tomcat 中,使用 Connector 來(lái)處理連接,一個(gè) Tomcat 可以配置多個(gè) Connector,分別用于監(jiān)聽(tīng)不同端口,或處理不同協(xié)議。
在 Connector 的構(gòu)造方法中,我們可以傳
HTTP/1.1
或
AJP/1.3
用于指定協(xié)議,也可以傳入相應(yīng)的協(xié)議處理類(lèi),畢竟協(xié)議不是重點(diǎn),將不同端口進(jìn)來(lái)的連接對(duì)應(yīng)不同處理類(lèi)才是正道。典型地,我們可以指定以下幾個(gè)協(xié)議處理類(lèi):
- org.apache.coyote.http11.Http11NioProtocol:對(duì)應(yīng)非阻塞 IO
- org.apache.coyote.http11.Http11Nio2Protocol:對(duì)應(yīng)異步 IO
- org.apache.coyote.http2.Http2Protocol:對(duì)應(yīng) http2 協(xié)議,對(duì) http2 感興趣的讀者,趕緊看起來(lái)吧。
本文的重點(diǎn)當(dāng)然是非阻塞 IO 了,之前已經(jīng)介紹過(guò)
異步 IO
的基礎(chǔ)知識(shí)了,讀者看完本文后,如果對(duì)異步 IO 的處理流程感興趣,可以自行去分析一遍。
如果你使用 9.0 以前的版本,Tomcat 在啟動(dòng)的時(shí)候是會(huì)自動(dòng)配置一個(gè) connector 的,我們可以不用顯示配置。
9.0 版本的 Tomcat#start() 方法:
public void start() throws LifecycleException { getServer(); server.start(); }
8.5 及之前版本的 Tomcat#start() 方法:
public void start() throws LifecycleException { getServer(); // 自動(dòng)配置一個(gè)使用非阻塞 IO 的 connector getConnector(); server.start(); }
endpoint
前面我們說(shuō)過(guò)一個(gè) Connector 對(duì)應(yīng)一個(gè)協(xié)議,當(dāng)然這描述也不太對(duì),NIO 和 NIO2 就都是處理 HTTP/1.1 的,只不過(guò)一個(gè)使用非阻塞,一個(gè)使用異步。進(jìn)到指定 protocol 代碼,我們就會(huì)發(fā)現(xiàn),它們的代碼及其簡(jiǎn)單,只不過(guò)是指定了特定的 endpoint。
打開(kāi)
Http11NioProtocol
和
Http11Nio2Protocol
源碼,我們可以看到,在構(gòu)造方法中,它們分別指定了 NioEndpoint 和 Nio2Endpoint。
// 非阻塞模式
public class Http11NioProtocol extends AbstractHttp11JsseProtocol {
public Http11NioProtocol() {
// NioEndpoint
super(new NioEndpoint());
}
...
}
// 異步模式
public class Http11Nio2Protocol extends AbstractHttp11JsseProtocol {
public Http11Nio2Protocol() {
// Nio2Endpoint
super(new Nio2Endpoint());
}
...
}
這里介紹第二個(gè)重要的概念: endpoint。Tomcat 使用不同的 endpoint 來(lái)處理不同的協(xié)議請(qǐng)求,今天我們的重點(diǎn)是 NioEndpoint,其使用 非阻塞 IO來(lái)進(jìn)行處理 HTTP/1.1 協(xié)議的請(qǐng)求。
NioEndpoint繼承 =>
AbstractJsseEndpoint繼承 =>
AbstractEndpoint。中間的 AbstractJsseEndpoint 主要是提供了一些關(guān)于
HTTPS
的方法,這塊我們暫時(shí)忽略它,后面所有關(guān)于 HTTPS 的我們都直接忽略,感興趣的讀者請(qǐng)自行分析。
init 過(guò)程分析
下面,我們看看從 tomcat.start() 一直到 NioEndpoint 的過(guò)程。
1. AbstractProtocol# init
@Override
public void init() throws Exception {
...
String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
endpoint.setDomain(domain);
// endpoint 的 name=http-nio-8089,domain=Tomcat
endpoint.init();
}
2. AbstractEndpoint# init
public final void init() throws Exception {
if (bindOnInit) {
bind(); // 這里對(duì)應(yīng)的當(dāng)然是子類(lèi) NioEndpoint 的 bind() 方法
bindState = BindState.BOUND_ON_INIT;
}
...
}
3. NioEndpoint# bind
這里就到我們的 NioEndpoint 了,要使用到我們之前學(xué)習(xí)的 NIO 的知識(shí)了。
@Override
public void bind() throws Exception {
// initServerSocket(); 原代碼是這行,我們 “內(nèi)聯(lián)” 過(guò)來(lái)一起說(shuō)
// 開(kāi)啟 ServerSocketChannel
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
// getPort() 會(huì)返回我們最開(kāi)始設(shè)置的 8080,得到我們的 address 是 0.0.0.0:8080
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
// ServerSocketChannel 綁定地址、端口,
// 第二個(gè)參數(shù) backlog 默認(rèn)為 100,超過(guò) 100 的時(shí)候,新連接會(huì)被拒絕(不過(guò)源碼注釋也說(shuō)了,這個(gè)值的真實(shí)語(yǔ)義取決于具體實(shí)現(xiàn))
serverSock.socket().bind(addr,getAcceptCount());
// ※※※ 設(shè)置 ServerSocketChannel 為阻塞模式 ※※※
serverSock.configureBlocking(true);
// 設(shè)置 acceptor 和 poller 的數(shù)量,至于它們是什么角色,待會(huì)說(shuō)
// acceptorThreadCount 默認(rèn)為 1
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
// 作者想表達(dá)的意思應(yīng)該是:使用多個(gè) acceptor 線程并不見(jiàn)得性能會(huì)更好
acceptorThreadCount = 1;
}
// poller 線程數(shù),默認(rèn)值定義如下,所以在多核模式下,默認(rèn)為 2
// pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());
if (pollerThreadCount <= 0) {
pollerThreadCount = 1;
}
//
setStopLatch(new CountDownLatch(pollerThreadCount));
// 初始化 ssl,我們忽略 ssl
initialiseSsl();
// 打開(kāi) NioSelectorPool,先忽略它
selectorPool.open();
}
- ServerSocketChannel 已經(jīng)打開(kāi),并且綁定要了之前指定的 8080 端口,設(shè)置成了 阻塞模式。
- 設(shè)置了 acceptor 的線程數(shù)為 1
- 設(shè)置了 poller 的線程數(shù),單核 CPU 為 1,多核為 2
- 打開(kāi)了一個(gè) SelectorPool,我們先忽略這個(gè)
到這里,我們還不知道 Acceptor 和 Poller 是什么東西,我們只是設(shè)置了它們的數(shù)量,我們先來(lái)看看最后面提到的 SelectorPool。
start 過(guò)程分析
剛剛我們分析完了 init() 過(guò)程,下面是啟動(dòng)過(guò)程 start() 分析。
AbstractProtocol # start
@Override
public void start() throws Exception {
...
// 調(diào)用 endpoint 的 start 方法
endpoint.start();
// Start async timeout thread
asyncTimeout = new AsyncTimeout();
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
int priority = endpoint.getThreadPriority();
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
priority = Thread.NORM_PRIORITY;
}
timeoutThread.setPriority(priority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
AbstractEndpoint # start
public final void start() throws Exception {
// 按照我們的流程,剛剛 init 的時(shí)候,已經(jīng)把 bindState 改為 BindState.BOUND_ON_INIT 了,
// 所以下面的 if 分支我們就不進(jìn)去了
if (bindState == BindState.UNBOUND) {
bind();
bindState = BindState.BOUND_ON_START;
}
// 往里看 NioEndpoint 的實(shí)現(xiàn)
startInternal();
}
下面這個(gè)方法還是比較重要的,這里會(huì)創(chuàng)建前面說(shuō)過(guò)的 acceptor 和 poller。
NioEndpoint # startInternal
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// 以下幾個(gè)是緩存用的,之后我們也會(huì)看到很多這樣的代碼,為了減少 new 很多對(duì)象出來(lái)
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// 創(chuàng)建【工作線程池】,Tomcat 自己包裝了一下 ThreadPoolExecutor,
// 1\. 為了在創(chuàng)建線程池以后,先啟動(dòng) corePoolSize 個(gè)線程(這個(gè)屬于線程池的知識(shí)了,不熟悉的讀者可以看看我之前的文章)
// 2\. 自己管理線程池的增長(zhǎng)方式(默認(rèn) corePoolSize 10, maxPoolSize 200),不是本文重點(diǎn),不分析
if ( getExecutor() == null ) {
createExecutor();
}
// 設(shè)置一個(gè)柵欄(tomcat 自定義了類(lèi) LimitLatch),控制最大的連接數(shù),默認(rèn)是 10000
initializeConnectionLatch();
// 開(kāi)啟 poller 線程
// 還記得之前 init 的時(shí)候,默認(rèn)地設(shè)置了 poller 的數(shù)量為 2,所以這里啟動(dòng) 2 個(gè) poller 線程
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i
到這里,我們啟動(dòng)了 工作線程池、 poller 線程組、 acceptor 線程組。同時(shí),工作線程池初始就已經(jīng)啟動(dòng)了 10 個(gè)線程。我們用 jconsole來(lái)看看此時(shí)的線程,請(qǐng)看下圖:
從 jconsole 中,我們可以看到,此時(shí)啟動(dòng)了 BlockPoller、worker、poller、acceptor、AsyncTimeout,大家應(yīng)該都已經(jīng)清楚了每個(gè)線程是哪里啟動(dòng)的吧。
Tomcat 中并沒(méi)有 Worker 這個(gè)類(lèi),此名字是我瞎編。
此時(shí),我們還是不知道 acceptor、poller 甚至 worker 到底是干嘛的,下面,我們從 acceptor 線程開(kāi)始看起。
Acceptor
它的結(jié)構(gòu)非常簡(jiǎn)單,在構(gòu)造函數(shù)中,已經(jīng)把 endpoint 傳進(jìn)來(lái)了,此外就只有 threadName 和 state 兩個(gè)簡(jiǎn)單的屬性。
private final AbstractEndpoint,U> endpoint;
private String threadName;
protected volatile AcceptorState state = AcceptorState.NEW;
public Acceptor(AbstractEndpoint,U> endpoint) {
this.endpoint = endpoint;
}
threadName就是一個(gè)線程名字而已,Acceptor 的狀態(tài) state主要是隨著 endpoint 來(lái)的。
public enum AcceptorState {
NEW, RUNNING, PAUSED, ENDED
}
我們直接來(lái)看 acceptor 的 run 方法吧:
Acceptor # run
@Override
public void run() {
int errorDelay = 0;
// 只要 endpoint 處于 running,這里就一直循環(huán)
while (endpoint.isRunning()) {
// 如果 endpoint 處于 pause 狀態(tài),這邊 Acceptor 用一個(gè) while 循環(huán)將自己也掛起
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
// endpoint 結(jié)束了,Acceptor 自然也要結(jié)束嘛
if (!endpoint.isRunning()) {
break;
}
state = AcceptorState.RUNNING;
try {
// 如果此時(shí)達(dá)到了最大連接數(shù)(之前我們說(shuō)過(guò),默認(rèn)是10000),就等待
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
// 這里就是接收下一個(gè)進(jìn)來(lái)的 SocketChannel
// 之前我們?cè)O(shè)置了 ServerSocketChannel 為阻塞模式,所以這邊的 accept 是阻塞的
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// accept 成功,將 errorDelay 設(shè)置為 0
errorDelay = 0;
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() 是這里的關(guān)鍵方法,也就是說(shuō)前面千辛萬(wàn)苦都是為了能到這里進(jìn)行處理
if (!endpoint.setSocketOptions(socket)) {
// 如果上面的方法返回 false,關(guān)閉 SocketChannel
endpoint.closeSocket(socket);
}
} else {
// 由于 endpoint 不 running 了,或者處于 pause 了,將此 SocketChannel 關(guān)閉
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
// APR specific.
// Could push this down but not sure it is worth the trouble.
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
}
state = AcceptorState.ENDED;
}
大家應(yīng)該發(fā)現(xiàn)了,Acceptor 繞來(lái)繞去,都是在調(diào)用 NioEndpoint 的方法,我們簡(jiǎn)單分析一下這個(gè)。
在 NioEndpoint init 的時(shí)候,我們開(kāi)啟了一個(gè) ServerSocketChannel,后來(lái) start 的時(shí)候,我們開(kāi)啟多個(gè) acceptor(實(shí)際上,默認(rèn)是 1 個(gè)),每個(gè) acceptor 啟動(dòng)以后就開(kāi)始循環(huán)調(diào)用 ServerSocketChannel 的 accept() 方法獲取新的連接,然后調(diào)用 endpoint.setSocketOptions(socket) 處理新的連接,之后再進(jìn)入循環(huán) accept 下一個(gè)連接。
到這里,大家應(yīng)該也就知道了,為什么這個(gè)叫 acceptor 了吧?接下來(lái),我們來(lái)看看 setSocketOptions 方法到底做了什么。
NioEndpoint # setSocketOptions
@Override
protected boolean setSocketOptions(SocketChannel socket) {
try {
// 設(shè)置該 SocketChannel 為非阻塞模式
socket.configureBlocking(false);
Socket sock = socket.socket();
// 設(shè)置 socket 的一些屬性
socketProperties.setProperties(sock);
// 還記得 startInternal 的時(shí)候,說(shuō)過(guò)了 nioChannels 是緩存用的。
// 限于篇幅,這里的 NioChannel 就不展開(kāi)了,它包括了 socket 和 buffer
NioChannel channel = nioChannels.pop();
if (channel == null) {
// 主要是創(chuàng)建讀和寫(xiě)的兩個(gè) buffer,默認(rèn)地,讀和寫(xiě) buffer 都是 8192 字節(jié),8k
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
// getPoller0() 會(huì)選取所有 poller 中的一個(gè) poller
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
我們看到,這里又沒(méi)有進(jìn)行實(shí)際的處理,而是將這個(gè) SocketChannel 注冊(cè)到了其中一個(gè) poller 上。因?yàn)槲覀冎?,acceptor 應(yīng)該盡可能的簡(jiǎn)單,只做 accept 的工作,簡(jiǎn)單處理下就往后面扔。acceptor 還得回到之前的循環(huán)去 accept 新的連接呢。
我們只需要明白,此時(shí),往 poller 中注冊(cè)了一個(gè) NioChannel 實(shí)例,此實(shí)例包含客戶(hù)端過(guò)來(lái)的 SocketChannel 和一個(gè) SocketBufferHandler 實(shí)例。
Poller
之前我們看到 acceptor 將一個(gè) NioChannel 實(shí)例 register 到了一個(gè) poller 中。在看 register 方法之前,我們需要先對(duì) poller 要有個(gè)簡(jiǎn)單的認(rèn)識(shí)。
public class Poller implements Runnable {
public Poller() throws IOException {
// 每個(gè) poller 開(kāi)啟一個(gè) Selector
this.selector = Selector.open();
}
private Selector selector;
// events 隊(duì)列,此類(lèi)的核心
private final SynchronizedQueue events =
new SynchronizedQueue<>();
private volatile boolean close = false;
private long nextExpiration = 0;//optimize expiration handling
// 這個(gè)值后面有用,記住它的初始值為 0
private AtomicLong wakeupCounter = new AtomicLong(0);
private volatile int keyCount = 0;
...
}
敲重點(diǎn):每個(gè) poller 關(guān)聯(lián)了一個(gè) Selector。
Poller 內(nèi)部圍著一個(gè) events 隊(duì)列轉(zhuǎn),來(lái)看看其 events() 方法:
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
try {
// 逐個(gè)執(zhí)行 event.run()
pe.run();
// 該 PollerEvent 還得給以后用,這里 reset 一下(還是之前說(shuō)過(guò)的緩存)
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error("",x);
}
}
return result;
}
events() 方法比較簡(jiǎn)單,就是取出當(dāng)前隊(duì)列中的 PollerEvent 對(duì)象,逐個(gè)執(zhí)行 event.run() 方法。
然后,現(xiàn)在來(lái)看 Poller 的 run() 方法,該方法會(huì)一直循環(huán),直到 poller.destroy() 被調(diào)用。
Poller # run
public void run() {
while (true) {
boolean hasEvents = false;
try {
if (!close) {
// 執(zhí)行 events 隊(duì)列中每個(gè) event 的 run() 方法
hasEvents = events();
// wakeupCounter 的初始值為 0,這里設(shè)置為 -1
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
// timeout 默認(rèn)值 1 秒
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
// 篇幅所限,我們就不說(shuō) close 的情況了
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
// 這里沒(méi)什么好說(shuō)的,頂多就再執(zhí)行一次 events() 方法
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
// 如果剛剛 select 有返回 ready keys,進(jìn)行處理
Iterator iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
// ※※※※※ 處理 ready key ※※※※※
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
poller 的 run() 方法主要做了調(diào)用 events() 方法和處理注冊(cè)到 Selector 上的 ready key,這里我們暫時(shí)不展開(kāi) processKey 方法,因?yàn)榇朔椒ū囟ㄊ羌捌鋸?fù)雜的。
我們回過(guò)頭來(lái)看之前從 acceptor 線程中調(diào)用的 register 方法。
Poller # register
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
// 注意第三個(gè)參數(shù)值 OP_REGISTER
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
// 添加 event 到 poller 中
addEvent(r);
}
這里將這個(gè) socket(包含 socket 和 buffer 的 NioChannel 實(shí)例) 包裝為一個(gè) PollerEvent,然后添加到 events 中,此時(shí)調(diào)用此方法的 acceptor 結(jié)束返回,去處理新的 accepted 連接了。
接下來(lái),我們已經(jīng)知道了,poller 線程在循環(huán)過(guò)程中會(huì)不斷調(diào)用 events() 方法,那么 PollerEvent 的 run() 方法很快就會(huì)被執(zhí)行,我們就來(lái)看看剛剛這個(gè)新的連接被 注冊(cè)到這個(gè) poller 后,會(huì)發(fā)生什么。
PollerEvent # run
@Override
public void run() {
// 對(duì)于新來(lái)的連接,前面我們說(shuō)過(guò),interestOps == OP_REGISTER
if (interestOps == OP_REGISTER) {
try {
// 這步很關(guān)鍵?。。? // 將這個(gè)新連接 SocketChannel 注冊(cè)到該 poller 的 Selector 中,
// 設(shè)置監(jiān)聽(tīng) OP_READ 事件,
// 將 socketWrapper 設(shè)置為 attachment 進(jìn)行傳遞(這個(gè)對(duì)象可是什么鬼都有,往上看就知道了)
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
/* else 這塊不介紹,省得大家頭大 */
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.socketWrapper.getEndpoint().countDownConnection();
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {}
}
}
}
到這里,我們?cè)倩仡櫼幌拢簞倓傇?PollerEvent 的 run() 方法中,我們看到,新的 SocketChannel 注冊(cè)到了 Poller 內(nèi)部的 Selector 中,監(jiān)聽(tīng) OP_READ 事件,然后我們?cè)倩氐?Poller 的 run() 看下,一旦該 SocketChannel 是 readable 的狀態(tài),那么就會(huì)進(jìn)入到 poller 的 processKey 方法。
processKey
Poller # processKey
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
// 忽略 sendfile
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
// unregister 相應(yīng)的 interest set,
// 如接下來(lái)是處理 SocketChannel 進(jìn)來(lái)的數(shù)據(jù),那么就不再監(jiān)聽(tīng)該 channel 的 OP_READ 事件
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
// 處理讀
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
// 處理寫(xiě)
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
接下來(lái)是 processSocket 方法,注意第三個(gè)參數(shù),上面進(jìn)來(lái)的時(shí)候是 true。
AbstractEndpoint # processSocket
public boolean processSocket(SocketWrapperBase socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase sc = processorCache.pop();
if (sc == null) {
// 創(chuàng)建一個(gè) SocketProcessor 的實(shí)例
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
// 將任務(wù)放到之前建立的 worker 線程池中執(zhí)行
executor.execute(sc);
} else {
sc.run(); // ps: 如果 dispatch 為 false,那么就當(dāng)前線程自己執(zhí)行
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
NioEndpoint # createSocketProcessor
@Override
protected SocketProcessorBase createSocketProcessor(
SocketWrapperBase socketWrapper, SocketEvent event) {
return new SocketProcessor(socketWrapper, event);
}
我們看到,提交到 worker 線程池中的是 NioEndpoint.SocketProcessor 的實(shí)例,至于它的 run() 方法之后的邏輯,我們就不再繼續(xù)往里分析了。
總結(jié)
最后,再祭出文章開(kāi)始的那張圖來(lái)總結(jié)一下:
這里簡(jiǎn)單梳理下前面我們說(shuō)的流程,幫大家回憶一下:
- 指定 Protocol,初始化相應(yīng)的 Endpoint,我們分析的是 NioEndpoint;
- init 過(guò)程:在 NioEndpoint 中做 bind 操作;
- start 過(guò)程:?jiǎn)?dòng) worker 線程池,啟動(dòng) 1 個(gè) Acceptor 和 2 個(gè) Poller,當(dāng)然它們都是默認(rèn)值,可配;
- Acceptor 獲取到新的連接后,getPoller0() 獲取其中一個(gè) Poller,然后 register 到 Poller 中;
- Poller 循環(huán) selector.select(xxx),如果有通道 readable,那么在 processKey 中將其放到 worker 線程池中。
后續(xù)的流程,感興趣的讀者請(qǐng)自行分析,本文就說(shuō)到這里了。
(全文完)
文章標(biāo)題:Java網(wǎng)絡(luò)編程與NIO詳解11:Tomcat中的Connector源碼分析(NIO)
標(biāo)題URL:http://fisionsoft.com.cn/article/jdhioh.html