0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

如何保證kafka消息不丟失

馬哥Linux運(yùn)維 ? 來源:稀土掘金技術(shù)社區(qū) ? 2023-12-19 09:52 ? 次閱讀

如果在簡(jiǎn)歷上寫了使用過kafka消息中間件,面試官大概80%的概率會(huì)問你:"如何保證kafka消息不丟失?"反正我是屢試不爽。
如果你的核心業(yè)務(wù)數(shù)據(jù),比如訂單數(shù)據(jù),或者其它核心交易業(yè)務(wù)數(shù)據(jù),在使用kafka時(shí),要保證消息不丟失,并讓下游消費(fèi)系統(tǒng)一定能獲得訂單數(shù)據(jù),只靠kafka中間件來保證,是并不可靠的。

kafka已經(jīng)這么的優(yōu)秀 了,為什么還會(huì)丟消息了?這一定是初學(xué)者或者初級(jí)使用者心中的疑惑

kafka 已經(jīng)這么的優(yōu)秀了,為啥還會(huì)丟消息了?----太不省心了

2f1eac08-9d9d-11ee-8b88-92fbcf53809c.jpg

圖一 生產(chǎn)者,broker,消費(fèi)者

要解決kafka丟失消息的情況,需要從使用kafka涉及的主流程和主要組件進(jìn)行分析。kafka的核心業(yè)務(wù)流程很簡(jiǎn)單:發(fā)送消息,暫存消息,消費(fèi)消息。而這中間涉及到的主要組件,分別是生產(chǎn)端,broker端,消費(fèi)端。

生產(chǎn)端丟失消息的情況和解決方法

生產(chǎn)端丟失消息的第一個(gè)原因主要來源于kafka的特性:批量發(fā)送異步提交。我們知道,kafka在發(fā)送消息時(shí),是由底層的IO SEND線程進(jìn)行消息的批量發(fā)送,不是由業(yè)務(wù)代碼線程執(zhí)行發(fā)送的。即業(yè)務(wù)代碼線程執(zhí)行完send方法后,就返回了。消息到底發(fā)送給broker側(cè)沒有了?通過send方法其實(shí)是無法知道的。2f2f6444-9d9d-11ee-8b88-92fbcf53809c.jpg

那么如何解決了?kafka提供了一個(gè)帶有callback回調(diào)函數(shù)的方法,如果消息成功/(失敗的)發(fā)送給broker端了,底層的IO線程是可以知道的,所以此時(shí)IO線程可以回調(diào)callback函數(shù),通知上層業(yè)務(wù)應(yīng)用。我們也一般在callback函數(shù)里,根據(jù)回調(diào)函數(shù)的參數(shù),就能知道消息是否發(fā)送成功了,如果發(fā)送失敗了,那么我們還可以在callback函數(shù)里重試。一般業(yè)務(wù)場(chǎng)景下 通過重試的方法保證消息再次發(fā)送出去。

90%的面試者都能給出上面的標(biāo)準(zhǔn)回答。

但在一些嚴(yán)格的交易場(chǎng)景:僅僅依靠回調(diào)函數(shù)的通知和重試,是不能保證消息一定能發(fā)送到broker端的

理由如下:
1、callback函數(shù)是在jvm層面由IO SEND線程執(zhí)行的,如果剛好遇到在執(zhí)行回調(diào)函數(shù)時(shí),jvm宕機(jī)了,或者恰好長(zhǎng)時(shí)間的GC,最終導(dǎo)致OOM,或者jvm假死的情況;那么回調(diào)函數(shù)是不能被執(zhí)行的。恰好你的消息數(shù)據(jù),是一個(gè)帶有交易屬性核心業(yè)務(wù)數(shù)據(jù),必須要通知給下游。比如下單或者支付后,需要通知傭金系統(tǒng),或者積分系統(tǒng),去計(jì)算訂單傭金。此時(shí)一個(gè)JVM宕機(jī)或者OOM,給下游的數(shù)據(jù)就丟了,那么計(jì)算聯(lián)盟客的訂單傭金數(shù)據(jù)也就丟了,造成聯(lián)盟客資損了。

2、IO SEND線程和broker之間是通過網(wǎng)絡(luò)進(jìn)行通信的,而網(wǎng)絡(luò)通信并不一定都能保證一直都是順暢的,比如網(wǎng)絡(luò)丟包,網(wǎng)絡(luò)中的交換機(jī)壞了,由底層網(wǎng)絡(luò)硬件的故障,導(dǎo)致上層IO線程發(fā)送消息失?。淮藭r(shí)發(fā)送端配置的重試參數(shù) retries 也不好使了。

如何解決生產(chǎn)端在極端嚴(yán)格的交易場(chǎng)景下,消息丟失了?

如果要解決jvm宕機(jī),或者JVM假死;又或者底層網(wǎng)絡(luò)問題,帶來的消息丟失;是需要上層應(yīng)用額外的機(jī)制來保證消息數(shù)據(jù)發(fā)送的完整性。大概流程如下圖

2f3eead6-9d9d-11ee-8b88-92fbcf53809c.jpg

1、在發(fā)送消息之前,加一個(gè)發(fā)送記錄,并且初始化為待發(fā)送;并且把發(fā)送記錄進(jìn)行存儲(chǔ)(可以存儲(chǔ)在DB里,或者其它存儲(chǔ)引擎里);2、利用帶有回調(diào)函數(shù)的callback通知,在業(yè)務(wù)代碼里感知到消息是否發(fā)送成功;如果消息發(fā)送成功,則把存儲(chǔ)引擎里對(duì)應(yīng)的消息標(biāo)記為已發(fā)送 3、利用延遲的定時(shí)任務(wù),每隔5分鐘(可根據(jù)實(shí)際情況調(diào)整掃描頻率)定時(shí)掃描5分鐘前未發(fā)送或者發(fā)送失敗的消息,再次進(jìn)行發(fā)送。

這樣即使應(yīng)用的jvm宕機(jī),或者底層網(wǎng)絡(luò)出現(xiàn)故障,消息是否發(fā)送的記錄,都進(jìn)行了保存。通過持續(xù)的定時(shí)任務(wù)掃描和重試,能最終保證消息一定能發(fā)送出去。

broker端丟失消息的情況和解決方法

broker端接收到生產(chǎn)端的消息后,并成功應(yīng)答生產(chǎn)端后,消息會(huì)丟嗎?如果broker能像mysql服務(wù)器一樣,在成功應(yīng)答給客戶端前,能把消息寫入到了磁盤進(jìn)行持久化,并且在宕機(jī)斷電后,有恢復(fù)機(jī)制,那么我們能說broker端不會(huì)丟消息。

2f4c4000-9d9d-11ee-8b88-92fbcf53809c.jpg

但broker端提供數(shù)據(jù)不丟的保障和mysql是不一樣的。broker端在接受了一批消息數(shù)據(jù)后,是不會(huì)馬上寫入磁盤的,而是先寫入到page cache里,這個(gè)page cache是操作系統(tǒng)的頁緩存(也就是另外一個(gè)內(nèi)存,只是由操作系統(tǒng)管理,不屬于JVM管理的內(nèi)存),通過定時(shí)或者定量的的方式( log.flush.interval.messages和log.flush.interval.ms)會(huì)把page cache里的數(shù)據(jù)寫入到磁盤里。

如果page cache在持久化到磁盤前,broker進(jìn)程宕機(jī)了,這個(gè)時(shí)候不會(huì)丟失消息,重啟broker即可;如果此時(shí)操作系統(tǒng)宕機(jī)或者物理機(jī)宕機(jī)了,page cache里的數(shù)據(jù)還沒有持久化到磁盤里,此種情況數(shù)據(jù)就丟了。

kafka應(yīng)對(duì)此種情況,建議是通過多副本機(jī)制來解決的,核心思想也挺簡(jiǎn)單的:如果數(shù)據(jù)保存在一臺(tái)機(jī)器上你覺得可靠性不夠,那么我就把相同的數(shù)據(jù)保存到多臺(tái)機(jī)器上,某臺(tái)機(jī)器宕機(jī)了可以由其它機(jī)器提供相同的服務(wù)和數(shù)據(jù)。

要想達(dá)到上面效果,有三個(gè)關(guān)鍵參數(shù)需要配置
第一:生產(chǎn)端參數(shù)ack 設(shè)置為all
代表消息需要寫入到“大多數(shù)”的副本分區(qū)后,leader broker才給生產(chǎn)端應(yīng)答消息寫入成功。(即寫入了“大多數(shù)”機(jī)器的page cache里)

第二:在broker端 配置min.insync.replicas參數(shù)設(shè)置至少為2
此參數(shù)代表了 上面的“大多數(shù)”副本。為2表示除了寫入leader分區(qū)外,還需要寫入到一個(gè)follower 分區(qū)副本里,broker端才會(huì)應(yīng)答給生產(chǎn)端消息寫入成功。此參數(shù)設(shè)置需要搭配第一個(gè)參數(shù)使用。

第三:在broker端配置replicator.factor參數(shù)至少3
此參數(shù)表示:topic每個(gè)分區(qū)的副本數(shù)。如果配置為2,表示每個(gè)分區(qū)只有2個(gè)副本,在加上第二個(gè)參數(shù)消息寫入時(shí)至少寫入2個(gè)分區(qū)副本,則整個(gè)寫入邏輯就表示集群中topic的分區(qū)副本不能有一個(gè)宕機(jī)。如果配置為3,則topic的每個(gè)分區(qū)副本數(shù)為3,再加上第二個(gè)參數(shù)min.insync.replicas為2,即每次,只需要寫入2個(gè)分區(qū)副本即可,另外一個(gè)宕機(jī)也不影響,在保證了消息不丟的情況下,也能提高分區(qū)的可用性;只是有點(diǎn)費(fèi)空間,畢竟多保存了一份相同的數(shù)據(jù)到另外一臺(tái)機(jī)器上。

另外在broker端,還有個(gè)參數(shù)unclean.leader.election.enable
此參數(shù)表示:沒有和leader分區(qū)保持?jǐn)?shù)據(jù)同步的副本分區(qū)是否也能參與leader分區(qū)的選舉,建議設(shè)置為false,不允許。如果允許,這這些落后的副本分區(qū)競(jìng)選為leader分區(qū)后,則之前l(fā)eader分區(qū)已保存的最新數(shù)據(jù)就有丟失的風(fēng)險(xiǎn)。注意在0.11版本之前默認(rèn)為TRUE。

消費(fèi)端側(cè)丟失消息的情況和解決方法

消費(fèi)端丟失消息的情況:消費(fèi)端丟失消息的情況,主要是設(shè)置了 autoCommit為true,即消費(fèi)者消費(fèi)消息的位移,由消費(fèi)者自動(dòng)提交。
自動(dòng)提交,表面上看起來挺高大上的,但這是消費(fèi)端丟失消息的主要原因。實(shí)例代碼如下


while(true){
 consumer.poll(); #①拉取消息
  XXX #②進(jìn)行業(yè)務(wù)處理;
 }

如果在第一步拉取消息后,即提交了消息位移;而在第二步處理消息的時(shí)候發(fā)生了業(yè)務(wù)異常,或者jvm宕機(jī)了。則第二次在從消費(fèi)端poll消息時(shí),會(huì)從最新的位移拉取后面的消息,這樣就造成了消息的丟失。

消費(fèi)端解決消息丟失也不復(fù)雜,設(shè)置autoCommit為false;然后在消費(fèi)完消息后手工提交位移即可 實(shí)例代碼如下:


while(true){
 consumer.poll(); #①拉取消息
  XXX #②處理消息;
  consumer.commit();
 }

在第二步進(jìn)行了業(yè)務(wù)處理后,在提交消費(fèi)的消息位移;這樣即使第二步或者第三步提交位移失敗了又或者宕機(jī)了,第二次再從poll拉取消息時(shí),則會(huì)以第一次拉取消息的位移處獲取后面的消息,以此保證了消息的不丟失。

總結(jié)

在生產(chǎn)端所在的jvm運(yùn)行正常,底層網(wǎng)絡(luò)通順的情況下,通過kafka 生產(chǎn)端自身的retries機(jī)制和call back回調(diào)能減少一部分消息丟失情況;但并不能保證在應(yīng)用層,網(wǎng)絡(luò)層有問題時(shí),也能100%確保消息不丟失;如果要解決此問題,可以試試 記錄消息發(fā)送狀態(tài)+定時(shí)任務(wù)掃描+重試的機(jī)制。

在broker端,要保證消息數(shù)據(jù)不丟失;kafka提供了多副本機(jī)制來進(jìn)行保證。關(guān)鍵核心參數(shù)三個(gè),一個(gè)生產(chǎn)端ack=all,兩個(gè)broker端參數(shù)min.insync.replicas 寫入數(shù)據(jù)到分區(qū)最小副本數(shù)為2,并且每個(gè)分區(qū)的副本集最小為3

在消費(fèi)端,要保證消息不丟失,需要設(shè)置消費(fèi)端參數(shù) autoCommit為false,并且在消息消費(fèi)完后,再手工提交消息位置

審核編輯:湯梓紅

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4788

    瀏覽量

    68612
  • 線程
    +關(guān)注

    關(guān)注

    0

    文章

    504

    瀏覽量

    19683
  • kafka
    +關(guān)注

    關(guān)注

    0

    文章

    51

    瀏覽量

    5222

原文標(biāo)題:kafka 消息“零丟失”的配方

文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    kafka設(shè)計(jì)原理的深度探討

    Kafka簡(jiǎn)介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問性能 高吞吐率。即使
    的頭像 發(fā)表于 10-08 07:50 ?2009次閱讀
    <b class='flag-5'>kafka</b>設(shè)計(jì)原理的深度探討

    kafka數(shù)據(jù)可靠性深度解讀

    集群內(nèi)部的可靠性,但是在生產(chǎn)者向kafka集群發(fā)送時(shí),數(shù)據(jù)經(jīng)過網(wǎng)絡(luò)傳輸,也是不可靠的,可能因?yàn)榫W(wǎng)絡(luò)延遲、閃斷等原因造成數(shù)據(jù)的丟失kafka為生產(chǎn)者提供了如下的三種可靠性級(jí)別,通過不同策略保證
    發(fā)表于 05-08 16:29

    流水線設(shè)計(jì)提高數(shù)據(jù)處理有沒有辦法保證數(shù)據(jù)丟失

    丟失?據(jù)說流水線可以實(shí)現(xiàn),但是本人對(duì)流水線只知道個(gè)大概,實(shí)際運(yùn)用還是有點(diǎn)摸不著頭腦。因?yàn)槊總€(gè)周期有幾千個(gè)數(shù)據(jù),是不是用FIFO多級(jí)延遲??
    發(fā)表于 08-16 11:50

    sja1000跟51外部中斷只能保證8幀丟失

    我用SJA1000收到信號(hào)就給stcf11f08xe一個(gè)外部中斷1(int1)信號(hào)收到一幀給一個(gè)信號(hào)- -但是連續(xù)發(fā)8幀以上 中斷就接收到了1次或者3次不等頂多只能保證8幀丟失- -這是為什么呢
    發(fā)表于 04-29 00:36

    淺析kafka

    kafka常見問題
    發(fā)表于 09-29 10:09

    基于發(fā)布與訂閱的消息系統(tǒng)Kafka

    Kafka權(quán)威指南》——初識(shí) Kafka
    發(fā)表于 03-05 13:46

    Kafka基礎(chǔ)入門文檔

    kafka系統(tǒng)入門教程(原理、配置、集群搭建、Java應(yīng)用、Kafka-manager)
    發(fā)表于 03-12 07:22

    Kafka集群環(huán)境的搭建

    1、環(huán)境版本版本:kafka2.11,zookeeper3.4注意:這里zookeeper3.4也是基于集群模式部署。2、解壓重命名tar -zxvf
    發(fā)表于 01-05 17:55

    基于臭氧的Kafka自適應(yīng)調(diào)優(yōu)方法ENLHS

    ,如果針對(duì)實(shí)際資源環(huán)境進(jìn)行調(diào)優(yōu), Kafka使用默認(rèn)的配置參數(shù)無法保證其在毎個(gè)生產(chǎn)環(huán)境下的性能。因?yàn)镵aka自身的配置項(xiàng)非常大,傳統(tǒng)的自適應(yīng)算法在大規(guī)模生產(chǎn)系統(tǒng)中的性能較差。為了提高 Kaf
    發(fā)表于 05-13 11:39 ?7次下載

    Kafka的概念及Kafka的宕機(jī)

    問題要從一次Kafka的宕機(jī)開始說起。 筆者所在的是一家金融科技公司,但公司內(nèi)部并沒有采用在金融支付領(lǐng)域更為流行的 RabbitMQ ,而是采用了設(shè)計(jì)之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發(fā)表于 08-27 11:21 ?2103次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機(jī)

    Kafka 的簡(jiǎn)介

    ,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問性能 高吞吐率。即使在非常廉價(jià)的機(jī)器上也能做到單機(jī)支持每秒100K條消息的傳輸 支持Kafka Server間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè)
    的頭像 發(fā)表于 07-03 11:10 ?614次閱讀
    <b class='flag-5'>Kafka</b> 的簡(jiǎn)介

    物通博聯(lián)5G-kafka工業(yè)網(wǎng)關(guān)實(shí)現(xiàn)kafka協(xié)議對(duì)接到云平臺(tái)

    Kafka協(xié)議是一種基于TCP層的網(wǎng)絡(luò)協(xié)議,用于在分布式消息傳遞系統(tǒng)Apache Kafka中發(fā)送和接收消息。Kafka協(xié)議定義了客戶端和服務(wù)器之間的通信方式和數(shù)據(jù)格式,允許客戶端發(fā)送消息到K
    的頭像 發(fā)表于 07-11 10:44 ?515次閱讀

    Kafka架構(gòu)技術(shù):Kafka的架構(gòu)和客戶端API設(shè)計(jì)

    Kafka 給自己的定位是事件流平臺(tái)(event stream platform)。因此在消息隊(duì)列中經(jīng)常使用的 "消息"一詞,在 Kafka 中被稱為 "事件"。
    的頭像 發(fā)表于 10-10 15:41 ?2380次閱讀
    <b class='flag-5'>Kafka</b>架構(gòu)技術(shù):<b class='flag-5'>Kafka</b>的架構(gòu)和客戶端API設(shè)計(jì)

    kafka相關(guān)命令詳解

    kafka常用命令詳解
    的頭像 發(fā)表于 10-20 11:34 ?942次閱讀

    面試官:Kafka會(huì)丟消息嗎?

    許多開發(fā)人員普遍認(rèn)為,Kafka 的設(shè)計(jì)本身就能保證不會(huì)丟失消息。然而,Kafka 架構(gòu)和配置的細(xì)微差別會(huì)導(dǎo)致消息的丟失。我們需要了解它如何
    的頭像 發(fā)表于 04-29 17:32 ?1027次閱讀
    面試官:<b class='flag-5'>Kafka</b>會(huì)丟消息嗎?