欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

RocketMQ-創(chuàng)新互聯(lián)

文章目錄
  • 引入依賴
  • 發(fā)送消息
  • 消費(fèi)消息
  • 消息的可靠性(不丟失、高可用)
  • 延時(shí)消息
  • 消息過濾
  • 順序消息
    • 全局順序消息
    • 部分順序消息
  • 消息積壓
  • 消息重復(fù)(保證消息的冪等性)
  • 消息 重試
  • 事務(wù)消息
  • 問題
    • MQ集群掛掉

創(chuàng)新互聯(lián)網(wǎng)站建設(shè)服務(wù)商,為中小企業(yè)提供成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)服務(wù),網(wǎng)站設(shè)計(jì),網(wǎng)站托管運(yùn)營等一站式綜合服務(wù)型公司,專業(yè)打造企業(yè)形象網(wǎng)站,讓您在眾多競爭對手中脫穎而出創(chuàng)新互聯(lián)。引入依賴
org.springframework.bootspring-boot-starter-weborg.apache.rocketmqrocketmq-client4.9.3
發(fā)送消息
// 同步發(fā)送
SendResult sendResult = producer.send(msg);
// 異步發(fā)送,指定回調(diào)
producer.send(msg, new SendCallback() {// 當(dāng)producer接收到MQ發(fā)送來的ACK后就會(huì)觸發(fā)該回調(diào)方法的執(zhí)行
    @Override
    public void onSuccess(SendResult sendResult) {System.out.println(sendResult);
    }
    @Override
    public void onException(Throwable e) {e.printStackTrace();
    }
});
// 消息發(fā)送的狀態(tài)
public enum SendStatus {SEND_OK,       // 發(fā)送成功
    FLUSH_DISK_TIMEOUT,  // 刷盤超時(shí)。當(dāng)Broker設(shè)置的刷盤策略為同步刷盤時(shí)才可能出現(xiàn)這種異常狀態(tài)。異步刷盤不會(huì)出現(xiàn)
    FLUSH_SLAVE_TIMEOUT, // Slave同步超時(shí)。當(dāng)Broker集群設(shè)置的Master-Slave的復(fù)制方式為同步復(fù)制時(shí)才可能出現(xiàn)這種異常狀態(tài)。異步復(fù)制不會(huì)出現(xiàn)
    SLAVE_NOT_AVAILABLE, // 沒有可用的Slave。當(dāng)Broker集群設(shè)置為Master-Slave的復(fù)制方式為同步復(fù)制時(shí)才可能出現(xiàn)這種異常狀態(tài)。異步復(fù)制不會(huì)出現(xiàn)
}
消費(fèi)消息
DefaultMQPushConsumer consumer = new
        DefaultMQPushConsumer ("cg");
consumer.setNamesrvAddr ("rocketmqOS:9876");
// 指定 從第一條消息開始消費(fèi)
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe ("someTopicA", "*");

// 指定 每次可以 消費(fèi)10條消息,默認(rèn)為1
consumer.setConsumeMessageBatchMaxSize (10);
// 指定 每次 可以從Broker拉取40條消息, 默認(rèn)為32
consumer.setPullBatchSize (40);
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
    public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println (msg);
        }
        // 消費(fèi)成功的返回結(jié)果
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        // 消費(fèi)異常時(shí)的返回結(jié)果
        // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
});
consumer.start ();
消息的可靠性(不丟失、高可用)

消息可能在哪些階段丟失呢?可能會(huì)在這三個(gè)階段發(fā)生丟失:生產(chǎn)階段、存儲(chǔ)階段、消費(fèi)階段

生產(chǎn)階段:事務(wù)消息,寫到OS Cache就返回 成功發(fā)送!

存儲(chǔ)階段:調(diào)整 MQ的刷盤策略,我們需要調(diào)整broker.conf配置文件,將其中的flushDiskType配置設(shè)置為:SYNC_FLUSH,默認(rèn)他的值是ASYNC_FLUSH,即默認(rèn)是異步刷盤

消費(fèi)階段

不能 異步 消費(fèi)消息

// 開啟監(jiān)聽,用于接收消息
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
    public ConsumeConcurrentlyStatus consumeMessage(Listlist, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 業(yè)務(wù) 處理
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 啟動(dòng)接收消息的服務(wù)
consumer.start ();

也可以 使用如下結(jié)構(gòu):
在這里插入圖片描述

延時(shí)消息

在這里插入圖片描述
目前RocketMQ支持的延時(shí)級別:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

生產(chǎn)者:

// 這是 訂單系統(tǒng)的生產(chǎn)者
DefaultMQProducer producer = new DefaultMQProducer ("OrderGroup");
// 啟動(dòng)生產(chǎn)者
producer.start ();
Message msg = new Message (
        "CreateOrderInformTopic",
        "create success".getBytes ());
// 設(shè)置消息為延時(shí)消息,延遲級別為 16
msg.setDelayTimeLevel (16);
producer.send (msg);

消費(fèi)者:

// 訂單掃描服務(wù) 的消費(fèi)者
DefaultMQPushConsumer consumer =
        new DefaultMQPushConsumer ("OrderScanServiceGroup");
// 訂閱 訂單創(chuàng)建 通知Topic
consumer.subscribe ("CreateOrderInformTopic", "*");
// 注冊 消息監(jiān)聽者
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
    public ConsumeConcurrentlyStatus consumeMessage(Listlist,
                                                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {// 將 msg轉(zhuǎn)為 訂單對象
            Order order = buildOrder (msg);
            // 根據(jù)Id 查詢數(shù)據(jù)庫
            order = getById (order.getId ());
            // 如果 已支付
            if (order.getStatus () == 1) {}
            // 如果 未支付
            else {}
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
消息過濾
  • 根據(jù)Tag過濾
// 創(chuàng)建消費(fèi)者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("aGroup");
// 訂閱主題,tag為 a,b,c的都監(jiān)聽
consumer.subscribe ("orderInfoTopic", "a || b || c");
  • SQL 表達(dá)式過濾

修改broker.conf 配置文件才能生效
在這里插入圖片描述

// 創(chuàng)建消費(fèi)者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("bGroup");
// 只有訂閱的消息有這個(gè)屬性a, a >=0 and a<= 3
consumer.subscribe ("TopicTest", MessageSelector.bySql ("a between 0 and 3"));
consumer.registerMessageListener (new MessageListenerConcurrently () {@Override
    public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {// 處理業(yè)務(wù)邏輯
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start ();

RocketMQ還是 支持比較豐富的數(shù)據(jù)過濾語法的,如下所示:
(1)數(shù)值比較,比如:>,>=,<,<=,BETWEEN,=;
(2)字符比較,比如:=,<>,IN;
(3)IS NULL 或者 IS NOT NULL;
(4)邏輯符號 AND,OR,NOT;
(5)數(shù)值,比如:123,3.1415;
(6)字符,比如:‘a(chǎn)bc’,必須用單引號包裹起來;
(7)NULL,特殊的常量
(8)布爾值,TRUE 或 FALSE

順序消息

順序消息是指消息的消費(fèi)順序和產(chǎn)生順序相同,在有些業(yè)務(wù)邏輯下,必須保證順序,比如訂單的生成、付款、發(fā)貨,這個(gè)消息必須按順序處理才行。

「消息亂序解決方案 不能和重試隊(duì)列混用。」

全局順序消息

創(chuàng)建一個(gè) Topic ,默認(rèn)八個(gè)寫隊(duì)列,八個(gè)讀隊(duì)列

要保證全局順序消息, 需要先把 Topic 的讀寫隊(duì)列數(shù)設(shè)置為 一,然后Producer Consumer 的并發(fā)設(shè)置,也要是一

在這里插入圖片描述

部分順序消息

部分順序消息相對比較好實(shí)現(xiàn),生產(chǎn)端需要做到把同 ID 的消息發(fā)送到同一個(gè) Message Queue ;在消費(fèi)過程中,要做到從同一個(gè)Message Queue讀取的消息順序處理

DefaultMQProducer producer = new DefaultMQProducer ();
producer.setSendLatencyFaultEnable (true);
producer.setSendMsgTimeout (50);
producer.setNamesrvAddr ("192.168.111.101:9876");
producer.start ();
Message message = new Message ("Topic", "hello world".getBytes ());
SendResult sendResult = producer.send (message, new MessageQueueSelector () {@Override
            public MessageQueue select(Listmqs,
                                       Message message,
                                       Object arg) {Long orderId = (Long) arg; // 根據(jù) 訂單id選擇發(fā)送 queue
                long index = orderId % mqs.size (); // 用訂單id 對MessageQueue數(shù)量取模
                return mqs.get ((int) index);  // 返回一個(gè)MessageQueue
            }
        }, orderId //這里傳入訂單id
);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("cg");
consumer.setNamesrvAddr ("192.168.111.101:9876");
// 從頭 開始消費(fèi)
consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe ("Topic", "*");
consumer.registerMessageListener (new MessageListenerOrderly () {@Override
  public ConsumeOrderlyStatus consumeMessage(Listmsgs,
                                             ConsumeOrderlyContext context) {  context.setAutoCommit (true);
      try {  for (MessageExt msg : msgs) {  // 對有序的消息 進(jìn)行處理
          }
          return ConsumeOrderlyStatus.SUCCESS;
      } catch (Exception e) {  // 如果 消息處理 有問題
          // 讓這批消息 暫停一會(huì)兒 再繼續(xù)處理這批消息
          return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
      }
  }
});
consumer.start ();
消息積壓

發(fā)生了消息積壓,這時(shí)候就得想辦法趕緊把積壓的消息消費(fèi)完,就得考慮提高消費(fèi)能力,一般有兩種辦法
在這里插入圖片描述

  • 消費(fèi)者擴(kuò)容:如果當(dāng)前Topic的Message Queue的數(shù)量大于消費(fèi)者數(shù)量,就可以對消費(fèi)者進(jìn)行擴(kuò)容,增加消費(fèi)者,來提高消費(fèi)能力,盡快把積壓的消息消費(fèi)玩。
  • 消息遷移Queue擴(kuò)容:如果當(dāng)前Topic的Message Queue的數(shù)量小于或者等于消費(fèi)者數(shù)量,這種情況,再擴(kuò)容消費(fèi)者就沒什么用,就得考慮擴(kuò)容Message Queue。可以新建一個(gè)臨時(shí)的Topic,臨時(shí)的Topic多設(shè)置一些Message Queue,然后先用一些消費(fèi)者把消費(fèi)的數(shù)據(jù)丟到臨時(shí)的Topic,因?yàn)椴挥脴I(yè)務(wù)處理,只是轉(zhuǎn)發(fā)一下消息,還是很快的。接下來用擴(kuò)容的消費(fèi)者去消費(fèi)新的Topic里的數(shù)據(jù),消費(fèi)完了之后,恢復(fù)原狀
    在這里插入圖片描述
消息重復(fù)(保證消息的冪等性)

在這里插入圖片描述

處理消息重復(fù)問題,主要有業(yè)務(wù)端自己保證,主要的方式有兩種:業(yè)務(wù)去重 和 消息去重。

  • 業(yè)務(wù)去重:保證 業(yè)務(wù)消費(fèi)的冪等性即可

  • 消息去重:依據(jù)在 生產(chǎn)者方設(shè)置消息的 messageKey,然后 每一條消息 在消費(fèi)方依據(jù)這個(gè)唯一的 messageKey,進(jìn)行冪等判斷
    流程:** insert 失敗,說明已經(jīng)消費(fèi)---->捕獲異常返回已消費(fèi),insert 成功---->處理業(yè)務(wù) 提交事務(wù)后 再確認(rèn)成功消費(fèi)~~ **
    在這里插入圖片描述

消息 重試
// 設(shè)置 同步發(fā)送失敗時(shí)  重試發(fā)送的次數(shù),默認(rèn)為2次
producer.setRetryTimesWhenSendFailed(3);
// 設(shè)置 發(fā)送 超時(shí)時(shí)限為5s,默認(rèn)3s
producer.setSendMsgTimeout(5000);
// 指定 異步發(fā)送失敗后 不進(jìn)行重試發(fā)送
producer.setRetryTimesWhenSendAsyncFailed(0);



// 順序消息消費(fèi)失敗的消費(fèi)重試時(shí)間間隔,單位毫秒,默認(rèn)為1000,其取值范圍為[10,30000]
// 重試期間 應(yīng)用會(huì)出現(xiàn) 消息消費(fèi)被阻塞
consumer.setSuspendCurrentQueueTimeMillis(100);
// 修改消費(fèi)重試次數(shù),默認(rèn)16次
consumer.setMaxReconsumeTimes(10);

消息 刷盤失敗策略

消息刷盤超時(shí)(Master或Slave)或slave不可用(slave在做數(shù)據(jù)同步時(shí)向master返回狀態(tài)不是SEND_OK)時(shí),默認(rèn)是不會(huì)將消息嘗試發(fā)送到其他Broker的。不過,對于重要消息可以通過在Broker的配置文件設(shè)置retryAnotherBrokerWhenNotStoreOK屬性為true來開啟

重試隊(duì)列里面的消息會(huì)再次發(fā)給消費(fèi)組,默認(rèn) 最多重試 16 次,如果重試 16 次失敗則進(jìn)入「死信隊(duì)列」

「死信隊(duì)列:」
對于死信隊(duì)列,一般我們可以專門開一個(gè)后臺線程,訂閱這個(gè)死信隊(duì)列,對死信隊(duì)列中的消息,一直不停的嘗試。

消息量比較大,不建議同步,影響消息消費(fèi)速度,造成消息堆積

消費(fèi)者處理失敗后,立刻寫入重試表,有個(gè) 定時(shí)任務(wù)專門重試

消費(fèi)失敗,自己給同一個(gè)topic發(fā)一條消息—>對消息順序要求不高的場景可以使用

在這里插入圖片描述

事務(wù)消息

原理
在這里插入圖片描述

在這里插入圖片描述
Rocketmq 未收到rollback、commit也會(huì) 補(bǔ)償回調(diào),MQ也會(huì)有補(bǔ)償機(jī)制 :checkLocalTransaction方法
讓我們自己處理

TransactionListenerImpl 事務(wù)監(jiān)聽器

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class TransactionListenerImpl implements TransactionListener {// 發(fā)送成功給 broker后,可以 執(zhí)行本地事務(wù)
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {// 執(zhí)行 訂單本地事務(wù)
        try {// 如果 本地事務(wù)都執(zhí)行成功了,返回 commit
            return LocalTransactionState.COMMIT_MESSAGE;

        } catch (Exception e) {// 本地事務(wù) 執(zhí)行失敗,回滾 本地事務(wù)
            // 更新 broker中的消息狀態(tài)為 刪除

            return LocalTransactionState.ROLLBACK_MESSAGE;
        }

    }

    // 因?yàn)槌瑫r(shí) 等原因,沒有返回 commit或者 rollback
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {// 這里默認(rèn)是 回滾
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

生產(chǎn)者:

// 接受 RocketMQ 回調(diào)的一個(gè) 監(jiān)聽器接口
// 會(huì)執(zhí)行 訂單本地事務(wù),commit、rollback,回調(diào)查詢 等邏輯
TransactionListenerImpl transactionListener = new TransactionListenerImpl ();
TransactionMQProducer producer = new TransactionMQProducer ();
ThreadPoolExecutor executorService = new ThreadPoolExecutor (
        2,
        5,
        100,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(2000)
);
// 設(shè)置 對應(yīng)的線程池,負(fù)責(zé)執(zhí)行 回調(diào)請求
producer.setExecutorService (executorService);
// 開啟 容錯(cuò)機(jī)制
producer.setSendLatencyFaultEnable (true);
// 設(shè)置 事務(wù)監(jiān)聽器
producer.setTransactionListener (transactionListener);
// 設(shè)置 發(fā)送失敗時(shí),最多重試幾次
producer.setRetryTimesWhenSendFailed (2);

// 構(gòu)建 消息體
Message message = new Message (
        "PayOrderSuccessTopic",
        "Tag",
        "MyKey",
        "Pay Success".getBytes ());
// 可以 查詢發(fā)送結(jié)果
SendResult sendResult = producer.send (message, 10);
問題 MQ集群掛掉

降級方案通常的思路:
發(fā)送消息到MQ代碼里去try catch捕獲異常,如果你發(fā)現(xiàn)發(fā)送消息到MQ有異常,此時(shí)你需要進(jìn)行重試

重試了,比如超過3次還是失敗,說明此時(shí)可能就是你的MQ集群徹底崩潰了
此時(shí)你必須把這條重要的消息寫入到本地存儲(chǔ)中去,可以是寫入數(shù)據(jù)庫里

要不停的嘗試發(fā)送消息到MQ去
發(fā)現(xiàn)MQ集群恢復(fù)了,你 必須有一個(gè)后臺線程可以 把之前持久化存儲(chǔ)的消息都查詢出來,然后依次 按照順序 發(fā)送到MQ集群里去

這里要有一個(gè)很關(guān)鍵的注意點(diǎn),就是你把消息寫入存儲(chǔ)中 暫存時(shí),一定要保證他的順序,比如按照順序一條一條的寫入本地磁盤文件去
暫存消息
在這里插入圖片描述
**流量 太多:**解決?法就是 對線上系統(tǒng)擴(kuò)容雙段緩沖的??,從 512kb 擴(kuò)容到?個(gè)緩沖區(qū) 10mb。

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧

當(dāng)前文章:RocketMQ-創(chuàng)新互聯(lián)
文章出自:http://www.chinadenli.net/article30/djsdpo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站網(wǎng)站收錄用戶體驗(yàn)小程序開發(fā)網(wǎng)站建設(shè)網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁設(shè)計(jì)公司