這篇文章主要介紹“RocketMQ的事務(wù)消息是什么意思”,在日常操作中,相信很多人在RocketMQ的事務(wù)消息是什么意思問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ的事務(wù)消息是什么意思”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
成都創(chuàng)新互聯(lián)服務(wù)項目包括勃利網(wǎng)站建設(shè)、勃利網(wǎng)站制作、勃利網(wǎng)頁制作以及勃利網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,勃利網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到勃利省份的部分城市,未來相信會繼續(xù)擴大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
阿里的RocketMQ以前版本閹割的消息回查,在新版又重新加入了,解決小公司沒能力做可靠消息中間件產(chǎn)品。同時RocketMQ也參考了Kafka實現(xiàn),性能上也很不錯。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
官方demo
@SpringBootApplication public class ProducerApplication implements CommandLineRunner { private static final String TX_PGROUP_NAME = "myTxProducerGroup"; @Resource private RocketMQTemplate rocketMQTemplate; @Value("${demo.rocketmq.transTopic}") private String springTransTopic; public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } @Override public void run(String... args) throws Exception { // Send transactional messages testTransaction(); } private void testTransaction() throws MessagingException { String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = MessageBuilder .withPayload("Hello RocketMQ " + i) .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i) .build(); SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME, springTransTopic + ":" + tags[i % tags.length], msg, null); System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n", msg.getPayload(), sendResult.getSendStatus()); Thread.sleep(10); } catch (Exception e) { e.printStackTrace(); } } } @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME) class TransactionListenerImpl implements RocketMQLocalTransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId); int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(transId, status); if (status == 0) { // Return local transaction with success(commit), in this case, // this message will not be checked in checkLocalTransaction() System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload()); return RocketMQLocalTransactionState.COMMIT; } if (status == 1) { // Return local transaction with failure(rollback) , in this case, // this message will not be checked in checkLocalTransaction() System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload()); return RocketMQLocalTransactionState.ROLLBACK; } System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT; Integer status = localTrans.get(transId); if (null != status) { switch (status) { case 0: retState = RocketMQLocalTransactionState.UNKNOWN; break; case 1: retState = RocketMQLocalTransactionState.COMMIT; break; case 2: retState = RocketMQLocalTransactionState.ROLLBACK; break; } } System.out.printf("------ !!! checkLocalTransaction is executed once," + " msgTransactionId=%s, TransactionState=%s status=%s %n", transId, retState, status); return retState; } } }
事務(wù)消息調(diào)用的是RocketMQTemplate.sendMessageInTransaction()
,那么就從這里開始
//RocketMQTemplate public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException { try { //從本地緩存中獲取生產(chǎn)者組名的生產(chǎn)者 TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup); org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); return txProducer.sendMessageInTransaction(rocketMsg, arg); } catch (MQClientException e) { throw RocketMQUtil.convert(e); } }
進入txProducer.sendMessageInTransaction(rocketMsg, arg)
//TransactionMQProducer public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { //是否已經(jīng)設(shè)置事務(wù)監(jiān)聽器(本地事務(wù)、回調(diào)查詢) if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null); } return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); }
進入defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg)
//DefaultMQProducerImpl public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; //設(shè)置為預(yù)消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { //發(fā)送消息 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { //發(fā)送成功,當消息對象中的isWaitStoreMsgOK=true(默認true),如果 isWaitStoreMsgOK=false,當沒有捕獲到異常,那么將返回SEND_OK case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } //執(zhí)行傳入的本地分支事務(wù) if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); //執(zhí)行注解或者生產(chǎn)者構(gòu)造傳入的事務(wù)監(jiān)聽器 } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; //刷盤超時 case FLUSH_DISK_TIMEOUT: //數(shù)據(jù)同步到Slave服務(wù)器?超時 case FLUSH_SLAVE_TIMEOUT: //無Slave服務(wù)器?可用 case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { //發(fā)送二次確認消息 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } //封裝執(zhí)行結(jié)果 TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; } public void endTransaction( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { //提交事務(wù) case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; //提交事務(wù) case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; //提交事務(wù) case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; //單向發(fā)送二次確認消息,不需要服務(wù)端相應(yīng),由消息回查監(jiān)聽補償 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
剛開始看RocketMQ的事務(wù)消息Example時,用的監(jiān)聽器執(zhí)行本地事務(wù),還以為是通過向服務(wù)端發(fā)送預(yù)消息,異步監(jiān)聽服務(wù)端響應(yīng)再處理本地事務(wù),那客戶端根本沒法實時響應(yīng)。
到此,關(guān)于“RocketMQ的事務(wù)消息是什么意思”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
網(wǎng)頁名稱:RocketMQ的事務(wù)消息是什么意思
文章URL:http://www.chinadenli.net/article28/ieoojp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁設(shè)計公司、搜索引擎優(yōu)化、企業(yè)建站、全網(wǎng)營銷推廣、外貿(mào)建站、手機網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)