0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

關(guān)于Spark的從0實(shí)現(xiàn)30s內(nèi)實(shí)時(shí)監(jiān)控指標(biāo)計(jì)算

佳佳 ? 來(lái)源:jf_36786605 ? 作者:jf_36786605 ? 2024-06-14 15:52 ? 次閱讀

前言

說(shuō)起Spark,大家就會(huì)自然而然地想到Flink,而且會(huì)不自覺(jué)地將這兩種主流的大數(shù)據(jù)實(shí)時(shí)處理技術(shù)進(jìn)行比較。然后最終得出結(jié)論:Flink實(shí)時(shí)性大于Spark。

的確,F(xiàn)link中的數(shù)據(jù)計(jì)算是以事件為驅(qū)動(dòng)的,所以來(lái)一條數(shù)據(jù)就會(huì)觸發(fā)一次計(jì)算,而Spark基于數(shù)據(jù)集RDD計(jì)算,RDD最小生成間隔就是50毫秒,所以Spark就被定義為亞實(shí)時(shí)計(jì)算。

窗口Window

這里的RDD就是“天然的窗口”,將RDD生成的時(shí)間間隔設(shè)置成1min,那么這個(gè)RDD就可以理解為“1min窗口”。所以如果想要窗口計(jì)算,首選Spark。

但當(dāng)需要對(duì)即臨近時(shí)間窗口進(jìn)行計(jì)算時(shí),必須借助滑動(dòng)窗口的算子來(lái)實(shí)現(xiàn)。

臨近時(shí)間如何理解

例如“3分鐘內(nèi)”這種時(shí)間范圍描述。這種時(shí)間范圍的計(jì)算,需要計(jì)算歷史的數(shù)據(jù)。例如1 ~ 3是3min,2 ~ 4也是3min,這里就重復(fù)使用了2和3的數(shù)據(jù),依次類(lèi)推,3 ~ 5也是3min,同樣也重復(fù)使用了3和4。

如果使用普通窗口,就無(wú)法滿(mǎn)足“最近3分鐘內(nèi)”這種時(shí)間概念。

很多窗口都丟失了臨近時(shí)間,例如第3個(gè)RDD的臨近時(shí)間其實(shí)是第二個(gè)RDD,但是他們就沒(méi)法在一起計(jì)算,這就是為什么不用普通窗口的原因。

滑動(dòng)窗口

滑動(dòng)窗口三要素:RDD的生成時(shí)間、窗口的長(zhǎng)度、滑動(dòng)的步長(zhǎng)。

我在本次實(shí)踐中,將RDD的時(shí)間間隔設(shè)置為10s,窗口長(zhǎng)度為30s、滑動(dòng)步長(zhǎng)為10s。也就是說(shuō)每10s就會(huì)生成一個(gè)窗口,計(jì)算最近30s內(nèi)的數(shù)據(jù),每個(gè)窗口由3個(gè)RDD組成。

數(shù)據(jù)源構(gòu)建

1. 數(shù)據(jù)規(guī)范

假設(shè)我們采集了設(shè)備的指標(biāo)信息,這里我們只關(guān)注吞吐量和響應(yīng)時(shí)間,在采集之前定義數(shù)據(jù)字段和規(guī)范[throughput, response_time],這里都定義成int類(lèi)型,響應(yīng)時(shí)間單位這里定義成毫秒ms。

實(shí)際情況中,我們不可能只采集一臺(tái)設(shè)備,如果我們想要得出每臺(tái)或者每個(gè)種類(lèi)設(shè)備的指標(biāo)監(jiān)控,就要在采集數(shù)據(jù)的時(shí)候?qū)γ總€(gè)設(shè)備加上唯一ID或者TypeID。

我這里的想法是對(duì)每臺(tái)設(shè)備的指標(biāo)進(jìn)行分析,所以我給每個(gè)設(shè)備都增加了一個(gè)唯一ID,最終字段[id, throughput, response_time],所以我們就按照這個(gè)數(shù)據(jù)格式,在SparkStreaming中構(gòu)建數(shù)據(jù)源讀取部分。

2. 讀取kafka

代碼語(yǔ)言:scala

復(fù)制

val conf = new SparkConf().setAppName("aqi").setMaster("local[1]")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "121.91.168.193:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "aqi",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (true: java.lang.Boolean)
)

val topics = Array("evt_monitor")
val stream: DStream[String] = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
).map(_.value)

這里我們將一個(gè)RDD時(shí)間間隔設(shè)置為10S,因?yàn)槭褂玫氖枪P記本跑,所以這里要將Master設(shè)置為local,表示本地運(yùn)行模式,1代表使用1個(gè)線程。

我們使用Kafka作為數(shù)據(jù)源,在讀取時(shí)就要構(gòu)建Consumer的config,像bootstrap.servers這些基本配置沒(méi)有什么好說(shuō)的,關(guān)鍵的是auto.offset.reset和enable.auto.commit,

這兩個(gè)參數(shù)分表控制讀取topic消費(fèi)策略和是否提交offset。這里的earliest會(huì)從topic中現(xiàn)存最早的數(shù)據(jù)開(kāi)始消費(fèi),latest是最新的位置開(kāi)始消費(fèi)。

當(dāng)重啟程序時(shí),這兩種消費(fèi)模式又被enable.auto.commit控制,設(shè)置true提交offset時(shí),earliest和latest不再生效,都是從消費(fèi)組記錄的offset進(jìn)行消費(fèi)。設(shè)置為false不提交offset,offset不被提交記錄earliest還是從topic中現(xiàn)存最早的數(shù)據(jù)開(kāi)始消費(fèi),latest還是從最新的數(shù)據(jù)消費(fèi)。

最后就是設(shè)置要讀取的topic和創(chuàng)建Kafka的DStream數(shù)據(jù)流。至此,整個(gè)數(shù)據(jù)源的讀取就已經(jīng)完成了,下面就是對(duì)數(shù)據(jù)處理邏輯的開(kāi)發(fā)。

3. 指標(biāo)聚合計(jì)算

代碼語(yǔ)言:scala

復(fù)制

stream.map(x => {
      val s = x.split(",")
      (s(0), (s(2).toInt, 1))
    }).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
      .reduceByKeyAndWindow((x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2), Seconds(30), Seconds(10))
    .foreachRDD(rdd => {
      rdd.foreach(x => {
        val id = x._1
        val responseTimes = x._2._1
        val num = x._2._2
        val responseTime_avg = responseTimes / num
        println(id, responseTime_avg)
      })
    })

我們從自身需求出發(fā),來(lái)構(gòu)思程序邏輯的開(kāi)發(fā)。從需求看,關(guān)鍵字無(wú)非是最近一段時(shí)間內(nèi)、平均值。想要取一段時(shí)間內(nèi)的數(shù)據(jù),就要使用滑動(dòng)窗口,以當(dāng)前時(shí)間為基準(zhǔn),向前圈定時(shí)間范圍。

而平均值,無(wú)非就是將時(shí)間范圍內(nèi),即窗口所有的響應(yīng)時(shí)間加起來(lái),然后除以數(shù)據(jù)條數(shù)即可。想要把所有的響應(yīng)時(shí)間加起來(lái),這里使用reduceByKey() 將窗口內(nèi)相同ID的設(shè)備時(shí)間相加,將數(shù)據(jù)條數(shù)進(jìn)行相加。

所以我在第一步切分?jǐn)?shù)據(jù)的時(shí)候,就將數(shù)據(jù)切分成KV的元組形式,V有兩個(gè)字段,第一個(gè)是響應(yīng)時(shí)間,第二個(gè)1表示一條數(shù)據(jù)。reduceByKey一共分為兩步,第一是RDD內(nèi)的reduceByKey,這也算是數(shù)據(jù)的預(yù)處理,RDD的數(shù)據(jù)只會(huì)計(jì)算一次,當(dāng)這個(gè)RDD被多個(gè)窗口使用,就不會(huì)重復(fù)計(jì)算了。第二步是基于窗口的reduceByKey,將窗口所有RDD的數(shù)據(jù)再一次聚合,最后在foreachRDD中獲取輸出

4. 驗(yàn)證結(jié)果

我們向kafka的evt_monitor這個(gè)topic中寫(xiě)入數(shù)據(jù)。

備注:(最后11那個(gè)id是終端顯示問(wèn)題,其實(shí)是1),然后可以輸出平均值。

驗(yàn)證結(jié)果是沒(méi)有問(wèn)題的,換個(gè)角度,我們也可以從DAG來(lái)看。

這個(gè)窗口一共計(jì)算了3個(gè)RDD,其中左側(cè)的兩個(gè)是灰色的,上面是skipped標(biāo)識(shí),代表著這兩個(gè)RDD在上一個(gè)窗口已經(jīng)計(jì)算完成了,在這個(gè)窗口只需要計(jì)算當(dāng)前的RDD,然后再一起對(duì)RDD的結(jié)果數(shù)據(jù)進(jìn)行窗口計(jì)算。

結(jié)語(yǔ)

本篇文章主要是利用Spark的滑動(dòng)窗口,做了一個(gè)計(jì)算平均響應(yīng)時(shí)長(zhǎng)的應(yīng)用場(chǎng)景,以Kafka作為數(shù)據(jù)源、通過(guò)滑動(dòng)窗口和reduceByKey算子得以實(shí)現(xiàn)。同時(shí),開(kāi)發(fā)Spark還是強(qiáng)烈推薦scala,整個(gè)程序看起來(lái)沒(méi)有任何多余的部分。

最后對(duì)于Spark和Flink的選型看法,Spark的確是在實(shí)時(shí)性上比Flink差一些,但是Spark對(duì)于窗口計(jì)算還是有優(yōu)勢(shì)的。所以對(duì)于每種技術(shù),也不用人云亦云,適合自己的才是最好的。

審核編輯 黃宇

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • RDD
    RDD
    +關(guān)注

    關(guān)注

    0

    文章

    7

    瀏覽量

    8079
  • 實(shí)時(shí)監(jiān)控

    關(guān)注

    1

    文章

    114

    瀏覽量

    13916
  • SPARK
    +關(guān)注

    關(guān)注

    1

    文章

    106

    瀏覽量

    20419
收藏 人收藏

    評(píng)論

    相關(guān)推薦
    熱點(diǎn)推薦

    邊緣計(jì)算網(wǎng)關(guān)在水產(chǎn)養(yǎng)殖尾水處理中的實(shí)時(shí)監(jiān)控應(yīng)用

    ,某大型水產(chǎn)養(yǎng)殖企業(yè)決定引入先進(jìn)的 YC-GR90-S工業(yè)智能網(wǎng)關(guān) 技術(shù),對(duì)尾水處理過(guò)程進(jìn)行遠(yuǎn)程監(jiān)控和管理。 二、項(xiàng)目需求 設(shè)備遠(yuǎn)程監(jiān)控: 需要實(shí)時(shí)
    的頭像 發(fā)表于 06-06 14:36 ?55次閱讀
    邊緣<b class='flag-5'>計(jì)算</b>網(wǎng)關(guān)在水產(chǎn)養(yǎng)殖尾水處理中的<b class='flag-5'>實(shí)時(shí)</b><b class='flag-5'>監(jiān)控</b>應(yīng)用

    自媒體推廣實(shí)時(shí)監(jiān)控服務(wù)器帶寬到用戶(hù)行為解決方法

    自媒體推廣的實(shí)時(shí)監(jiān)控需要從底層基礎(chǔ)設(shè)施到前端用戶(hù)行為進(jìn)行全鏈路覆蓋,確保推廣活動(dòng)的穩(wěn)定性和效果可追蹤。以下是系統(tǒng)性解決方案,主機(jī)推薦小編為您整理發(fā)布自媒體推廣實(shí)時(shí)監(jiān)控
    的頭像 發(fā)表于 04-09 10:47 ?204次閱讀

    邊緣計(jì)算網(wǎng)關(guān)的實(shí)時(shí)監(jiān)控與預(yù)測(cè)性維護(hù)都有哪些方面?適合哪些行業(yè)使用?

    邊緣計(jì)算網(wǎng)關(guān)的實(shí)時(shí)監(jiān)控與預(yù)測(cè)性維護(hù)都有哪些方面?適合哪些行業(yè)使用? 有實(shí)施過(guò)得案例的介紹嗎? 深控技術(shù)的不需要點(diǎn)表的邊緣計(jì)算網(wǎng)關(guān)如何?
    發(fā)表于 04-01 09:44

    NVIDIA 宣布推出 DGX Spark 個(gè)人 AI 計(jì)算機(jī)

    的 DGX? 個(gè)人 AI 超級(jí)計(jì)算機(jī)。 ? DGX Spark(前身為 Project DIGITS)支持 AI 開(kāi)發(fā)者、研究人員、數(shù)據(jù)科學(xué)家和學(xué)生,在臺(tái)式電腦上對(duì)大模型進(jìn)行原型設(shè)計(jì)、微調(diào)和推理。用
    發(fā)表于 03-19 09:59 ?284次閱讀
       NVIDIA 宣布推出 DGX <b class='flag-5'>Spark</b> 個(gè)人 AI <b class='flag-5'>計(jì)算</b>機(jī)

    HarmonyOS NEXT 原生應(yīng)用/元服務(wù)-DevEco Profiler性能問(wèn)題定界實(shí)時(shí)監(jiān)控

    不同的圖像形式(直方圖、柱狀圖、折線圖等)來(lái)更加清晰的展示某一項(xiàng)資源在一段時(shí)間范圍內(nèi)的變化趨勢(shì),以幫助您快速判斷性能熱點(diǎn)區(qū)域。 整個(gè)實(shí)時(shí)監(jiān)控頁(yè)面從上到下,依次展示了系統(tǒng)事件、異常事件、前臺(tái)應(yīng)用、CPU
    發(fā)表于 02-21 14:35

    HarmonyOS NEXT 原生應(yīng)用/元服務(wù)-DevEco Profiler性能問(wèn)題定界實(shí)時(shí)監(jiān)控

    不同的圖像形式(直方圖、柱狀圖、折線圖等)來(lái)更加清晰的展示某一項(xiàng)資源在一段時(shí)間范圍內(nèi)的變化趨勢(shì),以幫助您快速判斷性能熱點(diǎn)區(qū)域。 整個(gè)實(shí)時(shí)監(jiān)控頁(yè)面從上到下,依次展示了系統(tǒng)事件、異常事件、前臺(tái)應(yīng)用、CPU
    發(fā)表于 02-20 10:14

    輸電線路防外破防異物實(shí)時(shí)監(jiān)控預(yù)警裝置|場(chǎng)景模型真實(shí)還原|測(cè)距誤差在0.25米內(nèi)

    輸電線路防外破防異物實(shí)時(shí)監(jiān)控預(yù)警裝置|場(chǎng)景模型真實(shí)還原|測(cè)距誤差在0.25米內(nèi) 輸電線路防外破防異物實(shí)時(shí)監(jiān)控預(yù)警裝置針對(duì)輸電線路通道走廊
    的頭像 發(fā)表于 01-21 09:48 ?331次閱讀

    可與MES系統(tǒng)集成的數(shù)據(jù)采集監(jiān)控平臺(tái)

    有用的指標(biāo)和報(bào)表。 提供實(shí)時(shí)監(jiān)控、故障預(yù)警、產(chǎn)能優(yōu)化等數(shù)據(jù)分析功能,為決策提供支持。 數(shù)據(jù)分發(fā)與集成: 將處理后的數(shù)據(jù)按需分發(fā)給各個(gè)業(yè)務(wù)系統(tǒng)或用戶(hù)。 實(shí)現(xiàn)與MES、ERP等系統(tǒng)的無(wú)縫集
    發(fā)表于 12-16 15:08

    DCS數(shù)據(jù)采集與實(shí)時(shí)監(jiān)控

    、降低成本、保障安全。 1. DCS系統(tǒng)概述 DCS(Distributed Control System,分布式控制系統(tǒng))是一種將控制功能分散到各個(gè)控制節(jié)點(diǎn)的計(jì)算機(jī)控制系統(tǒng)。它通過(guò)高速通信網(wǎng)絡(luò)連接各個(gè)控制節(jié)點(diǎn),實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)
    的頭像 發(fā)表于 11-13 09:15 ?2320次閱讀

    MES系統(tǒng)如何實(shí)現(xiàn)生產(chǎn)車(chē)間的實(shí)時(shí)監(jiān)控、精準(zhǔn)調(diào)度

    MES系統(tǒng)通過(guò)數(shù)據(jù)采集、傳輸、處理和展示等技術(shù)手段,實(shí)現(xiàn)了生產(chǎn)過(guò)程的實(shí)時(shí)監(jiān)控;同時(shí),通過(guò)生產(chǎn)計(jì)劃優(yōu)化、生產(chǎn)任務(wù)分配、動(dòng)態(tài)調(diào)度、資源優(yōu)化和協(xié)同管理等功能模塊,實(shí)現(xiàn)了生產(chǎn)過(guò)程的精準(zhǔn)調(diào)度。
    的頭像 發(fā)表于 10-07 12:51 ?1009次閱讀
    MES系統(tǒng)如何<b class='flag-5'>實(shí)現(xiàn)</b>生產(chǎn)車(chē)間的<b class='flag-5'>實(shí)時(shí)</b><b class='flag-5'>監(jiān)控</b>、精準(zhǔn)調(diào)度

    spark為什么比mapreduce快?

    spark為什么比mapreduce快? 首先澄清幾個(gè)誤區(qū): 1:兩者都是基于內(nèi)存計(jì)算的,任何計(jì)算框架都肯定是基于內(nèi)存的,所以網(wǎng)上說(shuō)的spark是基于內(nèi)存
    的頭像 發(fā)表于 09-06 09:45 ?476次閱讀

    監(jiān)控系統(tǒng)原理揭秘-數(shù)據(jù)運(yùn)算篇

    、數(shù)據(jù)計(jì)算、數(shù)據(jù)存儲(chǔ)、數(shù)據(jù)可視化及監(jiān)控預(yù)警等功能。本文主要介紹數(shù)據(jù)計(jì)算部分。 二、實(shí)時(shí)計(jì)算 流數(shù)據(jù)實(shí)時(shí)計(jì)算是一種處理和分析
    的頭像 發(fā)表于 08-06 10:30 ?1030次閱讀
    <b class='flag-5'>監(jiān)控</b>系統(tǒng)原理揭秘-數(shù)據(jù)運(yùn)算篇

    30元如何實(shí)現(xiàn)車(chē)輛防后撞

    HLK-LD2451是海凌科最新推出的一款專(zhuān)用于檢測(cè)車(chē)輛狀態(tài)的24G雷達(dá)模塊,以不到30元的價(jià)格,實(shí)現(xiàn)實(shí)時(shí)感知周?chē)?00米范圍內(nèi)的車(chē)輛靠近與遠(yuǎn)離。
    的頭像 發(fā)表于 07-29 09:43 ?1474次閱讀
    <b class='flag-5'>30</b>元如何<b class='flag-5'>實(shí)現(xiàn)</b>車(chē)輛防后撞

    spark運(yùn)行的基本流程

    記錄和分享下spark運(yùn)行的基本流程。 一、spark的基礎(chǔ)組件及其概念 1. ClusterManager 在Standalone模式中即為Master,控制整個(gè)集群,監(jiān)控Worker。在YARN
    的頭像 發(fā)表于 07-02 10:31 ?675次閱讀
    <b class='flag-5'>spark</b>運(yùn)行的基本流程

    Spark基于DPU的Native引擎算子卸載方案

    1.背景介紹 Apache Spark(以下簡(jiǎn)稱(chēng)Spark)是一個(gè)開(kāi)源的分布式計(jì)算框架,由UC Berkeley AMP Lab開(kāi)發(fā),可用于批處理、交互式查詢(xún)(Spark SQL)、
    的頭像 發(fā)表于 06-28 17:12 ?979次閱讀
    <b class='flag-5'>Spark</b>基于DPU的Native引擎算子卸載方案

    電子發(fā)燒友

    中國(guó)電子工程師最喜歡的網(wǎng)站

    • 2931785位工程師會(huì)員交流學(xué)習(xí)
    • 獲取您個(gè)性化的科技前沿技術(shù)信息
    • 參加活動(dòng)獲取豐厚的禮品