新聞中心
大家好,我是君哥。今天聊一聊 RocketMQ 的順序消息實(shí)現(xiàn)機(jī)制。

為廊坊等地區(qū)用戶(hù)提供了全套網(wǎng)頁(yè)設(shè)計(jì)制作服務(wù),及廊坊網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為成都網(wǎng)站建設(shè)、成都做網(wǎng)站、廊坊網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專(zhuān)業(yè)、用心的態(tài)度為用戶(hù)提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶(hù)的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!
在有些場(chǎng)景下,使用 MQ 需要保證消息的順序性,比如在電商系統(tǒng)中,用戶(hù)提交訂單、支付訂單、訂單出庫(kù)這 3 個(gè)消息應(yīng)該保證順序性,如下圖:
對(duì)于 RocketMQ 來(lái)說(shuō),主要是通過(guò) Producer 和 Consumer 來(lái)保證消息順序的。
1、Producer
下面代碼是 Producer 發(fā)送順序消息的官方示例:
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
跟發(fā)送并發(fā)消息不一樣的是,發(fā)送消息時(shí)傳入了 MessageQueueSelector,這里可以指定消息發(fā)送到固定的 MessageQueue。
注意:上面的代碼把 orderId 相同的消息都會(huì)發(fā)送到同一個(gè) MessageQueue,這樣同一個(gè) orderId 的消息是有序的,這也叫做局部有序。對(duì)應(yīng)的另一種是全局有序,這需要把所有的消息都發(fā)到同一個(gè) MessageQueue。
下面再來(lái)看一下發(fā)送的代碼:
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//省略部分邏輯
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
ListmessageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue threw exception.", e);
}
//省略部分邏輯
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
//省略部分邏輯
}
可以看到,在發(fā)送的時(shí)候,使用 MessageQueueSelector 選擇一個(gè) MessageQueue,然后發(fā)送消息到這個(gè) MessageQueue。對(duì)于并發(fā)消息,這里不傳 MessageQueueSelector,如果發(fā)送方法沒(méi)有指定 MessageQueue,就會(huì)按照默認(rèn)的策略選擇一個(gè)。
2、Consumer
以 RocketMQ 推模式為例,消費(fèi)者會(huì)注冊(cè)一個(gè)監(jiān)聽(tīng)器,進(jìn)行消息的拉取和消費(fèi)處理,下面的 UML 類(lèi)圖顯示了調(diào)用關(guān)系:
上圖中包含了對(duì)順序消息和對(duì)并發(fā)消息的處理。其中 MessageListenerOrderly 和 ConsumeMessageOrderlyService 對(duì)順序消息進(jìn)行處理。跟并發(fā)消息不一樣的是,順序消息定義了一個(gè) MessageQueueLock 類(lèi),這個(gè)類(lèi)保存了每個(gè) MessageQueue 對(duì)應(yīng)的鎖,代碼如下:
private ConcurrentMapmqLockTable = new ConcurrentHashMap ();
下面代碼是順序消費(fèi)的官方示例:
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
下面看一下順序消息的消費(fèi)端處理邏輯。
(1)注冊(cè)監(jiān)聽(tīng)
上面的代碼定義了順序消息監(jiān)聽(tīng)器 MessageListenerOrderly,并且注冊(cè)到 DefaultMQPushConsumer,這個(gè)注冊(cè)同時(shí)也注冊(cè)到了 DefaultMQPushConsumerImpl。
(2)PushConsumer 初始化
在 DefaultMQPushConsumerImpl 類(lèi)初始化的時(shí)候,會(huì)判斷注冊(cè)的 MessageListener 是不是 MessageListenerOrderly,如果是,就把 consumeOrderly 變量設(shè)置為 true,以此來(lái)標(biāo)記是順序消息拉取還是并發(fā)消息拉取。然后把 ConsumeMessageService 初始化為 ConsumeMessageOrderlyService。
(3)鎖定 mq
要保證消息的順序性,就需要保證同一個(gè) MessageQueue 只能被同一個(gè) Consumer 消費(fèi)。
ConsumeMessageOrderlyService 初始化的時(shí)候,會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),周期性(默認(rèn) 20s)地向 Broker 發(fā)送鎖定消息(請(qǐng)求類(lèi)型 LOCK_BATCH_MQ),Broker 收到后,就會(huì)把 MessageQueue、group 和 clientId 進(jìn)行綁定,這樣其他客戶(hù)端就不能從這個(gè) MessageQueue 拉取消息。
注意:Broker 的鎖定是有過(guò)期時(shí)間的,默認(rèn) 60s,可以配置,鎖定過(guò)期后,有可能被其他 Consumer 進(jìn)行消費(fèi)。
Broker 端鎖結(jié)構(gòu)如下圖:
(4)拉取消息
消費(fèi)者啟動(dòng)時(shí),啟動(dòng)消費(fèi)拉取線(xiàn)程 PullMessageService,里面死循環(huán)不停地從 Broker 拉取消息。這里調(diào)用了 DefaultMQPushConsumerImpl 類(lèi)的 pullMessage 方法。這里拉取消息的邏輯跟并發(fā)消息邏輯是一樣的。
拉取到消息后,調(diào)用 PullCallback 的 onSuccess 方法處理結(jié)果,這里調(diào)用了 ConsumeMessageOrderlyService 的 submitConsumeRequest 方法,里面用線(xiàn)程池提交了 ConsumeRequest 線(xiàn)程。
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
//省略
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
//省略
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
//省略
}
//省略
break;
//省略
}
}
}
//省略
};上面拉取到消息后,先把消息放到了 ProcessQueue,然后調(diào)用了 submitConsumeRequest 方法。跟并發(fā)消息處理方式不同的是,submitConsumeRequest 方法并沒(méi)有處理拉取到的消息,而真正處理的時(shí)候是從 ProcessQueue 獲取。
(5)處理消息
處理消息的邏輯在 ConsumeMessageOrderlyService 的內(nèi)部類(lèi) ConsumeRequest,這是一個(gè)線(xiàn)程類(lèi),run 方法如下:
public void run() {
//省略部分邏輯
//1.獲取到 MessageQueueLock 對(duì)應(yīng)的鎖
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
//省略延后執(zhí)行的邏輯
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//2.從 processQueue 拉取消息
List msgs = this.processQueue.takeMessages(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
//省略部分邏輯
boolean hasException = false;
try {
//3.獲取處理鎖
this.processQueue.getConsumeLock().lock();
//4.執(zhí)行消費(fèi)處理邏輯
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
} finally {
//5.釋放處理鎖
this.processQueue.getConsumeLock().unlock();
}
//省略部分邏輯
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
//省略部分邏輯
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
上面的代碼總結(jié)一下,Consumer 消費(fèi)消息的邏輯如下:
- 對(duì) MessageQueueLock 進(jìn)行加鎖,這樣就保證只有一個(gè)線(xiàn)程在處理當(dāng)前 MessageQueue。
- 從 ProcessQueue 拉取一批消息。
- 獲取 ProcessQueue 鎖,這樣保證了只有當(dāng)前線(xiàn)程可以進(jìn)行消息處理,同時(shí)也可以防止 Rebalance 線(xiàn)程把當(dāng)前處理的 MessageQueue 移除掉。
- 執(zhí)行消費(fèi)處理邏輯。
- 釋放 ProcessQueue 處理鎖;6.processConsumeResult 方法更新消息偏移量。
注意:ProcessQueue 中的鎖是 ReentrantLock。
3、重試
跟并發(fā)消息不一樣的是,順序消息消費(fèi)失敗后并不會(huì)把消息發(fā)送到 Broker,而是直接在 Consumer 端進(jìn)行重試,如果重試次數(shù)超過(guò)了最大重試次數(shù)(16 次),則發(fā)送到 Broker,Broker 則將消息推入死信隊(duì)列。如下圖:
4、總結(jié)
RocketMQ 順序消息的原理是在 Producer 端把一批需要保證順序的消息發(fā)送到同一個(gè) MessageQueue,Consumer 端則通過(guò)加鎖的機(jī)制來(lái)保證消息消費(fèi)的順序性,Broker 端通過(guò)對(duì) MessageQueue 進(jìn)行加鎖,保證同一個(gè) MessageQueue 只能被同一個(gè) Consumer 進(jìn)行消費(fèi)。
根據(jù)實(shí)現(xiàn)原理可以看到,RocketMQ 的順序消息可能存在兩個(gè)坑:
- 有順序性的消息需要發(fā)送到同一個(gè) MessageQueue,可能導(dǎo)致單個(gè) MessageQueue 消息量很大,而 Consumer 端消費(fèi)的時(shí)候只能單線(xiàn)程消費(fèi),很可能導(dǎo)致當(dāng)前 MessageQueue 消息積壓。
- 如果順序消息 MessageQueue 所在的 broker 掛了,這時(shí) Producer 只能把消息發(fā)送到其他 Broker 的 MessageQueue 上,而如果新的 MessageQueue 被其他 Consumer 消費(fèi),這樣兩個(gè) Consumer 消費(fèi)的消息就不能保證順序性了。如下圖:
Broker1 發(fā)生故障,把訂單出庫(kù)的消息發(fā)送到了 Broker2,由 Consumer2 來(lái)進(jìn)行消費(fèi),消息順序很可能會(huì)錯(cuò)亂。
網(wǎng)站標(biāo)題:五張圖帶你理解RocketMQ順序消息實(shí)現(xiàn)機(jī)制
文章起源:http://fisionsoft.com.cn/article/dhgcggc.html


咨詢(xún)
建站咨詢(xún)
