org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-client 4.9.3
發(fā)送消息// 同步發(fā)送
SendResult sendResult = producer.send(msg);
// 異步發(fā)送,指定回調(diào)
producer.send(msg, new SendCallback() {// 當(dāng)producer接收到MQ發(fā)送來的ACK后就會(huì)觸發(fā)該回調(diào)方法的執(zhí)行
@Override
public void onSuccess(SendResult sendResult) {System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {e.printStackTrace();
}
});
// 消息發(fā)送的狀態(tài)
public enum SendStatus {SEND_OK, // 發(fā)送成功
FLUSH_DISK_TIMEOUT, // 刷盤超時(shí)。當(dāng)Broker設(shè)置的刷盤策略為同步刷盤時(shí)才可能出現(xiàn)這種異常狀態(tài)。異步刷盤不會(huì)出現(xiàn)
FLUSH_SLAVE_TIMEOUT, // Slave同步超時(shí)。當(dāng)Broker集群設(shè)置的Master-Slave的復(fù)制方式為同步復(fù)制時(shí)才可能出現(xiàn)這種異常狀態(tài)。異步復(fù)制不會(huì)出現(xiàn)
SLAVE_NOT_AVAILABLE, // 沒有可用的Slave。當(dāng)Broker集群設(shè)置為Master-Slave的復(fù)制方式為同步復(fù)制時(shí)才可能出現(xiàn)這種異常狀態(tài)。異步復(fù)制不會(huì)出現(xiàn)
}
消費(fèi)消息DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer ("cg");
consumer.setNamesrvAddr ("rocketmqOS:9876");
// 指定 從第一條消息開始消費(fèi)
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe ("someTopicA", "*");
// 指定 每次可以 消費(fèi)10條消息,默認(rèn)為1
consumer.setConsumeMessageBatchMaxSize (10);
// 指定 每次 可以從Broker拉取40條消息, 默認(rèn)為32
consumer.setPullBatchSize (40);
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println (msg);
}
// 消費(fèi)成功的返回結(jié)果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 消費(fèi)異常時(shí)的返回結(jié)果
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start ();
消息的可靠性(不丟失、高可用)消息可能在哪些階段丟失呢?可能會(huì)在這三個(gè)階段發(fā)生丟失:生產(chǎn)階段、存儲(chǔ)階段、消費(fèi)階段
生產(chǎn)階段:事務(wù)消息,寫到OS Cache就返回 成功發(fā)送!
存儲(chǔ)階段:調(diào)整 MQ的刷盤策略,我們需要調(diào)整broker.conf配置文件,將其中的flushDiskType配置設(shè)置為:SYNC_FLUSH,默認(rèn)他的值是ASYNC_FLUSH,即默認(rèn)是異步刷盤
消費(fèi)階段
不能 異步 消費(fèi)消息
// 開啟監(jiān)聽,用于接收消息
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listlist, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 業(yè)務(wù) 處理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動(dòng)接收消息的服務(wù)
consumer.start ();
也可以 使用如下結(jié)構(gòu):
目前RocketMQ支持的延時(shí)級別:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
生產(chǎn)者:
// 這是 訂單系統(tǒng)的生產(chǎn)者
DefaultMQProducer producer = new DefaultMQProducer ("OrderGroup");
// 啟動(dòng)生產(chǎn)者
producer.start ();
Message msg = new Message (
"CreateOrderInformTopic",
"create success".getBytes ());
// 設(shè)置消息為延時(shí)消息,延遲級別為 16
msg.setDelayTimeLevel (16);
producer.send (msg);
消費(fèi)者:
// 訂單掃描服務(wù) 的消費(fèi)者
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer ("OrderScanServiceGroup");
// 訂閱 訂單創(chuàng)建 通知Topic
consumer.subscribe ("CreateOrderInformTopic", "*");
// 注冊 消息監(jiān)聽者
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listlist,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {// 將 msg轉(zhuǎn)為 訂單對象
Order order = buildOrder (msg);
// 根據(jù)Id 查詢數(shù)據(jù)庫
order = getById (order.getId ());
// 如果 已支付
if (order.getStatus () == 1) {}
// 如果 未支付
else {}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
消息過濾// 創(chuàng)建消費(fèi)者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("aGroup");
// 訂閱主題,tag為 a,b,c的都監(jiān)聽
consumer.subscribe ("orderInfoTopic", "a || b || c");
修改broker.conf 配置文件才能生效
// 創(chuàng)建消費(fèi)者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("bGroup");
// 只有訂閱的消息有這個(gè)屬性a, a >=0 and a<= 3
consumer.subscribe ("TopicTest", MessageSelector.bySql ("a between 0 and 3"));
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {// 處理業(yè)務(wù)邏輯
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start ();
RocketMQ還是 支持比較豐富的數(shù)據(jù)過濾語法的,如下所示:
(1)數(shù)值比較,比如:>,>=,<,<=,BETWEEN,=;
(2)字符比較,比如:=,<>,IN;
(3)IS NULL 或者 IS NOT NULL;
(4)邏輯符號 AND,OR,NOT;
(5)數(shù)值,比如:123,3.1415;
(6)字符,比如:‘a(chǎn)bc’,必須用單引號包裹起來;
(7)NULL,特殊的常量
(8)布爾值,TRUE 或 FALSE
順序消息是指消息的消費(fèi)順序和產(chǎn)生順序相同,在有些業(yè)務(wù)邏輯下,必須保證順序,比如訂單的生成、付款、發(fā)貨,這個(gè)消息必須按順序處理才行。
「消息亂序解決方案 不能和重試隊(duì)列混用。」
全局順序消息創(chuàng)建一個(gè) Topic ,默認(rèn)八個(gè)寫隊(duì)列,八個(gè)讀隊(duì)列
要保證全局順序消息, 需要先把 Topic 的讀寫隊(duì)列數(shù)設(shè)置為 一,然后Producer Consumer 的并發(fā)設(shè)置,也要是一
部分順序消息相對比較好實(shí)現(xiàn)
,生產(chǎn)端需要做到把同 ID 的消息發(fā)送到同一個(gè) Message Queue ;在消費(fèi)過程中,要做到從同一個(gè)Message Queue讀取的消息順序處理
DefaultMQProducer producer = new DefaultMQProducer ();
producer.setSendLatencyFaultEnable (true);
producer.setSendMsgTimeout (50);
producer.setNamesrvAddr ("192.168.111.101:9876");
producer.start ();
Message message = new Message ("Topic", "hello world".getBytes ());
SendResult sendResult = producer.send (message, new MessageQueueSelector () {@Override
public MessageQueue select(Listmqs,
Message message,
Object arg) {Long orderId = (Long) arg; // 根據(jù) 訂單id選擇發(fā)送 queue
long index = orderId % mqs.size (); // 用訂單id 對MessageQueue數(shù)量取模
return mqs.get ((int) index); // 返回一個(gè)MessageQueue
}
}, orderId //這里傳入訂單id
);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("cg");
consumer.setNamesrvAddr ("192.168.111.101:9876");
// 從頭 開始消費(fèi)
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe ("Topic", "*");
consumer.registerMessageListener (new MessageListenerOrderly () {@Override
public ConsumeOrderlyStatus consumeMessage(Listmsgs,
ConsumeOrderlyContext context) { context.setAutoCommit (true);
try { for (MessageExt msg : msgs) { // 對有序的消息 進(jìn)行處理
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) { // 如果 消息處理 有問題
// 讓這批消息 暫停一會(huì)兒 再繼續(xù)處理這批消息
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
consumer.start ();
消息積壓發(fā)生了消息積壓,這時(shí)候就得想辦法趕緊把積壓的消息消費(fèi)完,就得考慮提高消費(fèi)能力,一般有兩種辦法
處理消息重復(fù)問題,主要有業(yè)務(wù)端自己保證,主要的方式有兩種:業(yè)務(wù)去重 和 消息去重。
業(yè)務(wù)去重:保證 業(yè)務(wù)消費(fèi)的冪等性即可
消息去重:依據(jù)在 生產(chǎn)者方設(shè)置消息的 messageKey,然后 每一條消息 在消費(fèi)方依據(jù)這個(gè)唯一的 messageKey,進(jìn)行冪等判斷
流程:** insert 失敗,說明已經(jīng)消費(fèi)---->捕獲異常返回已消費(fèi),insert 成功---->處理業(yè)務(wù) 提交事務(wù)后 再確認(rèn)成功消費(fèi)~~ **
// 設(shè)置 同步發(fā)送失敗時(shí) 重試發(fā)送的次數(shù),默認(rèn)為2次
producer.setRetryTimesWhenSendFailed(3);
// 設(shè)置 發(fā)送 超時(shí)時(shí)限為5s,默認(rèn)3s
producer.setSendMsgTimeout(5000);
// 指定 異步發(fā)送失敗后 不進(jìn)行重試發(fā)送
producer.setRetryTimesWhenSendAsyncFailed(0);
// 順序消息消費(fèi)失敗的消費(fèi)重試時(shí)間間隔,單位毫秒,默認(rèn)為1000,其取值范圍為[10,30000]
// 重試期間 應(yīng)用會(huì)出現(xiàn) 消息消費(fèi)被阻塞
consumer.setSuspendCurrentQueueTimeMillis(100);
// 修改消費(fèi)重試次數(shù),默認(rèn)16次
consumer.setMaxReconsumeTimes(10);
消息 刷盤失敗策略
消息刷盤超時(shí)(Master或Slave)或slave不可用(slave在做數(shù)據(jù)同步時(shí)向master返回狀態(tài)不是SEND_OK)時(shí),默認(rèn)是不會(huì)將消息嘗試發(fā)送到其他Broker的。不過,對于重要消息可以通過在Broker的配置文件設(shè)置retryAnotherBrokerWhenNotStoreOK
屬性為true
來開啟
重試隊(duì)列里面的消息會(huì)再次發(fā)給消費(fèi)組,默認(rèn) 最多重試 16 次,如果重試 16 次失敗則進(jìn)入「死信隊(duì)列」
「死信隊(duì)列:」
對于死信隊(duì)列,一般我們可以專門開一個(gè)后臺線程,訂閱這個(gè)死信隊(duì)列,對死信隊(duì)列中的消息,一直不停的嘗試。
消息量比較大,不建議同步,影響消息消費(fèi)速度,造成消息堆積
消費(fèi)者處理失敗后,立刻寫入重試表,有個(gè) 定時(shí)任務(wù)專門重試
消費(fèi)失敗,自己給同一個(gè)topic發(fā)一條消息—>對消息順序要求不高的場景可以使用
原理
Rocketmq 未收到rollback、commit也會(huì) 補(bǔ)償回調(diào),MQ也會(huì)有補(bǔ)償機(jī)制 :checkLocalTransaction方法
讓我們自己處理
TransactionListenerImpl 事務(wù)監(jiān)聽器
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionListenerImpl implements TransactionListener {// 發(fā)送成功給 broker后,可以 執(zhí)行本地事務(wù)
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {// 執(zhí)行 訂單本地事務(wù)
try {// 如果 本地事務(wù)都執(zhí)行成功了,返回 commit
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {// 本地事務(wù) 執(zhí)行失敗,回滾 本地事務(wù)
// 更新 broker中的消息狀態(tài)為 刪除
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 因?yàn)槌瑫r(shí) 等原因,沒有返回 commit或者 rollback
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {// 這里默認(rèn)是 回滾
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
生產(chǎn)者:
// 接受 RocketMQ 回調(diào)的一個(gè) 監(jiān)聽器接口
// 會(huì)執(zhí)行 訂單本地事務(wù),commit、rollback,回調(diào)查詢 等邏輯
TransactionListenerImpl transactionListener = new TransactionListenerImpl ();
TransactionMQProducer producer = new TransactionMQProducer ();
ThreadPoolExecutor executorService = new ThreadPoolExecutor (
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000)
);
// 設(shè)置 對應(yīng)的線程池,負(fù)責(zé)執(zhí)行 回調(diào)請求
producer.setExecutorService (executorService);
// 開啟 容錯(cuò)機(jī)制
producer.setSendLatencyFaultEnable (true);
// 設(shè)置 事務(wù)監(jiān)聽器
producer.setTransactionListener (transactionListener);
// 設(shè)置 發(fā)送失敗時(shí),最多重試幾次
producer.setRetryTimesWhenSendFailed (2);
// 構(gòu)建 消息體
Message message = new Message (
"PayOrderSuccessTopic",
"Tag",
"MyKey",
"Pay Success".getBytes ());
// 可以 查詢發(fā)送結(jié)果
SendResult sendResult = producer.send (message, 10);
問題
MQ集群掛掉降級方案通常的思路:
發(fā)送消息到MQ代碼里去try catch捕獲異常,如果你發(fā)現(xiàn)發(fā)送消息到MQ有異常,此時(shí)你需要進(jìn)行重試
重試了,比如超過3次還是失敗,說明此時(shí)可能就是你的MQ集群徹底崩潰了
此時(shí)你必須把這條重要的消息寫入到本地存儲(chǔ)中去,可以是寫入數(shù)據(jù)庫里
要不停的嘗試發(fā)送消息到MQ去
發(fā)現(xiàn)MQ集群恢復(fù)了,你 必須有一個(gè)后臺線程可以 把之前持久化存儲(chǔ)的消息都查詢出來,然后依次 按照順序 發(fā)送到MQ集群里去
這里要有一個(gè)很關(guān)鍵的注意點(diǎn),就是你把消息寫入存儲(chǔ)中 暫存時(shí),一定要保證他的順序
,比如按照順序一條一條的寫入本地磁盤文件去
暫存消息
**流量 太多:**解決?法就是 對線上系統(tǒng)擴(kuò)容雙段緩沖的??,從 512kb 擴(kuò)容到?個(gè)緩沖區(qū) 10mb。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
當(dāng)前文章:RocketMQ-創(chuàng)新互聯(lián)
文章出自:http://www.chinadenli.net/article30/djsdpo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、網(wǎng)站收錄、用戶體驗(yàn)、小程序開發(fā)、網(wǎng)站建設(shè)、網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容