新聞中心
今天來分享一個最近生產(chǎn)環(huán)境遇到的一個 RocketMQ 異常:

首先,我們回顧一下 RockemtMQ 的架構(gòu):
Broker 的主從節(jié)點都會注冊到 Name Server 集群,Name Server 集群保存了 Broker 相關(guān)信息。RocketMQ client 會在本地維護一份 topic 和 Broker 地址的映射關(guān)系,放在 MQClientInstance#brokerAddrTable。
發(fā)送消息
RocketMQ client 在發(fā)送消息時,會根據(jù) topic 首先從本地緩存(brokerAddrTable)獲取 Broker,如果獲取不到,就會到 Name Server 集群中獲取。
//DefaultMQProducerImpl 類
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
//從 Name Server 中獲取
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
if (brokerAddr != null) {
//省略處理邏輯
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
從上面的代碼可以看出,如果本地緩存和 Name Server 都沒有保存 Broker 信息,則會拋出 Broker 不存在的異常。這種情況解決思路就是從 Broker 啟動時是否注冊成功來著手分析。
消息偏移量
獲取偏移量
客戶端獲取消息偏移量(Consume Offset)的時候,也可能會拋出這個異常:
//RemoteBrokerOffsetStore 類
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
//省略實現(xiàn)邏輯
}
case READ_FROM_STORE: {
//省略
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
//省略
}
default:
break;
}
}
return -1;
}
從上面的代碼中可以看到:獲取偏移量的方式有 3 種:
- MEMORY_FIRST_THEN_STORE:先從內(nèi)存中獲取,如果獲取不到,再從 Broker 請求;
- READ_FROM_MEMORY:直接從內(nèi)存中獲取;
- READ_FROM_STORE:直接從 Broker 請求。
從 Broker 請求的代碼如下:
//RemoteBrokerOffsetStore 類
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
}
if (findBrokerResult != null) {
//忽略處理邏輯
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
這段代碼跟上一節(jié)發(fā)送消息時獲取 Broker 地址的代碼一樣,首先從本地內(nèi)存中獲取,如果過去不到,就從 Name Server 中獲取,如果取不到,就拋出 Broker 不存在的異常。
其他獲取偏移量方法
除了上面的獲取偏移量的方法外,還有 3 個獲取偏移量的方法,在 MQAdminImpl 類:
- searchOffset:從 Broker 獲取 Message-Queue 偏移量,跟上面方法類似;
- maxOffset:從 Broker 獲取 MessageQ-ueue 最大偏移量;
- minOffset:從 Broker 獲取 MessageQu-eue 最小偏移量。
這些方法的使用都在源碼【rocketmq-tools】這個模塊中。
獲取最早消息的保存時間
還有一個跟偏移量相關(guān)的方法,獲取最早的一條消息的保存時間,代碼如下:
//RemoteBrokerOffsetStore 類
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
//省略處理邏輯
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
這個方法是獲取一個 MessageQueue 中最小偏移量消息的保存時間。
這些方法的使用都在源碼【rocketmq-tools】這個模塊中。
拉取消息
正常拉取消息
拉取消息的核心代碼如下:
// PullAPIWrapper 類
public PullResult pullKernelImpl(
//省略參數(shù)
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
//省略處理邏輯
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
從上面的代碼可以看到,客戶端從 Broker 拉取消息之前,首先會從本地緩存獲取 Broker 地址,如果獲取不到,就從 Name Server 獲取 Broker 地址,如果獲取失敗,則拋出 Broker 不存在的異常。
偏移量不合法
如果拉取消息時返回偏移量不合法(OFFSET_ILLEGAL),這時就需要重新處理偏移量??蛻舳舜a的調(diào)用關(guān)系如下:
這個發(fā)生在事務(wù)消息的場景,RocketMQ client 向 Broker 拉取消息時,如果 Broker 返回 PULL_OFFSET_MOVED,client 就會通過異步線程(定時 10s 后執(zhí)行)通知 Broker 更新 offset 為 nextPullOffset(上次 pull 消息時 broker 返回)。代碼如下:
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean
isOneway) throws RemotingException,MQBrokerException, InterruptedException,
MQClientException { FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
MixAll.MASTER_ID, true); if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
MixAll.MASTER_ID, false); } if (findBrokerResult != null) { //省略業(yè)務(wù)代碼 } else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist",
null); }}
總結(jié)
今天根據(jù)之前遇到的一次生產(chǎn)環(huán)境的異常日志研究了出現(xiàn)【The broker[xxx] not exis】的 6 個場景,每個場景都類似,首先從本地緩存獲取 Broker 地址,如果獲取不到,就從 Name Server 獲取。
出現(xiàn)這種情況一般有下面三個原因:
- Broker 掛了,客戶端定時任務(wù)會判斷到 Broker 離線,就會從本地緩存中移除(MQClientInstance#cleanOfflineBroker);
- Broker 網(wǎng)絡(luò)異常;
- Broker 發(fā)生了主備切換,客戶端獲取 Broker 地址時切換還沒有完成。
這些場景其實也有定時任務(wù)刷新本地緩存,見下面代碼:
//MQClientInstance 類
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}
本文名稱:這六個場景下RocketMQ會找不到Broker
標(biāo)題來源:http://fisionsoft.com.cn/article/cojesej.html


咨詢
建站咨詢
