您好,歡迎來電子發(fā)燒友網(wǎng)! ,新用戶?[免費(fèi)注冊]

您的位置:電子發(fā)燒友網(wǎng)>源碼下載>數(shù)值算法/人工智能>

如何使用Apache Spark中的DataSource API以實(shí)現(xiàn)數(shù)據(jù)源混合計(jì)算的實(shí)踐

大?。?/span>0.6 MB 人氣: 2017-10-10 需要積分:1

  本文主要介紹如何使用Apache Spark中的DataSource API以實(shí)現(xiàn)多個(gè)數(shù)據(jù)源混合計(jì)算的實(shí)踐,那么這么做的意義何在,其主要?dú)w結(jié)于3個(gè)方面:

  首先,我們身邊存在大量的數(shù)據(jù),結(jié)構(gòu)化、非結(jié)構(gòu)化,各種各樣的數(shù)據(jù)結(jié)構(gòu)、格局格式,這種數(shù)據(jù)的多樣性本身即是大數(shù)據(jù)的特性之一,從而也決定了一種存儲方式不可能通吃所有。因此,數(shù)據(jù)本身決定了多種數(shù)據(jù)源存在的必然性。 其次:從業(yè)務(wù)需求來看,因?yàn)槊刻鞎_發(fā)各種各樣的應(yīng)用系統(tǒng),應(yīng)用系統(tǒng)中所遇到的業(yè)務(wù)場景是互不相同的,各種各樣的需求決定了目前市面上不可能有一種軟件架構(gòu)同時(shí)能夠解決這么多種業(yè)務(wù)場景,所以在數(shù)據(jù)存儲包括數(shù)據(jù)查詢、計(jì)算這一塊也不可能只有一種技術(shù)就能解決所有問題。最后,從軟件的發(fā)展來看,現(xiàn)在市面上出現(xiàn)了越來越多面對某一個(gè)細(xì)分領(lǐng)域的軟件技術(shù),比如像數(shù)據(jù)存儲、查詢搜索引擎,MPP數(shù)據(jù)庫,以及各種各樣的查詢引擎。這么多不同的軟件中,每一個(gè)軟件都相對擅長處理某一個(gè)領(lǐng)域的業(yè)務(wù)場景,只是涉及的領(lǐng)域大小不相同。因此,越來越多軟件的產(chǎn)生也決定了我們所接受的數(shù)據(jù)會存儲到越來越多不同的數(shù)據(jù)源。

  Apache Spark的多數(shù)據(jù)源方案

  傳統(tǒng)方案中,實(shí)現(xiàn)多數(shù)據(jù)源通常有兩種方案:冗余存儲,一份業(yè)務(wù)數(shù)據(jù)有多個(gè)存儲,或者內(nèi)部互相引用;集中的計(jì)算,不同的數(shù)據(jù)使用不同存儲,但是會在統(tǒng)一的地方集中計(jì)算,算的時(shí)候把這些數(shù)據(jù)從不同位置讀取出來。下面一起討論這兩種解決方案中存在的問題:

  如何使用Apache Spark中的DataSource API以實(shí)現(xiàn)數(shù)據(jù)源混合計(jì)算的實(shí)踐

  圖1 多數(shù)據(jù)源方案

  第一種方案中存在的一個(gè)問題是數(shù)據(jù)一致性,一樣的數(shù)據(jù)放在不同的存儲里面或多或少會有格式上的不兼容,或者查詢的差異,從而導(dǎo)致從不同位置查詢的數(shù)據(jù)可能出現(xiàn)不一致。比如有兩個(gè)報(bào)表相同的指標(biāo),但是因?yàn)槭欠旁诓煌鎯锊槌鰜淼慕Y(jié)果對不上,這點(diǎn)非常致命。第二個(gè)問題是存儲的成本,隨著存儲成本越來越低,這點(diǎn)倒是容易解決。

  第二種方案也存在兩個(gè)問題,其一是不同存儲出來的數(shù)據(jù)類型不同,從而在計(jì)算時(shí)需求相互轉(zhuǎn)換,因此如何轉(zhuǎn)換至關(guān)重要。第二個(gè)問題是讀取效率,需要高性能的數(shù)據(jù)抽取機(jī)制,盡量避免從遠(yuǎn)端讀取不必要的數(shù)據(jù),并且需要保證一定的并發(fā)性。

  Spark在1.2.0版本首次發(fā)布了一個(gè)新的DataSourceAPI,這個(gè)API提供了非常靈活的方案,讓Spark可以通過一個(gè)標(biāo)準(zhǔn)的接口訪問各種外部數(shù)據(jù)源,目標(biāo)是讓Spark各個(gè)組件以非常方便的通過SparkSQL訪問外部數(shù)據(jù)源。很顯然,Spark的DataSourceAPI其采用的是方案二,那么它是如何解決其中那個(gè)的問題的呢?

  如何使用Apache Spark中的DataSource API以實(shí)現(xiàn)數(shù)據(jù)源混合計(jì)算的實(shí)踐

  圖2 External Datasource API

  首先,數(shù)據(jù)類型轉(zhuǎn)換,Spark中定義了一個(gè)統(tǒng)一的數(shù)據(jù)類型標(biāo)準(zhǔn),不同的數(shù)據(jù)源自己定義數(shù)據(jù)類型的轉(zhuǎn)換方法,這樣解決數(shù)據(jù)源之間相互類型轉(zhuǎn)換的問題;

  關(guān)于數(shù)據(jù)處理效率的問題,Spark定義了一個(gè)比較簡單的API的接口,主要有3個(gè)方式:

  1./* 全量數(shù)據(jù)抽取 */

  2.trait TableScan {

  3.def buildScan(): RDD[Row]

  4.}

  5.

  6./* 列剪枝數(shù)據(jù)抽取 */

  7.trait PrunedScan {

  8.def buildScan(requiredColumns: Array[String]): RDD[Row]

  9.}

  10.

  11./* 列剪枝+行過濾數(shù)據(jù)抽取 */

  12.trait PrunedFilteredScan {

  13.def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]

  14.}

  TableScan。這種方式需要將1TB的數(shù)據(jù)從數(shù)據(jù)抽取,再把這些數(shù)據(jù)傳到Spark中。在把這1TB的數(shù)據(jù)穿過網(wǎng)絡(luò)IO傳給Spark端之后,Spark還要逐行的進(jìn)行過濾,從而消耗大量的計(jì)算資源,這是目前最低效的方式。

  PrunedScan。這個(gè)方式有一個(gè)好處是數(shù)據(jù)源只需要從磁盤讀取1TB的數(shù)據(jù),并只返回一些列的數(shù)據(jù),Spark不需要計(jì)算就可以使用1GB的數(shù)據(jù),這個(gè)過程中節(jié)省了大量的網(wǎng)絡(luò)IO。

  PrunedFilteredScan。它需要數(shù)據(jù)源既支持列過濾也支持行過濾,其好處是在磁盤IO這一層進(jìn)行數(shù)據(jù)過濾,因此如果需要1GB數(shù)據(jù),可能只抽出2GB大小,經(jīng)過列過濾的規(guī)則再抽出1GB的數(shù)據(jù),隨后傳給Spark,因此這種數(shù)據(jù)源接口最高效,這也是目前市面上實(shí)現(xiàn)的最高效的數(shù)據(jù)接口。

  可直接使用的DataSource實(shí)現(xiàn)

  目前市面上可以找到的Spark DataSource實(shí)現(xiàn)代碼有三大類:Spark自帶;Spark Packages(http://Spark-packages.org/)網(wǎng)站中存放的第三方軟件包;跟隨其他項(xiàng)目一同發(fā)布的內(nèi)置的Spark的實(shí)現(xiàn)。這里介紹其中幾個(gè):

  1.JDBCRelation

  1.private[sql] case class JDBCRelation(

  2.url: String,

  3.table: String,

  4.parts: Array[Partition],

  5.properties: Properties = new Properties())(@transient val sqlContext: SQLContext)

  6.extends BaseRelation

  7.with PrunedFilteredScan

  8.with InsertableRelation {

  9…。

  10.}

  以JDBC方式連接外部數(shù)據(jù)源在國內(nèi)十分流行,Spark也內(nèi)置了最高效的PrunedFilteredScan接口,同時(shí)還實(shí)現(xiàn)了數(shù)據(jù)插入的接口,使用起來非常方便,可以方便地把數(shù)據(jù)庫中的表用到Spark。以Postgres為例:

  1.sqlContext.read.jdbc(

  2.“jdbc:postgresql://testhost:7531/testdb”,

  3.“testTable”,

  4.“idField”, ——-索引列

  5.10000, ——-起始index

  6.1000000, ——-結(jié)束index

  7.10, ——-partition數(shù)量

  8.new Properties

  9.).registerTempTable(“testTable”)

  實(shí)現(xiàn)機(jī)制:默認(rèn)使用單個(gè)Task從遠(yuǎn)端數(shù)據(jù)庫讀取數(shù)據(jù),如果設(shè)定了partitionColumn、lowerBound、upperBound、numPartitions這4個(gè)參數(shù),那么還可以控制Spark把針對這個(gè)數(shù)據(jù)源的訪問任務(wù)進(jìn)行拆分,得到numPartitions個(gè)任務(wù),每個(gè)Executor收到任務(wù)之后會并發(fā)的去連接數(shù)據(jù)庫的Server讀取數(shù)據(jù)。

  具體類型:PostgreSQL, MySQL。

  問題:在實(shí)際使用中需要注意一個(gè)問題,所有的Spark都會并發(fā)連接一個(gè)Server,并發(fā)過高時(shí)可能會對數(shù)據(jù)庫造成較大的沖擊(對于MPP等新型的關(guān)系型數(shù)據(jù)庫還好)。

  建議:個(gè)人感覺,JDBC的數(shù)據(jù)源適合從MPP等分布式數(shù)據(jù)庫中讀取數(shù)據(jù),對于傳統(tǒng)意義上單機(jī)的數(shù)據(jù)庫建議只處理一些相對較小的數(shù)據(jù)。

  2.HadoopFsRelation

  第二個(gè)在Spark內(nèi)置的數(shù)據(jù)源實(shí)現(xiàn),HadoopFs,也是實(shí)現(xiàn)中最高效的PrunedFilteredScan接口,使用起來相對來說比JDBC更方便。

  1.sqlContext

  2..read

  3..parquet(“hdfs://testFS/testPath”)

  4..registerTempTable(“test”)

  實(shí)現(xiàn)機(jī)制:執(zhí)行的時(shí)候Spark在Driver端會直接獲取列表,根據(jù)文件的格式類型和壓縮方式生成多個(gè)TASK,再把這些TASK分配下去。Executor端會根據(jù)文件列表訪問,這種方式訪問HDFS不會出現(xiàn)IO集中的地方,所以具備很好的擴(kuò)展性,可以處理相當(dāng)大規(guī)模的數(shù)據(jù)。

  具體類型:ORC,Parquet,JSon。

  問題:在實(shí)時(shí)場景下如果使用HDFS作為數(shù)據(jù)輸出的數(shù)據(jù)源,在寫數(shù)據(jù)就會產(chǎn)生非常大量零散的數(shù)據(jù),在HDFS上積累大量的零碎文件,就會帶來很大的壓力,后續(xù)處理這些小文件的時(shí)候也非常頭疼。

  建議:這種方式適合離線數(shù)據(jù)處理程序輸入和輸出數(shù)據(jù),還有一些數(shù)據(jù)處理Pipeline中的臨時(shí)數(shù)據(jù),數(shù)據(jù)量比較大,可以臨時(shí)放在HDFS。實(shí)時(shí)場景下不推薦使用HDFS作為數(shù)據(jù)輸出。

  3.ElasticSearch

  越來越多的互聯(lián)網(wǎng)公司開始使用ELK(ElasticSearch+LogStash+Kibana)作為基礎(chǔ)數(shù)據(jù)分析查詢的工具,但是有多少人知道其實(shí)ElasticSearch也支持在Spark中掛載為一個(gè)DataSource進(jìn)行查詢呢?

  1.EsSparkSQL

  2..esDF(hc,indexName,esQuery)

  3..registerTempTable(”testTable”)

  實(shí)現(xiàn)機(jī)制:ES DataSource的實(shí)現(xiàn)機(jī)制是通過對esQuery進(jìn)行解析,將實(shí)際要發(fā)往多個(gè)ES Nodes的請求分為多個(gè)Task,在每個(gè)Executor上并行執(zhí)行。

非常好我支持^.^

(0) 0%

不好我反對

(0) 0%

      發(fā)表評論

      用戶評論
      評價(jià):好評中評差評

      發(fā)表評論,獲取積分! 請遵守相關(guān)規(guī)定!

      ?