前言
1. 需求
2. 性能優(yōu)化
3. 出問(wèn)題了
4. 多線程消費(fèi)
5. 順序消費(fèi)
6. 唯一索引
7. 分布式鎖
8. 統(tǒng)一mq異步處理
9. insert on duplicate key update
10. insert ignore
11. 防重表
前言
最近測(cè)試給我提了一個(gè)bug,說(shuō)我之前提供的一個(gè)批量復(fù)制商品的接口,產(chǎn)生了重復(fù)的商品數(shù)據(jù)。
追查原因之后發(fā)現(xiàn),這個(gè)事情沒(méi)想象中簡(jiǎn)單,可以說(shuō)一波多折。
1. 需求
產(chǎn)品有個(gè)需求:用戶選擇一些品牌,點(diǎn)擊確定按鈕之后,系統(tǒng)需要基于一份默認(rèn)品牌的商品數(shù)據(jù),復(fù)制出一批新的商品。
拿到這個(gè)需求時(shí)覺(jué)得太簡(jiǎn)單了,三下五除二就搞定。
我提供了一個(gè)復(fù)制商品的基礎(chǔ)接口,給商城系統(tǒng)調(diào)用。
當(dāng)時(shí)的流程圖如下:
如果每次復(fù)制的商品數(shù)量不多,使用同步接口調(diào)用的方案問(wèn)題也不大。
2. 性能優(yōu)化
但由于每次需要復(fù)制的商品數(shù)量比較多,可能有幾千。
如果每次都是用同步接口的方式復(fù)制商品,可能會(huì)有性能問(wèn)題。
因此,后來(lái)我把復(fù)制商品的邏輯改成使用mq異步處理。
改造之后的流程圖:
復(fù)制商品的結(jié)果還需要通知商城系統(tǒng):
這個(gè)方案看起來(lái),挺不錯(cuò)的。
但后來(lái)出現(xiàn)問(wèn)題了。
3. 出問(wèn)題了
測(cè)試給我們提了一個(gè)bug,說(shuō)我之前提供的一個(gè)批量復(fù)制商品的接口,產(chǎn)生了重復(fù)的商品數(shù)據(jù)。
經(jīng)過(guò)追查之后發(fā)現(xiàn),商城系統(tǒng)為了性能考慮,也改成異步了。
他們沒(méi)有在接口中直接調(diào)用基礎(chǔ)系統(tǒng)的復(fù)制商品接口,而是在job中調(diào)用的。
站在他們的視角流程圖是這樣的:
用戶調(diào)用商城的接口,他們會(huì)往請(qǐng)求記錄表中寫(xiě)入一條數(shù)據(jù),然后在另外一個(gè)job中,異步調(diào)用基礎(chǔ)系統(tǒng)的接口去復(fù)制商品。
但實(shí)際情況是這樣的:商城系統(tǒng)內(nèi)部出現(xiàn)了bug,在請(qǐng)求記錄表中,同一條請(qǐng)求產(chǎn)生了重復(fù)的數(shù)據(jù)。這樣導(dǎo)致的結(jié)果是,在job中調(diào)用基礎(chǔ)系統(tǒng)復(fù)制商品接口時(shí),發(fā)送了重復(fù)的請(qǐng)求。
剛好基礎(chǔ)系統(tǒng)現(xiàn)在是使用RocketMQ異步處理的。由于商城的job一次會(huì)取一批數(shù)據(jù)(比如:20條記錄),在極短的時(shí)間內(nèi)(其實(shí)就是在一個(gè)for循環(huán)中)多次調(diào)用接口,可能存在相同的請(qǐng)求參數(shù)連續(xù)調(diào)用復(fù)制商品接口情況。于是,出現(xiàn)了并發(fā)插入重復(fù)數(shù)據(jù)的問(wèn)題。
為什么會(huì)出現(xiàn)這個(gè)問(wèn)題呢?
4. 多線程消費(fèi)
RocketMQ的消費(fèi)者,為了性能考慮,默認(rèn)是用多線程并發(fā)消費(fèi)的,最大支持64個(gè)線程。
例如:
@RocketMQMessageListener(topic="${com.susan.topic:PRODUCT_TOPIC}", consumerGroup="${com.susan.group:PRODUCT_TOPIC_GROUP}") @Service publicclassMessageReceiverimplementsRocketMQListener{ @Override publicvoidonMessage(MessageExtmessage){ Stringmessage=newString(message.getBody(),StandardCharsets.UTF_8); doSamething(message); } }
也就是說(shuō),如果在極短的時(shí)間內(nèi),連續(xù)發(fā)送重復(fù)的消息,就會(huì)被不同的線程消費(fèi)。
即使在代碼中有這樣的判斷:
ProductoldProduct=query(hashCode); if(oldProduct==null){ productMapper.insert(product); }
在插入數(shù)據(jù)之前,先判斷該數(shù)據(jù)是否已經(jīng)存在,只有不存在才會(huì)插入。
但由于在并發(fā)情況下,不同的線程都判斷商品數(shù)據(jù)不存在,于是同時(shí)進(jìn)行了插入操作,所以就產(chǎn)生了重復(fù)數(shù)據(jù)。
如下圖所示:
5. 順序消費(fèi)
為了解決上述并發(fā)消費(fèi)重復(fù)消息的問(wèn)題,我們從兩方面著手:
商城系統(tǒng)修復(fù)產(chǎn)生重復(fù)記錄的bug。
基礎(chǔ)系統(tǒng)將消息改成單線程順序消費(fèi)。
我仔細(xì)思考了一下,如果只靠商城系統(tǒng)修復(fù)bug,以后很難避免不出現(xiàn)類似的重復(fù)商品問(wèn)題,比如:如果用戶在極短的時(shí)間內(nèi)點(diǎn)擊創(chuàng)建商品按鈕多次,或者商城系統(tǒng)主動(dòng)發(fā)起重試。
所以,基礎(chǔ)系統(tǒng)還需進(jìn)一步處理。
其實(shí)RocketMQ本身是支持順序消費(fèi)的,需要消息的生產(chǎn)者和消費(fèi)者一起改。
生產(chǎn)者改為:
rocketMQTemplate.asyncSendOrderly(topic,message,hashKey,newSendCallback(){ @Override publicvoidonSuccess(SendResultsendResult){ log.info("sendMessagesuccess"); } @Override publicvoidonException(Throwablee){ log.error("sendMessagefailed!"); } });
重點(diǎn)是要調(diào)用rocketMQTemplate對(duì)象的asyncSendOrderly方法,發(fā)送順序消息。
消費(fèi)者改為:
@RocketMQMessageListener(topic="${com.susan.topic:PRODUCT_TOPIC}", consumeMode=ConsumeMode.ORDERLY, consumerGroup="${com.susan.group:PRODUCT_TOPIC_GROUP}") @Service publicclassMessageReceiverimplementsRocketMQListener{ @Override publicvoidonMessage(MessageExtmessage){ Stringmessage=newString(message.getBody(),StandardCharsets.UTF_8); doSamething(message); } }
接收消息的重點(diǎn)是RocketMQMessageListener注解中的consumeMode參數(shù),要設(shè)置成ConsumeMode.ORDERLY,這樣就能順序消費(fèi)消息了。
兩邊都修改之后,復(fù)制商品這一塊就沒(méi)有再出現(xiàn)重復(fù)商品的問(wèn)題了。
But,修完bug之后,我又思考了良久。
復(fù)制商品只是創(chuàng)建商品的其中一個(gè)入口,如果有其他入口,跟復(fù)制商品功能同時(shí)創(chuàng)建新商品呢?
不也會(huì)出現(xiàn)重復(fù)商品問(wèn)題?
雖說(shuō),這種概率非常非常小。
但如果一旦出現(xiàn)重復(fù)商品問(wèn)題,后續(xù)涉及到要合并商品的數(shù)據(jù),非常麻煩。
經(jīng)過(guò)這一次的教訓(xùn),一定要防微杜漸。
不管是用戶,還是自己的內(nèi)部系統(tǒng),從不同的入口創(chuàng)建商品,都需要解決重復(fù)商品創(chuàng)建問(wèn)題。
那么,如何解決這個(gè)問(wèn)題呢?
6. 唯一索引
解決重復(fù)商品數(shù)據(jù)問(wèn)題,最快成本最低最有效的辦法是:給表建唯一索引。
想法是好的,但我們這邊有個(gè)規(guī)范就是:業(yè)務(wù)表必須都是邏輯刪除。
而我們都知道,要?jiǎng)h除表的某條記錄的話,如果用delete語(yǔ)句操作的話。
例如:
deletefromproductwhereid=123;
這種delete操作是物理刪除,即該記錄被刪除之后,后續(xù)通過(guò)sql語(yǔ)句基本查不出來(lái)。(不過(guò)通過(guò)其他技術(shù)手段可以找回,那是后話了)
還有另外一種是邏輯刪除,主要是通過(guò)update語(yǔ)句操作的。
例如:
updateproductsetdelete_status=1,edit_time=now(3) whereid=123;
邏輯刪除需要在表中額外增加一個(gè)刪除狀態(tài)字段,用于記錄數(shù)據(jù)是否被刪除。在所有的業(yè)務(wù)查詢的地方,都需要過(guò)濾掉已經(jīng)刪除的數(shù)據(jù)。
通過(guò)這種方式刪除數(shù)據(jù)之后,數(shù)據(jù)任然還在表中,只是從邏輯上過(guò)濾了刪除狀態(tài)的數(shù)據(jù)而已。
其實(shí)對(duì)于這種邏輯刪除的表,是沒(méi)法加唯一索引的。
為什么呢?
假設(shè)之前給商品表中的name和model加了唯一索引,如果用戶把某條記錄刪除了,delete_status設(shè)置成1了。后來(lái),該用戶發(fā)現(xiàn)不對(duì),又重新添加了一模一樣的商品。
由于唯一索引的存在,該用戶第二次添加商品會(huì)失敗,即使該商品已經(jīng)被刪除了,也沒(méi)法再添加了。
這個(gè)問(wèn)題顯然有點(diǎn)嚴(yán)重。
有人可能會(huì)說(shuō):把name、model和delete_status三個(gè)字段同時(shí)做成唯一索引不就行了?
答:這樣做確實(shí)可以解決用戶邏輯刪除了某個(gè)商品,后來(lái)又重新添加相同的商品時(shí),添加不了的問(wèn)題。但如果第二次添加的商品,又被刪除了。該用戶第三次添加相同的商品,不也出現(xiàn)問(wèn)題了?
由此可見(jiàn),如果表中有邏輯刪除功能,是不方便創(chuàng)建唯一索引的。
7. 分布式鎖
接下來(lái),你想到的第二種解決數(shù)據(jù)重復(fù)問(wèn)題的辦法可能是:加分布式鎖。
目前最常用的性能最高的分布式鎖,可能是redis分布式鎖了。
使用redis分布式鎖的偽代碼如下:
try{ Stringresult=jedis.set(lockKey,requestId,"NX","PX",expireTime); if("OK".equals(result)){ doSamething(); returntrue; } returnfalse; }finally{ unlock(lockKey,requestId); }
不過(guò)需要在finally代碼塊中釋放鎖。
其中l(wèi)ockKey是由商品表中的name和model組合而成的,requestId是每次請(qǐng)求的唯一標(biāo)識(shí),以便于它每次都能正確得釋放鎖。還需要設(shè)置一個(gè)過(guò)期時(shí)間expireTime,防止釋放鎖失敗,鎖一直存在,導(dǎo)致后面的請(qǐng)求沒(méi)法獲取鎖。
如果只是單個(gè)商品,或者少量的商品需要復(fù)制添加,則加分布式鎖沒(méi)啥問(wèn)題。
主要流程如下:
可以在復(fù)制添加商品之前,先嘗試加鎖。如果加鎖成功,則在查詢商品是否存在,如果不存在,則添加商品。此外,在該流程中如果加鎖失敗,或者查詢商品時(shí)不存在,則直接返回。
加分布式鎖的目的是:保證查詢商品和添加商品的兩個(gè)操作是原子性的操作。
但現(xiàn)在的問(wèn)題是,我們這次需要復(fù)制添加的商品數(shù)量很多,如果每添加一個(gè)商品都要加分布式鎖的話,會(huì)非常影響性能。
顯然對(duì)于批量接口,加redis分布式鎖,不是一個(gè)理想的方案。
8. 統(tǒng)一mq異步處理
前面我們已經(jīng)聊過(guò),在批量復(fù)制商品的接口,我們是通過(guò)RocketMQ的順序消息,單線程異步復(fù)制添加商品的,可以暫時(shí)解決商品重復(fù)的問(wèn)題。
但那只改了一個(gè)添加商品的入口,還有其他添加商品的入口。
能不能把添加商品的底層邏輯統(tǒng)一一下,最終都調(diào)用同一段代碼。然后通過(guò)RocketMQ的順序消息,單線程異步添加商品。
主要流程如下圖所示:
這樣確實(shí)能夠解決重復(fù)商品的問(wèn)題。
但同時(shí)也帶來(lái)了另外兩個(gè)問(wèn)題:
現(xiàn)在所有的添加商品功能都改成異步了,之前同步添加商品的接口如何返回?cái)?shù)據(jù)呢?這就需要修改前端交互,否則會(huì)影響用戶體驗(yàn)。
之前不同的添加商品入口,是多線程添加商品的,現(xiàn)在改成只能由一個(gè)線程添加商品,這樣修改的結(jié)果導(dǎo)致添加商品的整體效率降低了。
由此,綜合考慮了一下各方面因素,這個(gè)方案最終被否定了。
9. insert on duplicate key update
其實(shí),在mysql中存在這樣的語(yǔ)法,即:insert on duplicate key update。
在添加數(shù)據(jù)時(shí),mysql發(fā)現(xiàn)數(shù)據(jù)不存在,則直接insert。如果發(fā)現(xiàn)數(shù)據(jù)已經(jīng)存在了,則做update操作。
不過(guò)要求表中存在唯一索引或PRIMARY KEY,這樣當(dāng)這兩個(gè)值相同時(shí),才會(huì)觸發(fā)更新操作,否則是插入。
現(xiàn)在的問(wèn)題是PRIMARY KEY是商品表的主鍵,是根據(jù)雪花算法提前生成的,不可能產(chǎn)生重復(fù)的數(shù)據(jù)。
但由于商品表有邏輯刪除功能,導(dǎo)致唯一索引在商品表中創(chuàng)建不了。
由此,insert on duplicate key update這套方案,暫時(shí)也沒(méi)法用。
此外,insert on duplicate key update在高并發(fā)的情況下,可能會(huì)產(chǎn)生死鎖問(wèn)題,需要特別注意一下。
10. insert ignore
在mysql中還存在這樣的語(yǔ)法,即:insert ... ignore。
在insert語(yǔ)句執(zhí)行的過(guò)程中:mysql發(fā)現(xiàn)如果數(shù)據(jù)重復(fù)了,就忽略,否則就會(huì)插入。
它主要是用來(lái)忽略,插入重復(fù)數(shù)據(jù)產(chǎn)生的Duplicate entry 'XXX' for key 'XXXX'異常的。
不過(guò)也要求表中存在唯一索引或PRIMARY KEY。
但由于商品表有邏輯刪除功能,導(dǎo)致唯一索引在商品表中創(chuàng)建不了。
由此可見(jiàn),這個(gè)方案也不行。
溫馨的提醒一下,使用insert ... ignore也有可能會(huì)導(dǎo)致死鎖。
11. 防重表
之前聊過(guò),因?yàn)橛羞壿媱h除功能,給商品表加唯一索引,行不通。
后面又說(shuō)了加分布式鎖,或者通過(guò)mq單線程異步添加商品,影響創(chuàng)建商品的性能。
那么,如何解決問(wèn)題呢?
我們能否換一種思路,加一張防重表,在防重表中增加商品表的name和model字段作為唯一索引。
例如:
CREATETABLE`product_unique`( `id`bigint(20)NOTNULLCOMMENT'id', `name`varchar(130)DEFAULTNULLCOMMENT'名稱', `model`varchar(255)NOTNULLCOMMENT'規(guī)格', `user_id`bigint(20)unsignedNOTNULLCOMMENT'創(chuàng)建用戶id', `user_name`varchar(30)NOTNULLCOMMENT'創(chuàng)建用戶名稱', `create_date`datetime(3)NOTNULLDEFAULTCURRENT_TIMESTAMP(3)COMMENT'創(chuàng)建時(shí)間', PRIMARYKEY(`id`), UNIQUEKEY`ux_name_model`(`name`,`model`) )ENGINE=InnoDBDEFAULTCHARSET=utf8mb4COMMENT='商品防重表';
其中表中的id可以用商品表的id,表中的name和model就是商品表的name和model,不過(guò)在這張防重表中增加了這兩個(gè)字段的唯一索引。
視野一下子被打開(kāi)了。
在添加商品數(shù)據(jù)之前,先添加防重表。如果添加成功,則說(shuō)明可以正常添加商品,如果添加失敗,則說(shuō)明有重復(fù)數(shù)據(jù)。
防重表添加失敗,后續(xù)的業(yè)務(wù)處理,要根據(jù)實(shí)際業(yè)務(wù)需求而定。
如果業(yè)務(wù)上允許添加一批商品時(shí),發(fā)現(xiàn)有重復(fù)的,直接拋異常,則可以提示用戶:系統(tǒng)檢測(cè)到重復(fù)的商品,請(qǐng)刷新頁(yè)面重試。
例如:
try{ transactionTemplate.execute((status)->{ productUniqueMapper.batchInsert(productUniqueList); productMapper.batchInsert(productList); returnBoolean.TRUE; }); }catch(DuplicateKeyExceptione){ thrownewBusinessException("系統(tǒng)檢測(cè)到重復(fù)的商品,請(qǐng)刷新頁(yè)面重試"); }
在批量插入數(shù)據(jù)時(shí),如果出現(xiàn)了重復(fù)數(shù)據(jù),捕獲DuplicateKeyException異常,轉(zhuǎn)換成BusinessException這樣運(yùn)行時(shí)的業(yè)務(wù)異常。
還有一種業(yè)務(wù)場(chǎng)景,要求即使出現(xiàn)了重復(fù)的商品,也不拋異常,讓業(yè)務(wù)流程也能夠正常走下去。
例如:
try{ transactionTemplate.execute((status)->{ productUniqueMapper.insert(productUnique); productMapper.insert(product); returnBoolean.TRUE; }); }catch(DuplicateKeyExceptione){ product=productMapper.query(product); }
在插入數(shù)據(jù)時(shí),如果出現(xiàn)了重復(fù)數(shù)據(jù),則捕獲DuplicateKeyException,在catch代碼塊中再查詢一次商品數(shù)據(jù),將數(shù)據(jù)庫(kù)已有的商品直接返回。
如果調(diào)用了同步添加商品的接口,這里非常關(guān)鍵的一點(diǎn),是要返回已有數(shù)據(jù)的id,業(yè)務(wù)系統(tǒng)做后續(xù)操作,要拿這個(gè)id操作。
當(dāng)然在執(zhí)行execute之前,還是需要先查一下商品數(shù)據(jù)是否存在,如果已經(jīng)存在,則直接返回已有數(shù)據(jù),如果不存在,才執(zhí)行execute方法。這一步千萬(wàn)不能少。
例如:
ProductoldProduct=productMapper.query(product); if(Objects.nonNull(oldProduct)){ returnoldProduct; } try{ transactionTemplate.execute((status)->{ productUniqueMapper.insert(productUnique); productMapper.insert(product); returnBoolean.TRUE; }); }catch(DuplicateKeyExceptione){ product=productMapper.query(product); } returnproduct;
千萬(wàn)注意:防重表和添加商品的操作必須要在同一個(gè)事務(wù)中,否則會(huì)出問(wèn)題。
順便說(shuō)一下,還需要對(duì)商品的刪除功能做特殊處理一下,在邏輯刪除商品表的同時(shí),要物理刪除防重表。用商品表id作為查詢條件即可。
說(shuō)實(shí)話,解決重復(fù)數(shù)據(jù)問(wèn)題的方案挺多的,沒(méi)有最好的方案,只有最適合業(yè)務(wù)場(chǎng)景的,最優(yōu)的方案。
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7073瀏覽量
89138 -
代碼
+關(guān)注
關(guān)注
30文章
4797瀏覽量
68707 -
線程
+關(guān)注
關(guān)注
0文章
505瀏覽量
19703 -
Redis
+關(guān)注
關(guān)注
0文章
376瀏覽量
10887 -
并發(fā)
+關(guān)注
關(guān)注
0文章
7瀏覽量
2508
原文標(biāo)題:去阿里面試到第二輪就被虐慘:高并發(fā)下怎么防止數(shù)據(jù)重復(fù)?
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論