這期內容當中小編將會給大家?guī)碛嘘Pspring cloud中Hystrix指標收集原理是什么,文章內容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
成都創(chuàng)新互聯公司是專業(yè)的惠陽網站建設公司,惠陽接單;提供網站設計、成都做網站,網頁設計,網站設計,建網站,PHP網站建設等專業(yè)做網站服務;采用PHP框架,可快速的進行惠陽網站開發(fā)網頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網站,專業(yè)的做網站團隊,希望更多企業(yè)前來合作!
上一篇介紹了hystrix熔斷降級的基本實現原理,著重點是從hystrix自身的能力方面著手,結合代碼,做了整體介紹,那么觸發(fā)熔斷的指標是如何計算的,可能前面會籠統(tǒng)的提到metrics,至于它的metrics實現原理是怎么樣的,在本章做重點介紹
官方圖示:
對于使用者先構造一個HystrixCommand對象或者HystrixObservalbeCommand
選擇queue或者execute,調用者決定是使用異步還是同步方式
根據commandKey看緩存中是否存在Observalbe,開啟緩存是為了提升性能,直接返回輸出
沒有緩存,那就開始走熔斷器的邏輯,先判斷熔斷器是不是開啟狀態(tài)
熔斷器開啟,觸發(fā)快速失敗,觸發(fā)降級,去執(zhí)行用戶提供的fallback()邏輯
判斷是不是并發(fā)超限,超限,觸發(fā)降級,則發(fā)出執(zhí)行拒絕的異常,去執(zhí)行用戶提供的fallback邏輯
執(zhí)行用戶實現的具體業(yè)務邏輯,是否出現執(zhí)行異?;蛘叱瑫r,異?;虺瑫r,則觸發(fā)降級去執(zhí)行用戶提供的fallback邏輯
執(zhí)行結束
無論是正常結束還是執(zhí)行異常,都會觸發(fā)metrics的收集,收集的結果經過計算后,提供給熔斷器,做開啟和關閉的決策
這部分我們需要從以下幾個方面做分析:指標上報、指標計算、指標使用,這期間會涉及多線程的并發(fā)寫入、消息的順序到達、滑動窗口的實現等等
指標上報
每一個請求線程,都會創(chuàng)建一個ExecutionResult實例,這個實例會關聯一些基礎事件比如開始時間、執(zhí)行延遲、事件統(tǒng)計等基礎信息,也就是在整個hystrix的生命周期里面,會通過指標上報的方式做數據的收集,下面看下數據上報的幾個事件:
1.1、executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());//判斷斷路器未開啟,并發(fā)未超限,記錄執(zhí)行的開始時間
1.2、executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);//執(zhí)行成功會增加success的事件和耗時
1.3、HystrixEventType.SHORT_CIRCUITED//斷路器打開,會收集快速熔斷的事件和耗時
1.4、HystrixEventType.SEMAPHORE_REJECTED//信號量方式并發(fā)數超限,會記錄該事件和耗時
1.5、HystrixEventType.THREAD_POOL_REJECTED//線程池不可用(并發(fā)超限),會記錄該事件和耗時
1.6、HystrixEventType.TIMEOUT//執(zhí)行超時,會收集該事件和耗時
1.7、HystrixEventType.BAD_REQUEST//參數或狀態(tài)異常,會收集該事件和耗時
以上整體的事件分為兩大類,成功和失敗,根據用戶邏輯代碼的執(zhí)行結果,如果是有異常,收集異常事件和耗時,執(zhí)行circuitBreaker.markNonSuccess(),否則執(zhí)行circuitBreaker.markNonSuccess()
另外觸發(fā)熔斷器開啟和關閉,有且只有兩個途徑,如下圖:
指標計算
這里簡單對各個步驟中涉及到多線程并發(fā)的情況以及滑動窗口的計算做一個簡單介紹:
2.1:并發(fā)(threadLocal&SerializedSubject)
同一個接口收到多個請求時候,也就是這些請求命中的都是同一個commandKey時(統(tǒng)計指標是按照KEY為維度),每個請求都是一個獨立的線程,每個線程內會產生多個各種各樣的事件,首先同一個線程內的event拼接封裝成HystrixCommandCompletion,上報的是一個HystrixCommandCompletion,流計算操作的也是一個個的HystrixCommandCompletion,不存在計算時候把各線程的事件混雜在一起的可能,如何保證的在下面會講到
2.1.1:上報者是通過threadLocal線程隔離
首先hystrix啟動后會創(chuàng)建一個threadLocal,當一個客戶端請求不管是正常結束還是異常結束,都要上報上報狀態(tài),也就是執(zhí)行handleCommandEnd,都會從threadLocal中返回一個當前線程的HystrixThreadEventStream,代碼如下:
private void handleCommandEnd(boolean commandExecutionStarted) { //省略部分代碼 if (executionResultAtTimeOfCancellation == null) { //上報metrics metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted); } else { metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted); } }
void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) { //threadLocal中放置的是HystrixThreadEventStream,因為改寫了init方法,所以無需set,直接可以獲取 HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey); if (executionStarted) { concurrentExecutionCount.decrementAndGet(); } } //從threadLocal中獲取事件流 public static HystrixThreadEventStream getInstance() { return threadLocalStreams.get(); } //threadLocal的定義,改寫了init方法,所以不用單獨調用set private static final ThreadLocalthreadLocalStreams = new ThreadLocal () { @Override protected HystrixThreadEventStream initialValue() { return new HystrixThreadEventStream(Thread.currentThread()); } }
2.1.2:限流隊列
每個線程會有唯一的HystrixThreadEventStream,因為是從theadLocal獲取,每個HystrixThreadEventStream都會關聯一個由Subject實現的隊列,也就是每一個線程都有一個私有的隊列,這里說它提供限流是因為采用了‘背壓’的原理,所謂的‘背壓’是指按需提供,根據消費者的能力去往隊列生產,代碼如下:
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) { //把executionResult封裝成HystrixCommandCompletion,HystrixCommandCompletion是流計算操作的基本單位 HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey); //writeOnlyCommandCompletionSubject就是一個通過RXjava實現的限流隊列 writeOnlyCommandCompletionSubject.onNext(event); } //省略代碼 writeOnlyCommandCompletionSubject .onBackpressureBuffer()//開啟'背壓功能' .doOnNext(writeCommandCompletionsToShardedStreams)//核心是這個action的call方法 .unsafeSubscribe(Subscribers.empty());
2.2:數據流串行化
每個放入隊列的HystrixCommandCompletion,都會執(zhí)利doOnNext的Action,通過他的call方法去調用HystrixCommandCompletionStream的write方法,相同的commandKey具有同一個HystrixCommandCompletionStream實例,具體是通過currentHashMap做的實例隔離,HystrixCommandCompletionStream內部是通過一個SerializedSubject實現多個HystrixCommandCompletion并行寫入的串行化,具體代碼邏輯如下:
//限流隊列收到數據后會執(zhí)行call方法,是通過觀察者注冊了doOnnext事件 private static final Action1writeCommandCompletionsToShardedStreams = new Action1 () { @Override public void call(HystrixCommandCompletion commandCompletion) { //同一個commandkey對應同一個串行隊列的實例,因為同一個commandKey必須要收集該key下所有線程的metrix事件做統(tǒng)計,才能準確 HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey()); commandStream.write(commandCompletion);//寫入串行隊列,這里是核心 if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) { HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey()); threadPoolStream.write(commandCompletion); } } }; //具體的write方法如下,需要重點關注writeOnlySubject的定義 public void write(HystrixCommandCompletion event) { writeOnlySubject.onNext(event); } //下面是writeOnlySubject的定義,是通過SerializedSubject將并行的寫入變?yōu)榇谢? HystrixCommandCompletionStream(final HystrixCommandKey commandKey) { this.commandKey = commandKey; this.writeOnlySubject = new SerializedSubject (PublishSubject. create()); this.readOnlyStream = writeOnlySubject.share(); }
2.3:消費訂閱
在hystrixCommand創(chuàng)建的時候,會對HystrixCommandCompletionStream進行訂閱,目前有:
healthCountsStream
rollingCommandEventCounterStream
cumulativeCommandEventCounterStream
rollingCommandLatencyDistributionStream
rollingCommandUserLatencyDistributionStream
rollingCommandMaxConcurrencyStream
這幾個消費者通過滾動窗口的形式,對數據做統(tǒng)計和指標計算,下面選取具有代表意義的healthCountsStream做講解:
public static HealthCountsStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) { //統(tǒng)計計算指標的時間間隔-metricsHealthSnapshotIntervalInMilliseconds final int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds().get(); if (healthCountBucketSizeInMs == 0) { throw new RuntimeException("You have set the bucket size to 0ms. Please set a positive number, so that the metric stream can be properly consumed"); } //熔斷窗口滑動周期,默認10秒,保留10秒內的統(tǒng)計數據,指定窗口期內,有效進行指標計算的次數=metricsRollingStatisticalWindowInMilliseconds/metricsHealthSnapshotIntervalInMilliseconds final int numHealthCountBuckets = properties.metricsRollingStatisticalWindowInMilliseconds().get() / healthCountBucketSizeInMs; return getInstance(commandKey, numHealthCountBuckets, healthCountBucketSizeInMs); } //繼承關系HealthCountStream-》BucketedRollingCounterStream-》BucketedCounterStream //把各事件聚合成桶...省略代碼,在BucketedCounterStream完成 this.bucketedStream = Observable.defer(new Func0>() { @Override public Observable call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full) } } //聚合成桶的邏輯代碼 public static final Func2 appendEventToBucket = new Func2 () { @Override public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) { ExecutionResult.EventCounts eventCounts = execution.getEventCounts(); for (HystrixEventType eventType: ALL_EVENT_TYPES) { switch (eventType) { case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here default: initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);//對各類型的event做,分類匯總 break; } } return initialCountArray; } }; //生成計算指標,在BucketedRollingCounterStream完成,省略部分代碼 this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //emit overlapping windows of buckets .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) //計算指標聚合實現,reduceWindowToSummary private static final Func2 healthCheckAccumulator = new Func2 () { @Override public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) { return healthCounts.plus(bucketEventCounts);//重點看該方法 } }; public HealthCounts plus(long[] eventTypeCounts) { long updatedTotalCount = totalCount; long updatedErrorCount = errorCount; long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()]; long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()]; long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()]; long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()]; long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()]; //多個線程的事件,被匯總計算以后,所有的事件相加得到總和 updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); //失敗的事件總和,注意只有FAIL+timeoutCount+THREAD_POOL_REJECTED+SEMAPHORE_REJECTED updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); return new HealthCounts(updatedTotalCount, updatedErrorCount); }
指標使用
指標使用比較簡單,用于控制熔斷器的關閉與開啟,邏輯如下:
public void onNext(HealthCounts hc) { if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { } else { if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { } else { if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } }
上述就是小編為大家分享的spring cloud中Hystrix指標收集原理是什么了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注創(chuàng)新互聯行業(yè)資訊頻道。