0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內(nèi)不再提示

實時數(shù)據(jù)體系建設的總體方案的三部分

西西 ? 來源:CRM咨詢 ? 作者:CRM咨詢 ? 2020-05-31 11:02 ? 次閱讀

隨著互聯(lián)網(wǎng)的發(fā)展進入下半場,數(shù)據(jù)的時效性對企業(yè)的精細化運營越來越重要, 商場如戰(zhàn)場,在每天產(chǎn)生的海量數(shù)據(jù)中,如何能實時有效的挖掘出有價值的信息, 對企業(yè)的決策運營策略調(diào)整有很大幫助。

此外,隨著 5G 技術的成熟、廣泛應用, 對于工業(yè)互聯(lián)網(wǎng)、物聯(lián)網(wǎng)等數(shù)據(jù)時效性要求非常高的行業(yè),企業(yè)就更需要一套完整成熟的實時數(shù)據(jù)體系來提高自身的行業(yè)競爭力。

本文從上述現(xiàn)狀及實時數(shù)據(jù)需求出發(fā),結合工業(yè)界案例、筆者的實時數(shù)據(jù)開發(fā)經(jīng)驗, 梳理總結了實時數(shù)據(jù)體系建設的總體方案,本文主要分為三個部分:

第一部分主要介紹了當下在工業(yè)界比較火熱的實時計算引擎 Flink 在實時數(shù)據(jù)體系建設過程中主要的應用場景及對應解決方案;

第二部分從實時數(shù)據(jù)體系架構、實時數(shù)據(jù)模型分層、實時數(shù)據(jù)體系建設方式、流批一體實時數(shù)據(jù)架構發(fā)展等四個方面思考了實時數(shù)據(jù)體系的建設方案;

第三部分則以一個具體案例介紹如何使用 Flink SQL 完成實時數(shù)據(jù)統(tǒng)計類需求。

一、Flink 實時應用場景

目前看來,F(xiàn)link 在實時計算領域內(nèi)的主要應用場景主要可分為四類場景, 分別是實時數(shù)據(jù)同步、流式 ETL、實時數(shù)據(jù)分析和復雜事件處理,具體的業(yè)務場景和對應的解決方案可詳細研究下圖, 文字層面不再詳述。

二、實時數(shù)據(jù)體系架構

實時數(shù)據(jù)體系大致分為三類場景:流量類、業(yè)務類和特征類,這三種場景各有不同。

在數(shù)據(jù)模型上,流量類是扁平化的寬表,業(yè)務數(shù)倉更多是基于范式的建模,特征數(shù)據(jù)是 KV 存儲;

從數(shù)據(jù)來源區(qū)分,流量數(shù)倉的數(shù)據(jù)來源一般是日志數(shù)據(jù),業(yè)務數(shù)倉的數(shù)據(jù)來源是業(yè)務 binlog 數(shù)據(jù),特征數(shù)倉的數(shù)據(jù)來源則多種多樣;

從數(shù)據(jù)量而言,流量和特征數(shù)倉都是海量數(shù)據(jù),每天十億級以上,而業(yè)務數(shù)倉的數(shù)據(jù)量一般每天百萬到千萬級;

從數(shù)據(jù)更新頻率而言,流量數(shù)據(jù)極少更新,則業(yè)務和特征數(shù)據(jù)更新較多,流量數(shù)據(jù)一般關注時序和趨勢,業(yè)務數(shù)據(jù)和特征數(shù)據(jù)關注狀態(tài)變更;

在數(shù)據(jù)準確性上,流量數(shù)據(jù)要求較低,而業(yè)務數(shù)據(jù)和特征數(shù)據(jù)要求較高。

1、實時數(shù)據(jù)體系整體架構

整個實時數(shù)據(jù)體系架構分為五層,分別是接入層,存儲層,計算層、平臺層和應用層,上圖只是整體架構的概要圖,每一層具體要做的事情,接下來通過文字來詳述。

1)接入層:該層利用各種數(shù)據(jù)接入工具收集各個系統(tǒng)的數(shù)據(jù),包括 binlog 日志、埋點日志、以及后端服務日志,數(shù)據(jù)會被收集到 Kafka 中;這些數(shù)據(jù)不只是參與實時計算,也會參與離線計算,保證實時和離線的原始數(shù)據(jù)是統(tǒng)一的;

2)存儲層:該層對原始數(shù)據(jù)、清洗關聯(lián)后的明細數(shù)據(jù)進行存儲,基于統(tǒng)一的實時數(shù)據(jù)模型分層理念,將不同應用場景的數(shù)據(jù)分別存儲在 Kafka、HDFS、Kudu、 Clickhouse、Hbase、Redis、Mysql 等存儲引擎中,各種存儲引擎存放的具體的數(shù)據(jù)類型在實時數(shù)據(jù)模型分層部分會詳細介紹;

3)計算層:計算層主要使用 Flink、Spark、Presto 以及 ClickHouse 自帶的計算能力等四種計算引擎,F(xiàn)link 計算引擎主要用于實時數(shù)據(jù)同步、 流式 ETL、關鍵系統(tǒng)秒級實時指標計算場景,Spark SQL 主要用于復雜多維分析的準實時指標計算需求場景,Presto 和 ClickHouse 主要滿足多維自助分析、對查詢響應時間要求不太高的場景;

4)平臺層:在平臺層主要做三個方面的工作,分別是對外提供統(tǒng)一查詢服務、元數(shù)據(jù)及指標管理、數(shù)據(jù)質(zhì)量及血緣;

5)應用層:以統(tǒng)一查詢服務對各個業(yè)務線數(shù)據(jù)場景進行支持,業(yè)務主要包括實時大屏、實時數(shù)據(jù)產(chǎn)品、實時 OLAP、實時特征等。

其中,平臺層詳細工作如下:

統(tǒng)一查詢服務支持從底層明細數(shù)據(jù)到聚合層數(shù)據(jù)的查詢,支持以SQL化方式查詢Redis、Hbase等KV存儲中的數(shù)據(jù);

元數(shù)據(jù)及指標管理:主要對實時的Kafka表、Kudu表、Clickhouse表、Hive表等進行統(tǒng)一管理,以數(shù)倉模型中表的命名方式規(guī)范表的命名,明確每張表的字段含義、使用方,指標管理則是盡量通過指標管理系統(tǒng)將所有的實時指標統(tǒng)一管理起來,明確計算口徑,提供給不同的業(yè)務方使用;

數(shù)據(jù)質(zhì)量及血緣分析:數(shù)據(jù)質(zhì)量分為平臺監(jiān)控和數(shù)據(jù)監(jiān)控兩個部分,血緣分析則主要是對實時數(shù)據(jù)依賴關系、實時任務的依賴關系進行分析。

平臺監(jiān)控部分一是對任務運行狀態(tài)進行監(jiān)控,對異常的任務進行報警并根據(jù)設定的參數(shù)對任務進行自動拉起與恢復,二是針對 Flink 任務要對 Kafka 消費處理延遲進行監(jiān)控并實時報警。

數(shù)據(jù)監(jiān)控則分為兩個部分:

首先流式 ETL 是整個實時數(shù)據(jù)流轉(zhuǎn)過程中重要的一環(huán),ETL 的過程中會關聯(lián)各種維表,實時關聯(lián)時,定時對沒有關聯(lián)上的記錄上報異常日志到監(jiān)控平臺,當數(shù)量達到一定閾值時觸發(fā)報警;

其次,部分關鍵實時指標采用了 lambda 架構,因此需要對歷史的實時指標與離線 hive 計算的數(shù)據(jù)定時做對比,提供實時數(shù)據(jù)的數(shù)據(jù)質(zhì)量監(jiān)控,對超過閾值的指標數(shù)據(jù)進行報警。

為了配合數(shù)據(jù)監(jiān)控,需要做實時數(shù)據(jù)血緣,主要是梳理實時數(shù)據(jù)體系中數(shù)據(jù)依賴關系,以及實時任務的依賴關系,從底層ODS 到 DW 再到 DM,以及 DM 層被哪些模型用到, 將整個鏈條串聯(lián)起來,這樣做在數(shù)據(jù)/任務主動調(diào)整時可以通知關聯(lián)的下游,指標異常時借助血緣定位問題,同時基于血緣關系的分析,我們也能評估數(shù)據(jù)的應用價值,核算數(shù)據(jù)的計算成本。

2、實時數(shù)據(jù)模型分層

離線數(shù)倉考慮到效率問題,一般會采取空間換時間的方式,層級劃分會比較多;實時數(shù)倉考慮到實時性問題,分層則越少越好,另外也減少了中間流程出錯的可能性,因此將其分為四層。

1)ODS 層

操作數(shù)據(jù)層,保存原始數(shù)據(jù),對非結構化的數(shù)據(jù)進行結構化處理,輕度清洗,幾乎不刪除原始數(shù)據(jù)。

該層的數(shù)據(jù)主要來自業(yè)務數(shù)據(jù)庫的 binlog 日志、埋點日志和應用程序日志。

對于 binlog 日志通過 canal 監(jiān)聽,寫到消息隊列 Kafka 中,對應于埋點和應用程序日志,則通過 Filebeat 采集 nginx 和 tomcat 日志,上報到Kafka 中。

除了存儲在 Kafka 中,同時也會對業(yè)務數(shù)據(jù)庫的 binlog 日志通過 Flink 寫入 HDFS、Kudu 等存儲引擎,落地到 5min Hive 表,供查詢明細數(shù)據(jù),同時也提供給離線數(shù)倉,做為其原始數(shù)據(jù);另外,對于埋點日志數(shù)據(jù),由于 ODS 層是非結構化的,則沒有必要落地。

2)DWD 層

實時明細數(shù)據(jù)層,以業(yè)務過程作為建模驅(qū)動,基于每個具體的業(yè)務過程特點,構建最細粒度的明細層事實表;可以結合企業(yè)的數(shù)據(jù)使用特點,將明細事實表的某些重要維度屬性字段做適當冗余,也即寬表化處理。

該層的數(shù)據(jù)來源于 ODS 層,通過簡單的 Streaming ETL 后得到,對于 binlog 日志的處理主要進行簡單的數(shù)據(jù)清洗、處理數(shù)據(jù)漂移,以及可能對多個 ODS 層的表進行 Streaming Join,對流量日志主要是做一些通用ETL 處理,將非結構化的數(shù)據(jù)結構化,關聯(lián)通用的維度字段。

該層的數(shù)據(jù)存儲在消息隊列 Kafka 中,同時也會用 Flink 實時寫入 Hive 5min 表,供查詢明細數(shù)據(jù),同時要提供給離線數(shù)倉,做為其原始數(shù)據(jù)。

3)DIM 層

公共維度層,基于維度建模理念思想,建立整個業(yè)務過程的一致性維度,降低數(shù)據(jù)計算口徑和算法不統(tǒng)一風險。

DIM 層數(shù)據(jù)來源于兩部分:一部分是Flink程序?qū)崟r處理ODS層數(shù)據(jù)得到,另外一部分是通過離線任務出倉得到。

DIM 層維度數(shù)據(jù)主要使用 MySQL、Hbase、Redis 三種存儲引擎,對于維表數(shù)據(jù)比較少的情況可以使用 MySQL,對于單條數(shù)據(jù)大小比較小,查詢 QPS 比較高的情況,可以使用 Redis 存儲,降低機器內(nèi)存資源占用,對于數(shù)據(jù)量比較大,對維表數(shù)據(jù)變化不是特別敏感的場景,可以使用HBase 存儲。

4)DM 層

①數(shù)據(jù)集市層

以數(shù)據(jù)域+業(yè)務域的理念建設公共匯總層,對于DM層比較復雜,需要綜合考慮對于數(shù)據(jù)落地的要求以及具體的查詢引擎來選擇不同的存儲方式,分為輕度匯總層和高度匯總層,同時產(chǎn)出,高度匯總層數(shù)據(jù)用于前端比較簡單的KV查詢, 提升查詢性能,比如實時大屏,實時報表等,數(shù)據(jù)的時效性要求為秒級,輕度匯總層Kafka中寬表實時寫入OLAP存儲引擎,用于前端產(chǎn)品復雜的OLAP查詢場景,滿足自助分析和產(chǎn)出復雜報表的需求,對數(shù)據(jù)的時效性要求可容忍到分鐘級;

②輕度匯總層

輕度匯總層由明細層通過Streaming ETL得到,主要以寬表的形式存在,業(yè)務明細匯總是由業(yè)務事實明細表和維度表join得到,流量明細匯總是由流量日志按業(yè)務線拆分和維度表join得到。

輕度匯總層數(shù)據(jù)存儲比較多樣化,首先利用Flink實時消費DWD層Kafka中明細數(shù)據(jù)join業(yè)務過程需要的維表,實時打?qū)捄髮懭朐搶拥腒afka中,以Json或PB格式存儲。

同時對多維業(yè)務明細匯總數(shù)據(jù)通過Flink實時寫入Kudu,用于查詢明細數(shù)據(jù)和更復雜的多維數(shù)據(jù)分析需求,對于流量數(shù)據(jù)通過Flink分別寫入HDFS和ClickHouse用于復雜的多維數(shù)據(jù)分析, 實時特征數(shù)據(jù)則通過Flink join維表后實時寫入HDFS,用于下游的離線ETL消費。

對于落地Kudu和HDFS的寬表數(shù)據(jù),可用Spark SQL做分鐘級的預計算,滿足業(yè)務方復雜數(shù)據(jù)分析需求,提供分鐘級延遲的數(shù)據(jù),從而加速離線ETL過程的延遲, 另外隨著Flink SQL與Hive生態(tài)集成的不斷完善,可嘗試用Flink SQL做離線ETL和OLAP計算任務(Flink流計算基于內(nèi)存計算的特性,和presto非常類似,這使其也可以成為一個OLAP計算引擎),用一套計算引擎解決實時離線需求,從而實現(xiàn)批流統(tǒng)一。

對于Kudu中的業(yè)務明細數(shù)據(jù)、ClickHouse中的流量明細數(shù)據(jù),也可以滿足業(yè)務方的個性化數(shù)據(jù)分析需求,利用強大的OLAP計算引擎,實時查詢明細數(shù)據(jù),在10s量級的響應時間內(nèi)給出結果,這類需求也即是實時OLAP需求,靈活性比較高。

③高度匯總層

高度匯總層由明細數(shù)據(jù)層或輕度匯總層通過聚合計算后寫入到存儲引擎中,產(chǎn)出一部分實時數(shù)據(jù)指標需求,靈活性比較差。

計算引擎使用Flink Datastream API和Flink SQL,指標存儲引擎根據(jù)不同的需求,對于常見的簡單指標匯總模型可直接放在MySQL里面,維度比較多的、寫入更新比較大的模型會放在HBase里面, 還有一種是需要做排序、對查詢QPS、響應時間要求非常高、且不需要持久化存儲如大促活動期間在線TopN商品等直接存儲在Redis里面。

在秒級指標需求中,需要混用Lambda和Kappa架構,大部分實時指標使用Kappa架構完成計算,少量關鍵指標(如金額相關)使用Lambda架構用批處理重新處理計算,增加一次校對過程。

總體來說 DM 層對外提供三種時效性的數(shù)據(jù):

首先是 Flink 等實時計算引擎預計算好的秒級實時指標,這種需求對數(shù)據(jù)的時效性要求非常高,用于實時大屏、計算維度不復雜的實時報表需求。

其次是 Spark SQL 預計算的延遲在分鐘級的準實時指標, 該類指標滿足一些比較復雜但對數(shù)據(jù)時效性要求不太高的數(shù)據(jù)分析場景,可能會涉及到多個事實表的join,如銷售歸因等需求。

最后一種則是不需要預計算,ad-hoc查詢的復雜多維數(shù)據(jù)分析場景,此類需求比較個性化,靈活性比較高,如果 OLAP 計算引擎性能足夠強大,也可完全滿足秒級計算需求的場景; 對外提供的秒級實時數(shù)據(jù)和另外兩種準實時數(shù)據(jù)的比例大致為 3:7,絕大多數(shù)的業(yè)務需求都優(yōu)先考慮準實時計算或 ad-hoc 方式,可以降低資源使用、提升數(shù)據(jù)準確性,以更靈活的方式滿足復雜的業(yè)務場景。

3、實時數(shù)據(jù)體系建設方式

整個實時數(shù)據(jù)體系分為兩種建設方式,即實時和準實時(它們的實現(xiàn)方式分別是基于流計算引擎和 ETL、OLAP 引擎,數(shù)據(jù)時效性則分別是秒級和分鐘級。

1)在調(diào)度開銷方面,準實時數(shù)據(jù)是批處理過程,因此仍然需要調(diào)度系統(tǒng)支持,調(diào)度頻率較高,而實時數(shù)據(jù)卻沒有調(diào)度開銷。

2)在業(yè)務靈活性方面,因為準實時數(shù)據(jù)是基于 ETL 或 OLAP 引擎實現(xiàn),靈活性優(yōu)于基于流計算的方式。

3)在對數(shù)據(jù)晚到的容忍度方面,因為準實時數(shù)據(jù)可以基于一個周期內(nèi)的數(shù)據(jù)進行全量計算,因此對于數(shù)據(jù)晚到的容忍度也是比較高的,而實時數(shù)據(jù)使用的是增量計算,對于數(shù)據(jù)晚到的容忍度更低一些。

4)在適用場景方面,準實時數(shù)據(jù)主要用于有實時性要求但不太高、涉及多表關聯(lián)和業(yè)務變更頻繁的場景,如交易類型的實時分析,實時數(shù)據(jù)則更適用于實時性要求高、數(shù)據(jù)量大的場景,如實時特征、流量類型實時分析等場景。

4、流批一體實時數(shù)據(jù)架構發(fā)展

從1990年 Inmon 提出數(shù)據(jù)倉庫概念到今天,大數(shù)據(jù)架構經(jīng)歷了從最初的離線大數(shù)據(jù)架構、Lambda 架構、Kappa 架構以及 Flink 的火熱帶出的流批一體架構,數(shù)據(jù)架構技術不斷演進,本質(zhì)是在往流批一體的方向發(fā)展,讓用戶能以最自然、最小的成本完成實時計算。

1)離線大數(shù)據(jù)架構:數(shù)據(jù)源通過離線的方式導入到離線數(shù)倉中,下游應用根據(jù)業(yè)務需求選擇直接讀取 DM 或加一層數(shù)據(jù)服務,比如 MySQL 或 Redis,數(shù)據(jù)存儲引擎是 HDFS/Hive,ETL 工具可以是 MapReduce 腳本或 HiveSQL。數(shù)據(jù)倉庫從模型層面分為操作數(shù)據(jù)層 ODS、數(shù)據(jù)倉庫明細層 DWD、數(shù)據(jù)集市層 DM。

2)Lambda 架構:隨著大數(shù)據(jù)應用的發(fā)展,人們逐漸對系統(tǒng)的實時性提出了要求,為了計算一些實時指標,就在原來離線數(shù)倉的基礎上增加了一個實時計算的鏈路,并對數(shù)據(jù)源做流式改造(即把數(shù)據(jù)發(fā)送到消息隊列),實時計算去訂閱消息隊列,直接完成指標增量的計算,推送到下游的數(shù)據(jù)服務中去,由數(shù)據(jù)服務層完成離線&實時結果的合并。

3)Kappa 架構:Lambda 架構雖然滿足了實時的需求,但帶來了更多的開發(fā)與運維工作,其架構背景是流處理引擎還不完善,流處理的結果只作為臨時的、近似的值提供參考。后來隨著 Flink 等流處理引擎的出現(xiàn),流處理技術成熟起來,這時為了解決兩套代碼的問題,LickedIn 的 Jay Kreps 提出了 Kappa 架構。

4)流批一體架構:流批一體架構比較完美的實現(xiàn)方式是采用流計算 + 交互式分析雙引擎架構,在這個架構中,流計算負責的是基礎數(shù)據(jù),而交互式分析引擎是中心,流計算引擎對數(shù)據(jù)進行實時 ETL 工作,與離線相比,降低了 ETL 過程的 latency,交互式分析引擎則自帶存儲,通過計算存儲的協(xié)同優(yōu)化, 實現(xiàn)高寫入 TPS、高查詢 QPS 和低查詢 latency ,從而做到全鏈路的實時化和 SQL 化,這樣就可以用批的方式實現(xiàn)實時分析和按需分析,并能快速的響應業(yè)務的變化,兩者配合,實現(xiàn) 1 + 1 > 2 的效果;該架構對交互式分析引擎的要求非常高,也許是未來大數(shù)據(jù)庫技術發(fā)展的一個重點和方向。

為了應對業(yè)務方更復雜的多維實時數(shù)據(jù)分析需求,筆者目前在數(shù)據(jù)開發(fā)中引入 Kudu這個 OLAP 存儲引擎,對訂單等業(yè)務數(shù)據(jù)使用 Presto + Kudu 的計算方案也是在探索流批一體架構在實時數(shù)據(jù)分析領域的可行性。此外,目前比較熱的數(shù)據(jù)湖技術,如 Delta lake、Hudi 等支持在 HDFS 上進行 upsert 更新,隨著其流式寫入、SQL 引擎支持的成熟,未來可以用一套存儲引擎解決實時、離線數(shù)據(jù)需求,從而減少多引擎運維開發(fā)成本。

三、Flink SQL 實時計算 UV 指標

上一部分從宏觀層面介紹了如何建設實時數(shù)據(jù)體系,非常不接地氣,可能大家需要的只是一個具體的 case 來了解一下該怎么做,那么接下來用一個接地氣的案例來介紹如何實時計算 UV 數(shù)據(jù)。

大家都知道,在 ToC 的互聯(lián)網(wǎng)公司,UV 是一個很重要的指標,對于老板、商務、運營的及時決策會產(chǎn)生很大的影響,筆者在電商公司,目前主要的工作就是計算 UV、銷售等各類實時數(shù)據(jù),體驗就特別深刻, 因此就用一個簡單demo 演示如何用 Flink SQL 消費 Kafka 中的 PV 數(shù)據(jù),實時計算出 UV 指標后寫入 Hbase。

1、Kafka 源數(shù)據(jù)解析

PV 數(shù)據(jù)來源于埋點數(shù)據(jù)經(jīng) FileBeat 上報清洗后,以 ProtoBuffer 格式寫入下游 Kafka,消費時第一步要先反序列化 PB 格式的數(shù)據(jù)為 Flink 能識別的 Row 類型,因此也就需要自定義實現(xiàn) DeserializationSchema 接口,具體如下代碼, 這里只抽取計算用到的 PV 的 mid、事件時間 time_local,并從其解析得到 log_date 字段:

publicclassPageViewDeserializationSchemaimplementsDeserializationSchema{

publicstaticfinalLoggerLOG=LoggerFactory.getLogger(PageViewDeserializationSchema.class);

protectedSimpleDateFormatdayFormatter;

privatefinalRowTypeInforowTypeInfo;

publicPageViewDeserializationSchema(RowTypeInforowTypeInfo){

dayFormatter=newSimpleDateFormat("yyyyMMdd",Locale.UK);

this.rowTypeInfo=rowTypeInfo;

}

@Override

publicRowdeserialize(byte[]message)throwsIOException{

Rowrow=newRow(rowTypeInfo.getArity());

MobilePagemobilePage=null;

try{

mobilePage=MobilePage.parseFrom(message);

Stringmid=mobilePage.getMid();

row.setField(0,mid);

LongtimeLocal=mobilePage.getTimeLocal();

StringlogDate=dayFormatter.format(timeLocal);

row.setField(1,logDate);

row.setField(2,timeLocal);

}catch(Exceptione){

StringmobilePageError=(mobilePage!=null)?mobilePage.toString():"";

LOG.error("errorparsebytespayloadis{},pageviewerroris{}",message.toString(),mobilePageError,e);

}

returnnull;

}

2、編寫 Flink Job 主程序

將 PV 數(shù)據(jù)解析為 Flink 的 Row 類型后,接下來就很簡單了,編寫主函數(shù),寫 SQL 就能統(tǒng)計 UV 指標了,代碼如下:

publicclassRealtimeUV{

publicstaticvoidmain(String[]args)throwsException{

//step1從properties配置文件中解析出需要的Kakfa、Hbase配置信息、checkpoint參數(shù)信息

Mapconfig=PropertiesUtil.loadConfFromFile(args[0]);

Stringtopic=config.get("source.kafka.topic");

StringgroupId=config.get("source.group.id");

StringsourceBootStrapServers=config.get("source.bootstrap.servers");

StringhbaseTable=config.get("hbase.table.name");

StringhbaseZkQuorum=config.get("hbase.zk.quorum");

StringhbaseZkParent=config.get("hbase.zk.parent");

intcheckPointPeriod=Integer.parseInt(config.get("checkpoint.period"));

intcheckPointTimeout=Integer.parseInt(config.get("checkpoint.timeout"));

StreamExecutionEnvironmentsEnv=StreamExecutionEnvironment.getExecutionEnvironment();

//step2設置Checkpoint相關參數(shù),用于Failover容錯

sEnv.getConfig().registerTypeWithKryoSerializer(MobilePage.class,

ProtobufSerializer.class);

sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(false);

sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

sEnv.enableCheckpointing(checkPointPeriod,CheckpointingMode.EXACTLY_ONCE);

sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout);

sEnv.getCheckpointConfig().enableExternalizedCheckpoints(

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

//step3使用Blinkplanner、創(chuàng)建TableEnvironment,并且設置狀態(tài)過期時間,避免JobOOM

EnvironmentSettingsenvironmentSettings=EnvironmentSettings.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build();

StreamTableEnvironmenttEnv=StreamTableEnvironment.create(sEnv,environmentSettings);

tEnv.getConfig().setIdleStateRetentionTime(Time.days(1),Time.days(2));

PropertiessourceProperties=newProperties();

sourceProperties.setProperty("bootstrap.servers",sourceBootStrapServers);

sourceProperties.setProperty("auto.commit.interval.ms","3000");

sourceProperties.setProperty("group.id",groupId);

//step4初始化KafkaTableSource的Schema信息,筆者這里使用registerTableSource的方式將源表注冊到Flink中,而沒有用registerDataStream方式,也是因為想熟悉一下如何注冊KafkaTableSource到Flink中

TableSchemaschema=TableSchemaUtil.getAppPageViewTableSchema();

OptionalproctimeAttribute=Optional.empty();

ListrowtimeAttributeDescriptors=Collections.emptyList();

MapfieldMapping=newHashMap<>();

ListcolumnNames=newArrayList<>();

RowTypeInforowTypeInfo=newRowTypeInfo(schema.getFieldTypes(),schema.getFieldNames());

columnNames.addAll(Arrays.asList(schema.getFieldNames()));

columnNames.forEach(name->fieldMapping.put(name,name));

PageViewDeserializationSchemadeserializationSchema=new

PageViewDeserializationSchema(

rowTypeInfo);

MapspecificOffsets=newHashMap<>();

Kafka011TableSourcekafkaTableSource=newKafka011TableSource(

schema,

proctimeAttribute,

rowtimeAttributeDescriptors,

Optional.of(fieldMapping),

topic,

sourceProperties,

deserializationSchema,

StartupMode.EARLIEST,

specificOffsets);

tEnv.registerTableSource("pageview",kafkaTableSource);

//step5初始化HbaseTableSchema、寫入?yún)?shù),并將其注冊到Flink中

HBaseTableSchemahBaseTableSchema=newHBaseTableSchema();

hBaseTableSchema.setRowKey("log_date",String.class);

hBaseTableSchema.addColumn("f","UV",Long.class);

HBaseOptionshBaseOptions=HBaseOptions.builder()

.setTableName(hbaseTable)

.setZkQuorum(hbaseZkQuorum)

.setZkNodeParent(hbaseZkParent)

.build();

HBaseWriteOptionshBaseWriteOptions=HBaseWriteOptions.builder()

.setBufferFlushMaxRows(1000)

.setBufferFlushIntervalMillis(1000)

.build();

HBaseUpsertTableSinkhBaseSink=newHBaseUpsertTableSink(hBaseTableSchema,hBaseOptions,hBaseWriteOptions);

tEnv.registerTableSink("uv_index",hBaseSink);

//step6實時計算當天UV指標sql,這里使用最簡單的groupbyagg,沒有使用minibatch或窗口,在大數(shù)據(jù)量優(yōu)化時最好使用后兩種方式

StringuvQuery="insertintouv_index"

+"selectlog_date, "

+"ROW(count(distinctmid)asUV) "

+"frompageview "

+"groupbylog_date";

tEnv.sqlUpdate(uvQuery);

//step7執(zhí)行Job

sEnv.execute("UVJob");

}

}

以上就是一個簡單的使用 Flink SQL 統(tǒng)計 UV 的 case, 代碼非常簡單,只需要理清楚如何解析 Kafka 中數(shù)據(jù),如何初始化 Table Schema,以及如何將表注冊到 Flink中,即可使用 Flink SQL 完成各種復雜的實時數(shù)據(jù)統(tǒng)計類的業(yè)務需求,學習成本比API 的方式低很多。

說明一下,筆者這個 demo 是基于目前業(yè)務場景而開發(fā)的,在生產(chǎn)環(huán)境中可以真實運行起來,可能不能拆箱即用,你需要結合自己的業(yè)務場景自定義相應的 kafka 數(shù)據(jù)解析類。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 互聯(lián)網(wǎng)

    關注

    54

    文章

    11171

    瀏覽量

    103534
  • 5G
    5G
    +關注

    關注

    1355

    文章

    48481

    瀏覽量

    564983
  • 大數(shù)據(jù)

    關注

    64

    文章

    8899

    瀏覽量

    137571
收藏 人收藏

    評論

    相關推薦

    實時數(shù)據(jù)求差

    請問在Labview中如何對實時數(shù)據(jù)求差呢?具體過程是什么,請賜教?。?/div>
    發(fā)表于 09-10 21:45

    單電壓基準與雙電壓基準的對決-第三部分

    在這個三部分系列博文中,我已經(jīng)討論了生成兩個良好匹配、低漂移電壓基準的解決方案。我們在第一部分個拓撲結構開始,在第二部分比較和對比了性能
    發(fā)表于 09-12 11:36

    基于多DSP互聯(lián)技術的頻譜監(jiān)測分析儀總體方案

    本文設計了一個頻譜監(jiān)測分析儀的總體方案,即由超外差信號接收,強大的中頻信號采集處理系統(tǒng)以及內(nèi)嵌計算機系統(tǒng)這大主要部分組成。在設計總體方案的同時,給出了實現(xiàn)此
    發(fā)表于 02-19 06:44

    在STM32中執(zhí)行中斷主要三部分

    在STM32中執(zhí)行中斷主要三部分:1.配置NVIC_Config()函數(shù)2.配置EXTI_Config()函數(shù)3.編寫中斷服務函數(shù)(注:本文章所用代碼為中斷按鍵代碼,實現(xiàn)了按鍵進入中斷從而控制
    發(fā)表于 08-13 08:10

    在STM32中執(zhí)行中斷主要三部分

    在STM32中執(zhí)行中斷主要三部分:1.配置NVIC_Config()函數(shù)2.配置EXTI_Config()函數(shù)3.編寫中斷服務函數(shù)(注:本文章所用代碼為中斷按鍵代碼,實現(xiàn)了按鍵進入中斷從而控制
    發(fā)表于 08-20 07:53

    TD-LTE規(guī)模試驗網(wǎng)總體方案介紹

    本文是關于TD-LTE規(guī)模試驗網(wǎng)總體方案的介紹 。。
    發(fā)表于 11-01 17:50 ?53次下載
    TD-LTE規(guī)模試驗網(wǎng)<b class='flag-5'>總體方案</b>介紹

    開關電源設計(第3版)第三部分

    電子發(fā)燒友網(wǎng)站提供《開關電源設計(第3版)第三部分.txt》資料免費下載
    發(fā)表于 09-12 15:04 ?0次下載

    2012年PSoC數(shù)模混合設計培訓_第三部分

    2012年PSoC數(shù)?;旌显O計培訓_第三部分
    發(fā)表于 10-27 09:30 ?8次下載
    2012年PSoC數(shù)?;旌显O計培訓_第<b class='flag-5'>三部分</b>

    移動通信行業(yè)的生態(tài)系統(tǒng)大致可以分成三部分

    當前,5G已經(jīng)成為電信業(yè)的新熱點。王建宙指出,移動通信行業(yè)的生態(tài)系統(tǒng)大致可分成三部分:第一部分是網(wǎng)絡連接,包括運營商提供的網(wǎng)絡運營和制造商提供的網(wǎng)絡設備,也是最基礎的部分;第二部分是終
    發(fā)表于 11-30 09:33 ?3073次閱讀

    三部門印發(fā)國家車聯(lián)網(wǎng)產(chǎn)業(yè)標準體系建設指南

    近日,工業(yè)和信息化、交通運輸、國家標準化管理委員會三部門印發(fā)《國家車聯(lián)網(wǎng)產(chǎn)業(yè)標準體系建設指南(智能交通相關)》(下稱“
    的頭像 發(fā)表于 03-25 09:36 ?2050次閱讀
    <b class='flag-5'>三部</b>門印發(fā)國家車聯(lián)網(wǎng)產(chǎn)業(yè)標準<b class='flag-5'>體系</b><b class='flag-5'>建設</b>指南

    LTC2387驅(qū)動程序第三部分

    LTC2387驅(qū)動程序第三部分
    發(fā)表于 05-16 15:23 ?5次下載
    LTC2387驅(qū)動程序第<b class='flag-5'>三部分</b>

    用于激活設備的可編程定時器-第三部分

    電子發(fā)燒友網(wǎng)站提供《用于激活設備的可編程定時器-第三部分.zip》資料免費下載
    發(fā)表于 12-16 10:28 ?0次下載
    用于激活設備的可編程定時器-第<b class='flag-5'>三部分</b>

    硬件即代碼第三部分:空間與時間

    電子發(fā)燒友網(wǎng)站提供《硬件即代碼第三部分:空間與時間.zip》資料免費下載
    發(fā)表于 06-14 15:12 ?0次下載
    硬件即代碼第<b class='flag-5'>三部分</b>:空間與時間

    SensorTile.box第三部分:編程模式(Pro mode)介紹

    電子發(fā)燒友網(wǎng)站提供《SensorTile.box第三部分:編程模式(Pro mode)介紹.pdf》資料免費下載
    發(fā)表于 07-29 16:19 ?0次下載
    SensorTile.box第<b class='flag-5'>三部分</b>:編程模式(Pro mode)介紹

    光伏并網(wǎng)逆變器總體方案

    2KW級光伏并網(wǎng)逆變器(以下簡稱逆變器)的總體方案包括DC/AC逆變電路部分、相應的控制電路部分和顯示界面。逆變器主要功能是將光伏電池組件發(fā)出的直流功率轉(zhuǎn)化成交流功率,并輸送到電網(wǎng)上。
    發(fā)表于 10-11 15:24 ?6次下載