怎么在ASP.NET Core 3.x 中實現(xiàn)并發(fā)限制?針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
創(chuàng)新互聯(lián)公司主營三亞網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app開發(fā)定制,三亞h5成都小程序開發(fā)搭建,三亞網(wǎng)站營銷推廣歡迎三亞等地區(qū)企業(yè)咨詢Queue策略
添加Nuget
Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
public void ConfigureServices(IServiceCollection services) { services.AddQueuePolicy(options => { //較大并發(fā)請求數(shù) options.MaxConcurrentRequests = 2; //請求隊列長度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { //添加并發(fā)限制中間件 app.UseConcurrencyLimiter(); app.Run(async context => { Task.Delay(100).Wait(); // 100ms sync-over-async await context.Response.WriteAsync("Hello World!"); }); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); }
通過上面簡單的配置,我們就可以將他引入到我們的代碼中,從而做并發(fā)量限制,以及隊列的長度;那么問題來了,他是怎么實現(xiàn)的呢?
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Actionconfigure) { services.Configure(configure); services.AddSingleton (); return services; }
QueuePolicy采用的是SemaphoreSlim信號量設(shè)計,SemaphoreSlim、Semaphore(信號量)支持并發(fā)多線程進(jìn)入被保護(hù)代碼,對象在初始化時會指定 較大任務(wù)數(shù)量,當(dāng)線程請求訪問資源,信號量遞減,而當(dāng)他們釋放時,信號量計數(shù)又遞增。
////// 構(gòu)造方法(初始化Queue策略) /// /// public QueuePolicy(IOptionsoptions) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0) { throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number."); } //使用SemaphoreSlim來限制任務(wù)較大個數(shù) _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests); }
ConcurrencyLimiterMiddleware中間件
////// Invokes the logic of the middleware. /// /// The. /// A public async Task Invoke(HttpContext context) { var waitInQueueTask = _queuePolicy.TryEnterAsync(); // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets. bool result; if (waitInQueueTask.IsCompleted) { ConcurrencyLimiterEventSource.Log.QueueSkipped(); result = waitInQueueTask.Result; } else { using (ConcurrencyLimiterEventSource.Log.QueueTimer()) { result = await waitInQueueTask; } } if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); } }that completes when the request leaves.
每次當(dāng)我們請求的時候首先會調(diào)用_queuePolicy.TryEnterAsync(),進(jìn)入該方法后先開啟一個私有l(wèi)ock鎖,再接著判斷總請求量是否≥(請求隊列限制的大小+較大并發(fā)請求數(shù)),如果當(dāng)前數(shù)量超出了,那么我直接拋出,送你個503狀態(tài);
if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); }
問題來了,我這邊如果說還沒到你設(shè)置的大小呢,我這個請求沒有給你服務(wù)器造不成壓力,那么你給我處理一下吧.
await _serverSemaphore.WaitAsync();
異步等待進(jìn)入信號量,如果沒有線程被授予對信號量的訪問權(quán)限,則進(jìn)入執(zhí)行保護(hù)代碼;否則此線程將在此處等待,直到信號量被釋放為止
lock (_totalRequestsLock) { if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests) { return false; } TotalRequests++; } //異步等待進(jìn)入信號量,如果沒有線程被授予對信號量的訪問權(quán)限,則進(jìn)入執(zhí)行保護(hù)代碼;否則此線程將在此處等待,直到信號量被釋放為止 await _serverSemaphore.WaitAsync(); return true; }
返回成功后那么中間件這邊再進(jìn)行處理,_queuePolicy.OnExit();通過該調(diào)用進(jìn)行調(diào)用_serverSemaphore.Release();釋放信號燈,再對總請求數(shù)遞減
Stack策略
再來看看另一種方法,棧策略,他是怎么做的呢?一起來看看.再附加上如何使用的代碼.
public void ConfigureServices(IServiceCollection services) { services.AddStackPolicy(options => { //較大并發(fā)請求數(shù) options.MaxConcurrentRequests = 2; //請求隊列長度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); }
通過上面的配置,我們便可以對我們的應(yīng)用程序執(zhí)行出相應(yīng)的策略.下面再來看看他是怎么實現(xiàn)的呢
public static IServiceCollection AddStackPolicy(this IServiceCollection services, Actionconfigure) { services.Configure(configure); services.AddSingleton (); return services; }
可以看到這次是通過StackPolicy類做的策略.來一起來看看主要的方法
////// 構(gòu)造方法(初始化參數(shù)) /// /// public StackPolicy(IOptionsoptions) { //棧分配 _buffer = new List (); //隊列大小 _maxQueueCapacity = options.Value.RequestQueueLimit; //較大并發(fā)請求數(shù) _maxConcurrentRequests = options.Value.MaxConcurrentRequests; //剩余可用空間 _freeServerSpots = options.Value.MaxConcurrentRequests; }
當(dāng)我們通過中間件請求調(diào)用,_queuePolicy.TryEnterAsync()時,首先會判斷我們是否還有訪問請求次數(shù),如果_freeServerSpots>0,那么則直接給我們返回true,讓中間件直接去執(zhí)行下一步,如果當(dāng)前隊列=我們設(shè)置的隊列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留后面的請求;
public ValueTaskTryEnterAsync() { lock (_bufferLock) { if (_freeServerSpots > 0) { _freeServerSpots--; return _trueTask; } // 如果隊列滿了,取消先前的請求 if (_queueLength == _maxQueueCapacity) { _hasReachedCapacity = true; _buffer[_head].Complete(false); _queueLength--; } var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this); _cachedResettableTCS = null; if (_hasReachedCapacity || _queueLength < _buffer.Count) { _buffer[_head] = tcs; } else { _buffer.Add(tcs); } _queueLength++; // increment _head for next time _head++; if (_head == _maxQueueCapacity) { _head = 0; } return tcs.GetValueTask(); } }
當(dāng)我們請求后調(diào)用_queuePolicy.OnExit();出棧,再將請求長度遞減
public void OnExit() { lock (_bufferLock) { if (_queueLength == 0) { _freeServerSpots++; if (_freeServerSpots > _maxConcurrentRequests) { _freeServerSpots--; throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync"); } return; } // step backwards and launch a new task if (_head == 0) { _head = _maxQueueCapacity - 1; } else { _head--; } //退出,出棧 _buffer[_head].Complete(true); _queueLength--; } }
關(guān)于怎么在ASP.NET Core 3.x 中實現(xiàn)并發(fā)限制問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。