0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Join在Spark中是如何組織運(yùn)行的

人工智能與大數(shù)據(jù)技術(shù) ? 來(lái)源:人工智能與大數(shù)據(jù)技術(shù) ? 作者:人工智能與大數(shù)據(jù) ? 2020-09-25 11:35 ? 次閱讀

Join作為SQL中一個(gè)重要語(yǔ)法特性,幾乎所有稍微復(fù)雜一點(diǎn)的數(shù)據(jù)分析場(chǎng)景都離不開(kāi)Join,如今Spark SQL(Dataset/DataFrame)已經(jīng)成為Spark應(yīng)用程序開(kāi)發(fā)的主流,作為開(kāi)發(fā)者,我們有必要了解Join在Spark中是如何組織運(yùn)行的。

SparkSQL總體流程介紹

在闡述Join實(shí)現(xiàn)之前,我們首先簡(jiǎn)單介紹SparkSQL的總體流程,一般地,我們有兩種方式使用SparkSQL,一種是直接寫(xiě)sql語(yǔ)句,這個(gè)需要有元數(shù)據(jù)庫(kù)支持,例如Hive等,另一種是通過(guò)Dataset/DataFrame編寫(xiě)Spark應(yīng)用程序。如下圖所示,sql語(yǔ)句被語(yǔ)法解析(SQL AST)成查詢(xún)計(jì)劃,或者我們通過(guò)Dataset/DataFrame提供的APIs組織成查詢(xún)計(jì)劃,查詢(xún)計(jì)劃分為兩大類(lèi):邏輯計(jì)劃和物理計(jì)劃,這個(gè)階段通常叫做邏輯計(jì)劃,經(jīng)過(guò)語(yǔ)法分析(Analyzer)、一系列查詢(xún)優(yōu)化(Optimizer)后得到優(yōu)化后的邏輯計(jì)劃,最后被映射成物理計(jì)劃,轉(zhuǎn)換成RDD執(zhí)行。

對(duì)于語(yǔ)法解析、語(yǔ)法分析以及查詢(xún)優(yōu)化,本文不做詳細(xì)闡述,本文重點(diǎn)介紹Join的物理執(zhí)行過(guò)程。

Join基本要素

如下圖所示,Join大致包括三個(gè)要素:Join方式、Join條件以及過(guò)濾條件。其中過(guò)濾條件也可以通過(guò)AND語(yǔ)句放在Join條件中。

Spark支持所有類(lèi)型的Join,包括:

inner join

left outer join

right outer join

full outer join

left semi join

left anti join

下面分別闡述這幾種Join的實(shí)現(xiàn)。

Join基本實(shí)現(xiàn)流程

總體上來(lái)說(shuō),Join的基本實(shí)現(xiàn)流程如下圖所示,Spark將參與Join的兩張表抽象為流式遍歷表(streamIter)和查找表(buildIter),通常streamIter為大表,buildIter為小表,我們不用擔(dān)心哪個(gè)表為streamIter,哪個(gè)表為buildIter,這個(gè)spark會(huì)根據(jù)join語(yǔ)句自動(dòng)幫我們完成。

在實(shí)際計(jì)算時(shí),spark會(huì)基于streamIter來(lái)遍歷,每次取出streamIter中的一條記錄rowA,根據(jù)Join條件計(jì)算keyA,然后根據(jù)該keyA去buildIter中查找所有滿足Join條件(keyB==keyA)的記錄rowBs,并將rowBs中每條記錄分別與rowAjoin得到j(luò)oin后的記錄,最后根據(jù)過(guò)濾條件得到最終join的記錄。

從上述計(jì)算過(guò)程中不難發(fā)現(xiàn),對(duì)于每條來(lái)自streamIter的記錄,都要去buildIter中查找匹配的記錄,所以buildIter一定要是查找性能較優(yōu)的數(shù)據(jù)結(jié)構(gòu)。spark提供了三種join實(shí)現(xiàn):sort merge join、broadcast join以及hash join。

sort merge join實(shí)現(xiàn)

要讓兩條記錄能join到一起,首先需要將具有相同key的記錄在同一個(gè)分區(qū),所以通常來(lái)說(shuō),需要做一次shuffle,map階段根據(jù)join條件確定每條記錄的key,基于該key做shuffle write,將可能join到一起的記錄分到同一個(gè)分區(qū)中,這樣在shuffle read階段就可以將兩個(gè)表中具有相同key的記錄拉到同一個(gè)分區(qū)處理。前面我們也提到,對(duì)于buildIter一定要是查找性能較優(yōu)的數(shù)據(jù)結(jié)構(gòu),通常我們能想到hash表,但是對(duì)于一張較大的表來(lái)說(shuō),不可能將所有記錄全部放到hash表中,另外也可以對(duì)buildIter先排序,查找時(shí)按順序查找,查找代價(jià)也是可以接受的,我們知道,spark shuffle階段天然就支持排序,這個(gè)是非常好實(shí)現(xiàn)的,下面是sort merge join示意圖。

在shuffle read階段,分別對(duì)streamIter和buildIter進(jìn)行merge sort,在遍歷streamIter時(shí),對(duì)于每條記錄,都采用順序查找的方式從buildIter查找對(duì)應(yīng)的記錄,由于兩個(gè)表都是排序的,每次處理完streamIter的一條記錄后,對(duì)于streamIter的下一條記錄,只需從buildIter中上一次查找結(jié)束的位置開(kāi)始查找,所以說(shuō)每次在buildIter中查找不必重頭開(kāi)始,整體上來(lái)說(shuō),查找性能還是較優(yōu)的。

broadcast join實(shí)現(xiàn)

為了能具有相同key的記錄分到同一個(gè)分區(qū),我們通常是做shuffle,那么如果buildIter是一個(gè)非常小的表,那么其實(shí)就沒(méi)有必要大動(dòng)干戈做shuffle了,直接將buildIter廣播到每個(gè)計(jì)算節(jié)點(diǎn),然后將buildIter放到hash表中,如下圖所示。

從上圖可以看到,不用做shuffle,可以直接在一個(gè)map中完成,通常這種join也稱(chēng)之為map join。那么問(wèn)題來(lái)了,什么時(shí)候會(huì)用broadcast join實(shí)現(xiàn)呢?這個(gè)不用我們擔(dān)心,spark sql自動(dòng)幫我們完成,當(dāng)buildIter的估計(jì)大小不超過(guò)參數(shù)spark.sql.autoBroadcastJoinThreshold設(shè)定的值(默認(rèn)10M),那么就會(huì)自動(dòng)采用broadcast join,否則采用sort merge join。

hash join實(shí)現(xiàn)

除了上面兩種join實(shí)現(xiàn)方式外,spark還提供了hash join實(shí)現(xiàn)方式,在shuffle read階段不對(duì)記錄排序,反正來(lái)自?xún)筛癖淼木哂邢嗤琸ey的記錄會(huì)在同一個(gè)分區(qū),只是在分區(qū)內(nèi)不排序,將來(lái)自buildIter的記錄放到hash表中,以便查找,如下圖所示。

不難發(fā)現(xiàn),要將來(lái)自buildIter的記錄放到hash表中,那么每個(gè)分區(qū)來(lái)自buildIter的記錄不能太大,否則就存不下,默認(rèn)情況下hash join的實(shí)現(xiàn)是關(guān)閉狀態(tài),如果要使用hash join,必須滿足以下四個(gè)條件:

buildIter總體估計(jì)大小超過(guò)spark.sql.autoBroadcastJoinThreshold設(shè)定的值,即不滿足broadcast join條件

開(kāi)啟嘗試使用hash join的開(kāi)關(guān),spark.sql.join.preferSortMergeJoin=false

每個(gè)分區(qū)的平均大小不超過(guò)spark.sql.autoBroadcastJoinThreshold設(shè)定的值,即shuffle read階段每個(gè)分區(qū)來(lái)自buildIter的記錄要能放到內(nèi)存中

streamIter的大小是buildIter三倍以上

所以說(shuō),使用hash join的條件其實(shí)是很苛刻的,在大多數(shù)實(shí)際場(chǎng)景中,即使能使用hash join,但是使用sort merge join也不會(huì)比hash join差很多,所以盡量使用hash

下面我們分別闡述不同Join方式的實(shí)現(xiàn)流程。

inner join

inner join是一定要找到左右表中滿足join條件的記錄,我們?cè)趯?xiě)sql語(yǔ)句或者使用DataFrame時(shí),可以不用關(guān)心哪個(gè)是左表,哪個(gè)是右表,在spark sql查詢(xún)優(yōu)化階段,spark會(huì)自動(dòng)將大表設(shè)為左表,即streamIter,將小表設(shè)為右表,即buildIter。這樣對(duì)小表的查找相對(duì)更優(yōu)。其基本實(shí)現(xiàn)流程如下圖所示,在查找階段,如果右表不存在滿足join條件的記錄,則跳過(guò)。

left outer join

left outer join是以左表為準(zhǔn),在右表中查找匹配的記錄,如果查找失敗,則返回一個(gè)所有字段都為null的記錄。我們?cè)趯?xiě)sql語(yǔ)句或者使用DataFrmae時(shí),一般讓大表在左邊,小表在右邊。其基本實(shí)現(xiàn)流程如下圖所示。

right outer join

right outer join是以右表為準(zhǔn),在左表中查找匹配的記錄,如果查找失敗,則返回一個(gè)所有字段都為null的記錄。所以說(shuō),右表是streamIter,左表是buildIter,我們?cè)趯?xiě)sql語(yǔ)句或者使用DataFrame時(shí),一般讓大表在右邊,小表在左邊。其基本實(shí)現(xiàn)流程如下圖所示。

full outer join

full outer join相對(duì)來(lái)說(shuō)要復(fù)雜一點(diǎn),總體上來(lái)看既要做left outer join,又要做right outer join,但是又不能簡(jiǎn)單地先left outer join,再right outer join,最后union得到最終結(jié)果,因?yàn)檫@樣最終結(jié)果中就存在兩份inner join的結(jié)果了。因?yàn)榧热煌瓿蒷eft outer join又要完成right outer join,所以full outer join僅采用sort merge join實(shí)現(xiàn),左邊和右表既要作為streamIter,又要作為buildIter,其基本實(shí)現(xiàn)流程如下圖所示。

由于左表和右表已經(jīng)排好序,首先分別順序取出左表和右表中的一條記錄,比較key,如果key相等,則joinrowA和rowB,并將rowA和rowB分別更新到左表和右表的下一條記錄;如果keyAkeyB,則說(shuō)明左表中沒(méi)有與右表rowB對(duì)應(yīng)的記錄,那么joinnullRow與rowB,緊接著,rowB更新到右表的下一條記錄。如此循環(huán)遍歷直到左表和右表的記錄全部處理完。

left semi join

left semi join是以左表為準(zhǔn),在右表中查找匹配的記錄,如果查找成功,則僅返回左邊的記錄,否則返回null,其基本實(shí)現(xiàn)流程如下圖所示。

left anti join

left anti join與left semi join相反,是以左表為準(zhǔn),在右表中查找匹配的記錄,如果查找成功,則返回null,否則僅返回左邊的記錄,其基本實(shí)現(xiàn)流程如下圖所示。

總結(jié)

Join是數(shù)據(jù)庫(kù)查詢(xún)中一個(gè)非常重要的語(yǔ)法特性,在數(shù)據(jù)庫(kù)領(lǐng)域可以說(shuō)是“得join者得天下”,SparkSQL作為一種分布式數(shù)據(jù)倉(cāng)庫(kù)系統(tǒng),給我們提供了全面的join支持,并在內(nèi)部實(shí)現(xiàn)上無(wú)聲無(wú)息地做了很多優(yōu)化,了解join的實(shí)現(xiàn)將有助于我們更深刻的了解我們的應(yīng)用程序的運(yùn)行軌跡。

責(zé)任編輯:xj

原文標(biāo)題:面試必知的 Spark SQL 幾種 Join 實(shí)現(xiàn)

文章出處:【微信公眾號(hào):人工智能與大數(shù)據(jù)技術(shù)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • SQL
    SQL
    +關(guān)注

    關(guān)注

    1

    文章

    768

    瀏覽量

    44175
  • SPARK
    +關(guān)注

    關(guān)注

    1

    文章

    105

    瀏覽量

    19927

原文標(biāo)題:面試必知的 Spark SQL 幾種 Join 實(shí)現(xiàn)

文章出處:【微信號(hào):TheBigData1024,微信公眾號(hào):人工智能與大數(shù)據(jù)技術(shù)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    spark為什么比mapreduce快?

    spark為什么比mapreduce快? 首先澄清幾個(gè)誤區(qū): 1:兩者都是基于內(nèi)存計(jì)算的,任何計(jì)算框架都肯定是基于內(nèi)存的,所以網(wǎng)上說(shuō)的spark是基于內(nèi)存計(jì)算所以快,顯然是錯(cuò)誤的 2;DAG計(jì)算模型
    的頭像 發(fā)表于 09-06 09:45 ?288次閱讀

    spark運(yùn)行的基本流程

    記錄和分享下spark運(yùn)行的基本流程。 一、spark的基礎(chǔ)組件及其概念 1. ClusterManager Standalone模式
    的頭像 發(fā)表于 07-02 10:31 ?423次閱讀
    <b class='flag-5'>spark</b><b class='flag-5'>運(yùn)行</b>的基本流程

    Spark基于DPU的Native引擎算子卸載方案

    ?和 R?等多種高級(jí)編程語(yǔ)言,這使得Spark可以應(yīng)對(duì)各種復(fù)雜的大數(shù)據(jù)應(yīng)用場(chǎng)景,例如金融、電商、社交媒體等。 Spark 經(jīng)過(guò)多年發(fā)展,作為基礎(chǔ)的計(jì)算框架,不管是
    的頭像 發(fā)表于 06-28 17:12 ?710次閱讀
    <b class='flag-5'>Spark</b>基于DPU的Native引擎算子卸載方案

    關(guān)于Spark的從0實(shí)現(xiàn)30s內(nèi)實(shí)時(shí)監(jiān)控指標(biāo)計(jì)算

    前言 說(shuō)起Spark,大家就會(huì)自然而然地想到Flink,而且會(huì)不自覺(jué)地將這兩種主流的大數(shù)據(jù)實(shí)時(shí)處理技術(shù)進(jìn)行比較。然后最終得出結(jié)論:Flink實(shí)時(shí)性大于Spark。 的確,F(xiàn)link的數(shù)據(jù)計(jì)算
    的頭像 發(fā)表于 06-14 15:52 ?475次閱讀

    Spark+Hive”DPU環(huán)境下的性能測(cè)評(píng) | OLAP數(shù)據(jù)庫(kù)引擎選型白皮書(shū)(24版)DPU部分節(jié)選

    奇點(diǎn)云2024年版《OLAP數(shù)據(jù)庫(kù)引擎選型白皮書(shū)》,中科馭數(shù)聯(lián)合奇點(diǎn)云針對(duì)Spark+Hive這類(lèi)大數(shù)據(jù)計(jì)算場(chǎng)景下的主力引擎,測(cè)評(píng)DPU環(huán)境下對(duì)比CPU環(huán)境下的性能提升效果。特此節(jié)選該章節(jié)內(nèi)容,與大家共享。
    的頭像 發(fā)表于 05-30 16:09 ?551次閱讀
    “<b class='flag-5'>Spark</b>+Hive”<b class='flag-5'>在</b>DPU環(huán)境下的性能測(cè)評(píng) | OLAP數(shù)據(jù)庫(kù)引擎選型白皮書(shū)(24版)DPU部分節(jié)選

    移植Nucleo745ziq+cyw43439的wifi_join_wpa3時(shí),得到Wifi_join 33555456錯(cuò)誤信息如何解決?

    當(dāng)我移植我的電路板(Nucleo745ziq+cyw43439)的 wifi_join_wpa3 時(shí) 但我得到的錯(cuò)誤代碼是 33555456 OTL.... 如何解決我的項(xiàng)目?
    發(fā)表于 05-24 06:49

    STM8RAM運(yùn)行遇到的疑問(wèn)求解

    系統(tǒng)函數(shù),而這個(gè)系統(tǒng)函數(shù)flash里面。這個(gè)時(shí)候我把flash 已經(jīng)關(guān)了。程序就執(zhí)行不動(dòng)了。 RAM的地址域是from 0x0000 to 0x07FF flash的地址域是from 0x8000 to 0xFFFF RAM
    發(fā)表于 05-07 07:32

    淺談變電所運(yùn)行平臺(tái)安全管理的應(yīng)用

    淺談變電所運(yùn)行平臺(tái)安全管理的應(yīng)用 張穎姣 安科瑞電氣股份有限公司 上海嘉定 201801 摘要:電氣安全管理是企業(yè)生產(chǎn)管理中比較薄弱的環(huán)節(jié),它是一項(xiàng)綜合性的工作,有工程技術(shù)的一面,也有組織
    的頭像 發(fā)表于 04-15 16:26 ?323次閱讀
    淺談變電所<b class='flag-5'>運(yùn)行</b>平臺(tái)<b class='flag-5'>在</b>安全管理<b class='flag-5'>中</b>的應(yīng)用

    stm32的運(yùn)行程序,初始函數(shù)明明沒(méi)有while函數(shù)里面,為什么能反復(fù)運(yùn)行?

    stm32的運(yùn)行程序,好多初始函數(shù)明明沒(méi)有while函數(shù)里面,但是,他卻能反復(fù)的,不斷地去運(yùn)行,這是為什么呢? 就像是這個(gè)程序,對(duì)于設(shè)
    發(fā)表于 04-08 08:15

    如何利用DPU加速Spark大數(shù)據(jù)處理? | 總結(jié)篇

    SSD速度通過(guò)NVMe接口得到了大幅提升,并且網(wǎng)絡(luò)傳輸速率也進(jìn)入了新的高度,但CPU主頻發(fā)展并未保持同等步調(diào),3GHz左右的核心頻率已成為常態(tài)。 在當(dāng)前背景下Apache Spark等大數(shù)據(jù)處理工具,盡管存儲(chǔ)和網(wǎng)絡(luò)性能的提升極大地減少了數(shù)據(jù)讀取和傳輸?shù)臅r(shí)間消耗,但
    的頭像 發(fā)表于 04-02 13:45 ?1086次閱讀
    如何利用DPU加速<b class='flag-5'>Spark</b>大數(shù)據(jù)處理? | 總結(jié)篇

    STM32HIAR如何實(shí)現(xiàn)從FLASH加載到SRAM運(yùn)行程序?

    如題,STM32H IAR如何實(shí)現(xiàn)從FLASH加載到SRAM運(yùn)行程序 有沒(méi)有相關(guān)的例程可供參考
    發(fā)表于 03-28 07:46

    Spark基于DPU Snappy壓縮算法的異構(gòu)加速方案

    Spark 某些工作負(fù)載方面表現(xiàn)得更加優(yōu)越。換句話說(shuō),Spark 啟用了內(nèi)存分布數(shù)據(jù)集,除了能夠提供交互式查詢(xún)外,它還可以?xún)?yōu)化迭代工作負(fù)載。Spark SQL是
    的頭像 發(fā)表于 03-26 17:06 ?837次閱讀
    <b class='flag-5'>Spark</b>基于DPU Snappy壓縮算法的異構(gòu)加速方案

    RDMA技術(shù)Apache Spark的應(yīng)用

    、電信、零售、醫(yī)療保健還是物聯(lián)網(wǎng),Spark的應(yīng)用幾乎遍及所有需要處理海量數(shù)據(jù)和復(fù)雜計(jì)算的領(lǐng)域。它的快速、易用和通用性,使得數(shù)據(jù)科學(xué)家和工程師能夠輕松實(shí)現(xiàn)數(shù)據(jù)挖掘、數(shù)據(jù)分析、實(shí)時(shí)處理等任務(wù)。 然而,Spark的燦爛光環(huán)背后,一
    的頭像 發(fā)表于 03-25 18:13 ?1558次閱讀
    RDMA技術(shù)<b class='flag-5'>在</b>Apache <b class='flag-5'>Spark</b><b class='flag-5'>中</b>的應(yīng)用

    基于DPU和HADOS-RACE加速Spark 3.x

    、Python、Java、Scala、R)等特性大數(shù)據(jù)計(jì)算領(lǐng)域被廣泛使用。其中,Spark SQL 是 Spark 生態(tài)系統(tǒng)的一個(gè)重要組件,它允許用戶以結(jié)構(gòu)化數(shù)據(jù)的方式進(jìn)行數(shù)據(jù)處理
    的頭像 發(fā)表于 03-25 18:12 ?1392次閱讀
    基于DPU和HADOS-RACE加速<b class='flag-5'>Spark</b> 3.x

    淺談變電所運(yùn)行平臺(tái)安全管理的應(yīng)用

    電氣安全管理是企業(yè)生產(chǎn)管理中比較薄弱的環(huán)節(jié),它是一項(xiàng)綜合性的工作,有工程技術(shù)的一面,也有組織管理的一面。詳細(xì)闡述了保障變電所安全運(yùn)行的基本管理制度,著重探討了實(shí)踐中廣泛遇到的倒閘操作、電氣作業(yè)安全管理問(wèn)題。
    的頭像 發(fā)表于 02-05 15:40 ?426次閱讀
    淺談變電所<b class='flag-5'>運(yùn)行</b>平臺(tái)<b class='flag-5'>在</b>安全管理<b class='flag-5'>中</b>的應(yīng)用