如何使用Apache Spark中的DataSource API以實(shí)現(xiàn)數(shù)據(jù)源混合計(jì)算的實(shí)踐
推薦 + 挑錯(cuò) + 收藏(0) + 用戶評論(0)
本文主要介紹如何使用Apache Spark中的DataSource API以實(shí)現(xiàn)多個(gè)數(shù)據(jù)源混合計(jì)算的實(shí)踐,那么這么做的意義何在,其主要?dú)w結(jié)于3個(gè)方面:
首先,我們身邊存在大量的數(shù)據(jù),結(jié)構(gòu)化、非結(jié)構(gòu)化,各種各樣的數(shù)據(jù)結(jié)構(gòu)、格局格式,這種數(shù)據(jù)的多樣性本身即是大數(shù)據(jù)的特性之一,從而也決定了一種存儲方式不可能通吃所有。因此,數(shù)據(jù)本身決定了多種數(shù)據(jù)源存在的必然性。 其次:從業(yè)務(wù)需求來看,因?yàn)槊刻鞎_發(fā)各種各樣的應(yīng)用系統(tǒng),應(yīng)用系統(tǒng)中所遇到的業(yè)務(wù)場景是互不相同的,各種各樣的需求決定了目前市面上不可能有一種軟件架構(gòu)同時(shí)能夠解決這么多種業(yè)務(wù)場景,所以在數(shù)據(jù)存儲包括數(shù)據(jù)查詢、計(jì)算這一塊也不可能只有一種技術(shù)就能解決所有問題。最后,從軟件的發(fā)展來看,現(xiàn)在市面上出現(xiàn)了越來越多面對某一個(gè)細(xì)分領(lǐng)域的軟件技術(shù),比如像數(shù)據(jù)存儲、查詢搜索引擎,MPP數(shù)據(jù)庫,以及各種各樣的查詢引擎。這么多不同的軟件中,每一個(gè)軟件都相對擅長處理某一個(gè)領(lǐng)域的業(yè)務(wù)場景,只是涉及的領(lǐng)域大小不相同。因此,越來越多軟件的產(chǎn)生也決定了我們所接受的數(shù)據(jù)會存儲到越來越多不同的數(shù)據(jù)源。
Apache Spark的多數(shù)據(jù)源方案
傳統(tǒng)方案中,實(shí)現(xiàn)多數(shù)據(jù)源通常有兩種方案:冗余存儲,一份業(yè)務(wù)數(shù)據(jù)有多個(gè)存儲,或者內(nèi)部互相引用;集中的計(jì)算,不同的數(shù)據(jù)使用不同存儲,但是會在統(tǒng)一的地方集中計(jì)算,算的時(shí)候把這些數(shù)據(jù)從不同位置讀取出來。下面一起討論這兩種解決方案中存在的問題:
圖1 多數(shù)據(jù)源方案
第一種方案中存在的一個(gè)問題是數(shù)據(jù)一致性,一樣的數(shù)據(jù)放在不同的存儲里面或多或少會有格式上的不兼容,或者查詢的差異,從而導(dǎo)致從不同位置查詢的數(shù)據(jù)可能出現(xiàn)不一致。比如有兩個(gè)報(bào)表相同的指標(biāo),但是因?yàn)槭欠旁诓煌鎯锊槌鰜淼慕Y(jié)果對不上,這點(diǎn)非常致命。第二個(gè)問題是存儲的成本,隨著存儲成本越來越低,這點(diǎn)倒是容易解決。
第二種方案也存在兩個(gè)問題,其一是不同存儲出來的數(shù)據(jù)類型不同,從而在計(jì)算時(shí)需求相互轉(zhuǎn)換,因此如何轉(zhuǎn)換至關(guān)重要。第二個(gè)問題是讀取效率,需要高性能的數(shù)據(jù)抽取機(jī)制,盡量避免從遠(yuǎn)端讀取不必要的數(shù)據(jù),并且需要保證一定的并發(fā)性。
Spark在1.2.0版本首次發(fā)布了一個(gè)新的DataSourceAPI,這個(gè)API提供了非常靈活的方案,讓Spark可以通過一個(gè)標(biāo)準(zhǔn)的接口訪問各種外部數(shù)據(jù)源,目標(biāo)是讓Spark各個(gè)組件以非常方便的通過SparkSQL訪問外部數(shù)據(jù)源。很顯然,Spark的DataSourceAPI其采用的是方案二,那么它是如何解決其中那個(gè)的問題的呢?
圖2 External Datasource API
首先,數(shù)據(jù)類型轉(zhuǎn)換,Spark中定義了一個(gè)統(tǒng)一的數(shù)據(jù)類型標(biāo)準(zhǔn),不同的數(shù)據(jù)源自己定義數(shù)據(jù)類型的轉(zhuǎn)換方法,這樣解決數(shù)據(jù)源之間相互類型轉(zhuǎn)換的問題;
關(guān)于數(shù)據(jù)處理效率的問題,Spark定義了一個(gè)比較簡單的API的接口,主要有3個(gè)方式:
1./* 全量數(shù)據(jù)抽取 */
3.def buildScan(): RDD[Row]
4.}
5.
6./* 列剪枝數(shù)據(jù)抽取 */
7.trait PrunedScan {
8.def buildScan(requiredColumns: Array[String]): RDD[Row]
9.}
10.
11./* 列剪枝+行過濾數(shù)據(jù)抽取 */
12.trait PrunedFilteredScan {
13.def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
14.}
TableScan。這種方式需要將1TB的數(shù)據(jù)從數(shù)據(jù)抽取,再把這些數(shù)據(jù)傳到Spark中。在把這1TB的數(shù)據(jù)穿過網(wǎng)絡(luò)IO傳給Spark端之后,Spark還要逐行的進(jìn)行過濾,從而消耗大量的計(jì)算資源,這是目前最低效的方式。
PrunedScan。這個(gè)方式有一個(gè)好處是數(shù)據(jù)源只需要從磁盤讀取1TB的數(shù)據(jù),并只返回一些列的數(shù)據(jù),Spark不需要計(jì)算就可以使用1GB的數(shù)據(jù),這個(gè)過程中節(jié)省了大量的網(wǎng)絡(luò)IO。
PrunedFilteredScan。它需要數(shù)據(jù)源既支持列過濾也支持行過濾,其好處是在磁盤IO這一層進(jìn)行數(shù)據(jù)過濾,因此如果需要1GB數(shù)據(jù),可能只抽出2GB大小,經(jīng)過列過濾的規(guī)則再抽出1GB的數(shù)據(jù),隨后傳給Spark,因此這種數(shù)據(jù)源接口最高效,這也是目前市面上實(shí)現(xiàn)的最高效的數(shù)據(jù)接口。
可直接使用的DataSource實(shí)現(xiàn)
目前市面上可以找到的Spark DataSource實(shí)現(xiàn)代碼有三大類:Spark自帶;Spark Packages(http://Spark-packages.org/)網(wǎng)站中存放的第三方軟件包;跟隨其他項(xiàng)目一同發(fā)布的內(nèi)置的Spark的實(shí)現(xiàn)。這里介紹其中幾個(gè):
1.JDBCRelation
1.private[sql] case class JDBCRelation(
2.url: String,
3.table: String,
4.parts: Array[Partition],
5.properties: Properties = new Properties())(@transient val sqlContext: SQLContext)
6.extends BaseRelation
7.with PrunedFilteredScan
8.with InsertableRelation {
9…。
10.}
以JDBC方式連接外部數(shù)據(jù)源在國內(nèi)十分流行,Spark也內(nèi)置了最高效的PrunedFilteredScan接口,同時(shí)還實(shí)現(xiàn)了數(shù)據(jù)插入的接口,使用起來非常方便,可以方便地把數(shù)據(jù)庫中的表用到Spark。以Postgres為例:
1.sqlContext.read.jdbc(
2.“jdbc:postgresql://testhost:7531/testdb”,
3.“testTable”,
4.“idField”, ——-索引列
5.10000, ——-起始index
6.1000000, ——-結(jié)束index
7.10, ——-partition數(shù)量
8.new Properties
9.).registerTempTable(“testTable”)
實(shí)現(xiàn)機(jī)制:默認(rèn)使用單個(gè)Task從遠(yuǎn)端數(shù)據(jù)庫讀取數(shù)據(jù),如果設(shè)定了partitionColumn、lowerBound、upperBound、numPartitions這4個(gè)參數(shù),那么還可以控制Spark把針對這個(gè)數(shù)據(jù)源的訪問任務(wù)進(jìn)行拆分,得到numPartitions個(gè)任務(wù),每個(gè)Executor收到任務(wù)之后會并發(fā)的去連接數(shù)據(jù)庫的Server讀取數(shù)據(jù)。
具體類型:PostgreSQL, MySQL。
問題:在實(shí)際使用中需要注意一個(gè)問題,所有的Spark都會并發(fā)連接一個(gè)Server,并發(fā)過高時(shí)可能會對數(shù)據(jù)庫造成較大的沖擊(對于MPP等新型的關(guān)系型數(shù)據(jù)庫還好)。
建議:個(gè)人感覺,JDBC的數(shù)據(jù)源適合從MPP等分布式數(shù)據(jù)庫中讀取數(shù)據(jù),對于傳統(tǒng)意義上單機(jī)的數(shù)據(jù)庫建議只處理一些相對較小的數(shù)據(jù)。
2.HadoopFsRelation
第二個(gè)在Spark內(nèi)置的數(shù)據(jù)源實(shí)現(xiàn),HadoopFs,也是實(shí)現(xiàn)中最高效的PrunedFilteredScan接口,使用起來相對來說比JDBC更方便。
1.sqlContext
2..read
3..parquet(“hdfs://testFS/testPath”)
4..registerTempTable(“test”)
實(shí)現(xiàn)機(jī)制:執(zhí)行的時(shí)候Spark在Driver端會直接獲取列表,根據(jù)文件的格式類型和壓縮方式生成多個(gè)TASK,再把這些TASK分配下去。Executor端會根據(jù)文件列表訪問,這種方式訪問HDFS不會出現(xiàn)IO集中的地方,所以具備很好的擴(kuò)展性,可以處理相當(dāng)大規(guī)模的數(shù)據(jù)。
具體類型:ORC,Parquet,JSon。
問題:在實(shí)時(shí)場景下如果使用HDFS作為數(shù)據(jù)輸出的數(shù)據(jù)源,在寫數(shù)據(jù)就會產(chǎn)生非常大量零散的數(shù)據(jù),在HDFS上積累大量的零碎文件,就會帶來很大的壓力,后續(xù)處理這些小文件的時(shí)候也非常頭疼。
建議:這種方式適合離線數(shù)據(jù)處理程序輸入和輸出數(shù)據(jù),還有一些數(shù)據(jù)處理Pipeline中的臨時(shí)數(shù)據(jù),數(shù)據(jù)量比較大,可以臨時(shí)放在HDFS。實(shí)時(shí)場景下不推薦使用HDFS作為數(shù)據(jù)輸出。
3.ElasticSearch
越來越多的互聯(lián)網(wǎng)公司開始使用ELK(ElasticSearch+LogStash+Kibana)作為基礎(chǔ)數(shù)據(jù)分析查詢的工具,但是有多少人知道其實(shí)ElasticSearch也支持在Spark中掛載為一個(gè)DataSource進(jìn)行查詢呢?
1.EsSparkSQL
2..esDF(hc,indexName,esQuery)
3..registerTempTable(”testTable”)
實(shí)現(xiàn)機(jī)制:ES DataSource的實(shí)現(xiàn)機(jī)制是通過對esQuery進(jìn)行解析,將實(shí)際要發(fā)往多個(gè)ES Nodes的請求分為多個(gè)Task,在每個(gè)Executor上并行執(zhí)行。
非常好我支持^.^
(0) 0%
不好我反對
(0) 0%
下載地址
如何使用Apache Spark中的DataSource API以實(shí)現(xiàn)數(shù)據(jù)源混合計(jì)算的實(shí)踐下載
相關(guān)電子資料下載
- DeepSpark 開源社區(qū)百大應(yīng)用開放平臺23.09版本正式發(fā)布 51
- RT-Thread SPARK CAN的通信內(nèi)核詳解 334
- Spark Connected與英飛凌面向市場推出Yeti 的500 W無線充電解決方案 315
- NVIDIA 攜手騰訊開發(fā)和優(yōu)化 Spark UCX 實(shí)現(xiàn)性能躍升 224
- 基于RT-SPARK 1的物聯(lián)網(wǎng)-溫濕度報(bào)警器設(shè)計(jì)方案 239
- 一種基于STM32F407-RT-SPARK開發(fā)板的智能花盆設(shè)計(jì)案例 1297
- ?DeepSpark 開源社區(qū)百大應(yīng)用開放平臺23.06版本正式發(fā)布 212
- 傳音移動互聯(lián)DataSparkle為非洲數(shù)字經(jīng)濟(jì)研究提供數(shù)據(jù)支撐 141
- 為Spark ML算法提供GPU加速度 337
- Spark 3.4用于分布式模型訓(xùn)練和大規(guī)模模型推理 349