欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

如何處理消息消費失敗

本篇內(nèi)容介紹了“如何處理消息消費失敗”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

網(wǎng)站建設哪家好,找創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁設計、網(wǎng)站建設、微信開發(fā)、微信平臺小程序開發(fā)、集團企業(yè)網(wǎng)站建設等服務項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了商城免費建站歡迎大家使用!

一、介紹

在介紹消息中間件 MQ 之前,我們先來簡單的了解一下,為何要引用消息中間件。

例如,在電商平臺中,常見的用戶下單,會經(jīng)歷以下幾個流程。

當用戶下單時,創(chuàng)建完訂單之后,會調(diào)用第三方支付平臺,對用戶的賬戶金額進行扣款,如果平臺支付扣款成功,會將結果通知到對應的業(yè)務系統(tǒng),接著業(yè)務系統(tǒng)會更新訂單狀態(tài),同時調(diào)用倉庫接口,進行減庫存,通知物流進行發(fā)貨!

試想一下,從訂單狀態(tài)更新、到扣減庫存、通知物流發(fā)貨都在一個方法內(nèi)同步完成,假如用戶支付成功、訂單狀態(tài)更新也成功,但是在扣減庫存或者通知物流發(fā)貨步驟失敗了,那么就會造成一個問題,用戶已經(jīng)支付成功了,只是在倉庫扣減庫存方面失敗,從而導致整個交易失敗!

一單失敗,老板可以假裝看不見,但是如果上千個單子都因此失敗,那么因系統(tǒng)造成的業(yè)務損失,將是巨大的,老板可能坐不住了!

因此,針對這種業(yè)務場景,架構師們引入了異步通信技術方案,從而保證服務的高可用,大體流程如下:

如何處理消息消費失敗

當訂單系統(tǒng)收到支付平臺發(fā)送的扣款結果之后,會將訂單消息發(fā)送到 MQ 消息中間件,同時也會更新訂單狀態(tài)。

在另一端,由倉庫系統(tǒng)來異步監(jiān)聽訂單系統(tǒng)發(fā)送的消息,當收到訂單消息之后,再操作扣減庫存、通知物流公司發(fā)貨等服務!

在優(yōu)化后的流程下,即使扣減庫存服務失敗,也不會影響用戶交易。

正如《人月神話》中所說的,軟件工程,沒有銀彈!

當引入了 MQ 消息中間件之后,同樣也會帶來另一個問題,假如 MQ  消息中間件突然宕機了,導致消息無法發(fā)送出去,那倉庫系統(tǒng)就無法接受到訂單消息,進而也無法發(fā)貨!

針對這個問題,業(yè)界主流的解決辦法是采用集群部署,一主多從模式,從而實現(xiàn)服務的高可用,即使一臺機器突然宕機了,也依然能保證服務可用,在服務器故障期間,通過運維手段,將服務重新啟動,之后服務依然能正常運行!

但是還有另一個問題,假如倉庫系統(tǒng)已經(jīng)收到訂單消息了,但是業(yè)務處理異常,或者服務器異常,導致當前商品庫存并沒有扣減,也沒有發(fā)貨!

這個時候又改如何處理呢?

今天我們所要介紹的正是這種場景,假如消息消費失敗,我們應該如何處理?

二、解決方案

針對消息消費失敗的場景,我們一般會通過如下方式進行處理:

  • 當消息消費失敗時,會對消息進行重新推送

  • 如果重試次數(shù)超過最大值,會將異常消息存儲到數(shù)據(jù)庫,然后人工介入排查問題,進行手工重試

如何處理消息消費失敗

當消息在客戶端消費失敗時,我們會將異常的消息加入到一個消息重試對象中,同時設置最大重試次數(shù),并將消息重新推送到 MQ  消息中間件里,當重試次數(shù)超過最大值時,會將異常的消息存儲到 MongoDB數(shù)據(jù)庫中,方便后續(xù)查詢異常的信息。

基于以上系統(tǒng)模型,我們可以編寫一個公共重試組件,話不多說,直接干!

三、代碼實踐

本次補償服務采用 rabbitmq 消息中間件進行處理,其他消息中間件處理思路也類似!

3.1、創(chuàng)建一個消息重試實體類

@Data @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) public class MessageRetryDTO implements Serializable {      private static final long serialVersionUID = 1L;      /**      * 原始消息body      */     private String bodyMsg;      /**      * 消息來源ID      */     private String sourceId;      /**      * 消息來源描述      */     private String sourceDesc;      /**      * 交換器      */     private String exchangeName;      /**      * 路由鍵      */     private String routingKey;      /**      * 隊列      */     private String queueName;      /**      * 狀態(tài),1:初始化,2:成功,3:失敗      */     private Integer status = 1;      /**      * 最大重試次數(shù)      */     private Integer maxTryCount = 3;      /**      * 當前重試次數(shù)      */     private Integer currentRetryCount = 0;      /**      * 重試時間間隔(毫秒)      */     private Long retryIntervalTime = 0L;      /**      * 任務失敗信息      */     private String errorMsg;      /**      * 創(chuàng)建時間      */     private Date createTime;      @Override     public String toString() {         return "MessageRetryDTO{" +                 "bodyMsg='" + bodyMsg + '\'' +                 ", sourceId='" + sourceId + '\'' +                 ", sourceDesc='" + sourceDesc + '\'' +                 ", exchangeName='" + exchangeName + '\'' +                 ", routingKey='" + routingKey + '\'' +                 ", queueName='" + queueName + '\'' +                 ", status=" + status +                 ", maxTryCount=" + maxTryCount +                 ", currentRetryCount=" + currentRetryCount +                 ", retryIntervalTime=" + retryIntervalTime +                 ", errorMsg='" + errorMsg + '\'' +                 ", createTime=" + createTime +                 '}';     }      /**      * 檢查重試次數(shù)是否超過最大值      *      * @return      */     public boolean checkRetryCount() {         retryCountCalculate();         //檢查重試次數(shù)是否超過最大值         if (this.currentRetryCount < this.maxTryCount) {             return true;         }         return false;     }      /**      * 重新計算重試次數(shù)      */     private void retryCountCalculate() {         this.currentRetryCount = this.currentRetryCount + 1;     }  }

3.2、編寫服務重試抽象類

public abstract class CommonMessageRetryService {      private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);      @Autowired     private RabbitTemplate rabbitTemplate;      @Autowired     private MongoTemplate mongoTemplate;       /**      * 初始化消息      *      * @param message      */     public void initMessage(Message message) {         log.info("{} 收到消息: {},業(yè)務數(shù)據(jù):{}", this.getClass().getName(), message.toString(), new String(message.getBody()));         try {             //封裝消息             MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);             if (log.isInfoEnabled()) {                 log.info("反序列化消息:{}", messageRetryDto.toString());             }             prepareAction(messageRetryDto);         } catch (Exception e) {             log.warn("處理消息異常,錯誤信息:", e);         }     }      /**      * 準備執(zhí)行      *      * @param retryDto      */     protected void prepareAction(MessageRetryDTO retryDto) {         try {             execute(retryDto);             doSuccessCallBack(retryDto);         } catch (Exception e) {             log.error("當前任務執(zhí)行異常,業(yè)務數(shù)據(jù):" + retryDto.toString(), e);             //執(zhí)行失敗,計算是否還需要繼續(xù)重試             if (retryDto.checkRetryCount()) {                 if (log.isInfoEnabled()) {                     log.info("重試消息:{}", retryDto.toString());                 }                 retrySend(retryDto);             } else {                 if (log.isWarnEnabled()) {                     log.warn("當前任務重試次數(shù)已經(jīng)到達最大次數(shù),業(yè)務數(shù)據(jù):" + retryDto.toString(), e);                 }                 doFailCallBack(retryDto.setErrorMsg(e.getMessage()));             }         }     }      /**      * 任務執(zhí)行成功,回調(diào)服務(根據(jù)需要進行重寫)      *      * @param messageRetryDto      */     private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {         try {             successCallback(messageRetryDto);         } catch (Exception e) {             log.warn("執(zhí)行成功回調(diào)異常,隊列描述:{},錯誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());         }     }      /**      * 任務執(zhí)行失敗,回調(diào)服務(根據(jù)需要進行重寫)      *      * @param messageRetryDto      */     private void doFailCallBack(MessageRetryDTO messageRetryDto) {         try {             saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));             failCallback(messageRetryDto);         } catch (Exception e) {             log.warn("執(zhí)行失敗回調(diào)異常,隊列描述:{},錯誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());         }     }      /**      * 執(zhí)行任務      *      * @param messageRetryDto      */     protected abstract void execute(MessageRetryDTO messageRetryDto);      /**      * 成功回調(diào)      *      * @param messageRetryDto      */     protected abstract void successCallback(MessageRetryDTO messageRetryDto);      /**      * 失敗回調(diào)      *      * @param messageRetryDto      */     protected abstract void failCallback(MessageRetryDTO messageRetryDto);      /**      * 構建消息補償實體      * @param message      * @return      */     private MessageRetryDTO buildMessageRetryInfo(Message message){         //如果頭部包含補償消息實體,直接返回         Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();         if(messageHeaders.containsKey("message_retry_info")){             Object retryMsg = messageHeaders.get("message_retry_info");             if(Objects.nonNull(retryMsg)){                 return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);             }         }         //自動將業(yè)務消息加入補償實體         MessageRetryDTO messageRetryDto = new MessageRetryDTO();         messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));         messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());         messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());         messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());         messageRetryDto.setCreateTime(new Date());         return messageRetryDto;     }      /**      * 異常消息重新入庫      * @param retryDto      */     private void retrySend(MessageRetryDTO retryDto){         //將補償消息實體放入頭部,原始消息內(nèi)容保持不變         MessageProperties messageProperties = new MessageProperties();         messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);         messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));         Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);         rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);     }        /**      * 將異常消息存儲到mongodb中      * @param retryDto      */     private void saveMessageRetryInfo(MessageRetryDTO retryDto){         try {             mongoTemplate.save(retryDto, "message_retry_info");         } catch (Exception e){             log.error("將異常消息存儲到mongodb失敗,消息數(shù)據(jù):" + retryDto.toString(), e);         }     } }

3.3、編寫監(jiān)聽服務類

在消費端應用的時候,也非常簡單,例如,針對扣減庫存操作,我們可以通過如下方式進行處理!

@Component public class OrderServiceListener extends CommonMessageRetryService {      private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);      /**      * 監(jiān)聽訂單系統(tǒng)下單成功消息      * @param message      */     @RabbitListener(queues = "mq.order.add")     public void consume(Message message) {         log.info("收到訂單下單成功消息: {}", message.toString());         super.initMessage(message);     }       @Override     protected void execute(MessageRetryDTO messageRetryDto) {         //調(diào)用扣減庫存服務,將業(yè)務異常拋出來     }      @Override     protected void successCallback(MessageRetryDTO messageRetryDto) {         //業(yè)務處理成功,回調(diào)     }      @Override     protected void failCallback(MessageRetryDTO messageRetryDto) {         //業(yè)務處理失敗,回調(diào)     } }

當消息消費失敗,并超過最大次數(shù)時,會將消息存儲到 mongodb 中,然后像常規(guī)數(shù)據(jù)庫操作一樣,可以通過 web  接口查詢異常消息,并針對具體場景進行重試!

“如何處理消息消費失敗”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質量的實用文章!

網(wǎng)站欄目:如何處理消息消費失敗
標題路徑:http://www.chinadenli.net/article20/iphcjo.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站網(wǎng)站建設小程序開發(fā)移動網(wǎng)站建設外貿(mào)網(wǎng)站建設網(wǎng)站營銷

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設