常見的數(shù)據(jù)傾斜是怎么造成的?
Shuffle的時(shí)候,將各個(gè)節(jié)點(diǎn)上相同的key拉取到某個(gè)節(jié)點(diǎn)的一個(gè)task進(jìn)行處理,比如按照key進(jìn)行聚合或join等操作,如果某個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別大的話,就會(huì)發(fā)生數(shù)據(jù)傾斜現(xiàn)象。數(shù)據(jù)傾斜就成為了整個(gè)task運(yùn)行時(shí)間的短板。
觸發(fā)shuffle的常見算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
要解決數(shù)據(jù)傾斜的問題,首先要定位數(shù)據(jù)傾斜發(fā)生在什么地方。
首先是哪個(gè)stage,直接在Web UI上看就可以,一般出現(xiàn)傾斜都是耗時(shí)特別長(zhǎng)的Stage,然后查看運(yùn)行耗時(shí)的task,一般是其中的某幾個(gè)Task一直拖著,其他的Task早已經(jīng)完成了,根據(jù)這個(gè)task,根據(jù)stage劃分原理,推算出數(shù)據(jù)傾斜發(fā)生在哪個(gè)shuffle類算子上。
如何查看發(fā)生傾斜的RDD呢?
如果是Spark RDD執(zhí)行shuffle算子導(dǎo)致的數(shù)據(jù)傾斜,那么可以在Spark作業(yè)中加入查看key分布的代碼,比如RDD.countByKey()。然后對(duì)統(tǒng)計(jì)出來(lái)各個(gè)key出現(xiàn)的次數(shù),collect、take到客戶端打印一下,就可以看到key的分布情況。
以下方法可以大概看出哪個(gè)key出現(xiàn)了傾斜:
JavaPairRDDhssData = getHssData(fs, sc, hssPath);
JavaPairRDDsample = hssData.sample(false, 0.1);
MapcountByKey = sample.countByKey();
出現(xiàn)傾斜的key有兩種情況:
1、某個(gè)可以出現(xiàn)傾斜
2、多個(gè)key出現(xiàn)傾斜
某個(gè)Key出現(xiàn)傾斜解決辦法:
通過(guò)上述方法可以知道是哪個(gè)Key出現(xiàn)了傾斜,所以可以先通過(guò)filter方法過(guò)濾掉傾斜的Key,把傾斜的Key和沒有傾斜的Key分開處理,由于Spark運(yùn)行機(jī)制,所以單獨(dú)處理傾斜Key的時(shí)候就不會(huì)再出現(xiàn)傾斜現(xiàn)象。
上述方法只能處理特定的數(shù)據(jù)傾斜,對(duì)于實(shí)際的生產(chǎn)環(huán)境可能并不怎么適用,這事是解決傾斜的其中一個(gè)方法。
多個(gè)Key出現(xiàn)傾斜的解決辦法:
原理:在傾斜Shuffle之前給每一個(gè)Key都加上一個(gè)隨機(jī)前綴,然后再給加了前綴的Key進(jìn)行一個(gè)Shuffle操作,在Shuffle操作后再把Key的前綴去掉。在這個(gè)過(guò)程中由于前綴的加入,會(huì)把傾斜的Key隨機(jī)的分配到不同的Task。然后去掉前綴從而解決數(shù)據(jù)傾斜的問題。
private static JavaPairRDDrepar(
JavaPairRDD。Cdr) {
JavaPairRDDmapToPair;
try {
mapToPair = 。Cdr
.mapToPair(new PairFunctiontuple2, String, agg() {
@Override
public Tuple2call(Tuple2t)
throws Exception {
//產(chǎn)生隨機(jī)前綴,隨機(jī)數(shù)大小看情況決定
long i = (long) (Math.random() * 150);
//添加隨機(jī)數(shù)前綴
return new Tuple2(i + _ + t._1, t._2);
}
}).sortByKey()//進(jìn)行一個(gè)Shuffle操作打亂Key
//去掉隨機(jī)數(shù)前綴
.mapToPair(new PairFunctiontuple2, String, agg() {
@Override
public Tuple2call(Tuple2t)
throws Exception {
String str = t.1.split()[0];
return new Tuple2(str, t._2);
}
});
} catch (Exception e) {
return null;
}
return mapToPair;
}
以上是解決RDD數(shù)據(jù)傾斜簡(jiǎn)單方法。
-
RDD
+關(guān)注
關(guān)注
0文章
7瀏覽量
7986 -
大數(shù)據(jù)
+關(guān)注
關(guān)注
64文章
8903瀏覽量
137616
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論