這篇文章主要介紹RocketMQ中push consumer啟動之觸發(fā)消息拉取的示例代碼,文中介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們一定要看完!
成都創(chuàng)新互聯(lián)主營彝良網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,成都App定制開發(fā),彝良h5重慶小程序開發(fā)公司搭建,彝良網(wǎng)站營銷推廣歡迎彝良等地區(qū)企業(yè)咨詢
push consumer啟動方法DefaultMQPushConsumerImpl.start最后一步會觸發(fā)MQClientInstance.rebalanceImmediately,該調(diào)用最終會進(jìn)入到RebalanceImpl.doRebalance中,它會根據(jù)topic當(dāng)前的實(shí)際consumer數(shù)量(從nameserver獲取)通過負(fù)載均衡原則來決定自己所要訂閱的message queue。然后在本地創(chuàng)建對應(yīng)的消息緩存隊(duì)列(ProcessQueue),并觸發(fā)消息拉取操作。
RebalanceImpl是整個consumer的核心,它即保存本消費(fèi)者訂閱的topic信息,又緩存了topic中的message數(shù)據(jù)。RebalanceImpl相關(guān)的幾個核心類如下:

MessageQueue代表的是遠(yuǎn)端broker上一個topic下的某個message queue
ProcessQueue是對遠(yuǎn)端message queue的一個本地緩存,拉取下來的消息都存在一個TreeMap中,其中key是commitlog中的offset
RebalanceImpl中保存了三種關(guān)系:message queue和process queue的映射關(guān)系;topic和message queue的映射關(guān)系;topic的訂閱關(guān)系
doRebalance方法會調(diào)用rebalanceByTopic來決定本消費(fèi)者具體要訂閱一個topic下的哪些message queue,以達(dá)到負(fù)載均衡的效果。
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
// 廣播模式,訂閱所有message queue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// 集群模式,獲取該topic下所有的message queue + 該topic所有的consumer
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 通過負(fù)載均衡策略計(jì)算出當(dāng)前消費(fèi)者所需訂閱的message queue子集
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 更新Process Queue緩存列表
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}rebalanceByTopic中通過負(fù)載均衡策略計(jì)算出當(dāng)前消費(fèi)者對于一個topic實(shí)際訂閱的message queue子集之后,就會在updateProcessQueueTableInBalance方法中創(chuàng)建ProcessQueue,并啟動消息拉取。
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
// 本地Process Queue存在,但不再訂閱,則廢棄改process queue
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) { // process queue過期,也廢棄,等待新建
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
// 新訂閱,本地不存在對應(yīng)的process queue,則新建
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
// 初始化首次拉取請求
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 批量觸發(fā)首次拉取請求
this.dispatchPullRequest(pullRequestList);
return changed;
}消息拉取的初始化過程如下圖:

以上是“RocketMQ中push consumer啟動之觸發(fā)消息拉取的示例代碼”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
網(wǎng)頁題目:RocketMQ中pushconsumer啟動之觸發(fā)消息拉取的示例代碼
文章出自:http://www.chinadenli.net/article48/gidpep.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站導(dǎo)航、網(wǎng)站收錄、企業(yè)建站、云服務(wù)器、品牌網(wǎng)站設(shè)計(jì)、自適應(yīng)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)