一、數(shù)據(jù)傾斜的基本概念
01 什么是數(shù)據(jù)傾斜?
用最通俗易懂的話來說,數(shù)據(jù)傾斜無非就是大量的相同key被partition分配到一個(gè)分區(qū)里,造成了'一個(gè)人累死,其他人閑死'的情況,這種情況是我們不能接受的,這也違背了并行計(jì)算的初衷,首先一個(gè)節(jié)點(diǎn)要承受著巨大的壓力,而其他節(jié)點(diǎn)計(jì)算完畢后要一直等待這個(gè)忙碌的節(jié)點(diǎn),也拖累了整體的計(jì)算時(shí)間,可以說效率是十分低下的。
02? 數(shù)據(jù)傾斜發(fā)生時(shí)的現(xiàn)象?
(1)絕大多數(shù)task執(zhí)行得都非常快,但個(gè)別task執(zhí)行的極慢。
(2)原本能正常執(zhí)行的Spark作業(yè),某天突然爆出OOM(內(nèi)存溢出)異常。觀察異常棧,是我們寫的業(yè)務(wù)代碼造成的。
03 通用的常規(guī)解決方案
(1)增加jvm內(nèi)存,這適用于第一種情況(唯一值非常少,極少數(shù)值有非常多的記錄值(唯一值少于幾千)),這種情況下,往往只能通過硬件的手段來進(jìn)行調(diào)優(yōu),增加jvm內(nèi)存可以顯著的提高運(yùn)行效率。
(2)增加reduce的個(gè)數(shù),這適用于第二種情況(唯一值比較多,這個(gè)字段的某些值有遠(yuǎn)遠(yuǎn)多于其他值的記錄數(shù),但是它的占比也小于百分之一或千分之一),我們知道,這種情況下,最容易造成的結(jié)果就是大量相同key被partition到一個(gè)分區(qū),從而一個(gè)reduce執(zhí)行了大量的工作,而如果我們?cè)黾恿藃educe的個(gè)數(shù),這種情況相對(duì)來說會(huì)減輕很多,畢竟計(jì)算的節(jié)點(diǎn)多了,就算工作量還是不均勻的,那也要小很多。
(3)自定義分區(qū),這需要用戶自己繼承partition類,指定分區(qū)策略,這種方式效果比較顯著。
(4)重新設(shè)計(jì)key,有一種方案是在map階段時(shí)給key加上一個(gè)隨機(jī)數(shù),有了隨機(jī)數(shù)的key就不會(huì)被大量的分配到同一節(jié)點(diǎn)(小幾率),待到reduce后再把隨機(jī)數(shù)去掉即可。
(5)使用combinner合并,combinner是在map階段,reduce之前的一個(gè)中間階段,在這個(gè)階段可以選擇性的把大量的相同key數(shù)據(jù)先進(jìn)行一個(gè)合并,可以看做是local reduce,然后再交給reduce來處理,這樣做的好。
04 通用定位發(fā)生數(shù)據(jù)傾斜的代碼
(1)數(shù)據(jù)傾斜只會(huì)發(fā)生在shuffle中,下面是常用的可能會(huì)觸發(fā)shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出現(xiàn)數(shù)據(jù)傾斜時(shí),可能就是代碼中使用了這些算子的原因。
(2)通過觀察spark UI,定位數(shù)據(jù)傾斜發(fā)生在第幾個(gè)stage中,如果是用yarn-client模式提交,那么本地是可以直接看到log的,可以在log中找到當(dāng)前運(yùn)行到了第幾個(gè)stage;如果用yarn-cluster模式提交,可以通過Spark Web UI 來查看當(dāng)前運(yùn)行到了第幾個(gè)stage。此外,無論是使用了yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI 上深入看一下當(dāng)前這個(gè)stage各個(gè)task分配的數(shù)據(jù)量,從而進(jìn)一步確定是不是task分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜。
二、 Hive數(shù)據(jù)傾斜
1、Hive的執(zhí)行是分階段的,map處理數(shù)據(jù)量的差異取決于上一個(gè)stage的reduce輸出,所以如何將數(shù)據(jù)均勻的分配到各個(gè)reduce中,就是解決數(shù)據(jù)傾斜的根本所在。
2 、造成數(shù)據(jù)傾斜的原因
1)、key分布不均勻
2)、業(yè)務(wù)數(shù)據(jù)本身的特性
3)、建表時(shí)考慮不周
4)、某些SQL語句本身就有數(shù)據(jù)傾斜
3 、數(shù)據(jù)傾斜的表現(xiàn):
數(shù)據(jù)傾斜出現(xiàn)在SQL算子中包含join/group by/等聚合操作時(shí),大量的相同KEY被分配到少量的reduce去處理。導(dǎo)致絕大多數(shù)TASK執(zhí)行得都非???,但個(gè)別TASK執(zhí)行的極慢,原本能正常執(zhí)行的作業(yè),某天突然爆出OOM(內(nèi)存溢出)異常。任務(wù)進(jìn)度長時(shí)間維持在99%(或100%)。任務(wù)監(jiān)控頁面,發(fā)現(xiàn)只有少量(1個(gè)或幾個(gè))reduce子任務(wù)未完成。因?yàn)槠涮幚淼臄?shù)據(jù)量和其他reduce差異過大。單一reduce的記錄數(shù)與平均記錄數(shù)差異過大,通??赡苓_(dá)到3倍甚至更多。 最長時(shí)長遠(yuǎn)大于平均時(shí)長??梢圆榭淳唧wjob的reducer counter計(jì)數(shù)器協(xié)助定位。
4、數(shù)據(jù)傾斜的解決方案:
1)參數(shù)調(diào)節(jié):
hive.map.aggr=true(是否在Map端進(jìn)行聚合,默認(rèn)為true),這個(gè)設(shè)置可以將頂層的聚合操作放在Map階段執(zhí)行,從而減輕清洗階段數(shù)據(jù)傳輸和Reduce階段的執(zhí)行時(shí)間,提升總體性能 Set hive.groupby.skewindata=true(hive自動(dòng)進(jìn)行負(fù)載均衡)
2)SQL語句調(diào)節(jié)
a、如何Join: 關(guān)于驅(qū)動(dòng)表的選取,選用join key分布最均勻的表作為驅(qū)動(dòng)表。 做好列裁剪和filter操作,以達(dá)到兩表做join的時(shí)候,數(shù)據(jù)量相對(duì)變小的效果,避免笛卡爾積。 Hive中進(jìn)行表的關(guān)聯(lián)查詢時(shí),盡可能將較大的表放在Join之后。
b、大小表Join,開啟mapjoin
mapjoin的原理: MapJoin 會(huì)把小表全部讀入內(nèi)存中,在map階段直接拿另外一個(gè)表的數(shù)據(jù)和內(nèi)存中表數(shù)據(jù)做匹配,由于在map是進(jìn)行了join操作,省去了reduce 階段,運(yùn)行的效率就會(huì)高很多。參與連接的小表的行數(shù),以不超過2萬條為宜,大小不超過25M。
設(shè)置參數(shù)
set hive.auto.convert.join=true; hive.mapjoin.smalltable.filesize=25000000( 即25M)?手動(dòng)指定
-- a 表是大表,數(shù)據(jù)量是百萬級(jí)別
-- b 表是小表,數(shù)據(jù)量在百級(jí)別,mapjion括號(hào)中的b就是指定哪張表為小表
select /*+mapjoin(b)*/ a.field1asfield1, b.field2asfield2, b.field3asfield3 fromaleftjoinb on a.field1 = b.field1;c、大表Join大表:
null值不參與連接,簡單舉例
select field1,field2,field3… fromlogaleftjoinuserbona.useridisnotnullanda.userid=b.userid unionselectfield1,field2,field3fromlogwhereuseridisnull;
將熱點(diǎn)key打散,但是需要注意,盡量不要在join時(shí),對(duì)關(guān)聯(lián)key使用rand()函數(shù)。因?yàn)樵趆ive中當(dāng)遇到map失敗重算時(shí),就會(huì)出現(xiàn)數(shù)據(jù)重復(fù)(數(shù)據(jù)丟失)的問題,spark引擎使用rand容易導(dǎo)致task失敗重新計(jì)算的時(shí)候偶發(fā)不一致的問題??梢允褂胢d5加密唯一維度值的方式替代rand(), 比如: md5(concat(coalesce(sku_id, 0), '_', coalesce(dim_store_num, 0), '_', coalesce(store_id, 0), '_',coalesce(delv_center_id, 0))),其中concat的字段是表的唯一粒度;也可以使用hash。
d、count distinct大量相同特殊值,使用sum...group by代替count(distinct ) 例如
selecta,count(distinctb)fromtgroupbya 可以寫成selecta,sum(1)from(selecta,bfromtgroupbya,b)groupbya;
select count (distinct key) from a 可以寫成 Select sum(1) from (Select key from a group by key) t特殊情況特殊處理:在業(yè)務(wù)邏輯優(yōu)化效果的不大情況下,有些時(shí)候是可以將傾斜的數(shù)據(jù)單獨(dú)拿出來處理。最后union回去
e、 不管是join還是groupby 請(qǐng)先在內(nèi)層先進(jìn)行數(shù)據(jù)過濾,建議只保留需要的key值
f、 取最大最小值盡量使用min/max;不要采用row_number
g、 不要直接select * ;在內(nèi)層做好數(shù)據(jù)過濾
h、 盡量使用sort by替換order by
i、 明確數(shù)據(jù)源,有上層匯總的就不要使用基礎(chǔ)fdm或明細(xì)表
J、join避免多對(duì)多關(guān)聯(lián)
在join鏈接查詢時(shí),確認(rèn)是否存在多對(duì)多的關(guān)聯(lián),起碼保證有一個(gè)表的結(jié)果集的關(guān)聯(lián)字段不重復(fù)。
5、典型的業(yè)務(wù)場景舉例
(1)空值產(chǎn)生的數(shù)據(jù)傾斜
場景:如日志中,常會(huì)有信息丟失的問題,比如日志中的 user_id,如果取其中的 user_id 和 用戶表中的user_id 關(guān)聯(lián),會(huì)碰到數(shù)據(jù)傾斜的問題。
解決方法1: user_id為空的不參與關(guān)聯(lián)
select * from log a join users b on a.user_id is not null and a.user_id = b.user_idunion allselect * from log a where a.user_id is null;(2)不同數(shù)據(jù)類型關(guān)聯(lián)產(chǎn)生數(shù)據(jù)傾斜
場景:用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當(dāng)按照user_id進(jìn)行兩個(gè)表的Join操作時(shí),默認(rèn)的Hash操作會(huì)按int型的id來進(jìn)行分配,這樣會(huì)導(dǎo)致所有string類型id的記錄都分配到一個(gè)Reducer中。
解決方法:把數(shù)字類型轉(zhuǎn)換成字符串類型
select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)(3)小表不小不大,怎么用 map join 解決傾斜問題
使用 map join 解決小表(記錄數(shù)少)關(guān)聯(lián)大表的數(shù)據(jù)傾斜問題,這個(gè)方法使用的頻率非常高,但如果小表很大,大到map join會(huì)出現(xiàn)bug或異常,這時(shí)就需要特別的處理 。
select * from log a left outer join users b on a.user_id = b.user_id;users 表有 600w+ 的記錄,把 users 分發(fā)到所有的 map 上也是個(gè)不小的開銷,而且 map join 不支持這么大的小表。如果用普通的 join,又會(huì)碰到數(shù)據(jù)傾斜的問題。 解決方法:
select /*+mapjoin(x)*/* from log a left outer join ( select /*+mapjoin(c)*/d.* from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;log里user_id有上百萬個(gè),這就又回到原來map join問題。所幸,每日的會(huì)員uv不會(huì)太多,有交易的會(huì)員不會(huì)太多,有點(diǎn)擊的會(huì)員不會(huì)太多,有傭金的會(huì)員不會(huì)太多等等。所以這個(gè)方法能解決很多場景下的數(shù)據(jù)傾斜問題。
(4)業(yè)務(wù)邏輯突發(fā)熱key的處理(真實(shí)線上問題) 業(yè)務(wù)場景舉例:
流量數(shù)據(jù)多個(gè)設(shè)備號(hào)對(duì)應(yīng)了一個(gè)安裝id,突發(fā)某幾個(gè)安裝id數(shù)量級(jí)特別大。在歸一環(huán)節(jié)中,按照安裝id進(jìn)行分發(fā)reduce,再進(jìn)行處理,異常熱key會(huì)造成單一節(jié)點(diǎn)處理數(shù)據(jù)量大,由于數(shù)據(jù)傾斜從而導(dǎo)致任務(wù)卡死的情況。
解決方案:基于小時(shí)任務(wù),提前設(shè)置一個(gè)異常范圍,把異常安裝id和對(duì)應(yīng)的aid撈出來,寫到維表里面。按照歸一邏輯,優(yōu)先使用aid值作為歸一結(jié)果,所以在歸一任務(wù)中,讀取異常值,隨機(jī)分發(fā)到reduce中,并將aid賦值給歸一字段,這樣就避免了熱點(diǎn)處理。
總結(jié):
1、對(duì)于join,在判斷小表不大于1G的情況下,使用map join
2、對(duì)于group by或distinct,設(shè)定 hive.groupby.skewindata=true
3、盡量使用上述的SQL語句調(diào)節(jié)進(jìn)行優(yōu)化
6、數(shù)據(jù)傾斜的監(jiān)控預(yù)防
(1)測(cè)試的時(shí)候需要關(guān)注數(shù)據(jù)分布,針對(duì)不同日期、關(guān)鍵指標(biāo)、重點(diǎn)key、枚舉值等
(2)增加數(shù)據(jù)質(zhì)量監(jiān)控,數(shù)據(jù)計(jì)算的每層任務(wù)增加數(shù)據(jù)質(zhì)量監(jiān)控。
(3)L0任務(wù),大數(shù)據(jù)平臺(tái)需要有健康度巡檢,對(duì)資源、參數(shù)配置,數(shù)據(jù)傾斜、穩(wěn)定性等做任務(wù)健康度打分,從而發(fā)現(xiàn)數(shù)據(jù)傾斜的趨勢(shì),及早檢查任務(wù)
三、spark數(shù)據(jù)傾斜
Spark優(yōu)化數(shù)據(jù)傾斜的思路,join方式從SMJ方式改成BMJ的方式,但是只適合大小表的情況。優(yōu)化思路一般是: 改join方式,開啟spark自適應(yīng)框架,優(yōu)化sql。
1、開啟sparksql的數(shù)據(jù)傾斜時(shí)的自適應(yīng)關(guān)聯(lián)優(yōu)化
spark.shuffle.statistics.verbose=true打開后MapStatus會(huì)采集每個(gè)partition條數(shù)的信息,用于傾斜處理。
2 、Sortmergejoin 改成 BroadcastHashJoin。調(diào)大BroadcastHashJoin的閾值。
在某些場景下可以把SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin而避免shuffle產(chǎn)生的數(shù)據(jù)傾斜。 增加參數(shù):
spark.sql.autoBroadcastJoinThreshold=524288000將BHJ的閾值提高到500M
3、優(yōu)化sql同hive
4、傾斜KEY查找
需要結(jié)合實(shí)際業(yè)務(wù)代碼,查找到引起Shuffle的算子,并按照以下兩種方式查找大KEY。?
方式一:通過SQL抽樣傾斜KEY
適用場景:如果數(shù)據(jù)量比較小的情況下,通過SQL的方式驗(yàn)證比較便捷 。
操作步驟:
1、針對(duì)KEY進(jìn)行數(shù)量統(tǒng)計(jì)
2、按照數(shù)量從大到小進(jìn)行排序
3、直接取 limit N 即可?
方式二:通過sample抽樣傾斜KEY
適用場景:如果數(shù)據(jù)量很大,可以通過抽樣進(jìn)行抽取大KEY。能否抽取到大KEY一般和抽取數(shù)據(jù)比例有關(guān)系。
操作步驟:
1、對(duì)KEY賦值為1,便于下一步進(jìn)行計(jì)數(shù)
2、對(duì)KEY進(jìn)行累計(jì)
3、對(duì)KEY和VALUE交換
4、針對(duì)KEY按照字典進(jìn)行倒排
5、將KEY和VAlUE位置交換,還原到真實(shí)的
6、從已排序的RDD中,直接取前N條
數(shù)據(jù)傾斜一般由Shuffle時(shí)數(shù)據(jù)不均勻?qū)е?,一般有三類算子?huì)產(chǎn)生Shuffle:Aggregation (groupBy)、Join、Window。 01 Aggregation
建議打散key進(jìn)行二次聚合:采用對(duì) 非constant值、與key無關(guān) 的列進(jìn)行hash取模,不要使用rand類函數(shù)。
dataframe .groupBy(col("key"),pmod(hash(col("some_col")),100)).agg(max("value").as("partial_max")) .groupBy(col("key")).agg(max("partial_max").as("max"))02? Window
目前支持該模式下的傾斜window,(僅支持3.0)
select (... row_number() over(partition by ... order by ...) as rn) wherern[==|<=|<]?k?and?other?conditionsspark.sql.rankLimit.enabled=true?(目前支持基于row_number的topK計(jì)算邏輯)03? Shuffled Join
Spark 2.4開啟參數(shù)
spark.sql.adaptive.enabled=true spark.shuffle.statistics.verbose=true spark.sql.adaptive.skewedJoin.enabled=true spark.sql.adaptive.allowAdditionalShuffle=true如果不能處理,建議用戶自行定位熱點(diǎn)數(shù)據(jù)進(jìn)行處理 Spark 3.0
spark.sql.adaptive.enabled=true spark.sql.adaptive.skewJoin.enabled=true spark.sql.adaptive.skewJoin.enhance.enabled=true (通用傾斜算法,可處理更多場景) spark.sql.adaptive.forceOptimizeSkewedJoin=true(允許插入額外shuffle,可處理更多場景)
其他參數(shù):
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (默認(rèn)為256MB,分區(qū)大小超過該閾值才可被識(shí)別為傾斜分區(qū),如果希望調(diào)整的傾斜分區(qū)小于該閾值,可以酌情調(diào)?。?
spark.sql.adaptive.skewJoin.skewedPartitionFactor (默認(rèn)為5,分區(qū)大小超過中位數(shù)Xfactor才可被識(shí)別為傾斜分區(qū),一般不需要調(diào)整)? spark.sql.adaptive.skewJoin.enhance.maxJoins (默認(rèn)5,通用傾斜算法中,如果shuffled join超過此閾值則不處理,一般不需要調(diào)整)? spark.sql.adaptive.skewJoin.enhance.maxSplitsPerPartition (默認(rèn)1000,通用傾斜算法中,盡量使得每個(gè)傾斜分區(qū)的劃分不超過該閾值,一般不需要調(diào)整)?
04 數(shù)據(jù)膨脹(Join)
spark.sql.adaptive.skewJoin.inflation.enabled=true(默認(rèn)false,由于采樣計(jì)算會(huì)導(dǎo)致性能回歸,正常任務(wù)不要開啟) spark.sql.adaptive.skewJoin.inflation.factor=50(默認(rèn)為100,預(yù)估的分區(qū)輸出大小超過中位數(shù)Xfactor才可被識(shí)別為膨脹分區(qū),由于預(yù)估算法存在誤差,一般不要低于50) spark.sql.adaptive.shuffle.sampleSizePerPartition=500(默認(rèn)100,每個(gè)Task中的采樣數(shù),基于該采樣數(shù)據(jù)預(yù)估Join之后的分區(qū)大小,如果Task數(shù)量不大,可以酌情調(diào)大)05 傾斜key檢測(cè)(Join)
由于Join語義限制,對(duì)于A left join skewed B之類的場景,無法對(duì)B進(jìn)行劃分處理,否則會(huì)導(dǎo)致數(shù)據(jù)正確性問題,這也是Spark項(xiàng)目所面臨的難題。如果開啟以上功能依然不能處理數(shù)據(jù)傾斜,可以通過開啟傾斜key檢測(cè)功能來定位是哪些key導(dǎo)致了傾斜或膨脹,繼而進(jìn)行過濾等處理。
spark.sql.adaptive.shuffle.detectSkewness=true(默認(rèn)false,由于采樣計(jì)算會(huì)導(dǎo)致性能回歸,正常任務(wù)不要開啟)其他參數(shù):
spark.sql.adaptive.shuffle.sampleSizePerPartition=100(默認(rèn)100,每個(gè)Task中的采樣數(shù),如果Task數(shù)量不大,可以酌情調(diào)大)
審核編輯機(jī):劉清
-
計(jì)數(shù)器
+關(guān)注
關(guān)注
32文章
2256瀏覽量
94679 -
SQL
+關(guān)注
關(guān)注
1文章
766瀏覽量
44169 -
RDD
+關(guān)注
關(guān)注
0文章
7瀏覽量
7984 -
JVM
+關(guān)注
關(guān)注
0文章
158瀏覽量
12238
原文標(biāo)題:淺談離線數(shù)據(jù)傾斜
文章出處:【微信號(hào):OSC開源社區(qū),微信公眾號(hào):OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論