新聞中心
前言
本次打算直接上干貨分享 RabbitMQ Java 客戶端一系列的源碼分析 (com.rabbitmq:amqp-client:4.8.3)。

為賽罕等地區(qū)用戶提供了全套網(wǎng)頁設計制作服務,及賽罕網(wǎng)站建設行業(yè)解決方案。主營業(yè)務為網(wǎng)站建設、成都網(wǎng)站制作、賽罕網(wǎng)站設計,以傳統(tǒng)方式定制建設網(wǎng)站,并提供域名空間備案等一條龍服務,秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務。我們深信只要達到每一位用戶的要求,就會得到認可,從而選擇與我們長期合作。這樣,我們也可以走得更遠!
ps:最近接收到公司的任務就是閱讀和分析 spring-rabbit、amqp-client,因此打算一同和大家分享 amqp-client。由于 RabbitMQ 是 Erlang 語言開發(fā)(暫時沒有對這塊分享的計劃)。
友情提醒:本次分享適合的人群,需要對 RabbitMQ 有一定的了解。
- RabbitMQ Getstarted: https://www.rabbitmq.com/#getstarted。
- Java Client API Guide: https://www.rabbitmq.com/api-guide.html。
廢話不多話,開整!
Java Client Connection Demo
我們先看一個官網(wǎng)提供的 Java Client Connecting to RabbitMQ Demo。
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
Channel channel = connection.createChannel();
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
channel.close();
connection.close();
AMQP 協(xié)議交互流程
已經(jīng)使用過 RabbitMQ 的同學相信已經(jīng)不陌生,因此就簡單的描述下:與 RabbitMQ Broker 建立 Connection 和 Channel,發(fā)送消息后,關閉 Connection 和 Channel 的過程。下圖是 針對這個過程使用 Wireshark 抓包查看整個 AMQP 協(xié)議的交互流程(172.30.0.74 為客戶端即本機 ip;192.168.17.160 為 RabbitMQ Broker 的 ip)。
「client 與 broker 創(chuàng)建Connection、Channel、發(fā)送消息」
「client 與 broker 發(fā)送心跳(Heartbeat)、關閉Connection、Channel」
為了讓讀者更容易看得源碼,我先給大家描述下 client 與 broker 之間 AMQP 協(xié)議的交互流程描述(AMQP 協(xié)議中 不少命令都是成對存在的,抓包協(xié)議中 Info 里的命令是 -,而代碼里的是 駝峰式 此處以代碼為準):
- 將 AMQP 0-9-1 的連接頭寫入底層套接字,包含指定的版本信息(客戶端告訴 broker 自己使用的協(xié)議及版本,底層使用 java 自帶的 socket)。
- 客戶端等待 broker 發(fā)送的 Connection.Start (broker 告訴客戶端 通信的協(xié)議和版本、SASL認證機制(詳細見)、語言環(huán)境以及RabbitMQ的版本信息和支持能力)。
- 客戶端接收后 發(fā)送 Connection.StartOk (客戶端告訴 broker 連接使用的帳號和密碼、認證機制、語言環(huán)境、客戶的信息以及能力)。
- 客戶端等待 broker 發(fā)送的 Connection.Tune (broker 與 客戶端 進行參數(shù)協(xié)商)。
- 客戶端接收后 發(fā)送 Connection.TuneOk (客戶端 參數(shù) [ChannelMax、FrameMax、Heartbeat] 協(xié)商完成后告訴 broker)。
- 客戶端發(fā)送 Connection.Open (客戶端 告訴 broker 打開一個連接,并請求設置_virtualHost [vhost])。
- broker 接收到后返回 Connection.OpenOk (client 對 vhost 進行驗證,成功則返回如下此信息)。
- 客戶端發(fā)送 Channel.Open,broker 接收到后返回 Channel.OpenOk (客戶端 創(chuàng)建通道;broker 收到并創(chuàng)建通道完成)。
- 客戶端發(fā)送 Confirm.Select,broker 接收到后返回 Confirm.SelectOk(客戶端告訴 broker 消息需要使用 confirm的機制,broker收到并回復)。。
- 客戶端發(fā)送消息 Basic.Publish,broker 應答返回 Basic.Ack。
- 期間 客戶端和 broker 會相互檢查彼此的心跳 heartbeat。
- 客戶端 關閉通道 Channel.Close,broker 應答返回 Channel.CloseOk。
- 客戶端 關閉連接 Connection.Close,broker 應答返回 Connection.CloseOk。
源碼分析
熟悉完AMQP 協(xié)議的交互流程易于后續(xù)理解源碼,開始本次主要介紹 Connection 相關的源碼:ConnectionFactory.newConnection --> AMQConnection.start。
「ConnectionFactory.newConnection()」
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
throws IOException, TimeoutException {
if(this.metricsCollector == null) {
this.metricsCollector = new NoOpMetricsCollector();
}
// make sure we respect the provided thread factory
// 創(chuàng)建 socketFactory 和 初始化相應的配置
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
// 初始化 Connection 涉及到的參數(shù)
ConnectionParams params = params(executor);
// set client-provided via a client property
if (clientProvidedName != null) {
Mapproperties = new HashMap (params.getClientProperties());
properties.put("connection_name", clientProvidedName);
params.setClientProperties(properties);
}
// 這塊邏輯屬于 rabbit提供自動回復連接的邏輯
if (isAutomaticRecoveryEnabled()) {
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
return conn;
} else {
List addrs = addressResolver.getAddresses();
Exception lastException = null;
for (Address addr : addrs) {
try {
// 創(chuàng)建、連接 socket 并封裝成 返回 SocketFrameHandler (socket 不采用Negale算法[Negale算法,大家有興趣可以了解下這塊針對socket緩存性能的優(yōu)化])
FrameHandler handler = fhFactory.create(addr);
// 初始化配置、_channel0、_channelManager等等
AMQConnection conn = createConnection(params, handler, metricsCollector);
// 啟動 AMQConnection 后續(xù)會進行詳細介紹
conn.start();
this.metricsCollector.newConnection(conn);
return conn;
} catch (IOException e) {
lastException = e;
} catch (TimeoutException te) {
lastException = te;
}
}
if (lastException != null) {
if (lastException instanceof IOException) {
throw (IOException) lastException;
} else if (lastException instanceof TimeoutException) {
throw (TimeoutException) lastException;
}
}
throw new IOException("failed to connect");
}
}
AMQP 協(xié)議的交互流程中 1~6 的邏輯屬于 AMQConnection.start() 的重點邏輯,也是本次給大家主要介紹的點。
public void start()
throws IOException, TimeoutException {
// 初始化工作線程
initializeConsumerWorkService();
// 初始化心跳發(fā)送
initializeHeartbeatSender();
// 將 Connection標志位 啟動
this._running = true;
// 確認客戶端 第一件事 發(fā)送header頭部協(xié)議
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
new AMQChannel.SimpleBlockingRpcContinuation();
// 進入Rpc隊列進行阻塞,等待broker返回 connection.start method
_channel0.enqueueRpc(connStartBlocker);
try {
// The following two lines are akin to AMQChannel's
// transmit() method for this pseudo-RPC.
_frameHandler.setTimeout(handshakeTimeout);
// 1. 發(fā)送header頭部協(xié)議 AMQP 0-9-1
_frameHandler.sendHeader();
} catch (IOException ioe) {
_frameHandler.close();
throw ioe;
}
// 初始化啟動 startMainLoop -- 為了接收和處理broker發(fā)送的消息
this._frameHandler.initialize(this);
AMQP.Connection.Start connStart;
AMQP.Connection.Tune connTune = null;
try {
// 2. 客戶端等待 broker 發(fā)送的 Connection.Start
connStart =
(AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();
// 通信的協(xié)議和版本、SASL認證機制(詳細見)、語言環(huán)境以及RabbitMQ的版本信息和支持能力
_serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());
Version serverVersion =
new Version(connStart.getVersionMajor(),
connStart.getVersionMinor());
// 版本比對
if (!Version.checkVersion(clientVersion, serverVersion)) {
throw new ProtocolVersionMismatchException(clientVersion,
serverVersion);
}
String[] mechanisms = connStart.getMechanisms().toString().split(" ");
SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
if (sm == null) {
throw new IOException("No compatible authentication mechanism found - " +
"server offered [" + connStart.getMechanisms() + "]");
}
String username = credentialsProvider.getUsername();
String password = credentialsProvider.getPassword();
LongString challenge = null;
LongString response = sm.handleChallenge(null, username, password);
do {
// 3. 客戶端接收后 發(fā)送 `Connection.StartOk`
Method method = (challenge == null)
? new AMQP.Connection.StartOk.Builder()
.clientProperties(_clientProperties)
.mechanism(sm.getName())
.response(response)
.build()
: new AMQP.Connection.SecureOk.Builder().response(response).build();
try {
Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
if (serverResponse instanceof AMQP.Connection.Tune) {
// 4. 客戶端等待 broker 發(fā)送的 Connection.Tune
connTune = (AMQP.Connection.Tune) serverResponse;
} else {
challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
response = sm.handleChallenge(challenge, username, password);
}
} catch (ShutdownSignalException e) {
Method shutdownMethod = e.getReason();
if (shutdownMethod instanceof AMQP.Connection.Close) {
AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
throw new AuthenticationFailureException(shutdownClose.getReplyText());
}
}
throw new PossibleAuthenticationFailureException(e);
}
} while (connTune == null);
} catch (TimeoutException te) {
_frameHandler.close();
throw te;
} catch (ShutdownSignalException sse) {
_frameHandler.close();
throw AMQChannel.wrap(sse);
} catch(IOException ioe) {
_frameHandler.close();
throw ioe;
}
try {
// 最大通道數(shù)
int channelMax =
negotiateChannelMax(this.requestedChannelMax,
connTune.getChannelMax());
_channelManager = instantiateChannelManager(channelMax, threadFactory);
// 幀最大的大小
int frameMax =
negotiatedMaxValue(this.requestedFrameMax,
connTune.getFrameMax());
this._frameMax = frameMax;
// 心跳
int heartbeat =
negotiatedMaxValue(this.requestedHeartbeat,
connTune.getHeartbeat());
setHeartbeat(heartbeat);
// 5. 客戶端接收后 發(fā)送 Connection.TuneOk
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
.channelMax(channelMax)
.frameMax(frameMax)
.heartbeat(heartbeat)
.build());
// 6. 客戶端發(fā)送 Channel.Open
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
.virtualHost(_virtualHost)
.build());
} catch (IOException ioe) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw ioe;
} catch (ShutdownSignalException sse) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw AMQChannel.wrap(sse);
}
// We can now respond to errors having finished tailoring the connection
this._inConnectionNegotiation = false;
}
最后
本次分享的目的,先讓讀者對于 RabbitMQ Client 與 RabbitMQ Broker 根據(jù) AMQP 協(xié)議交互流程有個大體的認識,并根據(jù)分析 Connection 源碼有一定認知,其中還有很多 Connection 細節(jié)源碼需要讀者慢慢體會。
本文標題:RabbitMQ客戶端源碼系列-Connection
文章分享:http://fisionsoft.com.cn/article/dhecied.html


咨詢
建站咨詢
