Labs 導(dǎo)讀
隨著大數(shù)據(jù)和云計(jì)算時(shí)代的到來,我國(guó)的各個(gè)產(chǎn)業(yè)每天都在產(chǎn)生不可估計(jì)的數(shù)據(jù),以及對(duì)數(shù)據(jù)的各式各樣的需求,消息中間件在處理數(shù)據(jù)、消費(fèi)數(shù)據(jù)的過程中越來越受到重視。在高并發(fā)、微服務(wù)、分布式的場(chǎng)景下,如何合理地利用消息中間件,如何保證MQ消費(fèi)消息的冪等性?所謂知其然,才能知其所以然,本文將通過RocketMQ作為例子,來扒一扒什么情況下會(huì)導(dǎo)致重復(fù)消費(fèi)。
作者:李佳斌
單位:中國(guó)移動(dòng)智慧家庭運(yùn)營(yíng)中心
Part 01 ●RocketMQ如何生產(chǎn)和消費(fèi)消息●
先簡(jiǎn)單介紹下RocketMQ正常生產(chǎn)消息和消費(fèi)消息的流程,如下圖。
1.生產(chǎn)者在發(fā)送消息之前根據(jù)負(fù)載均衡策略(默認(rèn)是輪詢)選擇一個(gè)Queue,然后跟這個(gè)Queue所在的機(jī)器建立連接,把消息發(fā)送到這個(gè)Queue上。
2.消費(fèi)者消費(fèi)這個(gè)Queue,就能獲取到對(duì)應(yīng)的消息。
- 問題出現(xiàn)
當(dāng)異常情況出現(xiàn)時(shí),如消息發(fā)送超時(shí)或者消息消費(fèi)超時(shí),RocketMQ為保證消息發(fā)送成功,會(huì)啟動(dòng)重試機(jī)制,選擇另一臺(tái)機(jī)器的Queue重發(fā)。現(xiàn)在假設(shè)有這樣一種情況,消費(fèi)者實(shí)際正確接收到了消息,只是由于網(wǎng)絡(luò)波動(dòng)導(dǎo)致響應(yīng)超時(shí)了,那就會(huì)出現(xiàn)消息重復(fù)發(fā)送,導(dǎo)致消費(fèi)者重復(fù)消費(fèi)的情況出現(xiàn)。
那除此之外,還有沒有其他情況會(huì)導(dǎo)致消息重復(fù)消費(fèi)的情況呢?總結(jié)起來一共有如下幾種情況。
1)消息發(fā)送異常時(shí)的重復(fù)消費(fèi)
2)消費(fèi)消息時(shí)拋出了異常
3)消費(fèi)者提交offset失敗
4)Broker持久化offset失敗
5)主從同步失敗
6)重平衡
7)清理長(zhǎng)時(shí)間消費(fèi)的消息
Part 02 ●淺析各類情況●
- 消費(fèi)消息時(shí)拋出異常
問題分析一
RocketMQ在并發(fā)消費(fèi)的模式下會(huì)調(diào)用MessageListenerConcurrently的consumeMessage方法,入?yún)⑹莔sgs集合。當(dāng)調(diào)用該方法消費(fèi)消息出現(xiàn)異常時(shí),返回的結(jié)果status就會(huì)是null。這種情況下會(huì)導(dǎo)致status被設(shè)置為RECONSUME_LATER,也就是說消息之后會(huì)被重復(fù)消費(fèi)。
問題分析二
傳入的是msgs集合。上述原因一中消息處理之后,不管成功失敗,都會(huì)對(duì)結(jié)果進(jìn)行處理。而集合中的任意一個(gè)失敗,都會(huì)導(dǎo)致status被設(shè)置為RECONSUME_LATER。在對(duì)結(jié)果處理是,判斷到RECONSUME_LATER時(shí),就會(huì)對(duì)msgs重新遍歷并發(fā)送消息,重新消費(fèi),從而導(dǎo)致之前成功處理的消息都會(huì)被重復(fù)消費(fèi)。不過好在msgs消息的數(shù)量默認(rèn)情況下是1。
- 消費(fèi)者提交offset失敗
何為offset
producer發(fā)送消息到broker,Rocketmq會(huì)將消息的內(nèi)容持久化到commitLog文件中,再分發(fā)到topic下的消費(fèi)隊(duì)列consume Queue,消費(fèi)者提交消費(fèi)請(qǐng)求時(shí),broker從該consumer負(fù)責(zé)的消費(fèi)隊(duì)列中根據(jù)請(qǐng)求參數(shù)傳入的起始o(jì)ffset來獲取需要消費(fèi)的消息索引信息,再?gòu)腸ommitLog中獲取具體的消息內(nèi)容返回給consumer。消費(fèi)成功之后,消費(fèi)者提交offset,來記錄這個(gè)queue消費(fèi)到哪個(gè)位置了。
問題分析
RocketMq設(shè)計(jì)的時(shí)候,消費(fèi)完消息,并不是同步提交offset,而是將offset保存到內(nèi)存中,通過一個(gè)定時(shí)任務(wù)(默認(rèn)是5S一次),以網(wǎng)絡(luò)請(qǐng)求的方式將offset提交給broker。如果最新的offset還沒提交,此時(shí)服務(wù)器宕機(jī)了,那么重啟之后,就會(huì)從broker中讀取到之前的提交的offset,并從此處開始消費(fèi),此時(shí)就會(huì)出現(xiàn)重復(fù)消費(fèi)的情況了。
- broker持久化offset失敗
問題分析
與消費(fèi)者提交offset同理,Broker為了防止數(shù)據(jù)丟失,會(huì)將offset持久化到磁盤中。同樣的也是通過一個(gè)默認(rèn)5S的定時(shí)任務(wù)來處理持久化操作。所以offset的完整過程就如下圖。當(dāng)broker宕機(jī)時(shí),就會(huì)導(dǎo)致offset丟失,此時(shí)如果消費(fèi)者重新拉取消費(fèi)進(jìn)度,就會(huì)比實(shí)際消費(fèi)的進(jìn)度要低,導(dǎo)致重復(fù)消費(fèi)。
- 主從同步失敗
問題分析
為保證RocketMQ服務(wù)的高可用,一般項(xiàng)目中都會(huì)啟用主從備份的模式,當(dāng)主節(jié)點(diǎn)掛掉之后,從節(jié)點(diǎn)就會(huì)升級(jí)為主節(jié)點(diǎn)對(duì)外提供服務(wù)。因此就需要進(jìn)行主從同步,保證數(shù)據(jù)的一致性。默認(rèn)情況下每隔10S,從節(jié)點(diǎn)會(huì)向主節(jié)點(diǎn)請(qǐng)求,同步元數(shù)據(jù),包括消費(fèi)進(jìn)度。此時(shí)如果主節(jié)點(diǎn)宕機(jī)了,從節(jié)點(diǎn)就無(wú)法獲取到10S之內(nèi)的消費(fèi)進(jìn)度,自然也就會(huì)導(dǎo)致重復(fù)消費(fèi)。
- 重平衡
何為重平衡
RocketMQ的消費(fèi)者有兩種模式,集群消費(fèi)模式和廣播消費(fèi)模式,絕大多數(shù)場(chǎng)景采用的都是集群消費(fèi)模式。前面提到的消費(fèi)進(jìn)度就是在集群消費(fèi)模式下才會(huì)存在。集群消費(fèi)模式中有一個(gè)消費(fèi)組的概念。一個(gè)消費(fèi)組可以有多個(gè)消費(fèi)者,不同消費(fèi)組之間消費(fèi)消息互不干擾,而同一消費(fèi)組的消費(fèi)者按照一定的算法分配消息隊(duì)列進(jìn)行消息消費(fèi),保證一個(gè)消息只能被一個(gè)消費(fèi)組消費(fèi)一次。當(dāng)消費(fèi)組中的消費(fèi)組增加或者減少時(shí)就會(huì)觸發(fā)重平衡。如圖,原先消費(fèi)組中有兩個(gè)消費(fèi)者,平均消費(fèi)4個(gè)隊(duì)列,每個(gè)消費(fèi)組2個(gè)隊(duì)列;當(dāng)加入了一個(gè)新的消費(fèi)者時(shí),為了保證新的消費(fèi)者能夠消費(fèi)消息,就會(huì)進(jìn)行重平衡,重新分配消息隊(duì)列。
問題分析
假設(shè)在重平衡發(fā)生時(shí),此時(shí)消費(fèi)者2還在正常消費(fèi)Queue4,當(dāng)消費(fèi)者3加入,重平衡完成時(shí),此時(shí)消費(fèi)者2判斷到Queue4已經(jīng)不屬于自己消費(fèi)了,就會(huì)將Queue4設(shè)置為dropped,消費(fèi)完成時(shí),發(fā)現(xiàn)隊(duì)列是dropped狀態(tài),那么消費(fèi)者2的消費(fèi)進(jìn)度offset就不會(huì)被提交。成功消費(fèi)了消息,但是消費(fèi)進(jìn)度卻沒有被提交,于是當(dāng)消費(fèi)者3開始消費(fèi)消息時(shí),就會(huì)從服務(wù)端拉取到之前的消費(fèi)進(jìn)度,造成隊(duì)列4的消息被重復(fù)消費(fèi)。
- 清理長(zhǎng)時(shí)間消費(fèi)的消息
清理機(jī)制講解
RocketMQ中有一個(gè)機(jī)制會(huì)定時(shí)清理長(zhǎng)時(shí)間正在消費(fèi)的消息,默認(rèn)是15分鐘執(zhí)行一次清理任務(wù)。之所以這么做,是有原因的。我們說過,消息被消費(fèi)之后,就會(huì)提交offset。當(dāng)一個(gè)線程消費(fèi)了所有消息時(shí),就會(huì)把消息從集合中移除,提交的消息進(jìn)度offset就是msg5的offset+1。
假設(shè),現(xiàn)在是兩個(gè)線程消費(fèi),線程2消費(fèi)完成,之后提交offset,但是此時(shí)線程1還在處理前兩條消息,因此為了保證消費(fèi)消息的不丟失,移除之后發(fā)現(xiàn)集合中還有剩余消息,就會(huì)把msg1的offset返回提交上去。而一旦集合最前面的消息長(zhǎng)時(shí)間處理,就會(huì)導(dǎo)致這個(gè)消費(fèi)進(jìn)度一直在最前面。此時(shí)如果服務(wù)器重啟,就會(huì)導(dǎo)致很多消費(fèi)過的消息都會(huì)被重復(fù)消費(fèi)。因此引入了清理長(zhǎng)時(shí)間消費(fèi)的機(jī)制。
問題分析
引入清理長(zhǎng)時(shí)間消費(fèi)的消息機(jī)制后,一旦發(fā)現(xiàn)某個(gè)消息已經(jīng)處理超過15分鐘了,就會(huì)將消息移除,保障后續(xù)消息消費(fèi)進(jìn)度的正常提交,之后會(huì)隔一定的時(shí)間再次消費(fèi)這個(gè)被移除的消息。但是,這個(gè)消息雖然被移除了,卻并不是沒有消費(fèi)過,因此再次消費(fèi)就會(huì)導(dǎo)致重復(fù)消費(fèi)的問題出現(xiàn)。
Part 03 ● 總結(jié)●
RocketMq的官方文檔中對(duì)消息傳遞有這樣的解釋:RocketMq確保所有消息至少被傳遞一次,在大多數(shù)情況下,消息不會(huì)重復(fù)。可見RocketMq為了保證消息的不丟失,犧牲了消息投遞的重復(fù)率。因此我們?cè)谑褂肦okcetMq時(shí)需要合理使用它的特點(diǎn),設(shè)計(jì)合理的冪等技術(shù)方案來解決重復(fù)消費(fèi)的問題。
審核編輯:湯梓紅
-
云計(jì)算
+關(guān)注
關(guān)注
39文章
7800瀏覽量
137401 -
Queue
+關(guān)注
關(guān)注
0文章
16瀏覽量
7261 -
負(fù)載均衡
+關(guān)注
關(guān)注
0文章
111瀏覽量
12368 -
大數(shù)據(jù)
+關(guān)注
關(guān)注
64文章
8889瀏覽量
137442
原文標(biāo)題:技術(shù) | RocketMQ中各類重復(fù)消費(fèi)的原理淺析
文章出處:【微信號(hào):5G通信,微信公眾號(hào):5G通信】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論