0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

redis數(shù)據(jù)傾斜的原因以及應對方案 JD開源hotkey的源碼解析

我快閉嘴 ? 來源:京東云開發(fā)者 ? 作者:h1654155202.6723 ? 2022-09-29 10:35 ? 次閱讀

1 前言

之前旁邊的小伙伴問我熱點數(shù)據(jù)相關(guān)問題,在給他粗略的講解一波redis數(shù)據(jù)傾斜的案例之后,自己也順道回顧了一些關(guān)于熱點數(shù)據(jù)處理的方法論,同時也想起去年所學習JD開源項目hotkey——專門用來解決熱點數(shù)據(jù)問題的框架。在這里結(jié)合兩者所關(guān)聯(lián)到的知識點,通過幾個小圖和部分粗略的講解,來讓大家了解相關(guān)方法論以及hotkey的源碼解析。

2 Redis數(shù)據(jù)傾斜

2.1 定義與危害

先說說數(shù)據(jù)傾斜的定義,借用百度詞條的解釋:
對于集群系統(tǒng),一般緩存是分布式的,即不同節(jié)點負責一定范圍的緩存數(shù)據(jù)。我們把緩存數(shù)據(jù)分散度不夠,導致大量的緩存數(shù)據(jù)集中到了一臺或者幾臺服務節(jié)點上,稱為數(shù)據(jù)傾斜。一般來說數(shù)據(jù)傾斜是由于負載均衡實施的效果不好引起的。
從上面的定義中可以得知,數(shù)據(jù)傾斜的原因一般是因為LB的效果不好,導致部分節(jié)點數(shù)據(jù)量非常集中。

那這又會有什么危害呢?
如果發(fā)生了數(shù)據(jù)傾斜,那么保存了大量數(shù)據(jù),或者是保存了熱點數(shù)據(jù)的實例的處理壓力就會增大,速度變慢,甚至還可能會引起這個實例的內(nèi)存資源耗盡,從而崩潰。這是我們在應用切片集群時要避免的。

2.2 數(shù)據(jù)傾斜的分類

2.2.1 數(shù)據(jù)量傾斜(寫入傾斜)

1.圖示

52a07fce-3f2c-11ed-9e49-dac502259ad0.png

如圖,在某些情況下,實例上的數(shù)據(jù)分布不均衡,某個實例上的數(shù)據(jù)特別多。

2.bigkey導致傾斜

某個實例上正好保存了 bigkey。bigkey 的 value 值很大(String 類型),或者是 bigkey 保存了大量集合元素(集合類型),會導致這個實例的數(shù)據(jù)量增加,內(nèi)存資源消耗也相應增加。

應對方法

在業(yè)務層生成數(shù)據(jù)時,要盡量避免把過多的數(shù)據(jù)保存在同一個鍵值對中。

如果 bigkey 正好是集合類型,還有一個方法,就是把 bigkey 拆分成很多個小的集合類型數(shù)據(jù),分散保存在不同的實例上。

3.Slot分配不均導致傾斜

先簡單的介紹一下slot的概念,slot其實全名是Hash Slot(哈希槽),在Redis Cluster切片集群中一共有16384 個 Slot,這些哈希槽類似于數(shù)據(jù)分區(qū),每個鍵值對都會根據(jù)它的 key,被映射到一個哈希槽中。Redis Cluster 方案采用哈希槽來處理數(shù)據(jù)和實例之間的映射關(guān)系。

一張圖來解釋,數(shù)據(jù)、哈希槽、實例這三者的映射分布情況。

52c6cbde-3f2c-11ed-9e49-dac502259ad0.png

這里的CRC16(city)%16384可以簡單的理解為將key1根據(jù)CRC16算法取hash值然后對slot個數(shù)取模,得到的就是slot位置為14484,他所對應的實例節(jié)點是第三個。
運維在構(gòu)建切片集群時候,需要手動分配哈希槽,并且把16384 個槽都分配完,否則 Redis 集群無法正常工作。由于是手動分配,則可能會導致部分實例所分配的slot過多,導致數(shù)據(jù)傾斜。

應對方法
使用CLUSTER SLOTS 命令來查看slot分配情況,使用CLUSTER SETSLOT,CLUSTER GETKEYSINSLOT,MIGRATE這三個命令來進行slot數(shù)據(jù)的遷移,具體內(nèi)容不再這里細說,感興趣的同學可以自行學習一下。

4.Hash Tag導致傾斜

Hash Tag 定義 :指當一個key包含 {} 的時候,就不對整個key做hash,而僅對 {} 包括的字符串做hash。

假設hash算法為sha1。對user:{user1}:ids和user:{user1}:tweets,其hash值都等同于sha1(user1)。

Hash Tag 優(yōu)勢 :如果不同 key 的 Hash Tag 內(nèi)容都是一樣的,那么,這些 key 對應的數(shù)據(jù)會被映射到同一個 Slot 中,同時會被分配到同一個實例上。

Hash Tag 劣勢 :如果不合理使用,會導致大量的數(shù)據(jù)可能被集中到一個實例上發(fā)生數(shù)據(jù)傾斜,集群中的負載不均衡。

2.2.2 數(shù)據(jù)訪問傾斜(讀取傾斜-熱key問題)

一般來說數(shù)據(jù)訪問傾斜就是熱key問題導致的,如何處理redis熱key問題也是面試中常會問到的。所以了解相關(guān)概念及方法論也是不可或缺的一環(huán)。

1.圖示

52f2d51c-3f2c-11ed-9e49-dac502259ad0.png

如圖,雖然每個集群實例上的數(shù)據(jù)量相差不大,但是某個實例上的數(shù)據(jù)是熱點數(shù)據(jù),被訪問得非常頻繁。
但是為啥會有熱點數(shù)據(jù)的產(chǎn)生呢?

2.產(chǎn)生熱key的原因及危害

1)用戶消費的數(shù)據(jù)遠大于生產(chǎn)的數(shù)據(jù)(熱賣商品、熱點新聞、熱點評論、明星直播)。
在日常工作生活中一些突發(fā)的的事件,例如:雙十一期間某些熱門商品的降價促銷,當這其中的某一件商品被數(shù)萬次點擊瀏覽或者購買時,會形成一個較大的需求量,這種情況下就會造成熱點問題。
同理,被大量刊發(fā)、瀏覽的熱點新聞、熱點評論、明星直播等,這些典型的讀多寫少的場景也會產(chǎn)生熱點問題。
2)請求分片集中,超過單 Server 的性能極限。
在服務端讀數(shù)據(jù)進行訪問時,往往會對數(shù)據(jù)進行分片切分,此過程中會在某一主機 Server 上對相應的 Key 進行訪問,當訪問超過 Server 極限時,就會導致熱點 Key 問題的產(chǎn)生。

如果熱點過于集中,熱點 Key 的緩存過多,超過目前的緩存容量時,就會導致緩存分片服務被打垮現(xiàn)象的產(chǎn)生。當緩存服務崩潰后,此時再有請求產(chǎn)生,會緩存到后臺 DB 上,由于DB 本身性能較弱,在面臨大請求時很容易發(fā)生請求穿透現(xiàn)象,會進一步導致雪崩現(xiàn)象,嚴重影響設備的性能。

3.常用的熱key問題解決辦法:

解決方案一: 備份熱key
可以把熱點數(shù)據(jù)復制多份,在每一個數(shù)據(jù)副本的 key 中增加一個隨機后綴,讓它和其它副本數(shù)據(jù)不會被映射到同一個 Slot 中。
這里相當于把一份數(shù)據(jù)復制到其他實例上,這樣在訪問的時候也增加隨機前綴,將對一個實例的訪問壓力,均攤到其他實例上
例如:
我們在放入緩存時就將對應業(yè)務的緩存key拆分成多個不同的key。如下圖所示,我們首先在更新緩存的一側(cè),將key拆成N份,比如一個key名字叫做”good_100”,那我們就可以把它拆成四份,“good_100_copy1”、“good_100_copy2”、“good_100_copy3”、“good_100_copy4”,每次更新和新增時都需要去改動這N個key,這一步就是拆key。

530cb9be-3f2c-11ed-9e49-dac502259ad0.png

對于service端來講,我們就需要想辦法盡量將自己訪問的流量足夠的均勻。
如何給自己即將訪問的熱key上加入后綴?幾種辦法,根據(jù)本機的ip或mac地址做hash,之后的值與拆key的數(shù)量做取余,最終決定拼接成什么樣的key后綴,從而打到哪臺機器上;服務啟動時的一個隨機數(shù)對拆key的數(shù)量做取余。
偽代碼如下:


const M = N * 2

//生成隨機數(shù)

random = GenRandom(0, M)

//構(gòu)造備份新key

bakHotKey = hotKey + “_” + random

data = redis.GET(bakHotKey)

if data == NULL {

data = GetFromDB()

redis.SET(bakHotKey, expireTime + GenRandom(0,5))

}

解決方案二: 本地緩存+動態(tài)計算自動發(fā)現(xiàn)熱點緩存 基本流程圖

534720e0-3f2c-11ed-9e49-dac502259ad0.png

該方案通過主動發(fā)現(xiàn)熱點并對其進行存儲來解決熱點 Key 的問題。首先 Client 也會訪問 SLB,并且通過 SLB 將各種請求分發(fā)至 Proxy 中,Proxy 會按照基于路由的方式將請求轉(zhuǎn)發(fā)至后端的 Redis 中。 在熱點 key 的解決上是采用在服務端增加緩存的方式進行。具體來說就是在 Proxy 上增加本地緩存,本地緩存采用 LRU 算法來緩存熱點數(shù)據(jù),后端節(jié)點增加熱點數(shù)據(jù)計算模塊來返回熱點數(shù)據(jù)。

Proxy 架構(gòu)的主要有以下優(yōu)點:

Proxy 本地緩存熱點,讀能力可水平擴展

DB 節(jié)點定時計算熱點數(shù)據(jù)集合

DB 反饋 Proxy 熱點數(shù)據(jù)

對客戶端完全透明,不需做任何兼容

熱點數(shù)據(jù)的發(fā)現(xiàn)與存儲

53771bec-3f2c-11ed-9e49-dac502259ad0.png

對于熱點數(shù)據(jù)的發(fā)現(xiàn),首先會在一個周期內(nèi)對 Key 進行請求統(tǒng)計,在達到請求量級后會對熱點 Key 進行熱點定位,并將所有的熱點 Key 放入一個小的 LRU 鏈表內(nèi),在通過 Proxy 請求進行訪問時,若 Redis 發(fā)現(xiàn)待訪點是一個熱點,就會進入一個反饋階段,同時對該數(shù)據(jù)進行標記。 可以使用一個etcd或者zk集群來存儲反饋的熱點數(shù)據(jù),然后本地所有節(jié)點監(jiān)聽該熱點數(shù)據(jù),進而加載到本地JVM緩存中。

熱點數(shù)據(jù)的獲取

54763c26-3f2c-11ed-9e49-dac502259ad0.png

在熱點 Key 的處理上主要分為寫入跟讀取兩種形式,在數(shù)據(jù)寫入過程當 SLB 收到數(shù)據(jù) K1 并將其通過某一個 Proxy 寫入一個 Redis,完成數(shù)據(jù)的寫入。 假若經(jīng)過后端熱點模塊計算發(fā)現(xiàn) K1 成為熱點 key 后, Proxy 會將該熱點進行緩存,當下次客戶端再進行訪問 K1 時,可以不經(jīng) Redis。 最后由于 proxy 是可以水平擴充的,因此可以任意增強熱點數(shù)據(jù)的訪問能力。

最佳成熟方案: JD開源hotKey 這是目前較為成熟的自動探測熱key、分布式一致性緩存解決方案。原理就是在client端做洞察,然后上報對應hotkey,server端檢測到后,將對應hotkey下發(fā)到對應服務端做本地緩存,并且能保證本地緩存和遠程緩存的一致性。

在這里咱們就不細談了,這篇文章的第三部分:JD開源hotkey源碼解析里面會帶領(lǐng)大家了解其整體原理。

3 JD開源hotkey—自動探測熱key、分布式一致性緩存解決方案

3.1 解決痛點

從上面可知,熱點key問題在并發(fā)量比較高的系統(tǒng)中(特別是做秒殺活動)出現(xiàn)的頻率會比較高,對系統(tǒng)帶來的危害也很大。 那么針對此,hotkey誕生的目的是什么?需要解決的痛點是什么?以及它的實現(xiàn)原理。

在這里引用項目上的一段話來概述: 對任意突發(fā)性的無法預先感知的熱點數(shù)據(jù),包括并不限于熱點數(shù)據(jù)(如突發(fā)大量請求同一個商品)、熱用戶(如惡意爬蟲刷子)、熱接口(突發(fā)海量請求同一個接口)等,進行毫秒級精準探測到。然后對這些熱數(shù)據(jù)、熱用戶等,推送到所有服務端JVM內(nèi)存中,以大幅減輕對后端數(shù)據(jù)存儲層的沖擊,并可以由使用者決定如何分配、使用這些熱key(譬如對熱商品做本地緩存、對熱用戶進行拒絕訪問、對熱接口進行熔斷或返回默認值)。這些熱數(shù)據(jù)在整個服務端集群內(nèi)保持一致性,并且業(yè)務隔離。

核心功能:熱數(shù)據(jù)探測并推送至集群各個服務器

3.2 集成方式

集成方式在這里就不詳述了,感興趣的同學可以自行搜索。

3.3 源碼解析

3.3.1 架構(gòu)簡介

1.全景圖一覽

549957d8-3f2c-11ed-9e49-dac502259ad0.png

流程介紹:

客戶端通過引用hotkey的client包,在啟動的時候上報自己的信息給worker,同時和worker之間建立長連接。定時拉取配置中心上面的規(guī)則信息和worker集群信息。

客戶端調(diào)用hotkey的ishot()的方法來首先匹配規(guī)則,然后統(tǒng)計是不是熱key。

通過定時任務把熱key數(shù)據(jù)上傳到worker節(jié)點。

worker集群在收取到所有關(guān)于這個key的數(shù)據(jù)以后(因為通過hash來決定key 上傳到哪個worker的,所以同一個key只會在同一個worker節(jié)點上),在和定義的規(guī)則進行匹配后判斷是不是熱key,如果是則推送給客戶端,完成本地緩存。

2.角色構(gòu)成

這里直接借用作者的描述: 1)etcd集群 etcd作為一個高性能的配置中心,可以以極小的資源占用,提供高效的監(jiān)聽訂閱服務。主要用于存放規(guī)則配置,各worker的ip地址,以及探測出的熱key、手工添加的熱key等。

2)client端jar包 就是在服務中添加的引用jar,引入后,就可以以便捷的方式去判斷某key是否熱key。同時,該jar完成了key上報、監(jiān)聽etcd里的rule變化、worker信息變化、熱key變化,對熱key進行本地caffeine緩存等。

3) worker端集群 worker端是一個獨立部署的Java程序,啟動后會連接etcd,并定期上報自己的ip信息,供client端獲取地址并進行長連接。之后,主要就是對各個client發(fā)來的待測key進行累加計算,當達到etcd里設定的rule閾值后,將熱key推送到各個client。

4) dashboard控制臺 控制臺是一個帶可視化界面的Java程序,也是連接到etcd,之后在控制臺設置各個APP的key規(guī)則,譬如2秒20次算熱。然后當worker探測出來熱key后,會將key發(fā)往etcd,dashboard也會監(jiān)聽熱key信息,進行入庫保存記錄。同時,dashboard也可以手工添加、刪除熱key,供各個client端監(jiān)聽。

3.hotkey工程結(jié)構(gòu)

54ed3b50-3f2c-11ed-9e49-dac502259ad0.png

3.3.2 client端

主要從下面三個方面來解析源碼:

5522bc4e-3f2c-11ed-9e49-dac502259ad0.png

4.客戶端啟動器

1)啟動方式


@PostConstruct

public void init() {

ClientStarter.Builder builder = new ClientStarter.Builder();

ClientStarter starter = builder.setAppName(appName).setEtcdServer(etcd).build();

starter.startPipeline();

}

appName:是這個應用的名稱,一般為${spring.application.name}的值,后續(xù)所有的配置都以此為開頭 etcd:是etcd集群的地址,用逗號分隔,配置中心。 還可以看到ClientStarter實現(xiàn)了建造者模式,使代碼更為簡介。

2)核心入口 com.jd.platform.hotkey.client.ClientStarter#startPipeline


/**

* 啟動監(jiān)聽etcd

*/

public void startPipeline() {

JdLogger.info(getClass(), "etcdServer:" + etcdServer);

//設置caffeine的最大容量

Context.CAFFEINE_SIZE = caffeineSize;

//設置etcd地址

EtcdConfigFactory.buildConfigCenter(etcdServer);

//開始定時推送

PushSchedulerStarter.startPusher(pushPeriod);

PushSchedulerStarter.startCountPusher(10);

//開啟worker重連器

WorkerRetryConnector.retryConnectWorkers();

registEventBus();

EtcdStarter starter = new EtcdStarter();

//與etcd相關(guān)的監(jiān)聽都開啟

starter.start();

}

該方法主要有五個功能:

55590a92-3f2c-11ed-9e49-dac502259ad0.png

① 設置本地緩存(caffeine)的最大值,并創(chuàng)建etcd實例


//設置caffeine的最大容量

Context.CAFFEINE_SIZE = caffeineSize;

//設置etcd地址

EtcdConfigFactory.buildConfigCenter(etcdServer);

caffeineSize是本地緩存的最大值,在啟動的時候可以設置,不設置默認為200000。 etcdServer是上面說的etcd集群地址。

Context可以理解為一個配置類,里面就包含兩個字段:


public class Context {

public static String APP_NAME;

public static int CAFFEINE_SIZE;

}

EtcdConfigFactory是ectd配置中心的工廠類


public class EtcdConfigFactory {

private static IConfigCenter configCenter;

private EtcdConfigFactory() {}

public static IConfigCenter configCenter() {

return configCenter;

}

public static void buildConfigCenter(String etcdServer) {

//連接多個時,逗號分隔

configCenter = JdEtcdBuilder.build(etcdServer);

}

}

通過其configCenter()方法獲取創(chuàng)建etcd實例對象,IConfigCenter接口封裝了etcd實例對象的行為(包括基本的crud、監(jiān)控、續(xù)約等)

55ab6f08-3f2c-11ed-9e49-dac502259ad0.png

② 創(chuàng)建并啟動定時任務:PushSchedulerStarter


//開始定時推送

PushSchedulerStarter.startPusher(pushPeriod);//每0.5秒推送一次待測key

PushSchedulerStarter.startCountPusher(10);//每10秒推送一次數(shù)量統(tǒng)計,不可配置

pushPeriod是推送的間隔時間,可以再啟動的時候設置,最小為0.05s,推送越快,探測的越密集,會越快探測出來,但對client資源消耗相應增大

PushSchedulerStarter類


/**

* 每0.5秒推送一次待測key

*/

public static void startPusher(Long period) {

if (period == null || period <= 0) {

period = 500L;

}

@SuppressWarnings("PMD.ThreadPoolCreationRule")

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-pusher-service-executor", true));

scheduledExecutorService.scheduleAtFixedRate(() -> {

//熱key的收集器

IKeyCollector collectHK = KeyHandlerFactory.getCollector();

//這里相當于每0.5秒,通過netty來給worker來推送收集到的熱key的信息,主要是一些熱key的元數(shù)據(jù)信息(熱key來源的app和key的類型和是否是刪除事件,還有該熱key的上報次數(shù))

//這里面還有就是該熱key在每次上報的時候都會生成一個全局的唯一id,還有該熱key每次上報的創(chuàng)建時間是在netty發(fā)送的時候來生成,同一批次的熱key時間是相同的

List hotKeyModels = collectHK.lockAndGetResult();

if(CollectionUtil.isNotEmpty(hotKeyModels)){

//積攢了半秒的key集合,按照hash分發(fā)到不同的worker

KeyHandlerFactory.getPusher().send(Context.APP_NAME, hotKeyModels);

collectHK.finishOnce();

}

},0, period, TimeUnit.MILLISECONDS);

}

/**

* 每10秒推送一次數(shù)量統(tǒng)計

*/

public static void startCountPusher(Integer period) {

if (period == null || period <= 0) {

period = 10;

}

@SuppressWarnings("PMD.ThreadPoolCreationRule")

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-count-pusher-service-executor", true));

scheduledExecutorService.scheduleAtFixedRate(() -> {

IKeyCollector collectHK = KeyHandlerFactory.getCounter();

List keyCountModels = collectHK.lockAndGetResult();

if(CollectionUtil.isNotEmpty(keyCountModels)){

//積攢了10秒的數(shù)量,按照hash分發(fā)到不同的worker

KeyHandlerFactory.getPusher().sendCount(Context.APP_NAME, keyCountModels);

collectHK.finishOnce();

}

},0, period, TimeUnit.SECONDS);

}

從上面兩個方法可知,都是通過定時線程池來實現(xiàn)定時任務的,都是守護線程。

咱們重點關(guān)注一下KeyHandlerFactory類,它是client端設計的一個比較巧妙的地方,從類名上直譯為key處理工廠。具體的實例對象是DefaultKeyHandler:


public class DefaultKeyHandler {

//推送HotKeyMsg消息到Netty的推送者

private IKeyPusher iKeyPusher = new NettyKeyPusher();

//待測key的收集器,這里面包含兩個map,key主要是熱key的名字,value主要是熱key的元數(shù)據(jù)信息(比如:熱key來源的app和key的類型和是否是刪除事件)

private IKeyCollector iKeyCollector = new TurnKeyCollector();

//數(shù)量收集器,這里面包含兩個map,這里面key是相應的規(guī)則,HitCount里面是這個規(guī)則的總訪問次數(shù)和熱后訪問次數(shù)

private IKeyCollector iKeyCounter = new TurnCountCollector();

public IKeyPusher keyPusher() {

return iKeyPusher;

}

public IKeyCollector keyCollector() {

return iKeyCollector;

}

public IKeyCollector keyCounter() {

return iKeyCounter;

}

}

這里面有三個成員對象,分別是封裝推送消息到netty的NettyKeyPusher、待測key收集器TurnKeyCollector、數(shù)量收集器TurnCountCollector,其中后兩者都實現(xiàn)了接口IKeyCollector,能對hotkey的處理起到有效的聚合,充分體現(xiàn)了代碼的高內(nèi)聚。 先來看看封裝推送消息到netty的NettyKeyPusher:


/**

* 將msg推送到netty的pusher

* @author wuweifeng wrote on 2020-01-06

* @version 1.0

*/

public class NettyKeyPusher implements IKeyPusher {

@Override

public void send(String appName, List list) {

//積攢了半秒的key集合,按照hash分發(fā)到不同的worker

long now = System.currentTimeMillis();

Map> map = new HashMap<>();

for(HotKeyModel model : list) {

model.setCreateTime(now);

Channel channel = WorkerInfoHolder.chooseChannel(model.getKey());

if (channel == null) {

continue;

}

List newList = map.computeIfAbsent(channel, k -> new ArrayList<>());

newList.add(model);

}

for (Channel channel : map.keySet()) {

try {

List batch = map.get(channel);

HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_NEW_KEY, Context.APP_NAME);

hotKeyMsg.setHotKeyModels(batch);

channel.writeAndFlush(hotKeyMsg).sync();

} catch (Exception e) {

try {

InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();

JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());

} catch (Exception ex) {

JdLogger.error(getClass(),"flush error");

}

}

}

}

@Override

public void sendCount(String appName, List list) {

//積攢了10秒的數(shù)量,按照hash分發(fā)到不同的worker

long now = System.currentTimeMillis();

Map> map = new HashMap<>();

for(KeyCountModel model : list) {

model.setCreateTime(now);

Channel channel = WorkerInfoHolder.chooseChannel(model.getRuleKey());

if (channel == null) {

continue;

}

List newList = map.computeIfAbsent(channel, k -> new ArrayList<>());

newList.add(model);

}

for (Channel channel : map.keySet()) {

try {

List batch = map.get(channel);

HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_HIT_COUNT, Context.APP_NAME);

hotKeyMsg.setKeyCountModels(batch);

channel.writeAndFlush(hotKeyMsg).sync();

} catch (Exception e) {

try {

InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();

JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());

} catch (Exception ex) {

JdLogger.error(getClass(),"flush error");

}

}

}

}

}

send(String appName, Listlist) 主要是將TurnKeyCollector收集的待測key通過netty推送給worker,HotKeyModel對象主要是一些熱key的元數(shù)據(jù)信息(熱key來源的app和key的類型和是否是刪除事件,還有該熱key的上報次數(shù)) sendCount(String appName, Listlist) 主要是將TurnCountCollector收集的規(guī)則所對應的key通過netty推送給worker,KeyCountModel對象主要是一些key所對應的規(guī)則信息以及訪問次數(shù)等 WorkerInfoHolder.chooseChannel(model.getRuleKey()) 根據(jù)hash算法獲取key對應的服務器,分發(fā)到對應服務器相應的Channel 連接,所以服務端可以水平無限擴容,毫無壓力問題。

再來分析一下key收集器:TurnKeyCollector與TurnCountCollector: 實現(xiàn)IKeyCollector接口:


/**

* 對hotkey進行聚合

* @author wuweifeng wrote on 2020-01-06

* @version 1.0

*/

public interface IKeyCollector {

/**

* 鎖定后的返回值

*/

List lockAndGetResult();

/**

* 輸入的參數(shù)

*/

void collect(T t);

void finishOnce();

}

lockAndGetResult() 主要是獲取返回collect方法收集的信息,并將本地暫存的信息清空,方便下個統(tǒng)計周期積攢數(shù)據(jù)。 collect(T t) 顧名思義他是收集api調(diào)用的時候,將收集的到key信息放到本地存儲。 finishOnce() 該方法目前實現(xiàn)都是空,無需關(guān)注。

待測key收集器:TurnKeyCollector


public class TurnKeyCollector implements IKeyCollector {

//這map里面的key主要是熱key的名字,value主要是熱key的元數(shù)據(jù)信息(比如:熱key來源的app和key的類型和是否是刪除事件)

private ConcurrentHashMap map0 = new ConcurrentHashMap<>();

private ConcurrentHashMap map1 = new ConcurrentHashMap<>();

private AtomicLong atomicLong = new AtomicLong(0);

@Override

public List lockAndGetResult() {

//自增后,對應的map就會停止被寫入,等待被讀取

atomicLong.addAndGet(1);

List list;

//可以觀察這里與collect方法里面的相同位置,會發(fā)現(xiàn)一個是操作map0一個是操作map1,這樣保證在讀map的時候,不會阻塞寫map,

//兩個map同時提供輪流提供讀寫能力,設計的很巧妙,值得學習

if (atomicLong.get() % 2 == 0) {

list = get(map1);

map1.clear();

} else {

list = get(map0);

map0.clear();

}

return list;

}

private List get(ConcurrentHashMap map) {

return CollectionUtil.list(false, map.values());

}

@Override

public void collect(HotKeyModel hotKeyModel) {

String key = hotKeyModel.getKey();

if (StrUtil.isEmpty(key)) {

return;

}

if (atomicLong.get() % 2 == 0) {

//不存在時返回null并將key-value放入,已有相同key時,返回該key對應的value,并且不覆蓋

HotKeyModel model = map0.putIfAbsent(key, hotKeyModel);

if (model != null) {

//增加該hotMey上報的次數(shù)

model.add(hotKeyModel.getCount());

}

} else {

HotKeyModel model = map1.putIfAbsent(key, hotKeyModel);

if (model != null) {

model.add(hotKeyModel.getCount());

}

}

}

@Override

public void finishOnce() {}

}

可以看到該類中有兩個ConcurrentHashMap和一個AtomicLong,通過對AtomicLong來自增,然后對2取模,來分別控制兩個map的讀寫能力,保證每個map都能做讀寫,并且同一個map不能同時讀寫,這樣可以避免并發(fā)集合讀寫不阻塞,這一塊無鎖化的設計還是非常巧妙的,極大的提高了收集的吞吐量。 key數(shù)量收集器:TurnCountCollector 這里的設計與TurnKeyCollector大同小異,咱們就不細談了。值得一提的是它里面有個并行處理的機制,當收集的數(shù)量超過DATA_CONVERT_SWITCH_THRESHOLD=5000的閾值時,lockAndGetResult處理是使用java Stream并行流處理,提升處理的效率。

③ 開啟worker重連器


//開啟worker重連器

WorkerRetryConnector.retryConnectWorkers();

public class WorkerRetryConnector {

/**

* 定時去重連沒連上的workers

*/

public static void retryConnectWorkers() {

@SuppressWarnings("PMD.ThreadPoolCreationRule")

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("worker-retry-connector-service-executor", true));

//開啟拉取etcd的worker信息,如果拉取失敗,則定時繼續(xù)拉取

scheduledExecutorService.scheduleAtFixedRate(WorkerRetryConnector::reConnectWorkers, 30, 30, TimeUnit.SECONDS);

}

private static void reConnectWorkers() {

List nonList = WorkerInfoHolder.getNonConnectedWorkers();

if (nonList.size() == 0) {

return;

}

JdLogger.info(WorkerRetryConnector.class, "trying to reConnect to these workers :" + nonList);

NettyClient.getInstance().connect(nonList);//這里會觸發(fā)netty連接方法channelActive

}

}

也是通過定時線程來執(zhí)行,默認時間間隔是30s,不可設置。 通過WorkerInfoHolder來控制client的worker連接信息,連接信息是個List,用的CopyOnWriteArrayList,畢竟是一個讀多寫少的場景,類似與元數(shù)據(jù)信息。


/**

* 保存worker的ip地址和Channel的映射關(guān)系,這是有序的。每次client發(fā)送消息時,都會根據(jù)該map的size進行hash

* 如key-1就發(fā)送到workerHolder的第1個Channel去,key-2就發(fā)到第2個Channel去

*/

private static final List WORKER_HOLDER = new CopyOnWriteArrayList<>();

④ 注冊EventBus事件訂閱者


private void registEventBus() {

//netty連接器會關(guān)注WorkerInfoChangeEvent事件

EventBusCenter.register(new WorkerChangeSubscriber());

//熱key探測回調(diào)關(guān)注熱key事件

EventBusCenter.register(new ReceiveNewKeySubscribe());

//Rule的變化的事件

EventBusCenter.register(new KeyRuleHolder());

}

使用guava的EventBus事件消息總線,利用發(fā)布/訂閱者模式來對項目進行解耦。它可以利用很少的代碼,來實現(xiàn)多組件間通信。 基本原理圖如下: 560606c0-3f2c-11ed-9e49-dac502259ad0.png

監(jiān)聽worker信息變動:WorkerChangeSubscriber


/**

* 監(jiān)聽worker信息變動

*/

@Subscribe

public void connectAll(WorkerInfoChangeEvent event) {

List addresses = event.getAddresses();

if (addresses == null) {

addresses = new ArrayList<>();

}

WorkerInfoHolder.mergeAndConnectNew(addresses);

}

/**

* 當client與worker的連接斷開后,刪除

*/

@Subscribe

public void channelInactive(ChannelInactiveEvent inactiveEvent) {

//獲取斷線的channel

Channel channel = inactiveEvent.getChannel();

InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();

String address = socketAddress.getHostName() + ":" + socketAddress.getPort();

JdLogger.warn(getClass(), "this channel is inactive : " + socketAddress + " trying to remove this connection");

WorkerInfoHolder.dealChannelInactive(address);

}

562af9b2-3f2c-11ed-9e49-dac502259ad0.png

監(jiān)聽熱key回調(diào)事件:ReceiveNewKeySubscribe


private ReceiveNewKeyListener receiveNewKeyListener = new DefaultNewKeyListener();

@Subscribe

public void newKeyComing(ReceiveNewKeyEvent event) {

HotKeyModel hotKeyModel = event.getModel();

if (hotKeyModel == null) {

return;

}

//收到新key推送

if (receiveNewKeyListener != null) {

receiveNewKeyListener.newKey(hotKeyModel);

}

}

該方法會收到新的熱key訂閱事件之后,會將其加入到KeyHandlerFactory的收集器里面處理。

核心處理邏輯


@Override

public void newKey(HotKeyModel hotKeyModel) {

long now = System.currentTimeMillis();

//如果key到達時已經(jīng)過去1秒了,記錄一下。手工刪除key時,沒有CreateTime

if (hotKeyModel.getCreateTime() != 0 && Math.abs(now - hotKeyModel.getCreateTime()) > 1000) {

JdLogger.warn(getClass(), "the key comes too late : " + hotKeyModel.getKey() + " now " +

+now + " keyCreateAt " + hotKeyModel.getCreateTime());

}

if (hotKeyModel.isRemove()) {

//如果是刪除事件,就直接刪除

deleteKey(hotKeyModel.getKey());

return;

}

//已經(jīng)是熱key了,又推過來同樣的熱key,做個日志記錄,并刷新一下

if (JdHotKeyStore.isHot(hotKeyModel.getKey())) {

JdLogger.warn(getClass(), "receive repeat hot key :" + hotKeyModel.getKey() + " at " + now);

}

addKey(hotKeyModel.getKey());

}

private void deleteKey(String key) {

CacheFactory.getNonNullCache(key).delete(key);

}

private void addKey(String key) {

ValueModel valueModel = ValueModel.defaultValue(key);

if (valueModel == null) {

//不符合任何規(guī)則

deleteKey(key);

return;

}

//如果原來該key已經(jīng)存在了,那么value就被重置,過期時間也會被重置。如果原來不存在,就新增的熱key

JdHotKeyStore.setValueDirectly(key, valueModel);

}

如果該HotKeyModel里面是刪除事件,則獲取RULE_CACHE_MAP里面該key超時時間對應的caffeine,然后從中刪除該key緩存,然后返回(這里相當于刪除了本地緩存)。

如果不是刪除事件,則在RULE_CACHE_MAP對應的caffeine緩存中添加該key的緩存。

這里有個注意點,如果不為刪除事件,調(diào)用addKey()方法在caffeine增加緩存的時候,value是一個魔術(shù)值0x12fcf76,這個值只代表加了這個緩存,但是這個緩存在查詢的時候相當于為null。

監(jiān)聽Rule的變化事件:KeyRuleHolder

56584688-3f2c-11ed-9e49-dac502259ad0.png

可以看到里面有兩個成員屬性:RULE_CACHE_MAP,KEY_RULES


/**

* 保存超時時間和caffeine的映射,key是超時時間,value是caffeine[(String,Object)]

*/

private static final ConcurrentHashMap RULE_CACHE_MAP = new ConcurrentHashMap<>();

/**

* 這里KEY_RULES是保存etcd里面該appName所對應的所有rule

*/

private static final List KEY_RULES = new ArrayList<>();

ConcurrentHashMapRULE_CACHE_MAP:

保存超時時間和caffeine的映射,key是超時時間,value是caffeine[(String,Object)]。

巧妙的設計:這里將key的過期時間作為分桶策略,這樣同一個過期時間的key就會在一個桶(caffeine)里面,這里面每一個caffeine都是client的本地緩存,也就是說hotKey的本地緩存的KV實際上是存儲在這里面的。

ListKEY_RULES:

這里KEY_RULES是保存etcd里面該appName所對應的所有rule。

具體監(jiān)聽KeyRuleInfoChangeEvent事件方法:


@Subscribe

public void ruleChange(KeyRuleInfoChangeEvent event) {

JdLogger.info(getClass(), "new rules info is :" + event.getKeyRules());

List ruleList = event.getKeyRules();

if (ruleList == null) {

return;

}

putRules(ruleList);

}

核心處理邏輯


/**

* 所有的規(guī)則,如果規(guī)則的超時時間變化了,會重建caffeine

*/

public static void putRules(List keyRules) {

synchronized (KEY_RULES) {

//如果規(guī)則為空,清空規(guī)則表

if (CollectionUtil.isEmpty(keyRules)) {

KEY_RULES.clear();

RULE_CACHE_MAP.clear();

return;

}

KEY_RULES.clear();

KEY_RULES.addAll(keyRules);

Set durationSet = keyRules.stream().map(KeyRule::getDuration).collect(Collectors.toSet());

for (Integer duration : RULE_CACHE_MAP.keySet()) {

//先清除掉那些在RULE_CACHE_MAP里存的,但是rule里已沒有的

if (!durationSet.contains(duration)) {

RULE_CACHE_MAP.remove(duration);

}

}

//遍歷所有的規(guī)則

for (KeyRule keyRule : keyRules) {

int duration = keyRule.getDuration();

//這里如果RULE_CACHE_MAP里面沒有超時時間為duration的value,則新建一個放入到RULE_CACHE_MAP里面

//比如RULE_CACHE_MAP本來就是空的,則在這里來構(gòu)建RULE_CACHE_MAP的映射關(guān)系

//TODO 如果keyRules里面包含相同duration的keyRule,則也只會建一個key為duration,value為caffeine,其中caffeine是(string,object)

if (RULE_CACHE_MAP.get(duration) == null) {

LocalCache cache = CacheFactory.build(duration);

RULE_CACHE_MAP.put(duration, cache);

}

}

}

}

使用synchronized關(guān)鍵字來保證線程安全;

如果規(guī)則為空,清空規(guī)則表(RULE_CACHE_MAP、KEY_RULES);

使用傳遞進來的keyRules來覆蓋KEY_RULES;

清除掉RULE_CACHE_MAP里面在keyRules沒有的映射關(guān)系;

遍歷所有的keyRules,如果RULE_CACHE_MAP里面沒有相關(guān)的超時時間key,則在里面賦值;

⑤ 啟動EtcdStarter(etcd連接管理器)


EtcdStarter starter = new EtcdStarter();

//與etcd相關(guān)的監(jiān)聽都開啟

starter.start();

public void start() {

fetchWorkerInfo();

fetchRule();

startWatchRule();

//監(jiān)聽熱key事件,只監(jiān)聽手工添加、刪除的key

startWatchHotKey();

}

fetchWorkerInfo() 從etcd里面拉取worker集群地址信息allAddress,并更新WorkerInfoHolder里面的WORKER_HOLDER


/**

* 每隔30秒拉取worker信息

*/

private void fetchWorkerInfo() {

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

//開啟拉取etcd的worker信息,如果拉取失敗,則定時繼續(xù)拉取

scheduledExecutorService.scheduleAtFixedRate(() -> {

JdLogger.info(getClass(), "trying to connect to etcd and fetch worker info");

fetch();

}, 0, 30, TimeUnit.SECONDS);

}

使用定時線程池來執(zhí)行,單線程。

定時從etcd里面獲取,地址/jd/workers/+$appName或default,時間間隔不可設置,默認30秒,這里面存儲的是worker地址的ip+port。

發(fā)布WorkerInfoChangeEvent事件。

備注:地址有$appName或default,在worker里面配置,如果把worker放到某個appName下,則該worker只會參與該app的計算。

fetchRule() 定時線程來執(zhí)行,單線程,時間間隔不可設置,默認是5秒,當拉取規(guī)則配置和手動配置的hotKey成功后,該線程被終止(也就是說只會成功執(zhí)行一次),執(zhí)行失敗繼續(xù)執(zhí)行


private void fetchRule() {

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

//開啟拉取etcd的worker信息,如果拉取失敗,則定時繼續(xù)拉取

scheduledExecutorService.scheduleAtFixedRate(() -> {

JdLogger.info(getClass(), "trying to connect to etcd and fetch rule info");

boolean success = fetchRuleFromEtcd();

if (success) {

//拉取已存在的熱key

fetchExistHotKey();

//這里如果拉取規(guī)則和拉取手動配置的hotKey成功之后,則該定時執(zhí)行線程停止

scheduledExecutorService.shutdown();

}

}, 0, 5, TimeUnit.SECONDS);

}

fetchRuleFromEtcd()

從etcd里面獲取該appName配置的rule規(guī)則,地址/jd/rules/+$appName。

如果查出來規(guī)則rules為空,會通過發(fā)布KeyRuleInfoChangeEvent事件來清空本地的rule配置緩存和所有的規(guī)則key緩存。

發(fā)布KeyRuleInfoChangeEvent事件。

fetchExistHotKey()

從etcd里面獲取該appName手動配置的熱key,地址/jd/hotkeys/+$appName。

發(fā)布ReceiveNewKeyEvent事件,并且內(nèi)容HotKeyModel不是刪除事件。

startWatchRule()


/**

* 異步監(jiān)聽rule規(guī)則變化

*/

private void startWatchRule() {

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.submit(() -> {

JdLogger.info(getClass(), "--- begin watch rule change ----");

try {

IConfigCenter configCenter = EtcdConfigFactory.configCenter();

KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.rulePath + Context.APP_NAME);

//如果有新事件,即rule的變更,就重新拉取所有的信息

while (watchIterator.hasNext()) {

//這句必須寫,next會讓他卡住,除非真的有新rule變更

WatchUpdate watchUpdate = watchIterator.next();

List eventList = watchUpdate.getEvents();

JdLogger.info(getClass(), "rules info changed. begin to fetch new infos. rule change is " + eventList);

//全量拉取rule信息

fetchRuleFromEtcd();

}

} catch (Exception e) {

JdLogger.error(getClass(), "watch err");

}

});

}

異步監(jiān)聽rule規(guī)則變化,使用etcd監(jiān)聽地址為/jd/rules/+$appName的節(jié)點變化。

使用線程池,單線程,異步監(jiān)聽rule規(guī)則變化,如果有事件變化,則調(diào)用fetchRuleFromEtcd()方法。

startWatchHotKey() 異步開始監(jiān)聽熱key變化信息,使用etcd監(jiān)聽地址前綴為/jd/hotkeys/+$appName


/**

* 異步開始監(jiān)聽熱key變化信息,該目錄里只有手工添加的key信息

*/

private void startWatchHotKey() {

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.submit(() -> {

JdLogger.info(getClass(), "--- begin watch hotKey change ----");

IConfigCenter configCenter = EtcdConfigFactory.configCenter();

try {

KvClient.WatchIterator watchIterator = configCenter.watchPrefix(ConfigConstant.hotKeyPath + Context.APP_NAME);

//如果有新事件,即新key產(chǎn)生或刪除

while (watchIterator.hasNext()) {

WatchUpdate watchUpdate = watchIterator.next();

List eventList = watchUpdate.getEvents();

KeyValue keyValue = eventList.get(0).getKv();

Event.EventType eventType = eventList.get(0).getType();

try {

//從這個地方可以看出,etcd給的返回是節(jié)點的全路徑,而我們需要的key要去掉前綴

String key = keyValue.getKey().toStringUtf8().replace(ConfigConstant.hotKeyPath + Context.APP_NAME + "/", "");

//如果是刪除key,就立刻刪除

if (Event.EventType.DELETE == eventType) {

HotKeyModel model = new HotKeyModel();

model.setRemove(true);

model.setKey(key);

EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));

} else {

HotKeyModel model = new HotKeyModel();

model.setRemove(false);

String value = keyValue.getValue().toStringUtf8();

//新增熱key

JdLogger.info(getClass(), "etcd receive new key : " + key + " --value:" + value);

//如果這是一個刪除指令,就什么也不干

//TODO 這里有個疑問,監(jiān)聽到worker自動探測發(fā)出的惰性刪除指令,這里之間跳過了,但是本地緩存沒有更新吧?

//TODO 所以我猜測在客戶端使用判斷緩存是否存在的api里面,應該會判斷相關(guān)緩存的value值是否為"#[DELETE]#"刪除標記

//解疑:這里確實只監(jiān)聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動配置hotKey,worker自動探測的hotKey是直接通過netty通道來告知client的

if (Constant.DEFAULT_DELETE_VALUE.equals(value)) {

continue;

}

//手工創(chuàng)建的value是時間戳

model.setCreateTime(Long.valueOf(keyValue.getValue().toStringUtf8()));

model.setKey(key);

EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));

}

} catch (Exception e) {

JdLogger.error(getClass(), "new key err :" + keyValue);

}

}

} catch (Exception e) {

JdLogger.error(getClass(), "watch err");

}

});

}

使用線程池,單線程,異步監(jiān)聽熱key變化

使用etcd監(jiān)聽前綴地址的當前節(jié)點以及子節(jié)點的所有變化值

刪除節(jié)點動作

發(fā)布ReceiveNewKeyEvent事件,并且內(nèi)容HotKeyModel是刪除事件

新增or更新節(jié)點動作

事件變化的value值為刪除標記#[DELETE]#

如果是刪除標記的話,代表是worker自動探測或者client需要刪除的指令。

如果是刪除標記則什么也不做,直接跳過(這里從HotKeyPusher#push方法可以看到,做刪除事件的操作時候,他會給/jd/hotkeys/+$appName的節(jié)點里面增加一個值為刪除標記的節(jié)點,然后再刪除相同路徑的節(jié)點,這樣就可以觸發(fā)上面的刪除節(jié)點事件,所以這里判斷如果是刪除標記直接跳過)。

不為刪除標記

發(fā)布ReceiveNewKeyEvent事件,事件內(nèi)容HotKeyModel里面的createTime是kv對應的時間戳

疑問: 這里代碼注釋里面說只監(jiān)聽手工添加或者刪除的hotKey,難道說/jd/hotkeys/+$appName地址只是手工配置的地址嗎? 解疑: 這里確實只監(jiān)聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動配置hotKey,worker自動探測的hotKey是直接通過netty通道來告知client的

5.API解析

1)流程圖示 ① 查詢流程

5676cdb0-3f2c-11ed-9e49-dac502259ad0.png

② 刪除流程:

56a3ae0c-3f2c-11ed-9e49-dac502259ad0.png

從上面的流程圖中,大家應該知道該熱點key在代碼中是如何扭轉(zhuǎn)的,這里再給大家講解一下核心API的源碼解析,限于篇幅的原因,咱們不一個個貼相關(guān)源碼了,只是單純的告訴你它的內(nèi)部邏輯是怎么樣的。 2)核心類:JdHotKeyStore

56c91976-3f2c-11ed-9e49-dac502259ad0.png

JdHotKeyStore是封裝client調(diào)用的api核心類,包含上面10個公共方法,咱們重點解析其中6個重要方法: ① isHotKey(String key) 判斷是否在規(guī)則內(nèi),如果不在返回false 判斷是否是熱key,如果不是或者是且過期時間在2s內(nèi),則給TurnKeyCollector#collect收集 最后給TurnCountCollector#collect做統(tǒng)計收集 ② get(String key) 從本地caffeine取值 如果取到的value是個魔術(shù)值,只代表加入到caffeine緩存里面了,查詢的話為null ③ smartSet(String key, Object value) 判斷是否是熱key,這里不管它在不在規(guī)則內(nèi),如果是熱key,則給value賦值,如果不為熱key什么也不做 ④ forceSet(String key, Object value) 強制給value賦值 如果該key不在規(guī)則配置內(nèi),則傳遞的value不生效,本地緩存的賦值value會被變?yōu)閚ull ⑤ getValue(String key, KeyType keyType) 獲取value,如果value不存在則調(diào)用HotKeyPusher#push方法發(fā)往netty 如果沒有為該key配置規(guī)則,就不用上報key,直接返回null 如果取到的value是個魔術(shù)值,只代表加入到caffeine緩存里面了,查詢的話為null ⑥ remove(String key) 刪除某key(本地的caffeine緩存),會通知整個集群刪除(通過etcd來通知集群刪除) 3)client上傳熱key入口調(diào)用類:HotKeyPusher 核心方法:


public static void push(String key, KeyType keyType, int count, boolean remove) {

if (count <= 0) {

count = 1;

}

if (keyType == null) {

keyType = KeyType.REDIS_KEY;

}

if (key == null) {

return;

}

//這里之所以用LongAdder是為了保證多線程計數(shù)的線程安全性,雖然這里是在方法內(nèi)調(diào)用的,但是在TurnKeyCollector的兩個map里面,

//存儲了HotKeyModel的實例對象,這樣在多個線程同時修改count的計數(shù)屬性時,會存在線程安全計數(shù)不準確問題

LongAdder adderCnt = new LongAdder();

adderCnt.add(count);

HotKeyModel hotKeyModel = new HotKeyModel();

hotKeyModel.setAppName(Context.APP_NAME);

hotKeyModel.setKeyType(keyType);

hotKeyModel.setCount(adderCnt);

hotKeyModel.setRemove(remove);

hotKeyModel.setKey(key);

if (remove) {

//如果是刪除key,就直接發(fā)到etcd去,不用做聚合。但是有點問題現(xiàn)在,這個刪除只能刪手工添加的key,不能刪worker探測出來的

//因為各個client都在監(jiān)聽手工添加的那個path,沒監(jiān)聽自動探測的path。所以如果手工的那個path下,沒有該key,那么是刪除不了的。

//刪不了,就達不到集群監(jiān)聽刪除事件的效果,怎么辦呢?可以通過新增的方式,新增一個熱key,然后刪除它

//TODO 這里為啥不直接刪除該節(jié)點,難道worker自動探測處理的hotKey不會往該節(jié)點增加新增事件嗎?

//釋疑:worker根據(jù)探測配置的規(guī)則,當判斷出某個key為hotKey后,確實不會往keyPath里面加入節(jié)點,他只是單純的往本地緩存里面加入一個空值,代表是熱點key

EtcdConfigFactory.configCenter().putAndGrant(HotKeyPathTool.keyPath(hotKeyModel), Constant.DEFAULT_DELETE_VALUE, 1);

EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyPath(hotKeyModel));//TODO 這里很巧妙待補充描述

//也刪worker探測的目錄

EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyRecordPath(hotKeyModel));

} else {

//如果key是規(guī)則內(nèi)的要被探測的key,就積累等待傳送

if (KeyRuleHolder.isKeyInRule(key)) {

//積攢起來,等待每半秒發(fā)送一次

KeyHandlerFactory.getCollector().collect(hotKeyModel);

}

}

}

從上面的源碼中可知:

這里之所以用LongAdder是為了保證多線程計數(shù)的線程安全性,雖然這里是在方法內(nèi)調(diào)用的,但是在TurnKeyCollector的兩個map里面,存儲了HotKeyModel的實例對象,這樣在多個線程同時修改count的計數(shù)屬性時,會存在線程安全計數(shù)不準確問題。

如果是remove刪除類型,在刪除手動配置的熱key配置路徑的同時,還會刪除dashboard展示熱key的配置路徑。

只有在規(guī)則配置的key,才會被積攢探測發(fā)送到worker內(nèi)進行計算。

6.通訊機制(與worker交互)

1)NettyClient:netty連接器


public class NettyClient {

private static final NettyClient nettyClient = new NettyClient();

private Bootstrap bootstrap;

public static NettyClient getInstance() {

return nettyClient;

}

private NettyClient() {

if (bootstrap == null) {

bootstrap = initBootstrap();

}

}

private Bootstrap initBootstrap() {

//少線程

EventLoopGroup group = new NioEventLoopGroup(2);

Bootstrap bootstrap = new Bootstrap();

NettyClientHandler nettyClientHandler = new NettyClientHandler();

bootstrap.group(group).channel(NioSocketChannel.class)

.option(ChannelOption.SO_KEEPALIVE, true)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) {

ByteBuf delimiter = Unpooled.copiedBuffer(Constant.DELIMITER.getBytes());

ch.pipeline()

.addLast(new DelimiterBasedFrameDecoder(Constant.MAX_LENGTH, delimiter))//這里就是定義TCP多個包之間的分隔符,為了更好的做拆包

.addLast(new MsgDecoder())

.addLast(new MsgEncoder())

//30秒沒消息時,就發(fā)心跳包過去

.addLast(new IdleStateHandler(0, 0, 30))

.addLast(nettyClientHandler);

}

});

return bootstrap;

}

}

使用Reactor線程模型,只有2個工作線程,沒有單獨設置主線程

長連接,開啟TCP_NODELAY

netty的分隔符”$()$”,類似TCP報文分段的標準,方便拆包

Protobuf序列化與反序列化

30s沒有消息發(fā)給對端的時候,發(fā)送一個心跳包判活

工作線程處理器NettyClientHandler

JDhotkey的tcp協(xié)議設計就是收發(fā)字符串,每個tcp消息包使用特殊字符$()$來分割 優(yōu)點:這樣實現(xiàn)非常簡單。 獲得消息包后進行json或者protobuf反序列化。 缺點:是需要,從字節(jié)流-》反序列化成字符串-》反序列化成消息對象,兩層序列化損耗了一部分性能。 protobuf還好序列化很快,但是json序列化的速度只有幾十萬每秒,會損耗一部分性能。 2)NettyClientHandler:工作線程處理器


@ChannelHandler.Sharable

public class NettyClientHandler extends SimpleChannelInboundHandler {

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent) {

IdleStateEvent idleStateEvent = (IdleStateEvent) evt;

//這里表示如果讀寫都掛了

if (idleStateEvent.state() == IdleState.ALL_IDLE) {

//向服務端發(fā)送消息

ctx.writeAndFlush(new HotKeyMsg(MessageType.PING, Context.APP_NAME));

}

}

super.userEventTriggered(ctx, evt);

}

//在Channel注冊EventLoop、綁定SocketAddress和連接ChannelFuture的時候都有可能會觸發(fā)ChannelInboundHandler的channelActive方法的調(diào)用

//類似TCP三次握手成功之后觸發(fā)

@Override

public void channelActive(ChannelHandlerContext ctx) {

JdLogger.info(getClass(), "channelActive:" + ctx.name());

ctx.writeAndFlush(new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME));

}

//類似TCP四次揮手之后,等待2MSL時間之后觸發(fā)(大概180s),比如channel通道關(guān)閉會觸發(fā)(channel.close())

//客戶端channel主動關(guān)閉連接時,會向服務端發(fā)送一個寫請求,然后服務端channel所在的selector會監(jiān)聽到一個OP_READ事件,然后

//執(zhí)行數(shù)據(jù)讀取操作,而讀取時發(fā)現(xiàn)客戶端channel已經(jīng)關(guān)閉了,則讀取數(shù)據(jù)字節(jié)個數(shù)返回-1,然后執(zhí)行close操作,關(guān)閉該channel對應的底層socket,

//并在pipeline中,從head開始,往下將InboundHandler,并觸發(fā)handler的channelInactive和channelUnregistered方法的執(zhí)行,以及移除pipeline中的handlers一系列操作。

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

super.channelInactive(ctx);

//斷線了,可能只是client和server斷了,但都和etcd沒斷。也可能是client自己斷網(wǎng)了,也可能是server斷了

//發(fā)布斷線事件。后續(xù)10秒后進行重連,根據(jù)etcd里的worker信息來決定是否重連,如果etcd里沒了,就不重連。如果etcd里有,就重連

notifyWorkerChange(ctx.channel());

}

private void notifyWorkerChange(Channel channel) {

EventBusCenter.getInstance().post(new ChannelInactiveEvent(channel));

}

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, HotKeyMsg msg) {

if (MessageType.PONG == msg.getMessageType()) {

JdLogger.info(getClass(), "heart beat");

return;

}

if (MessageType.RESPONSE_NEW_KEY == msg.getMessageType()) {

JdLogger.info(getClass(), "receive new key : " + msg);

if (CollectionUtil.isEmpty(msg.getHotKeyModels())) {

return;

}

for (HotKeyModel model : msg.getHotKeyModels()) {

EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));

}

}

}

}

userEventTriggered

收到對端發(fā)來的心跳包,返回new HotKeyMsg(MessageType.PING, Context.APP_NAME)

channelActive

在Channel注冊EventLoop、綁定SocketAddress和連接ChannelFuture的時候都有可能會觸發(fā)ChannelInboundHandler的channelActive方法的調(diào)用

類似TCP三次握手成功之后觸發(fā),給對端發(fā)送new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME)

channelInactive

類似TCP四次揮手之后,等待2MSL時間之后觸發(fā)(大概180s),比如channel通道關(guān)閉會觸發(fā)(channel.close())該方法,發(fā)布ChannelInactiveEvent事件,來10s后重連

channelRead0

接收PONG消息類型時,打個日志返回

接收RESPONSE_NEW_KEY消息類型時,發(fā)布ReceiveNewKeyEvent事件

3.3.3 worker端

1.入口啟動加載:7個@PostConstruct

1)worker端對etcd相關(guān)的處理:EtcdStarter ① 第一個@PostConstruct:watchLog()


@PostConstruct

public void watchLog() {

AsyncPool.asyncDo(() -> {

try {

//取etcd的是否開啟日志配置,地址/jd/logOn

String loggerOn = configCenter.get(ConfigConstant.logToggle);

LOGGER_ON = "true".equals(loggerOn) || "1".equals(loggerOn);

} catch (StatusRuntimeException ex) {

logger.error(ETCD_DOWN);

}

//監(jiān)聽etcd地址/jd/logOn是否開啟日志配置,并實時更改開關(guān)

KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.logToggle);

while (watchIterator.hasNext()) {

WatchUpdate watchUpdate = watchIterator.next();

List eventList = watchUpdate.getEvents();

KeyValue keyValue = eventList.get(0).getKv();

logger.info("log toggle changed : " + keyValue);

String value = keyValue.getValue().toStringUtf8();

LOGGER_ON = "true".equals(value) || "1".equals(value);

}

});

}

放到線程池里面異步執(zhí)行

取etcd的是否開啟日志配置,地址/jd/logOn,默認true

監(jiān)聽etcd地址/jd/logOn是否開啟日志配置,并實時更改開關(guān)

由于有etcd的監(jiān)聽,所以會一直執(zhí)行,而不是執(zhí)行一次結(jié)束

② 第二個@PostConstruct:watch()


/**

* 啟動回調(diào)監(jiān)聽器,監(jiān)聽rule變化

*/

@PostConstruct

public void watch() {

AsyncPool.asyncDo(() -> {

KvClient.WatchIterator watchIterator;

if (isForSingle()) {

watchIterator = configCenter.watch(ConfigConstant.rulePath + workerPath);

} else {

watchIterator = configCenter.watchPrefix(ConfigConstant.rulePath);

}

while (watchIterator.hasNext()) {

WatchUpdate watchUpdate = watchIterator.next();

List eventList = watchUpdate.getEvents();

KeyValue keyValue = eventList.get(0).getKv();

logger.info("rule changed : " + keyValue);

try {

ruleChange(keyValue);

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

/**

* rule發(fā)生變化時,更新緩存的rule

*/

private synchronized void ruleChange(KeyValue keyValue) {

String appName = keyValue.getKey().toStringUtf8().replace(ConfigConstant.rulePath, "");

if (StrUtil.isEmpty(appName)) {

return;

}

String ruleJson = keyValue.getValue().toStringUtf8();

List keyRules = FastJsonUtils.toList(ruleJson, KeyRule.class);

KeyRuleHolder.put(appName, keyRules);

}

通過etcd.workerPath配置,來判斷該worker是否為某個app單獨服務的,默認為”default”,如果是默認值,代表該worker參與在etcd上所有app client的計算,否則只為某個app來服務計算 使用etcd來監(jiān)聽rule規(guī)則變化,如果是共享的worker,監(jiān)聽地址前綴為”/jd/rules/“,如果為某個app獨享,監(jiān)聽地址為”/jd/rules/“+$etcd.workerPath 如果規(guī)則變化,則修改對應app在本地存儲的rule緩存,同時清理該app在本地存儲的KV緩存

KeyRuleHolder:rule緩存本地存儲

Map,>

相對于client的KeyRuleHolder的區(qū)別:worker是存儲所有app規(guī)則,每個app對應一個規(guī)則桶,所以用map

CaffeineCacheHolder:key緩存本地存儲

Map,>

相對于client的caffeine,第一是worker沒有做緩存接口比如LocalCache,第二是client的map的kv分別是超時時間、以及相同超時時間所對應key的緩存桶

放到線程池里面異步執(zhí)行,由于有etcd的監(jiān)聽,所以會一直執(zhí)行,而不是執(zhí)行一次結(jié)束

③ 第三個@PostConstruct:watchWhiteList()


/**

* 啟動回調(diào)監(jiān)聽器,監(jiān)聽白名單變化,只監(jiān)聽自己所在的app,白名單key不參與熱key計算,直接忽略

*/

@PostConstruct

public void watchWhiteList() {

AsyncPool.asyncDo(() -> {

//從etcd配置中獲取所有白名單

fetchWhite();

KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.whiteListPath + workerPath);

while (watchIterator.hasNext()) {

WatchUpdate watchUpdate = watchIterator.next();

logger.info("whiteList changed ");

try {

fetchWhite();

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

拉取并監(jiān)聽etcd白名單key配置,地址為/jd/whiteList/+$etcd.workerPath

在白名單的key,不參與熱key計算,直接忽略

放到線程池里面異步執(zhí)行,由于有etcd的監(jiān)聽,所以會一直執(zhí)行,而不是執(zhí)行一次結(jié)束 ④ 第四個@PostConstruct:makeSureSelfOn()


/**

* 每隔一會去check一下,自己還在不在etcd里

*/

@PostConstruct

public void makeSureSelfOn() {

//開啟上傳worker信息

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

scheduledExecutorService.scheduleAtFixedRate(() -> {

try {

if (canUpload) {

uploadSelfInfo();

}

} catch (Exception e) {

//do nothing

}

}, 0, 5, TimeUnit.SECONDS);

}

在線程池里面異步執(zhí)行,定時執(zhí)行,時間間隔為5s

將本機woker的hostName,ip+port以kv的形式定時上報給etcd,地址為/jd/workers/+$etcd.workPath+”/“+$hostName,續(xù)期時間為8s

有一個canUpload的開關(guān)來控制worker是否向etcd來定時續(xù)期,如果這個開關(guān)關(guān)閉了,代表worker不向etcd來續(xù)期,這樣當上面地址的kv到期之后,etcd會刪除該節(jié)點,這樣client循環(huán)判斷worker信息變化了

2)將熱key推送到dashboard供入庫:DashboardPusher ① 第五個@PostConstruct:uploadToDashboard()


@Component

public class DashboardPusher implements IPusher {

/**

* 熱key集中營

*/

private static LinkedBlockingQueue hotKeyStoreQueue = new LinkedBlockingQueue<>();

@PostConstruct

public void uploadToDashboard() {

AsyncPool.asyncDo(() -> {

while (true) {

try {

//要么key達到1千個,要么達到1秒,就匯總上報給etcd一次

List tempModels = new ArrayList<>();

Queues.drain(hotKeyStoreQueue, tempModels, 1000, 1, TimeUnit.SECONDS);

if (CollectionUtil.isEmpty(tempModels)) {

continue;

}

//將熱key推到dashboard

DashboardHolder.flushToDashboard(FastJsonUtils.convertObjectToJSON(tempModels));

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

}

當熱key的數(shù)量達到1000或者每隔1s,把熱key的數(shù)據(jù)通過與dashboard的netty通道來發(fā)送給dashboard,數(shù)據(jù)類型為REQUEST_HOT_KEY

LinkedBlockingQueue

hotKeyStoreQueue:worker計算的給dashboard熱key的集中營,所有給dashboard推送熱key存儲在里面 3)推送到各客戶端服務器:AppServerPusher ① 第六個@PostConstruct:batchPushToClient()


public class AppServerPusher implements IPusher {

/**

* 熱key集中營

*/

private static LinkedBlockingQueue hotKeyStoreQueue = new LinkedBlockingQueue<>();

/**

* 和dashboard那邊的推送主要區(qū)別在于,給app推送每10ms一次,dashboard那邊1s一次

*/

@PostConstruct

public void batchPushToClient() {

AsyncPool.asyncDo(() -> {

while (true) {

try {

List tempModels = new ArrayList<>();

//每10ms推送一次

Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);

if (CollectionUtil.isEmpty(tempModels)) {

continue;

}

Map> allAppHotKeyModels = new HashMap<>();

//拆分出每個app的熱key集合,按app分堆

for (HotKeyModel hotKeyModel : tempModels) {

List oneAppModels = allAppHotKeyModels.computeIfAbsent(hotKeyModel.getAppName(), (key) -> new ArrayList<>());

oneAppModels.add(hotKeyModel);

}

//遍歷所有app,進行推送

for (AppInfo appInfo : ClientInfoHolder.apps) {

List list = allAppHotKeyModels.get(appInfo.getAppName());

if (CollectionUtil.isEmpty(list)) {

continue;

}

HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.RESPONSE_NEW_KEY);

hotKeyMsg.setHotKeyModels(list);

//整個app全部發(fā)送

appInfo.groupPush(hotKeyMsg);

}

//推送完,及時清理不使用內(nèi)存

allAppHotKeyModels = null;

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

}

會按照key的appName來進行分組,然后通過對應app的channelGroup來推送

當熱key的數(shù)量達到10或者每隔10ms,把熱key的數(shù)據(jù)通過與app的netty通道來發(fā)送給app,數(shù)據(jù)類型為RESPONSE_NEW_KEY

LinkedBlockingQueue

hotKeyStoreQueue:worker計算的給client熱key的集中營,所有給client推送熱key存儲在里面 4)client實例節(jié)點處理:NodesServerStarter ① 第七個@PostConstruct:start()


public class NodesServerStarter {

@Value("${netty.port}")

private int port;

private Logger logger = LoggerFactory.getLogger(getClass());

@Resource

private IClientChangeListener iClientChangeListener;

@Resource

private List messageFilters;

@PostConstruct

public void start() {

AsyncPool.asyncDo(() -> {

logger.info("netty server is starting");

NodesServer nodesServer = new NodesServer();

nodesServer.setClientChangeListener(iClientChangeListener);

nodesServer.setMessageFilters(messageFilters);

try {

nodesServer.startNettyServer(port);

} catch (Exception e) {

e.printStackTrace();

}

});

}

}

線程池里面異步執(zhí)行,啟動client端的nettyServer

iClientChangeListener和messageFilters這兩個依賴最終會被傳遞到netty消息處理器里面,iClientChangeListener會作為channel下線處理來刪除ClientInfoHolder下線或者超時的通道,messageFilters會作為netty收到事件消息的處理過濾器(責任鏈模式) ② 依賴的bean:IClientChangeListener iClientChangeListener


public interface IClientChangeListener {

/**

* 發(fā)現(xiàn)新連接

*/

void newClient(String appName, String channelId, ChannelHandlerContext ctx);

/**

* 客戶端掉線

*/

void loseClient(ChannelHandlerContext ctx);

}

對客戶端的管理,新來(newClient)(會觸發(fā)netty的連接方法channelActive)、斷線(loseClient)(會觸發(fā)netty的斷連方法channelInactive())的管理 client的連接信息主要是在ClientInfoHolder里面

List

apps,這里面的AppInfo主要是appName和對應的channelGroup

對apps的add和remove主要是通過新來(newClient)、斷線(loseClient) ③ 依賴的bean:List

messageFilters


/**

* 對netty來的消息,進行過濾處理

* @author wuweifeng wrote on 2019-12-11

* @version 1.0

*/

public interface INettyMsgFilter {

boolean chain(HotKeyMsg message, ChannelHandlerContext ctx);

}

對client發(fā)給worker的netty消息,進行過濾處理,共有四個實現(xiàn)類,也就是說底下四個過濾器都是收到client發(fā)送的netty消息來做處理 ④ 各個消息處理的類型:MessageType


APP_NAME((byte) 1),

REQUEST_NEW_KEY((byte) 2),

RESPONSE_NEW_KEY((byte) 3),

REQUEST_HIT_COUNT((byte) 7), //命中率

REQUEST_HOT_KEY((byte) 8), //熱key,worker->dashboard

PING((byte) 4), PONG((byte) 5),

EMPTY((byte) 6);

順序1:HeartBeatFilter

當消息類型為PING,則給對應的client示例返回PONG

順序2:AppNameFilter

當消息類型為APP_NAME,代表client與worker建立連接成功,然后調(diào)用iClientChangeListener的newClient方法增加apps元數(shù)據(jù)信息

順序3:HotKeyFilter

處理接收消息類型為REQUEST_NEW_KEY

先給HotKeyFilter.totalReceiveKeyCount原子類增1,該原子類代表worker實例接收到的key的總數(shù)

publishMsg方法,將消息通過自建的生產(chǎn)者消費者模型(KeyProducer,KeyConsumer),來把消息給發(fā)到生產(chǎn)者中分發(fā)消費

接收到的消息HotKeyMsg里面List

首先判斷HotKeyModel里面的key是否在白名單內(nèi),如果在則跳過,否則將HotKeyModel通過KeyProducer發(fā)送

順序4:KeyCounterFilter

處理接收類型為REQUEST_HIT_COUNT

這個過濾器是專門給dashboard來匯算key的,所以這個appName直接設置為該worker配置的appName

該過濾器的數(shù)據(jù)來源都是client的NettyKeyPusher#sendCount(String appName, List

list),這里面的數(shù)據(jù)都是默認積攢10s的,這個10s是可以配置的,這一點在client里面有講

將構(gòu)造的new KeyCountItem(appName, models.get(0).getCreateTime(), models)放到阻塞隊列LinkedBlockingQueue

COUNTER_QUEUE中,然后讓CounterConsumer來消費處理,消費邏輯是單線程的

CounterConsumer:熱key統(tǒng)計消費者

放在公共線程池中,來單線程執(zhí)行

從阻塞隊列COUNTER_QUEUE里面取數(shù)據(jù),然后將里面的key的統(tǒng)計數(shù)據(jù)發(fā)布到etcd的/jd/keyHitCount/+ appName + “/“ + IpUtils.getIp() + “-“ + System.currentTimeMillis()里面,該路徑是worker服務的client集群或者default,用來存放客戶端hotKey訪問次數(shù)和總訪問次數(shù)的path,然后讓dashboard來訂閱統(tǒng)計展示

2.三個定時任務:3個@Scheduled

1)定時任務1:EtcdStarter#pullRules()


/**

* 每隔1分鐘拉取一次,所有的app的rule

*/

@Scheduled(fixedRate = 60000)

public void pullRules() {

try {

if (isForSingle()) {

String value = configCenter.get(ConfigConstant.rulePath + workerPath);

if (!StrUtil.isEmpty(value)) {

List keyRules = FastJsonUtils.toList(value, KeyRule.class);

KeyRuleHolder.put(workerPath, keyRules);

}

} else {

List keyValues = configCenter.getPrefix(ConfigConstant.rulePath);

for (KeyValue keyValue : keyValues) {

ruleChange(keyValue);

}

}

} catch (StatusRuntimeException ex) {

logger.error(ETCD_DOWN);

}

}

每隔1分鐘拉取一次etcd地址為/jd/rules/的規(guī)則變化,如果worker所服務的app或者default的rule有變化,則更新規(guī)則的緩存,并清空該appName所對應的本地key緩存 2)定時任務2:EtcdStarter#uploadClientCount()


/**

* 每隔10秒上傳一下client的數(shù)量到etcd中

*/

@Scheduled(fixedRate = 10000)

public void uploadClientCount() {

try {

String ip = IpUtils.getIp();

for (AppInfo appInfo : ClientInfoHolder.apps) {

String appName = appInfo.getAppName();

int count = appInfo.size();

//即便是full gc也不能超過3秒,因為這里給的過期時間是13s,由于該定時任務每隔10s執(zhí)行一次,如果full gc或者說上報給etcd的時間超過3s,

//則在dashboard查詢不到client的數(shù)量

configCenter.putAndGrant(ConfigConstant.clientCountPath + appName + "/" + ip, count + "", 13);

}

configCenter.putAndGrant(ConfigConstant.caffeineSizePath + ip, FastJsonUtils.convertObjectToJSON(CaffeineCacheHolder.getSize()), 13);

//上報每秒QPS(接收key數(shù)量、處理key數(shù)量)

String totalCount = FastJsonUtils.convertObjectToJSON(new TotalCount(HotKeyFilter.totalReceiveKeyCount.get(), totalDealCount.longValue()));

configCenter.putAndGrant(ConfigConstant.totalReceiveKeyCount + ip, totalCount, 13);

logger.info(totalCount + " expireCount:" + expireTotalCount + " offerCount:" + totalOfferCount);

//如果是穩(wěn)定一直有key發(fā)送的應用,建議開啟該監(jiān)控,以避免可能發(fā)生的網(wǎng)絡故障

if (openMonitor) {

checkReceiveKeyCount();

}

// configCenter.putAndGrant(ConfigConstant.bufferPoolPath + ip, MemoryTool.getBufferPool() + "", 10);

} catch (Exception ex) {

logger.error(ETCD_DOWN);

}

}

每個10s將worker計算存儲的client信息上報給etcd,來方便dashboard來查詢展示,比如/jd/count/對應client數(shù)量,/jd/caffeineSize/對應caffeine緩存的大小,/jd/totalKeyCount/對應該worker接收的key總量和處理的key總量

可以從代碼中看到,上面所有etcd的節(jié)點租期時間都是13s,而該定時任務是每10s執(zhí)行一次,意味著如果full gc或者說上報給etcd的時間超過3s,則在dashboard查詢不到client的相關(guān)匯算信息

長時間不收到key,判斷網(wǎng)絡狀態(tài)不好,斷開worker給etcd地址為/jd/workers/+$workerPath節(jié)點的續(xù)租,因為client會循環(huán)判斷該地址的節(jié)點是否變化,使得client重新連接worker或者斷開失聯(lián)的worker 3)定時任務3:EtcdStarter#fetchDashboardIp()


/**

* 每隔30秒去獲取一下dashboard的地址

*/

@Scheduled(fixedRate = 30000)

public void fetchDashboardIp() {

try {

//獲取DashboardIp

List keyValues = configCenter.getPrefix(ConfigConstant.dashboardPath);

//是空,給個警告

if (CollectionUtil.isEmpty(keyValues)) {

logger.warn("very important warn !!! Dashboard ip is null!!!");

return;

}

String dashboardIp = keyValues.get(0).getValue().toStringUtf8();

NettyClient.getInstance().connect(dashboardIp);

} catch (Exception e) {

e.printStackTrace();

}

}

每隔30s拉取一次etcd前綴為/jd/dashboard/的dashboard連接ip的值,并且判斷DashboardHolder.hasConnected里面是否為未連接狀態(tài),如果是則重新連接worker與dashboard的netty通道

3.自建的生產(chǎn)者消費者模型(KeyProducer,KeyConsumer)

一般生產(chǎn)者消費者模型包含三大元素:生產(chǎn)者、消費者、消息存儲隊列 這里消息存儲隊列是DispatcherConfig里面的QUEUE,使用LinkedBlockingQueue,默認大小為200W 1)KeyProducer


@Component

public class KeyProducer {

public void push(HotKeyModel model, long now) {

if (model == null || model.getKey() == null) {

return;

}

//5秒前的過時消息就不處理了

if (now - model.getCreateTime() > InitConstant.timeOut) {

expireTotalCount.increment();

return;

}

try {

QUEUE.put(model);

totalOfferCount.increment();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

判斷接收到的HotKeyModel是否超出”netty.timeOut”配置的時間,如果是將expireTotalCount紀錄過期總數(shù)給自增,然后返回 2)KeyConsumer


public class KeyConsumer {

private IKeyListener iKeyListener;

public void setKeyListener(IKeyListener iKeyListener) {

this.iKeyListener = iKeyListener;

}

public void beginConsume() {

while (true) {

try {

//從這里可以看出,這里的生產(chǎn)者消費者模型,本質(zhì)上還是拉模式,之所以不使用EventBus,是因為需要隊列來做緩沖

HotKeyModel model = QUEUE.take();

if (model.isRemove()) {

iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);

} else {

iKeyListener.newKey(model, KeyEventOriginal.CLIENT);

}

//處理完畢,將數(shù)量加1

totalDealCount.increment();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

@Override

public void removeKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {

//cache里的key,appName+keyType+key

String key = buildKey(hotKeyModel);

hotCache.invalidate(key);

CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);

//推送所有client刪除

hotKeyModel.setCreateTime(SystemClock.now());

logger.info(DELETE_KEY_EVENT + hotKeyModel.getKey());

for (IPusher pusher : iPushers) {

//這里可以看到,刪除熱key的netty消息只給client端發(fā)了過去,沒有給dashboard發(fā)過去(DashboardPusher里面的remove是個空方法)

pusher.remove(hotKeyModel);

}

}

@Override

public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {

//cache里的key

String key = buildKey(hotKeyModel);

//判斷是不是剛熱不久

//hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內(nèi)不重復處理相同的hotKey。

//畢竟hotKey都是瞬時流量,可以避免在這5s內(nèi)重復推送給client和dashboard,避免無效的網(wǎng)絡開銷

Object o = hotCache.getIfPresent(key);

if (o != null) {

return;

}

//********** watch here ************//

//該方法會被InitConstant.threadCount個線程同時調(diào)用,存在多線程問題

//下面的那句addCount是加了鎖的,代表給Key累加數(shù)量時是原子性的,不會發(fā)生多加、少加的情況,到了設定的閾值一定會hot

//譬如閾值是2,如果多個線程累加,在沒hot前,hot的狀態(tài)肯定是對的,譬如thread1 加1,thread2加1,那么thread2會hot返回true,開啟推送

//但是極端情況下,譬如閾值是10,當前是9,thread1走到這里時,加1,返回true,thread2也走到這里,加1,此時是11,返回true,問題來了

//該key會走下面的else兩次,也就是2次推送。

//所以出現(xiàn)問題的原因是hotCache.getIfPresent(key)這一句在并發(fā)情況下,沒return掉,放了兩個key+1到addCount這一步時,會有問題

//測試代碼在TestBlockQueue類,直接運行可以看到會同時hot

//那么該問題用解決嗎,NO,不需要解決,1 首先要發(fā)生的條件極其苛刻,很難觸發(fā),以京東這樣高的并發(fā)量,線上我也沒見過觸發(fā)連續(xù)2次推送同一個key的

//2 即便觸發(fā)了,后果也是可以接受的,2次推送而已,毫無影響,客戶端無感知。但是如果非要解決,就要對slidingWindow實例加鎖了,必然有一些開銷

//所以只要保證key數(shù)量不多計算就可以,少計算了沒事。因為熱key必然頻率高,漏計幾次沒事。但非熱key,多計算了,被干成了熱key就不對了

SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);//從這里可知,每個app的每個key都會對應一個滑動窗口

//看看hot沒

boolean hot = slidingWindow.addCount(hotKeyModel.getCount());

if (!hot) {

//如果沒hot,重新put,cache會自動刷新過期時間

CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).put(key, slidingWindow);

} else {

//這里之所以放入的value為1,是因為hotCache是用來專門存儲剛生成的hotKey

//hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內(nèi)不重復處理相同的hotKey。

//畢竟hotKey都是瞬時流量,可以避免在這5s內(nèi)重復推送給client和dashboard,避免無效的網(wǎng)絡開銷

hotCache.put(key, 1);

//刪掉該key

//這個key從實際上是專門針對slidingWindow的key,他的組合邏輯是appName+keyType+key,而不是給client和dashboard推送的hotKey

CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);

//開啟推送

hotKeyModel.setCreateTime(SystemClock.now());

//當開關(guān)打開時,打印日志。大促時關(guān)閉日志,就不打印了

if (EtcdStarter.LOGGER_ON) {

logger.info(NEW_KEY_EVENT + hotKeyModel.getKey());

}

//分別推送到各client和etcd

for (IPusher pusher : iPushers) {

pusher.push(hotKeyModel);

}

}

}

“thread.count”配置即為消費者個數(shù),多個消費者共同消費一個QUEUE隊列 生產(chǎn)者消費者模型,本質(zhì)上還是拉模式,之所以不使用EventBus,是因為需要隊列來做緩沖 根據(jù)HotKeyModel里面是否是刪除消息類型

刪除CaffeineCacheHolder里面對應newkey的滑動窗口緩存。

向該hotKeyModel對應的app的client推送netty消息,表示新產(chǎn)生hotKey,使得client本地緩存,但是推送的netty消息只代表為熱key,client本地緩存不會存儲key對應的value值,需要調(diào)用JdHotKeyStore里面的api來給本地緩存的value賦值

向dashboard推送hotKeyModel,表示新產(chǎn)生hotKey

刪除消息類型

根據(jù)HotKeyModel里面的appName+keyType+key的名字,來構(gòu)建caffeine里面的newkey,該newkey在caffeine里面主要是用來與slidingWindow滑動時間窗對應

刪除hotCache里面newkey的緩存,放入的緩存kv分別是newKey和1,hotCache作用是用來存儲該生成的熱key,hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內(nèi)不重復處理相同的hotKey。畢竟hotKey都是瞬時流量,可以避免在這5s內(nèi)重復推送給client和dashboard,避免無效的網(wǎng)絡開銷

刪除CaffeineCacheHolder里面對應appName的caffeine里面的newKey,這里面存儲的是slidingWindow滑動窗口

推送給該HotKeyModel對應的所有client實例,用來讓client刪除該HotKeyModel

非刪除消息類型

根據(jù)HotKeyModel里面的appName+keyType+key的名字,來構(gòu)建caffeine里面的newkey,該newkey在caffeine里面主要是用來與slidingWindow滑動時間窗對應

通過hotCache來判斷該newkey是否剛熱不久,如果是則返回

根據(jù)滑動時間窗口來計算判斷該key是否為hotKey(這里可以學習一下滑動時間窗口的設計),并返回或者生成該newKey對應的滑動窗口

如果沒有達到熱key的標準

通過CaffeineCacheHolder重新put,cache會自動刷新過期時間

如果達到了熱key標準

向hotCache里面增加newkey對應的緩存,value為1表示剛為熱key。

3)計算熱key滑動窗口的設計 限于篇幅的原因,這里就不細談了,直接貼出項目作者對其寫的說明文章:Java簡單實現(xiàn)滑動窗口

3.3.4 dashboard端

這個沒啥可說的了,就是連接etcd、mysql,增刪改查,不過京東的前端框架很方便,直接返回list就可以成列表。

4 總結(jié)

文章第二部分為大家講解了redis數(shù)據(jù)傾斜的原因以及應對方案,并對熱點問題進行了深入,從發(fā)現(xiàn)熱key到解決熱key的兩個關(guān)鍵問題的總結(jié)。 文章第三部分是熱key問題解決方案——JD開源hotkey的源碼解析,分別從client端、worker端、dashboard端來進行全方位講解,包括其設計、使用及相關(guān)原理。 希望通過這篇文章,能夠使大家不僅學習到相關(guān)方法論,也能明白其方法論具體的落地方案,一起學習,一起成長。

審核編輯:湯梓紅

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 開源
    +關(guān)注

    關(guān)注

    3

    文章

    3381

    瀏覽量

    42604
  • Redis
    +關(guān)注

    關(guān)注

    0

    文章

    376

    瀏覽量

    10898

原文標題:Redis數(shù)據(jù)傾斜與JD開源hotkey源碼分析揭秘

文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    快充EOS 應對方案OVP和TVS

    快充EOS 應對方案OVP和TVSEOS是英文Electrical Over Stress的首字母縮寫,其意為電氣過應力 ,意思就是有過高的電壓和/或電流信號加到了受試對象上,它受不了了,被損壞
    發(fā)表于 07-28 13:46

    企業(yè)打開Redis的正確方式,來自阿里云云數(shù)據(jù)庫團隊的解讀

    的情況下,每個業(yè)務都需要各自特定的數(shù)據(jù)庫架構(gòu)和優(yōu)化方案,需要加入OLAP 、離線分析任務,并且考慮高速擴展、高性能、高可靠等問題。Redis開源的基于內(nèi)存且可以持久化的分布式 Key
    發(fā)表于 02-07 14:06

    Redis Stream應用案例

    IRC頻道(channel1),就可以接收所有用戶發(fā)出的消息了。發(fā)出消息時,只需使用發(fā)布命令(publish)命令即可。整個業(yè)務邏輯非常的清晰簡單,這也是Redis強大和流行的重要原因——提供的功能和數(shù)據(jù)
    發(fā)表于 06-26 17:15

    走近源碼Redis如何執(zhí)行命令的

    走近源碼Redis如何執(zhí)行命令
    發(fā)表于 06-09 16:31

    APT給AVER的困擾點有哪些?對于APT有什么應對方案?

    APT與傳統(tǒng)的病毒時代有什么不同?APT給AVER的困擾點有哪些?對于APT有什么應對方案?
    發(fā)表于 07-05 06:32

    如何使得redis中的數(shù)據(jù)不再有

    ,原因redis的持久化功能導致的,所謂的持久化就是redis在系統(tǒng)關(guān)閉的時候把數(shù)據(jù)存儲到硬盤中,在下一次啟動的時候,在從硬盤恢復到redis
    發(fā)表于 11-05 08:50

    【昉·星光 2 高性能RISC-V單板計算機體驗】Redis源碼編譯和性能測試以及與樹莓派4B對比

    本文首先介紹Redis是什么,然后介紹如何在VisionFive2上編譯Redis源碼以及源碼安裝R
    發(fā)表于 12-10 21:27

    【愛芯派 Pro 開發(fā)板試用體驗】Redis源碼編譯和基準測試

    庫、緩存、流式處理引擎和消息代理的開源內(nèi)存數(shù)據(jù)存儲。 二、源碼編譯Redis 2.1 安裝git和編譯工具鏈 # 安裝 git 和編譯工具鏈 sudo aptinstall git
    發(fā)表于 12-10 22:18

    舵機失靈的主要原因_舵機失靈的應對方

    本文首先介紹了舵機失靈的主要原因,其次介紹了舵機失靈的應對方法,最后介紹了舵機日常維護的重點以及提高船舶應急應變能力的對策,具體的跟隨小編一起來了解一下。
    的頭像 發(fā)表于 05-30 14:28 ?4.3w次閱讀

    氨水罐滲漏的快速應對方案

    氨水罐滲漏的快速應對方案
    發(fā)表于 02-28 10:04 ?7次下載

    運放輸出失調(diào)電壓的影響以及應對方法說明

    運放輸出失調(diào)電壓的影響以及應對方
    的頭像 發(fā)表于 03-17 16:58 ?1.3w次閱讀
    運放輸出失調(diào)電壓的影響<b class='flag-5'>以及</b><b class='flag-5'>應對方</b>法說明

    ECG子系統(tǒng)設計主要挑戰(zhàn)及應對方案

    電子發(fā)燒友網(wǎng)站提供《ECG子系統(tǒng)設計主要挑戰(zhàn)及應對方案.pdf》資料免費下載
    發(fā)表于 11-23 10:43 ?0次下載
    ECG子系統(tǒng)設計主要挑戰(zhàn)及<b class='flag-5'>應對方案</b>

    電源電壓變化對晶振性能的影響以及應對方

    電源電壓變化對晶振性能的影響以及應對方法? 電源電壓的變化是指電源輸入電壓的波動或變化,它可能產(chǎn)生一系列的問題,對晶振的性能和工作穩(wěn)定性產(chǎn)生影響。本文將詳細討論電源電壓變化對晶振的影響,并提供應對方
    的頭像 發(fā)表于 12-18 14:09 ?1509次閱讀

    新版 Redis 不再“開源”,對使用者都有哪些影響?

    2024 年 3 月 20 日,Redis Labs 宣布從 Redis 7.4 開始,將原先比較寬松的 BSD 源碼使用協(xié)議修改為 RSAv2和 SSPLv1協(xié)議。該變化意味著 Redis
    的頭像 發(fā)表于 03-27 22:30 ?524次閱讀
    新版 <b class='flag-5'>Redis</b> 不再“<b class='flag-5'>開源</b>”,對使用者都有哪些影響?

    Redis開源版與Redis企業(yè)版,怎么選用?

    點擊“藍字”關(guān)注我們數(shù)以千計的企業(yè)和數(shù)以百萬計的開發(fā)人員Redis開源版來構(gòu)建應用程序。但隨著用戶數(shù)量、數(shù)據(jù)量和地區(qū)性的增加,成本、可擴展性、運營和可用性等問題也隨之而來。Redis
    的頭像 發(fā)表于 04-04 08:04 ?1153次閱讀
    <b class='flag-5'>Redis</b><b class='flag-5'>開源</b>版與<b class='flag-5'>Redis</b>企業(yè)版,怎么選用?