小編給大家分享一下RocketMQ消息消費(fèi)與重平衡問(wèn)題的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
創(chuàng)新互聯(lián)公司專(zhuān)注于企業(yè)全網(wǎng)整合營(yíng)銷(xiāo)推廣、網(wǎng)站重做改版、固原網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、成都h5網(wǎng)站建設(shè)、電子商務(wù)商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站制作、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性?xún)r(jià)比高,為固原等各大城市提供網(wǎng)站開(kāi)發(fā)制作服務(wù)。
之前發(fā)表說(shuō)了Kafka 重平衡機(jī)制,有說(shuō)到 RocketMQ 重平衡機(jī)制是每隔 20s 從任意一個(gè) Broker 節(jié)點(diǎn)獲取消費(fèi)組的消費(fèi) ID 以及訂閱信息,再根據(jù)這些訂閱信息進(jìn)行分配,然后將分配到的信息封裝成 pullRequest 對(duì)象 pull 到 pullRequestQueue 隊(duì)列中,拉取線程喚醒后執(zhí)行拉取任務(wù),流程圖如下:

但是其中有一些是沒(méi)有詳細(xì)說(shuō)的,比如每次拉消息都要等 20s 嗎?真的有個(gè)網(wǎng)友問(wèn)了我如下問(wèn)題:

很顯然他的項(xiàng)目是用了 push 模式進(jìn)行消息拉取,要回答這個(gè)問(wèn)題,就要從 RockeMQ 的消息拉取說(shuō)起:
RocketMQ 的 push 模式的實(shí)現(xiàn)是基于 pull 模式,只不過(guò)在 pull 模式上套了一層,所以RocketMQ push 模式并不是真正意義上的 ”推模式“,因此,在 push 模式下,消費(fèi)者拉取完消息后,立馬就有開(kāi)始下一個(gè)拉取任務(wù),并不會(huì)真的等 20s 重平衡后才拉取,至于 push 模式是怎么實(shí)現(xiàn)的,那就從源碼去找答案。
之前有寫(xiě)過(guò)一篇文章:「RocketMQ為什么要保證訂閱關(guān)系的一致性?」,里面有說(shuō)過(guò) 消息拉取是從 PullRequestQueue 阻塞隊(duì)列中取出 PullRequest 拉取任務(wù)進(jìn)行消息拉取的,但 PullRequest 是怎么放進(jìn) PullRequestQueue 阻塞隊(duì)列中的呢?
RocketMQ 一共提供了以下方法:
org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately:
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}從調(diào)用鏈發(fā)現(xiàn),除了重平衡會(huì)調(diào)用該方法之外,在 push 模式下,PullCallback 回調(diào)對(duì)象中的 onSuccess 方法在消息消費(fèi)時(shí),也調(diào)用了該方法:
org.apache.rocketmq.client.consumer.PullCallback#onSuccess:
case FOUND:
// 如果本次拉取消息為空,則繼續(xù)將pullRequest放入阻塞隊(duì)列中
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
// 將消息放入消費(fèi)者消費(fèi)線程去執(zhí)行
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
dispathToConsume);
// 將pullRequest放入阻塞隊(duì)列中
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}當(dāng)從 broker 拉取到消息后,如果消息被過(guò)濾掉,則繼續(xù)將pullRequest放入阻塞隊(duì)列中繼續(xù)循環(huán)執(zhí)行消息拉取任務(wù),否則將消息放入消費(fèi)者消費(fèi)線程去執(zhí)行,在pullRequest放入阻塞隊(duì)列中。
case NO_NEW_MESSAGE:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
如果從 broker 端沒(méi)有可拉取的新消息或者沒(méi)有匹配到消息,則將pullRequest放入阻塞隊(duì)列中繼續(xù)循環(huán)執(zhí)行消息拉取任務(wù)。
從以上消息消費(fèi)邏輯可以看出,當(dāng)消息處理完后,立即將 pullRequest 重新放入阻塞隊(duì)列中,因此這就很好解釋為什么 push 模式可以持續(xù)拉取消息了:
在 push 模式下消息消費(fèi)完后,還會(huì)調(diào)用該方法重新將 PullRequest 對(duì)象放進(jìn) PullRequestQueue 阻塞隊(duì)列中,不斷地從 broker 中拉取消息,實(shí)現(xiàn) push 效果。
繼續(xù)再想一個(gè)問(wèn)題,如果重平衡后,發(fā)現(xiàn)某個(gè)隊(duì)列被新的消費(fèi)者分配了,怎么辦,總不能繼續(xù)從該隊(duì)列中拉取消息吧?
RocketMQ 重平衡后會(huì)檢查 pullRequest 是否還在新分配的列表中,如果不在,則丟棄,調(diào)用 isDrop() 可查出該pullRequest是否已丟棄:
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage:
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}在消息拉取之前,首先判斷該隊(duì)列是否被丟棄,如果已丟棄,則直接放棄本次拉取任務(wù)。
那什么時(shí)候隊(duì)列被丟棄呢?
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
// 判斷當(dāng)前緩存 MessageQueue 是否包含在最新的 mqSet 中,如果不存在則將隊(duì)列丟棄
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
// 如果隊(duì)列拉取過(guò)期則丟棄
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}updateProcessQueueTableInRebalance 方法在重平衡時(shí)執(zhí)行,用于更新 processQueueTable,它是當(dāng)前消費(fèi)者的隊(duì)列緩存列表,以上方法邏輯判斷當(dāng)前緩存 MessageQueue 是否包含在最新的 mqSet 中,如果不包含其中,則說(shuō)明經(jīng)過(guò)這次重平衡后,該隊(duì)列被分配給其它消費(fèi)者了,或者拉取時(shí)間間隔太大過(guò)期了,則調(diào)用 setDropped(true) 方法將隊(duì)列置為丟棄狀態(tài)。
可能你會(huì)問(wèn),processQueueTable 跟 pullRequest 里面 processQueue 有什么關(guān)聯(lián),往下看:
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:
// 新建 ProcessQueue
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
// 將ProcessQueue放入processQueueTable中
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
// 將ProcessQueue放入pullRequest拉取任務(wù)對(duì)象中
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}可以看出,重平衡時(shí)會(huì)創(chuàng)建 ProcessQueue 對(duì)象,將其放入 processQueueTable 緩存隊(duì)列表中,再將其放入 pullRequest 拉取任務(wù)對(duì)象中,也就是 processQueueTable 中的 ProcessQueue 與 pullRequest 的中 ProcessQueue 是同一個(gè)對(duì)象。
之前在群里有個(gè)網(wǎng)友提了這個(gè)問(wèn)題:

我當(dāng)時(shí)回答他 RocketMQ 正常也是沒(méi)有重復(fù)消費(fèi),但后來(lái)發(fā)現(xiàn)其實(shí) RocketMQ 在某些情況下,也是會(huì)出現(xiàn)消息重復(fù)消費(fèi)的現(xiàn)象。
前面講到,RocketMQ 消息消費(fèi)時(shí),會(huì)將消息放進(jìn)消費(fèi)線程中去執(zhí)行,代碼如下:
org.apache.rocketmq.client.consumer.PullCallback#onSuccess:
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// pullResult.getMsgFoundList(), // processQueue, // pullRequest.getMessageQueue(), // dispathToConsume);
ConsumeMessageService 類(lèi)實(shí)現(xiàn)消息消費(fèi)的邏輯,它有兩個(gè)實(shí)現(xiàn)類(lèi):
// 并發(fā)消息消費(fèi)邏輯實(shí)現(xiàn)類(lèi) org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; // 順序消息消費(fèi)邏輯實(shí)現(xiàn)類(lèi) org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
先看并發(fā)消息消費(fèi)相關(guān)處理邏輯:
ConsumeMessageConcurrentlyService:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
// 消息消費(fèi)邏輯
// ...
// 如果隊(duì)列被設(shè)置為丟棄狀態(tài),則不提交消息消費(fèi)進(jìn)度
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}ConsumeRequest 是一個(gè)繼承了 Runnable 的類(lèi),它是消息消費(fèi)核心邏輯的實(shí)現(xiàn)類(lèi),submitConsumeRequest 方法將 ConsumeRequest 放入 消費(fèi)線程池中執(zhí)行消息消費(fèi),從它的 run 方法中可看出,如果在執(zhí)行消息消費(fèi)邏輯中有節(jié)點(diǎn)加入,重平衡后該隊(duì)列被分配給其它節(jié)點(diǎn)進(jìn)行消費(fèi)了,此時(shí)的隊(duì)列被丟棄,則不提交消息消費(fèi)進(jìn)度,因?yàn)橹耙呀?jīng)消費(fèi)了,此時(shí)就會(huì)造成消息重復(fù)消費(fèi)的情況。
再來(lái)看看順序消費(fèi)相關(guān)處理邏輯:
ConsumeMessageOrderlyService:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run:
public void run() {
// 判斷隊(duì)列是否被丟棄
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// 如果不是廣播模式,且隊(duì)列已加鎖且鎖沒(méi)有過(guò)期
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
// 再次判斷隊(duì)列是否被丟棄
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 消息消費(fèi)處理邏輯
// ...
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}RocketMQ 順序消息消費(fèi)會(huì)將隊(duì)列鎖定,當(dāng)隊(duì)列獲取鎖之后才能進(jìn)行消費(fèi),所以,即使消息在消費(fèi)過(guò)程中有節(jié)點(diǎn)加入,重平衡后該隊(duì)列被分配給其它節(jié)點(diǎn)進(jìn)行消費(fèi)了,此時(shí)的隊(duì)列被丟棄,依然不會(huì)造成重復(fù)消費(fèi)。
以上是“RocketMQ消息消費(fèi)與重平衡問(wèn)題的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
當(dāng)前名稱(chēng):RocketMQ消息消費(fèi)與重平衡問(wèn)題的示例分析
網(wǎng)頁(yè)地址:http://www.chinadenli.net/article40/iiocho.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、軟件開(kāi)發(fā)、定制開(kāi)發(fā)、網(wǎng)頁(yè)設(shè)計(jì)公司、網(wǎng)站改版、微信小程序
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)