前言
SQL 中 Group By
語句大家都很熟悉, 根據(jù)指定的規(guī)則對數(shù)據(jù)進行分組 ,常常和聚合函數(shù)一起使用。
比如,考慮有表 dealer
,表中數(shù)據(jù)如下:
id (Int) | city (String) | car_model (String) | quantity (Int) |
---|---|---|---|
100 | Fremont | Honda Civic | 10 |
100 | Fremont | Honda Accord | 15 |
100 | Fremont | Honda CRV | 7 |
200 | Dublin | Honda Civic | 20 |
200 | Dublin | Honda Accord | 10 |
200 | Dublin | Honda CRV | 3 |
300 | San Jose | Honda Civic | 5 |
300 | San Jose | Honda Accord | 8 |
如果執(zhí)行 SQL 語句 SELECT id, sum(quantity) FROM dealer GROUP BY id ORDER BY id
,會得到如下結(jié)果:
+---+-------------+
| id|sum(quantity)|
+---+-------------+
|100| 32|
|200| 33|
|300| 13|
+---+-------------+
上述 SQL 語句的意思就是對數(shù)據(jù)按 id
列進行分組,然后在每個分組內(nèi)對 quantity
列進行求和。
Group By
語句除了上面的簡單用法之外,還有更高級的用法,常見的是 Grouping Sets
、RollUp
和 Cube
,它們在 OLAP 時比較常用。其中,RollUp
和 Cube
都是以 Grouping Sets
為基礎(chǔ)實現(xiàn)的,因此,弄懂了 Grouping Sets
,也就理解了 RollUp
和 Cube
。
本文首先簡單介紹 Grouping Sets
的用法,然后以 Spark SQL 作為切入點,深入解析 Grouping Sets
的實現(xiàn)機制。
Spark SQL 是 Apache Spark 大數(shù)據(jù)處理框架的一個子模塊,用來處理結(jié)構(gòu)化信息。它可以將 SQL 語句翻譯多個任務(wù)在 Spark 集群上執(zhí)行, 允許用戶直接通過 SQL 來處理數(shù)據(jù) ,大大提升了易用性。
Grouping Sets 簡介
Spark SQL 官方文檔中 SQL Syntax 一節(jié)對 Grouping Sets
語句的描述如下:
Groups the rows for each grouping set specified after GROUPING SETS. (... 一些舉例) This clause is a shorthand for a
UNION ALL
where each leg of theUNION ALL
operator performs aggregation of each grouping set specified in theGROUPING SETS
clause. (... 一些舉例)
也即,Grouping Sets
語句的作用是指定幾個 grouping set 作為 Group By
的分組規(guī)則,然后再將結(jié)果聯(lián)合在一起。它的效果和, 先分別對這些 grouping set 進行 Group By
分組之后,再通過 Union All 將結(jié)果聯(lián)合起來 ,是一樣的。
比如,對于 dealer
表,Group By Grouping Sets ((city, car_model), (city), (car_model), ())
和 Union All((Group By city, car_model), (Group By city), (Group By car_model), 全局聚合)
的效果是相同的:
先看 Grouping Sets 版的執(zhí)行結(jié)果:
spark-sql> SELECT city, car_model, sum(quantity) AS sum FROM dealer
> GROUP BY GROUPING SETS ((city, car_model), (city), (car_model), ())
> ORDER BY city, car_model;
+--------+------------+---+
| city| car_model|sum|
+--------+------------+---+
| null| null| 78|
| null|Honda Accord| 33|
| null| Honda CRV| 10|
| null| Honda Civic| 35|
| Dublin| null| 33|
| Dublin|Honda Accord| 10|
| Dublin| Honda CRV| 3|
| Dublin| Honda Civic| 20|
| Fremont| null| 32|
| Fremont|Honda Accord| 15|
| Fremont| Honda CRV| 7|
| Fremont| Honda Civic| 10|
|San Jose| null| 13|
|San Jose|Honda Accord| 8|
|San Jose| Honda Civic| 5|
+--------+------------+---+
再看 Union All 版的執(zhí)行結(jié)果:
spark-sql> (SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY city, car_model) UNION ALL
> (SELECT city, NULL as car_model, sum(quantity) AS sum FROM dealer GROUP BY city) UNION ALL
> (SELECT NULL as city, car_model, sum(quantity) AS sum FROM dealer GROUP BY car_model) UNION ALL
> (SELECT NULL as city, NULL as car_model, sum(quantity) AS sum FROM dealer)
> ORDER BY city, car_model;
+--------+------------+---+
| city| car_model|sum|
+--------+------------+---+
| null| null| 78|
| null|Honda Accord| 33|
| null| Honda CRV| 10|
| null| Honda Civic| 35|
| Dublin| null| 33|
| Dublin|Honda Accord| 10|
| Dublin| Honda CRV| 3|
| Dublin| Honda Civic| 20|
| Fremont| null| 32|
| Fremont|Honda Accord| 15|
| Fremont| Honda CRV| 7|
| Fremont| Honda Civic| 10|
|San Jose| null| 13|
|San Jose|Honda Accord| 8|
|San Jose| Honda Civic| 5|
+--------+------------+---+
兩版的查詢結(jié)果完全一樣。
Grouping Sets 的執(zhí)行計劃
從執(zhí)行結(jié)果上看,Grouping Sets 版本和 Union All 版本的 SQL 是等價的,但 Grouping Sets 版本更加簡潔。
那么,Grouping Sets
僅僅只是 Union All
的一個縮寫,或者語法糖嗎 ?
為了進一步探究 Grouping Sets
的底層實現(xiàn)是否和 Union All
是一致的,我們可以來看下兩者的執(zhí)行計劃。
首先,我們通過 explain extended
來查看 Union All 版本的 Optimized Logical Plan :
spark-sql> explain extended (SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY city, car_model) UNION ALL (SELECT city, NULL as car_model, sum(quantity) AS sum FROM dealer GROUP BY city) UNION ALL (SELECT NULL as city, car_model, sum(quantity) AS sum FROM dealer GROUP BY car_model) UNION ALL (SELECT NULL as city, NULL as car_model, sum(quantity) AS sum FROM dealer) ORDER BY city, car_model;
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
Sort [city#93 ASC NULLS FIRST, car_model#94 ASC NULLS FIRST], true
+- Union false, false
:- Aggregate [city#93, car_model#94], [city#93, car_model#94, sum(quantity#95) AS sum#79L]
: +- Project [city#93, car_model#94, quantity#95]
: +- HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#92, city#93, car_model#94, quantity#95], Partition Cols: []]
:- Aggregate [city#97], [city#97, null AS car_model#112, sum(quantity#99) AS sum#81L]
: +- Project [city#97, quantity#99]
: +- HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#96, city#97, car_model#98, quantity#99], Partition Cols: []]
:- Aggregate [car_model#102], [null AS city#113, car_model#102, sum(quantity#103) AS sum#83L]
: +- Project [car_model#102, quantity#103]
: +- HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#100, city#101, car_model#102, quantity#103], Partition Cols: []]
+- Aggregate [null AS city#114, null AS car_model#115, sum(quantity#107) AS sum#86L]
+- Project [quantity#107]
+- HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#104, city#105, car_model#106, quantity#107], Partition Cols: []]
== Physical Plan ==
...
從上述的 Optimized Logical Plan 可以清晰地看出 Union All 版本的執(zhí)行邏輯:
- 執(zhí)行每個子查詢語句,計算得出查詢結(jié)果。其中,每個查詢語句的邏輯是這樣的:
- 在 HiveTableRelation 節(jié)點對
dealer
表進行全表掃描。 - 在 Project 節(jié)點選出與查詢語句結(jié)果相關(guān)的列,比如對于子查詢語句
SELECT NULL as city, NULL as car_model, sum(quantity) AS sum FROM dealer
,只需保留quantity
列即可。 - 在 Aggregate 節(jié)點完成
quantity
列對聚合運算。在上述的 Plan 中,Aggregate 后面緊跟的就是用來分組的列,比如Aggregate [city#902]
就表示根據(jù)city
列來進行分組。
- 在 HiveTableRelation 節(jié)點對
- 在 Union 節(jié)點完成對每個子查詢結(jié)果的聯(lián)合。
- 最后,在 Sort 節(jié)點完成對數(shù)據(jù)的排序,上述 Plan 中
Sort [city#93 ASC NULLS FIRST, car_model#94 ASC NULLS FIRST]
就表示根據(jù)city
和car_model
列進行升序排序。
接下來,我們通過 explain extended
來查看 Grouping Sets 版本的 Optimized Logical Plan:
spark-sql> explain extended SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY GROUPING SETS ((city, car_model), (city), (car_model), ()) ORDER BY city, car_model;
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
Sort [city#138 ASC NULLS FIRST, car_model#139 ASC NULLS FIRST], true
+- Aggregate [city#138, car_model#139, spark_grouping_id#137L], [city#138, car_model#139, sum(quantity#133) AS sum#124L]
+- Expand [[quantity#133, city#131, car_model#132, 0], [quantity#133, city#131, null, 1], [quantity#133, null, car_model#132, 2], [quantity#133, null, null, 3]], [quantity#133, city#138, car_model#139, spark_grouping_id#137L]
+- Project [quantity#133, city#131, car_model#132]
+- HiveTableRelation [`default`.`dealer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#130, city#131, car_model#132, quantity#133], Partition Cols: []]
== Physical Plan ==
...
從 Optimized Logical Plan 來看,Grouping Sets 版本要簡潔很多!具體的執(zhí)行邏輯是這樣的:
- 在 HiveTableRelation 節(jié)點對
dealer
表進行全表掃描。 - 在 Project 節(jié)點選出與查詢語句結(jié)果相關(guān)的列。
- 接下來的 Expand 節(jié)點是關(guān)鍵,數(shù)據(jù)經(jīng)過該節(jié)點后,多出了
spark_grouping_id
列。從 Plan 中可以看出來,Expand 節(jié)點包含了Grouping Sets
里的各個 grouping set 信息,比如[quantity#133, city#131, null, 1]
對應(yīng)的就是(city)
這一 grouping set。而且,每個 grouping set 對應(yīng)的spark_grouping_id
列的值都是固定的,比如(city)
對應(yīng)的spark_grouping_id
為1
。 - 在 Aggregate 節(jié)點完成
quantity
列對聚合運算,其中分組的規(guī)則為city, car_model, spark_grouping_id
。注意,數(shù)據(jù)經(jīng)過 Aggregate 節(jié)點后,spark_grouping_id
列被刪除了! - 最后,在 Sort 節(jié)點完成對數(shù)據(jù)的排序。
從 Optimized Logical Plan 來看,雖然 Union All 版本和 Grouping Sets 版本的效果一致,但它們的底層實現(xiàn)有著巨大的差別。
其中,Grouping Sets 版本的 Plan 中最關(guān)鍵的是 Expand 節(jié)點,目前,我們只知道數(shù)據(jù)經(jīng)過它之后,多出了 spark_grouping_id
列。而且從最終結(jié)果來看,spark_grouping_id
只是 Spark SQL 的內(nèi)部實現(xiàn)細(xì)節(jié),對用戶并不體現(xiàn)。那么:
- Expand 的實現(xiàn)邏輯是怎樣的,為什么能達到
Union All
的效果? - Expand 節(jié)點的輸出數(shù)據(jù)是怎樣的 ?
spark_grouping_id
列的作用是什么 ?
通過 Physical Plan,我們發(fā)現(xiàn) Expand 節(jié)點對應(yīng)的算子名稱也是 Expand
:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [city#138 ASC NULLS FIRST, car_model#139 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(city#138 ASC NULLS FIRST, car_model#139 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=422]
+- HashAggregate(keys=[city#138, car_model#139, spark_grouping_id#137L], functions=[sum(quantity#133)], output=[city#138, car_model#139, sum#124L])
+- Exchange hashpartitioning(city#138, car_model#139, spark_grouping_id#137L, 200), ENSURE_REQUIREMENTS, [plan_id=419]
+- HashAggregate(keys=[city#138, car_model#139, spark_grouping_id#137L], functions=[partial_sum(quantity#133)], output=[city#138, car_model#139, spark_grouping_id#137L, sum#141L])
+- Expand [[quantity#133, city#131, car_model#132, 0], [quantity#133, city#131, null, 1], [quantity#133, null, car_model#132, 2], [quantity#133, null, null, 3]], [quantity#133, city#138, car_model#139, spark_grouping_id#137L]
+- Scan hive default.dealer [quantity#133, city#131, car_model#132], HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#130, city#131, car_model#132, quantity#133], Partition Cols: []]
帶著前面的幾個問題,接下來我們深入 Spark SQL 的 Expand
算子源碼尋找答案。
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7133瀏覽量
89369 -
SQL
+關(guān)注
關(guān)注
1文章
773瀏覽量
44209 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4345瀏覽量
62864
發(fā)布評論請先 登錄
相關(guān)推薦
評論