在 MapReduce 框架中, Shuffle 階段是連接 Map 與 Reduce 之間的橋梁, Map 階段通過 Shuffle 過程將數(shù)據(jù)輸出到 Reduce 階段中。由于 Shuffle 涉及磁盤的讀寫和網(wǎng)絡(luò) I/O,因此 Shuffle 性能的高低直接影響整個程序的性能。Spark 也有 Map 階段和 Reduce 階段,因此也會出現(xiàn) Shuffle 。
Spark Shuffle
Spark Shuffle 分為兩種:一種是基于 Hash 的 Shuffle;另一種是基于 Sort 的 Shuffle。先介紹下它們的發(fā)展歷程,有助于我們更好的理解 Shuffle:
在 Spark 1.1 之前, Spark 中只實(shí)現(xiàn)了一種 Shuffle 方式,即基于 Hash 的 Shuffle 。在 Spark 1.1 版本中引入了基于 Sort 的 Shuffle 實(shí)現(xiàn)方式,并且 Spark 1.2 版本之后,默認(rèn)的實(shí)現(xiàn)方式從基于 Hash 的 Shuffle 修改為基于 Sort 的 Shuffle 實(shí)現(xiàn)方式,即使用的 ShuffleManager 從默認(rèn)的 hash 修改為 sort。在 Spark 2.0 版本中, Hash Shuffle 方式己經(jīng)不再使用。
Spark 之所以一開始就提供基于 Hash 的 Shuffle 實(shí)現(xiàn)機(jī)制,其主要目的之一就是為了避免不需要的排序,大家想下 Hadoop 中的 MapReduce,是將 sort 作為固定步驟,有許多并不需要排序的任務(wù),MapReduce 也會對其進(jìn)行排序,造成了許多不必要的開銷。
在基于 Hash 的 Shuffle 實(shí)現(xiàn)方式中,每個 Mapper 階段的 Task 會為每個 Reduce 階段的 Task生成一個文件,通常會產(chǎn)生大量的文件(即對應(yīng)為 M*R 個中間文件,其中, M 表示 Mapper階段的 Task 個數(shù), R 表示 Reduce 階段的 Task 個數(shù)) 伴隨大量的隨機(jī)磁盤 I/O 操作與大量的內(nèi)存開銷。
為了緩解上述問題,在 Spark 0.8.1 版本中為基于 Hash 的 Shuffle 實(shí)現(xiàn)引入了 ShuffleConsolidate 機(jī)制(即文件合并機(jī)制),將 Mapper 端生成的中間文件進(jìn)行合并的處理機(jī)制。通過配置屬性 spark.shuffie.consolidateFiles=true,減少中間生成的文件數(shù)量。通過文件合并,可以將中間文件的生成方式修改為每個執(zhí)行單位為每個 Reduce階段的 Task 生成一個文件。
執(zhí)行單位對應(yīng)為:每個 Mapper 端的 Cores 數(shù)/每個 Task分配的 Cores 數(shù)(默認(rèn)為 1) 。最終可以將文件個數(shù)從 MR 修改為 EC/T*R,其中,E 表示 Executors 個數(shù), C 表示可用 Cores 個數(shù), T 表示 Task 分配的 Cores 數(shù)。
Spark1.1 版本引入了 Sort Shuffle:
基于 Hash 的 Shuffle 的實(shí)現(xiàn)方式中,生成的中間結(jié)果文件的個數(shù)都會依賴于 Reduce 階段的 Task 個數(shù),即 Reduce 端的并行度,因此文件數(shù)仍然不可控,無法真正解決問題。為了更好地解決問題,在 Spark1.1 版本引入了基于 Sort 的 Shuffle 實(shí)現(xiàn)方式,并且在 Spark 1.2 版本之后,默認(rèn)的實(shí)現(xiàn)方式也從基于 Hash 的 Shuffle,修改為基于 Sort 的 Shuffle 實(shí)現(xiàn)方式,即使用的 ShuffleManager 從默認(rèn)的 hash 修改為 sort。
在基于 Sort 的 Shuffle 中,每個 Mapper 階段的 Task 不會為每 Reduce 階段的 Task 生成一個單獨(dú)的文件,而是全部寫到一個數(shù)據(jù)(Data)文件中,同時生成一個索引(Index)文件, Reduce 階段的各個 Task 可以通過該索引文件獲取相關(guān)的數(shù)據(jù)。
避免產(chǎn)生大量文件的直接收益就是降低隨機(jī)磁盤 I/0 與內(nèi)存的開銷。最終生成的文件個數(shù)減少到 2M ,其中 M 表示 Mapper 階段的 Task 個數(shù),每個 Mapper 階段的 Task 分別生成兩個文件(1 個數(shù)據(jù)文件、 1 個索引文件),最終的文件個數(shù)為 M 個數(shù)據(jù)文件與 M 個索引文件。因此,最終文件個數(shù)是 2M 個。
從 Spark 1.4 版本開始,在 Shuffle 過程中也引入了基于 Tungsten-Sort 的 Shuffie 實(shí)現(xiàn)方式,通 Tungsten 項(xiàng)目所做的優(yōu)化,可以極大提高 Spark 在數(shù)據(jù)處理上的性能。(Tungsten 翻譯為中文是鎢絲)
注:在一些特定的應(yīng)用場景下,采用基于 Hash 實(shí)現(xiàn) Shuffle 機(jī)制的性能會超過基于 Sort 的 Shuffle 實(shí)現(xiàn)機(jī)制。
一張圖了解下 Spark Shuffle 的迭代歷史:
Spark Shuffle 迭代歷史
為什么 Spark 最終還是放棄了 HashShuffle ,使用了 Sorted-Based Shuffle?
我們可以從 Spark 最根本要優(yōu)化和迫切要解決的問題中找到答案,使用 HashShuffle 的 Spark 在 Shuffle 時產(chǎn)生大量的文件。當(dāng)數(shù)據(jù)量越來越多時,產(chǎn)生的文件量是不可控的,這嚴(yán)重制約了 Spark 的性能及擴(kuò)展能力,所以 Spark 必須要解決這個問題,減少 Mapper 端 ShuffleWriter 產(chǎn)生的文件數(shù)量,這樣便可以讓 Spark 從幾百臺集群的規(guī)模瞬間變成可以支持幾千臺,甚至幾萬臺集群的規(guī)模。
但使用 Sorted-Based Shuffle 就完美了嗎,答案是否定的,Sorted-Based Shuffle 也有缺點(diǎn),其缺點(diǎn)反而是它排序的特性,它強(qiáng)制要求數(shù)據(jù)在 Mapper 端必須先進(jìn)行排序,所以導(dǎo)致它排序的速度有點(diǎn)慢。好在出現(xiàn)了 Tungsten-Sort Shuffle ,它對排序算法進(jìn)行了改進(jìn),優(yōu)化了排序的速度。Tungsten-SortShuffle 已經(jīng)并入了 Sorted-Based Shuffle,Spark 的引擎會自動識別程序需要的是 Sorted-BasedShuffle,還是 Tungsten-Sort Shuffle。
下面詳細(xì)剖析每個 Shuffle 的底層執(zhí)行原理:
一、Hash Shuffle 解析
以下的討論都假設(shè)每個 Executor 有 1 個 cpu core。
1. HashShuffleManager
shuffle write 階段,主要就是在一個 stage 結(jié)束計算之后,為了下一個 stage 可以執(zhí)行 shuffle 類的算子(比如 reduceByKey),而將每個 task 處理的數(shù)據(jù)按 key 進(jìn)行“劃分”。所謂“劃分”,就是對相同的 key 執(zhí)行 hash 算法,從而將相同 key 都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于下游 stage 的一個 task。在將數(shù)據(jù)寫入磁盤之前,會先將數(shù)據(jù)寫入內(nèi)存緩沖中,當(dāng)內(nèi)存緩沖填滿之后,才會溢寫到磁盤文件中去。
下一個 stage 的 task 有多少個,當(dāng)前 stage 的每個 task 就要創(chuàng)建多少份磁盤文件。比如下一個 stage 總共有 100 個 task,那么當(dāng)前 stage 的每個 task 都要創(chuàng)建 100 份磁盤文件。如果當(dāng)前 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執(zhí)行 5 個 task,那么每個 Executor 上總共就要創(chuàng)建 500 個磁盤文件,所有 Executor 上會創(chuàng)建 5000 個磁盤文件。由此可見,未經(jīng)優(yōu)化的 shuffle write 操作所產(chǎn)生的磁盤文件的數(shù)量是極其驚人的。
shuffle read 階段,通常就是一個 stage 剛開始時要做的事情。此時該 stage 的每一個 task 就需要將上一個 stage 的計算結(jié)果中的所有相同 key,從各個節(jié)點(diǎn)上通過網(wǎng)絡(luò)都拉取到自己所在的節(jié)點(diǎn)上,然后進(jìn)行 key 的聚合或連接等操作。
由于 shuffle write 的過程中,map task 給下游 stage 的每個 reduce task 都創(chuàng)建了一個磁盤文件,因此 shuffle read 的過程中,每個 reduce task 只要從上游 stage 的所有 map task 所在節(jié)點(diǎn)上,拉取屬于自己的那一個磁盤文件即可。
shuffle read 的拉取過程是一邊拉取一邊進(jìn)行聚合的。每個 shuffle read task 都會有一個自己的 buffer 緩沖,每次都只能拉取與 buffer 緩沖相同大小的數(shù)據(jù),然后通過內(nèi)存中的一個 Map 進(jìn)行聚合等操作。聚合完一批數(shù)據(jù)后,再拉取下一批數(shù)據(jù),并放到 buffer 緩沖中進(jìn)行聚合操作。以此類推,直到最后將所有數(shù)據(jù)到拉取完,并得到最終的結(jié)果。
HashShuffleManager 工作原理如下圖所示:
未優(yōu)化的HashShuffleManager工作原理
2. 優(yōu)化的 HashShuffleManager
為了優(yōu)化 HashShuffleManager 我們可以設(shè)置一個參數(shù):spark.shuffle.consolidateFiles,該參數(shù)默認(rèn)值為 false,將其設(shè)置為 true 即可開啟優(yōu)化機(jī)制,通常來說,如果我們使用 HashShuffleManager,那么都建議開啟這個選項(xiàng)。
開啟 consolidate 機(jī)制之后,在 shuffle write 過程中,task 就不是為下游 stage 的每個 task 創(chuàng)建一個磁盤文件了,此時會出現(xiàn)shuffleFileGroup的概念,每個 shuffleFileGroup 會對應(yīng)一批磁盤文件,磁盤文件的數(shù)量與下游 stage 的 task 數(shù)量是相同的。
一個 Executor 上有多少個 cpu core,就可以并行執(zhí)行多少個 task。而第一批并行執(zhí)行的每個 task 都會創(chuàng)建一個 shuffleFileGroup,并將數(shù)據(jù)寫入對應(yīng)的磁盤文件內(nèi)。
當(dāng) Executor 的 cpu core 執(zhí)行完一批 task,接著執(zhí)行下一批 task 時,下一批 task 就會復(fù)用之前已有的 shuffleFileGroup,包括其中的磁盤文件,也就是說,此時 task 會將數(shù)據(jù)寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。
因此,consolidate 機(jī)制允許不同的 task 復(fù)用同一批磁盤文件,這樣就可以有效將多個 task 的磁盤文件進(jìn)行一定程度上的合并,從而大幅度減少磁盤文件的數(shù)量,進(jìn)而提升 shuffle write 的性能。
假設(shè)第二個 stage 有 100 個 task,第一個 stage 有 50 個 task,總共還是有 10 個 Executor(Executor CPU 個數(shù)為 1),每個 Executor 執(zhí)行 5 個 task。那么原本使用未經(jīng)優(yōu)化的 HashShuffleManager 時,每個 Executor 會產(chǎn)生 500 個磁盤文件,所有 Executor 會產(chǎn)生 5000 個磁盤文件的。但是此時經(jīng)過優(yōu)化之后,每個 Executor 創(chuàng)建的磁盤文件的數(shù)量的計算公式為:cpu core的數(shù)量 * 下一個stage的task數(shù)量,也就是說,每個 Executor 此時只會創(chuàng)建 100 個磁盤文件,所有 Executor 只會創(chuàng)建 1000 個磁盤文件。
這個功能優(yōu)點(diǎn)明顯,但為什么 Spark 一直沒有在基于 Hash Shuffle 的實(shí)現(xiàn)中將功能設(shè)置為默認(rèn)選項(xiàng)呢,官方給出的說法是這個功能還欠穩(wěn)定。
優(yōu)化后的 HashShuffleManager 工作原理如下圖所示:
優(yōu)化后的HashShuffleManager工作原理
基于 Hash 的 Shuffle 機(jī)制的優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
可以省略不必要的排序開銷。
避免了排序所需的內(nèi)存開銷。
缺點(diǎn):
生產(chǎn)的文件過多,會對文件系統(tǒng)造成壓力。
大量小文件的隨機(jī)讀寫帶來一定的磁盤開銷。
數(shù)據(jù)塊寫入時所需的緩存空間也會隨之增加,對內(nèi)存造成壓力。
二、SortShuffle 解析
SortShuffleManager 的運(yùn)行機(jī)制主要分成三種:
普通運(yùn)行機(jī)制;
bypass 運(yùn)行機(jī)制,當(dāng) shuffle read task 的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(默認(rèn)為 200),就會啟用 bypass 機(jī)制;
Tungsten Sort 運(yùn)行機(jī)制,開啟此運(yùn)行機(jī)制需設(shè)置配置項(xiàng) spark.shuffle.manager=tungsten-sort。開啟此項(xiàng)配置也不能保證就一定采用此運(yùn)行機(jī)制(后面會解釋)。
1. 普通運(yùn)行機(jī)制
在該模式下,數(shù)據(jù)會先寫入一個內(nèi)存數(shù)據(jù)結(jié)構(gòu)中,此時根據(jù)不同的 shuffle 算子,可能選用不同的數(shù)據(jù)結(jié)構(gòu)。如果是 reduceByKey 這種聚合類的 shuffle 算子,那么會選用 Map 數(shù)據(jù)結(jié)構(gòu),一邊通過 Map 進(jìn)行聚合,一邊寫入內(nèi)存;
如果是 join 這種普通的 shuffle 算子,那么會選用 Array 數(shù)據(jù)結(jié)構(gòu),直接寫入內(nèi)存。接著,每寫一條數(shù)據(jù)進(jìn)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后,就會判斷一下,是否達(dá)到了某個臨界閾值。如果達(dá)到臨界閾值的話,那么就會嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫到磁盤,然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。
在溢寫到磁盤文件之前,會先根據(jù) key 對內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進(jìn)行排序。排序過后,會分批將數(shù)據(jù)寫入磁盤文件。默認(rèn)的 batch 數(shù)量是 10000 條,也就是說,排序好的數(shù)據(jù),會以每批 1 萬條數(shù)據(jù)的形式分批寫入磁盤文件。
寫入磁盤文件是通過 Java 的 BufferedOutputStream 實(shí)現(xiàn)的。BufferedOutputStream 是 Java 的緩沖輸出流,首先會將數(shù)據(jù)緩沖在內(nèi)存中,當(dāng)內(nèi)存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤 IO 次數(shù),提升性能。
一個 task 將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中,會發(fā)生多次磁盤溢寫操作,也就會產(chǎn)生多個臨時文件。最后會將之前所有的臨時磁盤文件都進(jìn)行合并,這就是merge 過程,此時會將之前所有臨時磁盤文件中的數(shù)據(jù)讀取出來,然后依次寫入最終的磁盤文件之中。
此外,由于一個 task 就只對應(yīng)一個磁盤文件,也就意味著該 task 為下游 stage 的 task 準(zhǔn)備的數(shù)據(jù)都在這一個文件中,因此還會單獨(dú)寫一份索引文件,其中標(biāo)識了下游各個 task 的數(shù)據(jù)在文件中的 start offset 與 end offset。
SortShuffleManager 由于有一個磁盤文件 merge 的過程,因此大大減少了文件數(shù)量。比如第一個 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執(zhí)行 5 個 task,而第二個 stage 有 100 個 task。由于每個 task 最終只有一個磁盤文件,因此此時每個 Executor 上只有 5 個磁盤文件,所有 Executor 只有 50 個磁盤文件。
普通運(yùn)行機(jī)制的 SortShuffleManager 工作原理如下圖所示:
普通運(yùn)行機(jī)制的SortShuffleManager工作原理
2. bypass 運(yùn)行機(jī)制
Reducer 端任務(wù)數(shù)比較少的情況下,基于 Hash Shuffle 實(shí)現(xiàn)機(jī)制明顯比基于 Sort Shuffle 實(shí)現(xiàn)機(jī)制要快,因此基于 Sort Shuffle 實(shí)現(xiàn)機(jī)制提供了一個帶 Hash 風(fēng)格的回退方案,就是 bypass 運(yùn)行機(jī)制。對于 Reducer 端任務(wù)數(shù)少于配置屬性spark.shuffle.sort.bypassMergeThreshold設(shè)置的個數(shù)時,使用帶 Hash 風(fēng)格的回退計劃。
bypass 運(yùn)行機(jī)制的觸發(fā)條件如下:
shuffle map task 數(shù)量小于spark.shuffle.sort.bypassMergeThreshold=200參數(shù)的值。
不是聚合類的 shuffle 算子。
此時,每個 task 會為每個下游 task 都創(chuàng)建一個臨時磁盤文件,并將數(shù)據(jù)按 key 進(jìn)行 hash 然后根據(jù) key 的 hash 值,將 key 寫入對應(yīng)的磁盤文件之中。當(dāng)然,寫入磁盤文件時也是先寫入內(nèi)存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創(chuàng)建一個單獨(dú)的索引文件。
該過程的磁盤寫機(jī)制其實(shí)跟未經(jīng)優(yōu)化的 HashShuffleManager 是一模一樣的,因?yàn)槎家獎?chuàng)建數(shù)量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機(jī)制相對未經(jīng)優(yōu)化的 HashShuffleManager 來說,shuffle read 的性能會更好。
而該機(jī)制與普通 SortShuffleManager 運(yùn)行機(jī)制的不同在于:第一,磁盤寫機(jī)制不同;第二,不會進(jìn)行排序。也就是說,啟用該機(jī)制的最大好處在于,shuffle write 過程中,不需要進(jìn)行數(shù)據(jù)的排序操作,也就節(jié)省掉了這部分的性能開銷。
bypass 運(yùn)行機(jī)制的 SortShuffleManager 工作原理如下圖所示:
bypass運(yùn)行機(jī)制的SortShuffleManager工作原理
3. Tungsten Sort Shuffle 運(yùn)行機(jī)制
Tungsten Sort 是對普通 Sort 的一種優(yōu)化,Tungsten Sort 會進(jìn)行排序,但排序的不是內(nèi)容本身,而是內(nèi)容序列化后字節(jié)數(shù)組的指針(元數(shù)據(jù)),把數(shù)據(jù)的排序轉(zhuǎn)變?yōu)榱酥羔様?shù)組的排序,實(shí)現(xiàn)了直接對序列化后的二進(jìn)制數(shù)據(jù)進(jìn)行排序。由于直接基于二進(jìn)制數(shù)據(jù)進(jìn)行操作,所以在這里面沒有序列化和反序列化的過程。內(nèi)存的消耗大大降低,相應(yīng)的,會極大的減少的 GC 的開銷。
Spark 提供了配置屬性,用于選擇具體的 Shuffle 實(shí)現(xiàn)機(jī)制,但需要說明的是,雖然默認(rèn)情況下 Spark 默認(rèn)開啟的是基于 SortShuffle 實(shí)現(xiàn)機(jī)制,但實(shí)際上,參考 Shuffle 的框架內(nèi)核部分可知基于 SortShuffle 的實(shí)現(xiàn)機(jī)制與基于 Tungsten Sort Shuffle 實(shí)現(xiàn)機(jī)制都是使用 SortShuffleManager,而內(nèi)部使用的具體的實(shí)現(xiàn)機(jī)制,是通過提供的兩個方法進(jìn)行判斷的:
對應(yīng)非基于 Tungsten Sort 時,通過 SortShuffleWriter.shouldBypassMergeSort 方法判斷是否需要回退到 Hash 風(fēng)格的 Shuffle 實(shí)現(xiàn)機(jī)制,當(dāng)該方法返回的條件不滿足時,則通過 SortShuffleManager.canUseSerializedShuffle 方法判斷是否需要采用基于 Tungsten Sort Shuffle 實(shí)現(xiàn)機(jī)制,而當(dāng)這兩個方法返回都為 false,即都不滿足對應(yīng)的條件時,會自動采用普通運(yùn)行機(jī)制。
因此,當(dāng)設(shè)置了 spark.shuffle.manager=tungsten-sort 時,也不能保證就一定采用基于 Tungsten Sort 的 Shuffle 實(shí)現(xiàn)機(jī)制。
要實(shí)現(xiàn) Tungsten Sort Shuffle 機(jī)制需要滿足以下條件:
Shuffle 依賴中不帶聚合操作或沒有對輸出進(jìn)行排序的要求。
Shuffle 的序列化器支持序列化值的重定位(當(dāng)前僅支持 KryoSerializer Spark SQL 框架自定義的序列化器)。
Shuffle 過程中的輸出分區(qū)個數(shù)少于 16777216 個。
實(shí)際上,使用過程中還有其他一些限制,如引入 Page 形式的內(nèi)存管理模型后,內(nèi)部單條記錄的長度不能超過 128 MB (具體內(nèi)存模型可以參考 PackedRecordPointer 類)。另外,分區(qū)個數(shù)的限制也是該內(nèi)存模型導(dǎo)致的。
所以,目前使用基于 Tungsten Sort Shuffle 實(shí)現(xiàn)機(jī)制條件還是比較苛刻的。
編輯:jq
-
cpu
+關(guān)注
關(guān)注
68文章
10870瀏覽量
211901 -
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7048瀏覽量
89078 -
SPARK
+關(guān)注
關(guān)注
1文章
105瀏覽量
19920 -
Shuffle
+關(guān)注
關(guān)注
0文章
5瀏覽量
1703
原文標(biāo)題:Spark 的兩種核心 Shuffle 詳解(建議收藏)
文章出處:【微信號:DBDevs,微信公眾號:數(shù)據(jù)分析與開發(fā)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論