一、Sentinel簡(jiǎn)介
Sentinel 以流量為切入點(diǎn),從流量控制、熔斷降級(jí)、系統(tǒng)負(fù)載保護(hù)等多個(gè)維度保護(hù)服務(wù)的穩(wěn)定性。
Sentinel 具有以下特征:
?豐富的應(yīng)用場(chǎng)景:Sentinel 承接了阿里巴巴近 10 年的雙十一大促流量的核心場(chǎng)景,例如秒殺(即突發(fā)流量控制在系統(tǒng)容量可以承受的范圍)、消息削峰填谷、集群流量控制、實(shí)時(shí)熔斷下游不可用應(yīng)用等。
?完備的實(shí)時(shí)監(jiān)控:Sentinel 同時(shí)提供實(shí)時(shí)的監(jiān)控功能。您可以在控制臺(tái)中看到接入應(yīng)用的單臺(tái)機(jī)器秒級(jí)數(shù)據(jù),甚至 500 臺(tái)以下規(guī)模的集群的匯總運(yùn)行情況。
?廣泛的開(kāi)源生態(tài):Sentinel 提供開(kāi)箱即用的與其它開(kāi)源框架/庫(kù)的整合模塊,例如與 Spring Cloud、Apache Dubbo、gRPC、Quarkus 的整合。您只需要引入相應(yīng)的依賴(lài)并進(jìn)行簡(jiǎn)單的配置即可快速地接入 Sentinel。同時(shí) Sentinel 提供 Java/Go/C++ 等多語(yǔ)言的原生實(shí)現(xiàn)。
?完善的 SPI 擴(kuò)展機(jī)制:Sentinel 提供簡(jiǎn)單易用、完善的 SPI 擴(kuò)展接口。您可以通過(guò)實(shí)現(xiàn)擴(kuò)展接口來(lái)快速地定制邏輯。例如定制規(guī)則管理、適配動(dòng)態(tài)數(shù)據(jù)源等。
有關(guān)Sentinel的詳細(xì)介紹以及和Hystrix的區(qū)別可以自行網(wǎng)上檢索,推薦一篇文章:https://mp.weixin.qq.com/s/Q7Xv8cypQFrrOQhbd9BOXw
本次主要使用了Sentinel的降級(jí)、限流、系統(tǒng)負(fù)載保護(hù)功能
二、Sentinel關(guān)鍵技術(shù)源碼解析
無(wú)論是限流、降級(jí)、負(fù)載等控制手段,大致流程如下:
?StatisticSlot 則用于記錄、統(tǒng)計(jì)不同維度的 runtime 指標(biāo)監(jiān)控信息
?責(zé)任鏈依次觸發(fā)后續(xù) slot 的 entry 方法,如 SystemSlot、FlowSlot、DegradeSlot 等的規(guī)則校驗(yàn);
?當(dāng)后續(xù)的 slot 通過(guò),沒(méi)有拋出 BlockException 異常,說(shuō)明該資源被成功調(diào)用,則增加執(zhí)行線(xiàn)程數(shù)和通過(guò)的請(qǐng)求數(shù)等信息。
關(guān)于數(shù)據(jù)統(tǒng)計(jì),主要會(huì)牽扯到 ArrayMetric、BucketLeapArray、MetricBucket、WindowWrap 等類(lèi)。
項(xiàng)目結(jié)構(gòu)
以下主要分析core包里的內(nèi)容
2.1注解入口
2.1.1 Entry、Context、Node
SphU門(mén)面類(lèi)的方法出參都是Entry,Entry可以理解為每次進(jìn)入資源的一個(gè)憑證,如果調(diào)用SphO.entry()或者SphU.entry()能獲取Entry對(duì)象,代表獲取了憑證,沒(méi)有被限流,否則拋出一個(gè)BlockException。
Entry中持有本次對(duì)資源調(diào)用的相關(guān)信息:
?createTime:創(chuàng)建該Entry的時(shí)間戳。
?curNode:Entry當(dāng)前是在哪個(gè)節(jié)點(diǎn)。
?orginNode:Entry的調(diào)用源節(jié)點(diǎn)。
?resourceWrapper:Entry關(guān)聯(lián)的資源信息。
?
Entry是一個(gè)抽象類(lèi),CtEntry是Entry的實(shí)現(xiàn),CtEntry持有Context和調(diào)用鏈的信息
Context的源碼注釋如下,
This class holds metadata of current invocation
Node的源碼注釋
Holds real-time statistics for resources
Node中保存了對(duì)資源的實(shí)時(shí)數(shù)據(jù)的統(tǒng)計(jì),Sentinel中的限流或者降級(jí)等功能就是通過(guò)Node中的數(shù)據(jù)進(jìn)行判斷的。Node是一個(gè)接口,里面定義了各種操作request、exception、rt、qps、thread的方法。
在細(xì)看Node實(shí)現(xiàn)時(shí),不難發(fā)現(xiàn)LongAddr的使用,關(guān)于LongAddr和DoubleAddr都是java8 java.util.concurrent.atomic里的內(nèi)容,感興趣的小伙伴可以再深入研究一下,這兩個(gè)是高并發(fā)下計(jì)數(shù)功能非常優(yōu)秀的數(shù)據(jù)結(jié)構(gòu),實(shí)際應(yīng)用場(chǎng)景里需要計(jì)數(shù)時(shí)可以考慮使用。
關(guān)于Node的介紹后續(xù)還會(huì)深入,此處大致先提一下這個(gè)概念。
2.2 初始化
2.2.1 Context初始化
在初始化slot責(zé)任鏈部分前,還執(zhí)行了context的初始化,里面涉及幾個(gè)重要概念,需要解釋一下:
可以發(fā)現(xiàn)在Context初始化的過(guò)程中,會(huì)把EntranceNode加入到Root子節(jié)點(diǎn)中(實(shí)際Root本身是一個(gè)特殊的EntranceNode),并把EntranceNode放到contextNameNodeMap中。
之前簡(jiǎn)單提到過(guò)Node,是用來(lái)統(tǒng)計(jì)數(shù)據(jù)用的,不同Node功能如下:
?Node:用于完成數(shù)據(jù)統(tǒng)計(jì)的接口
?StatisticNode:統(tǒng)計(jì)節(jié)點(diǎn),是Node接口的實(shí)現(xiàn)類(lèi),用于完成數(shù)據(jù)統(tǒng)計(jì)
?EntranceNode:入口節(jié)點(diǎn),一個(gè)Context會(huì)有一個(gè)入口節(jié)點(diǎn),用于統(tǒng)計(jì)當(dāng)前Context的總體流量數(shù)據(jù)
?DefaultNode:默認(rèn)節(jié)點(diǎn),用于統(tǒng)計(jì)一個(gè)資源在當(dāng)前Context中的流量數(shù)據(jù)
?ClusterNode:集群節(jié)點(diǎn),用于統(tǒng)計(jì)一個(gè)資源在所有Context中的總體流量數(shù)據(jù)
protected static Context trueEnter(String name, String origin) { Context context = contextHolder.get(); if (context == null) { Map localCacheNameMap = contextNameNodeMap; DefaultNode node = localCacheNameMap.get(name); if (node == null) { if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { LOCK.lock(); try { node = contextNameNodeMap.get(name); if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); // Add entrance node. Constants.ROOT.addChild(node); Map newMap = new HashMap?>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); } } } context = new Context(node, name); context.setOrigin(origin); contextHolder.set(context); } return context; }
2.2.2 通過(guò)SpiLoader默認(rèn)初始化8個(gè)slot
每個(gè)slot的主要職責(zé)如下:
?NodeSelectorSlot 負(fù)責(zé)收集資源的路徑,并將這些資源的調(diào)用路徑,以樹(shù)狀結(jié)構(gòu)存儲(chǔ)起來(lái),用于根據(jù)調(diào)用路徑來(lái)限流降級(jí);
?ClusterBuilderSlot 則用于存儲(chǔ)資源的統(tǒng)計(jì)信息以及調(diào)用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流,降級(jí)的依據(jù);
?StatisticSlot 則用于記錄、統(tǒng)計(jì)不同緯度的 runtime 指標(biāo)監(jiān)控信息;
?FlowSlot 則用于根據(jù)預(yù)設(shè)的限流規(guī)則以及前面 slot 統(tǒng)計(jì)的狀態(tài),來(lái)進(jìn)行流量控制;
?AuthoritySlot 則根據(jù)配置的黑白名單和調(diào)用來(lái)源信息,來(lái)做黑白名單控制;
?DegradeSlot 則通過(guò)統(tǒng)計(jì)信息以及預(yù)設(shè)的規(guī)則,來(lái)做熔斷降級(jí);
?SystemSlot 則通過(guò)系統(tǒng)的狀態(tài),例如 集群QPS、線(xiàn)程數(shù)、RT、負(fù)載 等,來(lái)控制總的入口流量;
2.3 StatisticSlot
2.3.1 Node
深入看一下Node,因?yàn)榻y(tǒng)計(jì)信息都在里面,后面不論是限流、熔斷、負(fù)載保護(hù)等都是結(jié)合規(guī)則+統(tǒng)計(jì)信息判斷是否要執(zhí)行
從Node的源碼注釋看,它會(huì)持有資源維度的實(shí)時(shí)統(tǒng)計(jì)數(shù)據(jù),以下是接口里的方法定義,可以看到totalRequest、totalPass、totalSuccess、blockRequest、totalException、passQps等很多request、qps、thread的相關(guān)方法:
/** * Holds real-time statistics for resources. * * @author qinan.qn * @author leyou * @author Eric Zhao */ public interface Node extends OccupySupport, DebugSupport { long totalRequest(); long totalPass(); long totalSuccess(); long blockRequest(); long totalException(); double passQps(); double blockQps(); double totalQps(); double successQps(); …… }
2.3.2 StatisticNode
我們先從最基礎(chǔ)的StatisticNode開(kāi)始看,源碼給出的定位是:
The statistic node keep three kinds of real-time statistics metrics: metrics in second level ({@code rollingCounterInSecond}) metrics in minute level ({@code rollingCounterInMinute}) thread count
StatisticNode只有四個(gè)屬性,除了之前提到過(guò)的LongAddr類(lèi)型的curThreadNum外,還有兩個(gè)屬性是Metric對(duì)象,通過(guò)入?yún)⒁呀?jīng)屬性命名可以看出,一個(gè)用于秒級(jí),一個(gè)用于分鐘級(jí)統(tǒng)計(jì)。接下來(lái)我們就要看看Metric
// StatisticNode持有兩個(gè)Metric,一個(gè)秒級(jí)一個(gè)分鐘級(jí),由入?yún)⒖芍?,秒?jí)統(tǒng)計(jì)劃分了兩個(gè)時(shí)間窗口,窗口程度是500ms private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); // 分鐘級(jí)統(tǒng)計(jì)劃分了60個(gè)時(shí)間窗口,窗口長(zhǎng)度是1000ms private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); /** * The counter for thread count. */ private LongAdder curThreadNum = new LongAdder(); /** * The last timestamp when metrics were fetched. */ private long lastFetchTime = -1;
ArrayMetric只有一個(gè)屬性L(fǎng)eapArray,其余都是用于統(tǒng)計(jì)的方法,LeapArray是sentinel中統(tǒng)計(jì)最基本的數(shù)據(jù)結(jié)構(gòu),這里有必要詳細(xì)看一下,總體就是根據(jù)timeMillis去獲取一個(gè)bucket,分為:沒(méi)有創(chuàng)建、有直接返回、被廢棄后的reset三種場(chǎng)景。
//以分鐘級(jí)的統(tǒng)計(jì)屬性為例,看一下時(shí)間窗口初始化過(guò)程 private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); public LeapArray(int sampleCount, int intervalInMs) { AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount); AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive"); AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); // windowLengthInMs = 60*1000 / 60 = 1000 滑動(dòng)窗口時(shí)間長(zhǎng)度,可見(jiàn)sentinel默認(rèn)將單位時(shí)間分為了60個(gè)滑動(dòng)窗口進(jìn)行數(shù)據(jù)統(tǒng)計(jì) this.windowLengthInMs = intervalInMs / sampleCount; // 60*1000 this.intervalInMs = intervalInMs; // 60 this.intervalInSecond = intervalInMs / 1000.0; // 60 this.sampleCount = sampleCount; // 數(shù)組長(zhǎng)度60 this.array = new AtomicReferenceArray?>(sampleCount); } /** * Get bucket item at provided timestamp. * * @param timeMillis a valid timestamp in milliseconds * @return current bucket item at provided timestamp if the time is valid; null if time is invalid */ public WindowWrap currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 根據(jù)當(dāng)前時(shí)間戳算一個(gè)數(shù)組索引 int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. // timeMillis % 1000 long windowStart = calculateWindowStart(timeMillis); /* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket. */ while (true) { WindowWrap old = array.get(idx); if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ // newEmptyBucket 方法重寫(xiě),秒級(jí)和分鐘級(jí)統(tǒng)計(jì)對(duì)象實(shí)現(xiàn)不同 WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; } else if (windowStart > old.windowStart()) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } // 持有一個(gè)時(shí)間窗口對(duì)象的數(shù)據(jù),會(huì)根據(jù)當(dāng)前時(shí)間戳除以時(shí)間窗口長(zhǎng)度然后散列到數(shù)組中 private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); }
WindowWrap持有了windowLengthInMs, windowStart和LeapArray(分鐘統(tǒng)計(jì)實(shí)現(xiàn)是BucketLeapArray,秒級(jí)統(tǒng)計(jì)實(shí)現(xiàn)是OccupiableBucketLeapArray),對(duì)于分鐘級(jí)別的統(tǒng)計(jì),MetricBucket維護(hù)了一個(gè)longAddr數(shù)組和一個(gè)配置的minRT
/** * The fundamental data structure for metric statistics in a time span. * * @author jialiang.linjl * @author Eric Zhao * @see LeapArray */ public class BucketLeapArray extends LeapArray { public BucketLeapArray(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); } @Override public MetricBucket newEmptyBucket(long time) { return new MetricBucket(); } @Override protected WindowWrap resetWindowTo(WindowWrap w, long startTime) { // Update the start time and reset value. w.resetTo(startTime); w.value().reset(); return w; } }
對(duì)于秒級(jí)統(tǒng)計(jì),QPS=20場(chǎng)景下,如何準(zhǔn)確統(tǒng)計(jì)的問(wèn)題,此處用到了另外一個(gè)LeapArry實(shí)現(xiàn)FutureBucketLeapArray,至于秒級(jí)統(tǒng)計(jì)如何保證沒(méi)有統(tǒng)計(jì)誤差,讀者可以再研究一下FutureBucketLeapArray的上下文就好。
2.4 FlowSlot
2.4.1 常見(jiàn)限流算法
介紹sentinel限流實(shí)現(xiàn)前,先介紹一下常見(jiàn)限流算法,基本分為三種:計(jì)數(shù)器、漏斗、令牌桶。
計(jì)數(shù)器算法
顧名思義,計(jì)數(shù)器算法就是統(tǒng)計(jì)某個(gè)時(shí)間段內(nèi)的請(qǐng)求,每單位時(shí)間加1,然后與配置的限流值(最大QPS)進(jìn)行比較,如果超出則觸發(fā)限流。但是這種算法不能做到“平滑限流”,以1s為單位時(shí)間,100QPS為限流值為例,如下圖,會(huì)出現(xiàn)某時(shí)段超出限流值的情況
因此在單純計(jì)數(shù)器算法上,又出現(xiàn)了滑動(dòng)窗口計(jì)數(shù)器算法,我們將統(tǒng)計(jì)時(shí)間細(xì)分,比如將1s統(tǒng)計(jì)時(shí)長(zhǎng)分為5個(gè)時(shí)間窗口,通過(guò)滾動(dòng)統(tǒng)計(jì)所有時(shí)間窗口的QPS作為系統(tǒng)實(shí)際的QPS的方式,就能解決上述臨界統(tǒng)計(jì)問(wèn)題,后續(xù)我們看sentinel源碼時(shí)也能看到類(lèi)似操作。
漏斗算法
不論流量有多大都會(huì)先到漏桶中,然后以均勻的速度流出。如何在代碼中實(shí)現(xiàn)這個(gè)勻速呢?比如我們想讓勻速為100q/s,那么我們可以得到每流出一個(gè)流量需要消耗10ms,類(lèi)似一個(gè)隊(duì)列,每隔10ms從隊(duì)列頭部取出流量進(jìn)行放行,而我們的隊(duì)列也就是漏桶,當(dāng)流量大于隊(duì)列的長(zhǎng)度的時(shí)候,我們就可以拒絕超出的部分。
漏斗算法同樣的也有一定的缺點(diǎn):無(wú)法應(yīng)對(duì)突發(fā)流量。比如一瞬間來(lái)了100個(gè)請(qǐng)求,在漏桶算法中只能一個(gè)一個(gè)的過(guò)去,當(dāng)最后一個(gè)請(qǐng)求流出的時(shí)候時(shí)間已經(jīng)過(guò)了一秒了,所以漏斗算法比較適合請(qǐng)求到達(dá)比較均勻,需要嚴(yán)格控制請(qǐng)求速率的場(chǎng)景。
令牌桶算法
令牌桶算法和漏斗算法比較類(lèi)似,區(qū)別是令牌桶存放的是令牌數(shù)量不是請(qǐng)求數(shù)量,令牌桶可以根據(jù)自身需求多樣性得管理令牌的生產(chǎn)和消耗,可以解決突發(fā)流量的問(wèn)題。
2.4.2 單機(jī)限流模式
接下來(lái)我們看一下Sentinel中的限流實(shí)現(xiàn),相比上述基本限流算法,Sentinel限流的第一個(gè)特性就是引入“資源”的概念,可以細(xì)粒度多樣性的支持特定資源、關(guān)聯(lián)資源、指定鏈路的限流。
FlowSlot的主要邏輯都在FlowRuleChecker里,介紹之前,我們先看一下Sentinel關(guān)于規(guī)則的模型描述,下圖分別是限流、訪(fǎng)問(wèn)控制規(guī)則、系統(tǒng)保護(hù)規(guī)則(Linux負(fù)載)、降級(jí)規(guī)則
/** * 流量控制兩種模式 * 0: thread count(當(dāng)調(diào)用該api的線(xiàn)程數(shù)達(dá)到閾值的時(shí)候,進(jìn)行限流) * 1: QPS(當(dāng)調(diào)用該api的QPS達(dá)到閾值的時(shí)候,進(jìn)行限流) */ private int grade = RuleConstant.FLOW_GRADE_QPS; /** * 流量控制閾值,值含義與grade有關(guān) */ private double count; /** * 調(diào)用關(guān)系限流策略(可以支持關(guān)聯(lián)資源或指定鏈路的多樣性限流需求) * 直接(api 達(dá)到限流條件時(shí),直接限流) * 關(guān)聯(lián)(當(dāng)關(guān)聯(lián)的資源達(dá)到限流閾值時(shí),就限流自己) * 鏈路(只記錄指定鏈路上的流量) * {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin); * {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource); * {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource). */ private int strategy = RuleConstant.STRATEGY_DIRECT; /** * Reference resource in flow control with relevant resource or context. */ private String refResource; /** * 流控效果: * 0. default(reject directly),直接拒絕,拋異常FlowException * 1. warm up, 慢啟動(dòng)模式(根據(jù)coldFactor(冷加載因子,默認(rèn)3)的值,從閾值/coldFactor,經(jīng)過(guò)預(yù)熱時(shí)長(zhǎng),才達(dá)到設(shè)置的QPS閾值) * 2. rate limiter 排隊(duì)等待 * 3. warm up + rate limiter */ private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; private int warmUpPeriodSec = 10; /** * Max queueing time in rate limiter behavior. */ private int maxQueueingTimeMs = 500; /** * 是否集群限流,默認(rèn)為否 */ private boolean clusterMode; /** * Flow rule config for cluster mode. */ private ClusterFlowConfig clusterConfig; /** * The traffic shaping (throttling) controller. */ private TrafficShapingController controller;
接著我們繼續(xù)分析FlowRuleChecker
canPassCheck第一步會(huì)好看limitApp,這個(gè)是結(jié)合訪(fǎng)問(wèn)授權(quán)限制規(guī)則使用的,默認(rèn)是所有。
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { // 根據(jù)策略選擇Node來(lái)進(jìn)行統(tǒng)計(jì)(可以是本身Node、關(guān)聯(lián)的Node、指定的鏈路) Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } return rule.getRater().canPass(selectedNode, acquireCount, prioritized); } static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) { // limitApp是訪(fǎng)問(wèn)控制使用的,默認(rèn)是default,不限制來(lái)源 String limitApp = rule.getLimitApp(); // 拿到限流策略 int strategy = rule.getStrategy(); String origin = context.getOrigin(); // 基于調(diào)用來(lái)源做鑒權(quán) if (limitApp.equals(origin) && filterOrigin(origin)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { // Matches limit origin, return origin statistic node. return context.getOriginNode(); } // return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { // Return the cluster node. return node.getClusterNode(); } return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } return selectReferenceNode(rule, context, node); } return null; } static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { String refResource = rule.getRefResource(); int strategy = rule.getStrategy(); if (StringUtil.isEmpty(refResource)) { return null; } if (strategy == RuleConstant.STRATEGY_RELATE) { return ClusterBuilderSlot.getClusterNode(refResource); } if (strategy == RuleConstant.STRATEGY_CHAIN) { if (!refResource.equals(context.getName())) { return null; } return node; } // No node. return null; } // 此代碼是load限流規(guī)則時(shí)根據(jù)規(guī)則初始化流量整形控制器的邏輯,rule.getRater()返回TrafficShapingController private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) { if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { switch (rule.getControlBehavior()) { // 預(yù)熱模式返回WarmUpController case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor); // 排隊(duì)模式返回ThrottlingController case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: return new ThrottlingController(rule.getMaxQueueingTimeMs(), rule.getCount()); // 預(yù)熱+排隊(duì)模式返回WarmUpRateLimiterController case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_DEFAULT: default: // Default mode or unknown mode: default traffic shaping controller (fast-reject). } } // 默認(rèn)是DefaultController return new DefaultController(rule.getCount(), rule.getGrade()); }
Sentinel單機(jī)限流算法
上面我們看到根據(jù)限流規(guī)則controlBehavior屬性(流控效果),會(huì)初始化以下實(shí)現(xiàn):
?DefaultController:是一個(gè)非常典型的滑動(dòng)窗口計(jì)數(shù)器算法實(shí)現(xiàn),將當(dāng)前統(tǒng)計(jì)的qps和請(qǐng)求進(jìn)來(lái)的qps進(jìn)行求和,小于限流值則通過(guò),大于則計(jì)算一個(gè)等待時(shí)間,稍后再試
?ThrottlingController:是漏斗算法的實(shí)現(xiàn),實(shí)現(xiàn)思路已經(jīng)在源碼片段中加了備注
?WarmUpController:實(shí)現(xiàn)參考了Guava的帶預(yù)熱的RateLimiter,區(qū)別是Guava側(cè)重于請(qǐng)求間隔,類(lèi)似前面提到的令牌桶,而Sentinel更關(guān)注于請(qǐng)求數(shù),和令牌桶算法有點(diǎn)類(lèi)似
?WarmUpRateLimiterController:低水位使用預(yù)熱算法,高水位使用滑動(dòng)窗口計(jì)數(shù)器算法排隊(duì)。
DefaultController
@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) { if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }
ThrottlingController
public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat) { this(queueingTimeoutMs, maxCountPerStat, 1000); } public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat, int statDurationMs) { AssertUtil.assertTrue(statDurationMs > 0, "statDurationMs should be positive"); AssertUtil.assertTrue(maxCountPerStat >= 0, "maxCountPerStat should be >= 0"); AssertUtil.assertTrue(queueingTimeoutMs >= 0, "queueingTimeoutMs should be >= 0"); this.maxQueueingTimeMs = queueingTimeoutMs; this.count = maxCountPerStat; this.statDurationMs = statDurationMs; // Use nanoSeconds when durationMs%count != 0 or count/durationMs> 1 (to be accurate) // 可見(jiàn)配置限流值count大于1000時(shí)useNanoSeconds會(huì)是true否則是false if (maxCountPerStat > 0) { this.useNanoSeconds = statDurationMs % Math.round(maxCountPerStat) != 0 || maxCountPerStat / statDurationMs > 1; } else { this.useNanoSeconds = false; } } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } private boolean checkPassUsingNanoSeconds(int acquireCount, double maxCountPerStat) { final long maxQueueingTimeNs = maxQueueingTimeMs * MS_TO_NS_OFFSET; long currentTime = System.nanoTime(); // Calculate the interval between every two requests. final long costTimeNs = Math.round(1.0d * MS_TO_NS_OFFSET * statDurationMs * acquireCount / maxCountPerStat); // Expected pass time of this request. long expectedTime = costTimeNs + latestPassedTime.get(); if (expectedTime <= currentTime) { // Contention may exist here, but it's okay. latestPassedTime.set(currentTime); return true; } else { final long curNanos = System.nanoTime(); // Calculate the time to wait. long waitTime = costTimeNs + latestPassedTime.get() - curNanos; if (waitTime > maxQueueingTimeNs) { return false; } long oldTime = latestPassedTime.addAndGet(costTimeNs); waitTime = oldTime - curNanos; if (waitTime > maxQueueingTimeNs) { latestPassedTime.addAndGet(-costTimeNs); return false; } // in race condition waitTime may <= 0 if (waitTime > 0) { sleepNanos(waitTime); } return true; } } // 漏斗算法具體實(shí)現(xiàn) private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) { long currentTime = TimeUtil.currentTimeMillis(); // 計(jì)算兩次請(qǐng)求的間隔(分為秒級(jí)和納秒級(jí)) long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat); // 請(qǐng)求的期望的時(shí)間 long expectedTime = costTime + latestPassedTime.get(); if (expectedTime <= currentTime) { // latestPassedTime是AtomicLong類(lèi)型,支持volatile語(yǔ)義 latestPassedTime.set(currentTime); return true; } else { // 計(jì)算等待時(shí)間 long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); // 如果大于最大排隊(duì)時(shí)間,則觸發(fā)限流 if (waitTime > maxQueueingTimeMs) { return false; } long oldTime = latestPassedTime.addAndGet(costTime); waitTime = oldTime - TimeUtil.currentTimeMillis(); if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false; } // in race condition waitTime may <= 0 if (waitTime > 0) { sleepMs(waitTime); } return true; } } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // Pass when acquire count is less or equal than 0. if (acquireCount <= 0) { return true; } // Reject when count is less or equal than 0. // Otherwise, the costTime will be max of long and waitTime will overflow in some cases. if (count <= 0) { return false; } if (useNanoSeconds) { return checkPassUsingNanoSeconds(acquireCount, this.count); } else { return checkPassUsingCachedMs(acquireCount, this.count); } } private void sleepMs(long ms) { try { Thread.sleep(ms); } catch (InterruptedException e) { } } private void sleepNanos(long ns) { LockSupport.parkNanos(ns); } long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat);
由上述計(jì)算兩次請(qǐng)求間隔的公式我們可以發(fā)現(xiàn),當(dāng)maxCountPerStat(規(guī)則配置的限流值QPS)超過(guò)1000后,就無(wú)法準(zhǔn)確計(jì)算出勻速排隊(duì)模式下的請(qǐng)求間隔時(shí)長(zhǎng),因此對(duì)應(yīng)前面介紹的,當(dāng)規(guī)則配置限流值超過(guò)1000QPS后,會(huì)采用checkPassUsingNanoSeconds,小于1000QPS會(huì)采用checkPassUsingCachedMs,對(duì)比一下checkPassUsingNanoSeconds和checkPassUsingCachedMs,可以發(fā)現(xiàn)主體思路沒(méi)變,只是統(tǒng)計(jì)維度從毫秒換算成了納秒,因此只看checkPassUsingCachedMs實(shí)現(xiàn)就可以
WarmUpController
@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { long passQps = (long) node.passQps(); long previousQps = (long) node.previousPassQps(); syncToken(previousQps); // 開(kāi)始計(jì)算它的斜率 // 如果進(jìn)入了警戒線(xiàn),開(kāi)始調(diào)整他的qps long restToken = storedTokens.get(); if (restToken >= warningToken) { long aboveToken = restToken - warningToken; // 消耗的速度要比warning快,但是要比慢 // current interval = restToken*slope+1/count double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); if (passQps + acquireCount <= warningQps) { return true; } } else { if (passQps + acquireCount <= count) { return true; } } return false; } protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); currentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime) { return; } long oldValue = storedTokens.get(); long newValue = coolDownTokens(currentTime, passQps); if (storedTokens.compareAndSet(oldValue, newValue)) { long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); } } private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; // 添加令牌的判斷前提條件: // 當(dāng)令牌的消耗程度遠(yuǎn)遠(yuǎn)低于警戒線(xiàn)的時(shí)候 if (oldValue < warningToken) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { if (passQps < (int)count / coldFactor) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.min(newValue, maxToken); }
2.4.3 集群限流
passClusterCheck方法(因?yàn)閏lusterService找不到會(huì)降級(jí)到非集群限流)
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { try { // 獲取當(dāng)前節(jié)點(diǎn)是Token Client還是Token Server TokenService clusterService = pickClusterService(); if (clusterService == null) { return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); } long flowId = rule.getClusterConfig().getFlowId(); // 根據(jù)獲取的flowId通過(guò)TokenService進(jìn)行申請(qǐng)token。從上面可知,它可能是TokenClient調(diào)用的,也可能是ToeknServer調(diào)用的。分別對(duì)應(yīng)的類(lèi)是DefaultClusterTokenClient和DefaultTokenService TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); return applyTokenResult(result, rule, context, node, acquireCount, prioritized); // If client is absent, then fallback to local mode. } catch (Throwable ex) { RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); } // Fallback to local flow control when token client or server for this rule is not available. // If fallback is not enabled, then directly pass. return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); } //獲取當(dāng)前節(jié)點(diǎn)是Token Client還是Token Server。 //1) 如果當(dāng)前節(jié)點(diǎn)的角色是Client,返回的TokenService為DefaultClusterTokenClient; //2)如果當(dāng)前節(jié)點(diǎn)的角色是Server,則默認(rèn)返回的TokenService為DefaultTokenService。 private static TokenService pickClusterService() { if (ClusterStateManager.isClient()) { return TokenClientProvider.getClient(); } if (ClusterStateManager.isServer()) { return EmbeddedClusterTokenServerProvider.getServer(); } return null; }
集群限流模式
Sentinel 集群限流服務(wù)端有兩種啟動(dòng)方式:
?嵌入模式(Embedded)適合應(yīng)用級(jí)別的限流,部署簡(jiǎn)單,但對(duì)應(yīng)用性能有影響
?獨(dú)立模式(Alone)適合全局限流,需要獨(dú)立部署
考慮到文章篇幅,集群限流有機(jī)會(huì)再展開(kāi)詳細(xì)介紹。
集群限流模式降級(jí)
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { try { TokenService clusterService = pickClusterService(); if (clusterService == null) { return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); } long flowId = rule.getClusterConfig().getFlowId(); TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); return applyTokenResult(result, rule, context, node, acquireCount, prioritized); // If client is absent, then fallback to local mode. } catch (Throwable ex) { RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); } // Fallback to local flow control when token client or server for this rule is not available. // If fallback is not enabled, then directly pass. // 可以看到如果集群限流有異常,會(huì)降級(jí)到單機(jī)限流模式,如果配置不允許降級(jí),那么直接會(huì)跳過(guò)此次校驗(yàn) return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); }
2.5 DegradeSlot
CircuitBreaker
大神對(duì)斷路器的解釋?zhuān)篽ttps://martinfowler.com/bliki/CircuitBreaker.html
首先就看到了根據(jù)資源名稱(chēng)獲取斷路器列表,Sentinel的斷路器有兩個(gè)實(shí)現(xiàn):RT模式使用ResponseTimeCircuitBreaker、異常模式使用ExceptionCircuitBreaker
public interface CircuitBreaker { /** * Get the associated circuit breaking rule. * * @return associated circuit breaking rule */ DegradeRule getRule(); /** * Acquires permission of an invocation only if it is available at the time of invoking. * * @param context context of current invocation * @return {@code true} if permission was acquired and {@code false} otherwise */ boolean tryPass(Context context); /** * Get current state of the circuit breaker. * * @return current state of the circuit breaker */ State currentState(); /** * Record a completed request with the context and handle state transformation of the circuit breaker./p?> * Called when a passed/strong?> invocation finished./p?> * * @param context context of current invocation */ void onRequestComplete(Context context); /** * Circuit breaker state. */ enum State { /** * In {@code OPEN} state, all requests will be rejected until the next recovery time point. */ OPEN, /** * In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation. * If the invocation is abnormal according to the strategy (e.g. it's slow), the circuit breaker * will re-transform to the {@code OPEN} state and wait for the next recovery time point; * otherwise the resource will be regarded as "recovered" and the circuit breaker * will cease cutting off requests and transform to {@code CLOSED} state. */ HALF_OPEN, /** * In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold, * the circuit breaker will transform to {@code OPEN} state. */ CLOSED } }
以ExceptionCircuitBreaker為例看一下具體實(shí)現(xiàn)
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { // 異常模式有兩種,異常率和異常數(shù) private final int strategy; // 最小請(qǐng)求數(shù) private final int minRequestAmount; // 閾值 private final double threshold; // LeapArray是sentinel統(tǒng)計(jì)數(shù)據(jù)非常重要的一個(gè)結(jié)構(gòu),主要封裝了時(shí)間窗口相關(guān)的操作 private final LeapArray stat; public ExceptionCircuitBreaker(DegradeRule rule) { this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs())); } ExceptionCircuitBreaker(DegradeRule rule, LeapArray stat) { super(rule); this.strategy = rule.getGrade(); boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT; AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count"); AssertUtil.notNull(stat, "stat cannot be null"); this.minRequestAmount = rule.getMinRequestAmount(); this.threshold = rule.getCount(); this.stat = stat; } @Override protected void resetStat() { // Reset current bucket (bucket count = 1). stat.currentWindow().value().reset(); } @Override public void onRequestComplete(Context context) { Entry entry = context.getCurEntry(); if (entry == null) { return; } Throwable error = entry.getError(); SimpleErrorCounter counter = stat.currentWindow().value(); if (error != null) { counter.getErrorCount().add(1); } counter.getTotalCount().add(1); handleStateChangeWhenThresholdExceeded(error); } private void handleStateChangeWhenThresholdExceeded(Throwable error) { if (currentState.get() == State.OPEN) { return; } if (currentState.get() == State.HALF_OPEN) { // In detecting request if (error == null) { fromHalfOpenToClose(); } else { fromHalfOpenToOpen(1.0d); } return; } List counters = stat.values(); long errCount = 0; long totalCount = 0; for (SimpleErrorCounter counter : counters) { += counter.errorCount.sum(); totalCount += counter.totalCount.sum(); } if (totalCount < minRequestAmount) { return; } double curCount = errCount; if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) { // Use errorRatio curCount = errCount * 1.0d / totalCount; } if (curCount > threshold) { transformToOpen(curCount); } } static class SimpleErrorCounter { private LongAdder errorCount; private LongAdder totalCount; public SimpleErrorCounter() { this.errorCount = new LongAdder(); this.totalCount = new LongAdder(); } public LongAdder getErrorCount() { return errorCount; } public LongAdder getTotalCount() { return totalCount; } public SimpleErrorCounter reset() { errorCount.reset(); totalCount.reset(); return this; } @Override public String toString() { return "SimpleErrorCounter{" + "errorCount=" + errorCount + ", totalCount=" + totalCount + '}'; } } static class SimpleErrorCounterLeapArray extends LeapArray { public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); } @Override public SimpleErrorCounter newEmptyBucket(long timeMillis) { return new SimpleErrorCounter(); } @Override protected WindowWrap resetWindowTo(WindowWrap w, long startTime) { // Update the start time and reset value. w.resetTo(startTime); w.value().reset(); return w; } } }
2.6 SystemSlot
校驗(yàn)邏輯主要集中在com.alibaba.csp.sentinel.slots.system.SystemRuleManager#checkSystem,以下是片段,可以看到,作為負(fù)載保護(hù)規(guī)則校驗(yàn),實(shí)現(xiàn)了集群的QPS、線(xiàn)程、RT(響應(yīng)時(shí)間)、系統(tǒng)負(fù)載的控制,除系統(tǒng)負(fù)載以外,其余統(tǒng)計(jì)都是依賴(lài)StatisticSlot實(shí)現(xiàn),系統(tǒng)負(fù)載是通過(guò)SystemRuleManager定時(shí)調(diào)度SystemStatusListener,通過(guò)OperatingSystemMXBean去獲取
/** * Apply {@link SystemRule} to the resource. Only inbound traffic will be checked. * * @param resourceWrapper the resource. * @throws BlockException when any system rule's threshold is exceeded. */ public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException { if (resourceWrapper == null) { return; } // Ensure the checking switch is on. if (!checkSystemStatus.get()) { return; } // for inbound traffic only if (resourceWrapper.getEntryType() != EntryType.IN) { return; } // total qps 此處是拿到某個(gè)資源在集群中的QPS總和,相關(guān)概念可以會(huì)看初始化關(guān)于Node的介紹 double currentQps = Constants.ENTRY_NODE.passQps(); if (currentQps + count > qps) { throw new SystemBlockException(resourceWrapper.getName(), "qps"); } // total thread int currentThread = Constants.ENTRY_NODE.curThreadNum(); if (currentThread > maxThread) { throw new SystemBlockException(resourceWrapper.getName(), "thread"); } double rt = Constants.ENTRY_NODE.avgRt(); if (rt > maxRt) { throw new SystemBlockException(resourceWrapper.getName(), "rt"); } // load. BBR algorithm. if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) { if (!checkBbr(currentThread)) { throw new SystemBlockException(resourceWrapper.getName(), "load"); } } // cpu usage if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) { throw new SystemBlockException(resourceWrapper.getName(), "cpu"); } } private static boolean checkBbr(int currentThread) { if (currentThread > 1 && currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) { return false; } return true; } public static double getCurrentSystemAvgLoad() { return statusListener.getSystemAverageLoad(); } public static double getCurrentCpuUsage() { return statusListener.getCpuUsage(); } public class SystemStatusListener implements Runnable { volatile double currentLoad = -1; volatile double currentCpuUsage = -1; volatile String reason = StringUtil.EMPTY; volatile long processCpuTime = 0; volatile long processUpTime = 0; public double getSystemAverageLoad() { return currentLoad; } public double getCpuUsage() { return currentCpuUsage; } @Override public void run() { try { OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class); currentLoad = osBean.getSystemLoadAverage(); /* * Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad()}:/br?> * Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval. * A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value * of 1.0 means that all CPUs were actively running 100% of the time during the recent period being * observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the * system. If the system recent cpu usage is not available, the method returns a negative value. */ double systemCpuUsage = osBean.getSystemCpuLoad(); // calculate process cpu usage to support application running in container environment RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class); long newProcessCpuTime = osBean.getProcessCpuTime(); long newProcessUpTime = runtimeBean.getUptime(); int cpuCores = osBean.getAvailableProcessors(); long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS .toMillis(newProcessCpuTime - processCpuTime); long processUpTimeDiffInMs = newProcessUpTime - processUpTime; double processCpuUsage = (double) processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores; processCpuTime = newProcessCpuTime; processUpTime = newProcessUpTime; currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage); if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) { writeSystemStatusLog(); } } catch (Throwable e) { RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e); } } private void writeSystemStatusLog() { StringBuilder sb = new StringBuilder(); sb.append("Load exceeds the threshold: "); sb.append("load:").append(String.format("%.4f", currentLoad)).append("; "); sb.append("cpuUsage:").append(String.format("%.4f", currentCpuUsage)).append("; "); sb.append("qps:").append(String.format("%.4f", Constants.ENTRY_NODE.passQps())).append("; "); sb.append("rt:").append(String.format("%.4f", Constants.ENTRY_NODE.avgRt())).append("; "); sb.append("thread:").append(Constants.ENTRY_NODE.curThreadNum()).append("; "); sb.append("success:").append(String.format("%.4f", Constants.ENTRY_NODE.successQps())).append("; "); sb.append("minRt:").append(String.format("%.2f", Constants.ENTRY_NODE.minRt())).append("; "); sb.append("maxSuccess:").append(String.format("%.2f", Constants.ENTRY_NODE.maxSuccessQps())).append("; "); RecordLog.info(sb.toString()); } }
三、京東版最佳實(shí)踐
3.1 使用方式
Sentinel使用方式本身非常簡(jiǎn)單,就是一個(gè)注解,但是要考慮規(guī)則加載和規(guī)則持久化的方式,現(xiàn)有的方式有:
?使用Sentinel-dashboard功能:使用面板接入需要維護(hù)一個(gè)配置規(guī)則的管理端,考慮到偏后端的系統(tǒng)需要額外維護(hù)一個(gè)面板成本較大,如果是像RPC框架這種本身有管理端的接入可以考慮次方案。
?中間件(如:zookepper、nacos、eureka、redis等):Sentinel源碼extension包里提供了類(lèi)似的實(shí)現(xiàn),如下圖
結(jié)合京東實(shí)際,我實(shí)現(xiàn)了一個(gè)規(guī)則熱部署的Sentinel組件,實(shí)現(xiàn)方式類(lèi)似zookeeper的方式,將規(guī)則記錄到ducc的一個(gè)key上,在spring容器啟動(dòng)時(shí)做第一次規(guī)則加載和監(jiān)聽(tīng)器注冊(cè),組件也做一了一些規(guī)則讀取,校驗(yàn)、實(shí)例化不同規(guī)則對(duì)象的工作
插件使用方式:注解+配置
第一步 引入組件
com.jd.ldop.tools/groupId?> sentinel-tools/artifactId?> 1.0.0-SNAPSHOT/version?> /dependency?>
第二步 初始化sentinelProcess
支持ducc、本地文件讀取、直接寫(xiě)入三種方式規(guī)則寫(xiě)入方式
目前支持限流規(guī)則、熔斷降級(jí)規(guī)則兩種模式,系統(tǒng)負(fù)載保護(hù)模式待開(kāi)發(fā)和驗(yàn)證
!-- 基于sentinel的降級(jí)、限流、熔斷組件 --?> /list?> /property?> /bean?> !-- 降級(jí)或限流規(guī)則配置 --?> /bean?>
ducc上配置如下:
第三步 定義資源和關(guān)聯(lián)類(lèi)型
通過(guò)@SentinelResource可以直接在任意位置定義資源名以及對(duì)應(yīng)的熔斷降級(jí)或者限流方式、回調(diào)方法等,同時(shí)也可以指定關(guān)聯(lián)類(lèi)型,支持直接、關(guān)聯(lián)、指定鏈路三種
@Override @SentinelResource(value = "modifyGetWaybillState", fallback = "executeDegrade") public ExecutionResult> execute(@NotNull Model imodel) { // 業(yè)務(wù)邏輯處理 } public ExecutionResult> executeDegrade(@NotNull Model imodel) { // 降級(jí)業(yè)務(wù)邏輯處理 }
3.2 應(yīng)用場(chǎng)景
組件支持任意的業(yè)務(wù)降級(jí)、限流、負(fù)載保護(hù)
四、Sentinel壓測(cè)數(shù)據(jù)
4.1 壓測(cè)目標(biāo)
調(diào)用量:1.2W/m
應(yīng)用機(jī)器內(nèi)存穩(wěn)定在50%以?xún)?nèi)
機(jī)器規(guī)格: 8C16G50G磁盤(pán)*2
Sentinel降級(jí)規(guī)則:
count=350-------慢調(diào)用臨界閾值350ms
timeWindow=180------熔斷時(shí)間窗口180s
grade=0-----降級(jí)模式 慢調(diào)用
statIntervalMs=60000------統(tǒng)計(jì)時(shí)長(zhǎng)1min
4.2 壓測(cè)結(jié)果
應(yīng)用機(jī)器監(jiān)控:
壓測(cè)分為了兩個(gè)階段,分別是組件開(kāi)啟和組件關(guān)閉兩次,前半部分是組件開(kāi)啟的情況,后半部分是組件關(guān)閉的情況
應(yīng)用進(jìn)程內(nèi)存分析,和sentinel有關(guān)的前三對(duì)象是
com.alibaba.csp.sentinel.node.metric.MetricNode
com.alibaba.csp.sentinel.CtEntry
com.alibaba.csp.sentinel.context.Context
4.3 壓測(cè)結(jié)論
使Sentinel組件實(shí)現(xiàn)系統(tǒng)服務(wù)自動(dòng)降級(jí)或限流,由于sentinel會(huì)按照滑動(dòng)窗口周期性統(tǒng)計(jì)數(shù)據(jù),因此會(huì)占用一定的機(jī)器內(nèi)存,使用時(shí)應(yīng)設(shè)置合理的規(guī)則,如:合理的統(tǒng)計(jì)時(shí)長(zhǎng)、避免過(guò)多的Sentinel資源創(chuàng)建等。
總體來(lái)說(shuō),使用sentinel組件對(duì)應(yīng)用cpu和內(nèi)存影響不大。
審核編輯 黃宇
-
負(fù)載
+關(guān)注
關(guān)注
2文章
566瀏覽量
34348 -
Sentinel
+關(guān)注
關(guān)注
0文章
10瀏覽量
7151
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論