日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)

瀏覽:50日期:2023-05-08 09:29:31

本文收錄在個人博客:www.chengxy-nds.top,技術資源共享,一起進步

最近部門號召大伙多組織一些技術分享會,說是要活躍公司的技術氛圍,但早就看穿一切的我知道,這 T M 就是為了刷KPI。不過,話說回來這的確是件好事,與其開那些沒味的扯皮會,多做技術交流還是很有助于個人成長的。

于是乎我主動報名參加了分享,咳咳咳~ ,真的不是為了那點KPI,就是想和大伙一起學習學習!

springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)

這次我分享的是 springboot + rabbitmq 如何實現消息確認機制,以及在實際開發中的一點踩坑經驗,其實整體的內容比較簡單,有時候事情就是這么神奇,越是簡單的東西就越容易出錯。

可以看到使用了 RabbitMQ 以后,我們的業務鏈路明顯變長了,雖然做到了系統間的解耦,但可能造成消息丟失的場景也增加了。例如:

消息生產者 - > rabbitmq服務器(消息發送失敗) rabbitmq服務器自身故障導致消息丟失 消息消費者 - > rabbitmq服務(消費消息失敗)

springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)

所以說能不使用中間件就盡量不要用,如果為了用而用只會徒增煩惱。開啟消息確認機制以后,盡管很大程度上保證了消息的準確送達,但由于頻繁的確認交互,rabbitmq 整體效率變低,吞吐量下降嚴重,不是非常重要的消息真心不建議你用消息確認機制。

下邊我們先來實現springboot + rabbitmq消息確認機制,再對遇到的問題做具體分析。

一、準備環境

1、引入 rabbitmq 依賴包

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、修改 application.properties 配置

配置中需要開啟 發送端和 消費端 的消息確認。

spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest# 發送者開啟 confirm 確認機制spring.rabbitmq.publisher-confirms=true# 發送者開啟 return 確認機制spring.rabbitmq.publisher-returns=true##################################################### 設置消費端手動 ackspring.rabbitmq.listener.simple.acknowledge-mode=manual# 是否支持重試spring.rabbitmq.listener.simple.retry.enabled=true

3、定義 Exchange 和 Queue

定義交換機 confirmTestExchange 和隊列 confirm_test_queue ,并將隊列綁定在交換機上。

@Configurationpublic class QueueConfig { @Bean(name = 'confirmTestQueue') public Queue confirmTestQueue() { return new Queue('confirm_test_queue', true, false, false); } @Bean(name = 'confirmTestExchange') public FanoutExchange confirmTestExchange() { return new FanoutExchange('confirmTestExchange'); } @Bean public Binding confirmTestFanoutExchangeAndQueue( @Qualifier('confirmTestExchange') FanoutExchange confirmTestExchange, @Qualifier('confirmTestQueue') Queue confirmTestQueue) { return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange); }}

rabbitmq 的消息確認分為兩部分:發送消息確認 和 消息接收確認。

springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)

二、消息發送確認

發送消息確認:用來確認生產者 producer 將消息發送到 broker ,broker 上的交換機 exchange 再投遞給隊列 queue的過程中,消息是否成功投遞。

消息從 producer 到 rabbitmq broker有一個 confirmCallback 確認模式。

消息從 exchange 到 queue 投遞失敗有一個 returnCallback 退回模式。

我們可以利用這兩個Callback來確保消的100%送達。

1、 ConfirmCallback確認模式

消息只要被 rabbitmq broker 接收到就會觸發 confirmCallback 回調 。

@Slf4j@Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error('消息發送異常!'); } else { log.info('發送者爸爸已經收到確認,correlationData={} ,ack={}, cause={}', correlationData.getId(), ack, cause); } }}

實現接口 ConfirmCallback ,重寫其confirm()方法,方法內有三個參數correlationData、ack、cause。

correlationData:對象內部只有一個 id 屬性,用來表示當前消息的唯一性。 ack:消息投遞到broker 的狀態,true表示成功。 cause:表示投遞失敗的原因。

但消息被 broker 接收到只能表示已經到達 MQ服務器,并不能保證消息一定會被投遞到目標 queue 里。所以接下來需要用到 returnCallback 。

2、 ReturnCallback 退回模式

如果消息未能投遞到目標 queue 里將觸發回調 returnCallback ,一旦向 queue 投遞消息未成功,這里一般會記錄下當前消息的詳細投遞數據,方便后續做重發或者補償等操作。

@Slf4j@Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info('returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}', replyCode, replyText, exchange, routingKey); }}

實現接口ReturnCallback,重寫 returnedMessage() 方法,方法有五個參數message(消息體)、replyCode(響應code)、replyText(響應內容)、exchange(交換機)、routingKey(隊列)。

下邊是具體的消息發送,在rabbitTemplate中設置 Confirm 和 Return 回調,我們通過setDeliveryMode()對消息做持久化處理,為了后續測試創建一個 CorrelationData對象,添加一個id 為10000000000。

@Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnCallbackService returnCallbackService; public void sendMessage(String exchange, String routingKey, Object msg) { /** * 確保消息發送失敗后可以重新返回到隊列中 * 注意:yml需要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消費者確認收到消息后,手動ack回執回調處理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投遞到隊列失敗回調處理 */ rabbitTemplate.setReturnCallback(returnCallbackService); /** * 發送消息 */ rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(UUID.randomUUID().toString())); }

三、消息接收確認

消息接收確認要比消息發送確認簡單一點,因為只有一個消息回執(ack)的過程。使用@RabbitHandler注解標注的方法要增加 channel(信道)、message 兩個參數。

@Slf4j@Component@RabbitListener(queues = 'confirm_test_queue')public class ReceiverMessage1 { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info('小富收到消息:{}', msg); //TODO 具體業務 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error('消息已重復處理失敗,拒絕再次接收...'); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息 } else { log.error('消息即將再次返回隊列處理...'); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }}

消費消息有三種回執方法,我們來分析一下每種方法的含義。

1、basicAck

basicAck:表示成功確認,使用此回執方法后,消息會被rabbitmq broker 刪除。

void basicAck(long deliveryTag, boolean multiple)

deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認模式下,我們可以對指定deliveryTag的消息進行ack、nack、reject等操作。

multiple:是否批量確認,值為 true 則會一次性 ack所有小于當前消息 deliveryTag 的消息。

舉個栗子: 假設我先發送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag為8,multiple設置為 true,會將5、6、7、8的消息全部進行確認。

2、basicNack

basicNack :表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投遞序號。

multiple:是否批量確認。

requeue:值為 true 消息將重新入隊列。

3、basicReject

basicReject:拒絕消息,與basicNack區別在于不能進行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投遞序號。

requeue:值為 true 消息將重新入隊列。

四、測試

發送消息測試一下消息確認機制是否生效,從執行結果上看發送者發消息后成功回調,消費端成功的消費了消息。

springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)

用抓包工具Wireshark 觀察一下rabbitmq amqp協議交互的變化,也多了 ack 的過程。

springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)

五、踩坑日志

1、不消息確認

這是一個非常沒技術含量的坑,但卻是非常容易犯錯的地方。

開啟消息確認機制,消費消息別忘了channel.basicAck,否則消息會一直存在,導致重復消費。

springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)

2、消息無限投遞

在我最開始接觸消息確認機制的時候,消費端代碼就像下邊這樣寫的,思路很簡單:處理完業務邏輯后確認消息, int a = 1 / 0 發生異常后將消息重新投入隊列。

@RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info('消費者 2 號收到:{}', msg); int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }

但是有個問題是,業務代碼一旦出現 bug 99.9%的情況是不會自動修復,一條消息會被無限投遞進隊列,消費端無限執行,導致了死循環。

springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)

本地的CPU被瞬間打滿了,大家可以想象一下當時在生產環境導致服務死機,我是有多慌。

springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)

而且rabbitmq management 只有一條未被確認的消息。

springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)

經過測試分析發現,當消息重新投遞到消息隊列時,這條消息不會回到隊列尾部,仍是在隊列頭部。

消費者會立刻消費這條消息,業務處理再拋出異常,消息再重新入隊,如此反復進行。導致消息隊列處理出現阻塞,導致正常消息也無法運行。

而我們當時的解決方案是,先將消息進行應答,此時消息隊列會刪除該條消息,同時我們再次發送該消息到消息隊列,異常消息就放在了消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業務的進行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 重新發送消息到隊尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(msg));

但這種方法并沒有解決根本問題,錯誤消息還是會時不時報錯,后面優化設置了消息重試次數,達到了重試上限以后,手動確認,隊列刪除此消息,并將消息持久化入MySQL并推送報警,進行人工處理和定時任務做補償。

3、重復消費

如何保證 MQ 的消費是冪等性,這個需要根據具體業務而定,可以借助MySQL、或者redis 將消息持久化,通過再消息中的唯一性屬性校驗。

demo的 GitHub 地址 https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-rabbitmq-confirm

總結

到此這篇關于springboot + rabbitmq 如何實現消息確認機制(踩坑經驗)的文章就介紹到這了,更多相關springboot rabbitmq 消息確認機制內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Spring
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
亚洲福利免费| 国产一区二区三区91| 日韩网站在线| 国内精品99| 国产精品蜜芽在线观看| 青青伊人久久| 欧美精品观看| 国产伦精品一区二区三区千人斩| 免费精品国产的网站免费观看| 精品亚洲自拍| 欧美aa在线观看| 日韩伦理一区| 福利欧美精品在线| 久久麻豆视频| 国产专区一区| 激情综合自拍| 成人国产精选| 日韩国产精品久久久久久亚洲| 日本免费在线视频不卡一不卡二| 在线观看免费一区二区| 国产成人精品一区二区三区免费 | 免费黄网站欧美| 一区在线视频观看| 国产精品**亚洲精品| 久久亚洲黄色| 久久精品毛片| 欧美黑人做爰爽爽爽| 色爱av综合网| 久久福利精品| 国产精品乱战久久久| 精品99在线| 亚洲午夜一级| 国产精品白浆| 国产传媒在线观看| 亚洲性色av| 美国欧美日韩国产在线播放| 天海翼精品一区二区三区| 午夜电影亚洲| 日本国产亚洲| 三上亚洲一区二区| 在线日韩av| 伊人精品视频| 久久精品国产999大香线蕉| 欧美国产专区| 亚洲日本网址| 久久精品网址| 久久亚洲一区| 久久精品资源| 欧美日韩国产综合网| 国产极品模特精品一二| 国产高清一区| 日韩av中文字幕一区二区三区| 国内不卡的一区二区三区中文字幕| 欧美一级专区| 欧美日韩一区二区综合| 国产精成人品2018| 国产精品www.| 国产视频一区在线观看一区免费| 欧美a一区二区| 中文精品在线| 成人在线丰满少妇av| 亚洲综合不卡| 韩日一区二区三区| 久久夜夜操妹子| 国内一区二区三区| 国产精品777777在线播放 | 成人免费网站www网站高清| 日韩毛片视频| 久久永久免费| 色狠狠一区二区三区| 99在线精品视频在线观看| 欧美日韩视频| 久久精品 人人爱| 欧美天堂视频| 青青青国产精品| 亚洲激情五月| 另类欧美日韩国产在线| 伊人精品一区| 日韩高清一区| 色88888久久久久久影院| 亚洲理论在线| 欧美日韩一区二区综合| 国产精品欧美大片| 亚洲激情不卡| 欧美国产三级| 亚洲色图网站| 欧美一区二区性| 国产伦精品一区二区三区视频 | 亚洲专区视频| 日本免费新一区视频| 欧美精品91| 99精品综合| 国产一区二区高清| 亚洲一区欧美| 国产精品久久国产愉拍| 国产一区二区三区国产精品| 国产精品资源| 国产一区观看| 日韩中文视频| 蜜桃视频一区二区| 精品久久免费| 蜜臀91精品一区二区三区| 欧美久久香蕉| 午夜日韩福利| 国产一区二区三区四区二区| 蜜桃视频在线观看一区二区| 亚洲a一区二区三区| 国产亚洲精品美女久久 | 欧美交a欧美精品喷水| 欧美肉体xxxx裸体137大胆| 欧美日韩一区二区国产| 亚洲精品91| av资源新版天堂在线| 日韩高清在线观看一区二区| 欧美日韩在线网站| 国产精品极品在线观看| 亚洲尤物在线| 97精品一区| 国产精品伦一区二区| 丝袜美腿亚洲一区| 亚洲播播91| 国产精选久久| 久久精品97| 美腿丝袜亚洲一区| 亚洲va中文在线播放免费| 亚洲理论在线| 久久不卡国产精品一区二区| 久久久久国产精品一区三寸| 激情91久久| 精品一区二区男人吃奶 | 亚洲精品在线a| 成人av动漫在线观看| 欧美成人综合| 国产精品1区在线| 日本va欧美va欧美va精品| 国产精品最新自拍| 日韩国产在线| 免费一区二区视频| 美女视频网站久久| 好吊日精品视频| 欧美在线首页| av高清不卡| 亚洲手机视频| 涩涩涩久久久成人精品| 久久在线免费| 精品久久久久中文字幕小说| 久久精品1区| 91大神在线观看线路一区| 日韩三区在线| 日韩在线a电影| 国产精品成人a在线观看| 亚洲一区二区毛片| 日韩成人精品一区二区| 亚洲精品乱码日韩| 国产粉嫩在线观看| 日本亚洲欧美天堂免费| 国产一区观看| 久久亚洲国产精品尤物| 亚洲精品乱码| av不卡在线| 日韩高清中文字幕一区二区| 日韩激情网站| 亚洲一区中文| 久久亚洲国产| 久久精品二区亚洲w码 | 一区二区精品伦理...| 日韩高清电影一区| 亚洲一区欧美二区| 99精品电影| 日韩欧美一区二区三区在线视频| 91精品丝袜国产高跟在线| 日韩在线一二三区| 久久亚洲精品中文字幕蜜潮电影| 国产一区二区三区四区五区传媒| 欧美日韩一区二区高清| 蜜臀av一区二区三区| 午夜精品亚洲| 在线精品小视频| 日韩av首页| 日韩一区欧美| 欧美aⅴ一区二区三区视频| 国产精品女主播一区二区三区| 亚洲国产一区二区在线观看 | 日韩一区精品字幕| 欧美日韩一区二区三区四区在线观看 | 青青草伊人久久| 亚洲a在线视频| 欧美日韩一区二区三区不卡视频| 亚洲乱码一区| 国产精品多人| 国产欧美日韩精品高清二区综合区| 欧美综合二区| 黑人精品一区| 伊人成人网在线看| 亚洲欧美日韩国产综合精品二区| 一本大道色婷婷在线| 日韩在线观看不卡| 国际精品欧美精品| 麻豆久久久久久| 精品久久影院|