這篇文章主要講解了“sentinel的SentinelGatewayFilter有什么作用”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“sentinel的SentinelGatewayFilter有什么作用”吧!
創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括四方臺網(wǎng)站建設(shè)、四方臺網(wǎng)站制作、四方臺網(wǎng)頁制作以及四方臺網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,四方臺網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到四方臺省份的部分城市,未來相信會繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
本文主要研究一下sentinel的SentinelGatewayFilter
Sentinel-1.6.2/sentinel-adapter/sentinel-spring-cloud-gateway-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/gateway/sc/SentinelGatewayFilter.java
public class SentinelGatewayFilter implements GatewayFilter, GlobalFilter { private final GatewayParamParserparamParser = new GatewayParamParser<>( new ServerWebExchangeItemParser()); @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); Mono asyncResult = chain.filter(exchange); if (route != null) { String routeId = route.getId(); Object[] params = paramParser.parseParameterFor(routeId, exchange, r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_ROUTE_ID); String origin = Optional.ofNullable(GatewayCallbackManager.getRequestOriginParser()) .map(f -> f.apply(exchange)) .orElse(""); asyncResult = asyncResult.transform( new SentinelReactorTransformer<>(new EntryConfig(routeId, EntryType.IN, 1, params, new ContextConfig(contextName(routeId), origin))) ); } Set matchingApis = pickMatchingApiDefinitions(exchange); for (String apiName : matchingApis) { Object[] params = paramParser.parseParameterFor(apiName, exchange, r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME); asyncResult = asyncResult.transform( new SentinelReactorTransformer<>(new EntryConfig(apiName, EntryType.IN, 1, params)) ); } return asyncResult; } private String contextName(String route) { return SentinelGatewayConstants.GATEWAY_CONTEXT_ROUTE_PREFIX + route; } Set pickMatchingApiDefinitions(ServerWebExchange exchange) { return GatewayApiMatcherManager.getApiMatcherMap().values() .stream() .filter(m -> m.test(exchange)) .map(WebExchangeApiMatcher::getApiName) .collect(Collectors.toSet()); } }
SentinelGatewayFilter實(shí)現(xiàn)了GatewayFilter、GlobalFilter接口;其filter方法主要是獲取route信息,然后對asyncResult進(jìn)行transform,這里使用的是SentinelReactorTransformer
Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java
public class SentinelReactorTransformerimplements Function , Publisher > { private final EntryConfig entryConfig; public SentinelReactorTransformer(String resourceName) { this(new EntryConfig(resourceName)); } public SentinelReactorTransformer(EntryConfig entryConfig) { AssertUtil.notNull(entryConfig, "entryConfig cannot be null"); this.entryConfig = entryConfig; } @Override public Publisher apply(Publisher publisher) { if (publisher instanceof Mono) { return new MonoSentinelOperator<>((Mono ) publisher, entryConfig); } if (publisher instanceof Flux) { return new FluxSentinelOperator<>((Flux ) publisher, entryConfig); } throw new IllegalStateException("Publisher type is not supported: " + publisher.getClass().getCanonicalName()); } }
SentinelReactorTransformer使用entryConfig創(chuàng)建了MonoSentinelOperator或者M(jìn)onoSentinelOperator
Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java
public class MonoSentinelOperatorextends MonoOperator { private final EntryConfig entryConfig; public MonoSentinelOperator(Mono extends T> source, EntryConfig entryConfig) { super(source); AssertUtil.notNull(entryConfig, "entryConfig cannot be null"); this.entryConfig = entryConfig; } @Override public void subscribe(CoreSubscriber super T> actual) { source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, true)); } }
MonoSentinelOperator在subscribe的時候,使用的是SentinelReactorSubscriber
Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperator.java
public class FluxSentinelOperatorextends FluxOperator { private final EntryConfig entryConfig; public FluxSentinelOperator(Flux extends T> source, EntryConfig entryConfig) { super(source); AssertUtil.notNull(entryConfig, "entryConfig cannot be null"); this.entryConfig = entryConfig; } @Override public void subscribe(CoreSubscriber super T> actual) { source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, false)); } }
FluxSentinelOperator在subscribe的時候,使用的是SentinelReactorSubscriber
Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java
public class SentinelReactorSubscriberextends InheritableBaseSubscriber { private final EntryConfig entryConfig; private final CoreSubscriber super T> actual; private final boolean unary; private volatile AsyncEntry currentEntry; private final AtomicBoolean entryExited = new AtomicBoolean(false); public SentinelReactorSubscriber(EntryConfig entryConfig, CoreSubscriber super T> actual, boolean unary) { checkEntryConfig(entryConfig); this.entryConfig = entryConfig; this.actual = actual; this.unary = unary; } private void checkEntryConfig(EntryConfig config) { AssertUtil.notNull(config, "entryConfig cannot be null"); } @Override public Context currentContext() { if (currentEntry == null || entryExited.get()) { return actual.currentContext(); } com.alibaba.csp.sentinel.context.Context sentinelContext = currentEntry.getAsyncContext(); if (sentinelContext == null) { return actual.currentContext(); } return actual.currentContext() .put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, currentEntry.getAsyncContext()); } private void doWithContextOrCurrent(Supplier > contextSupplier, Runnable f) { Optional contextOpt = contextSupplier.get(); if (!contextOpt.isPresent()) { // Provided context is absent, use current context. f.run(); } else { // Run on provided context. ContextUtil.runOnContext(contextOpt.get(), f); } } private void entryWhenSubscribed() { ContextConfig sentinelContextConfig = entryConfig.getContextConfig(); if (sentinelContextConfig != null) { // If current we're already in a context, the context config won't work. ContextUtil.enter(sentinelContextConfig.getContextName(), sentinelContextConfig.getOrigin()); } try { AsyncEntry entry = SphU.asyncEntry(entryConfig.getResourceName(), entryConfig.getEntryType(), entryConfig.getAcquireCount(), entryConfig.getArgs()); this.currentEntry = entry; actual.onSubscribe(this); } catch (BlockException ex) { // Mark as completed (exited) explicitly. entryExited.set(true); // Signal cancel and propagate the {@code BlockException}. cancel(); actual.onSubscribe(this); actual.onError(ex); } finally { if (sentinelContextConfig != null) { ContextUtil.exit(); } } } @Override protected void hookOnSubscribe(Subscription subscription) { doWithContextOrCurrent(() -> currentContext().getOrEmpty(SentinelReactorConstants.SENTINEL_CONTEXT_KEY), this::entryWhenSubscribed); } @Override protected void hookOnNext(T value) { if (isDisposed()) { tryCompleteEntry(); return; } doWithContextOrCurrent(() -> Optional.ofNullable(currentEntry).map(AsyncEntry::getAsyncContext), () -> actual.onNext(value)); if (unary) { // For some cases of unary operator (Mono), we have to do this during onNext hook. // e.g. this kind of order: onSubscribe() -> onNext() -> cancel() -> onComplete() // the onComplete hook will not be executed so we'll need to complete the entry in advance. tryCompleteEntry(); } } @Override protected void hookOnComplete() { tryCompleteEntry(); actual.onComplete(); } @Override protected boolean shouldCallErrorDropHook() { // When flow control triggered or stream terminated, the incoming // deprecated exceptions should be dropped implicitly, so we'll not call the `onErrorDropped` hook. return !entryExited.get(); } @Override protected void hookOnError(Throwable t) { if (currentEntry != null && currentEntry.getAsyncContext() != null) { // Normal requests with non-BlockException will go through here. Tracer.traceContext(t, 1, currentEntry.getAsyncContext()); } tryCompleteEntry(); actual.onError(t); } @Override protected void hookOnCancel() { } private boolean tryCompleteEntry() { if (currentEntry != null && entryExited.compareAndSet(false, true)) { currentEntry.exit(1, entryConfig.getArgs()); return true; } return false; } }
SentinelReactorSubscriber繼承了InheritableBaseSubscriber(拷貝自reactor.core.publisher.BaseSubscriber,允許子類覆蓋onSubscribe、onNext、onError、onComplete方法
)
這里hookOnSubscribe調(diào)用了entryWhenSubscribed,它在sentinelContextConfig不為null的時候會先執(zhí)行ContextUtil.enter,然后使用SphU.asyncEntry創(chuàng)建了AsyncEntry,最后在finally里頭在sentinelContextConfig不為null的時候執(zhí)行ContextUtil.exit();
這里hookOnNext、hookOnComplete、hookOnError都調(diào)用了tryCompleteEntry方法,它主要是嘗試退出AsyncEntry
SentinelGatewayFilter實(shí)現(xiàn)了GatewayFilter、GlobalFilter接口;其filter方法主要是獲取route信息,然后對asyncResult進(jìn)行transform,這里使用的是SentinelReactorTransformer
SentinelReactorTransformer使用entryConfig創(chuàng)建了MonoSentinelOperator或者M(jìn)onoSentinelOperator;它們在subscribe的時候,使用的是SentinelReactorSubscriber
SentinelReactorSubscriber主要是在hookOnSubscribe的時候調(diào)用了entryWhenSubscribed方法創(chuàng)建AsyncEntry,在hookOnNext、hookOnComplete、hookOnError的時候調(diào)用了tryCompleteEntry方法,嘗試退出AsyncEntry
感謝各位的閱讀,以上就是“sentinel的SentinelGatewayFilter有什么作用”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對sentinel的SentinelGatewayFilter有什么作用這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!