真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

怎么在ASP.NETCore3.x中實現(xiàn)并發(fā)限制-創(chuàng)新互聯(lián)

怎么在ASP.NET Core 3.x 中實現(xiàn)并發(fā)限制?針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

創(chuàng)新互聯(lián)公司主營三亞網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app開發(fā)定制,三亞h5成都小程序開發(fā)搭建,三亞網(wǎng)站營銷推廣歡迎三亞等地區(qū)企業(yè)咨詢

Queue策略


添加Nuget


Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

public void ConfigureServices(IServiceCollection services)
    {
      services.AddQueuePolicy(options =>
      {
        //較大并發(fā)請求數(shù)
        options.MaxConcurrentRequests = 2;
        //請求隊列長度限制
        options.RequestQueueLimit = 1;
      });
      services.AddControllers();
    }
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
      //添加并發(fā)限制中間件
      app.UseConcurrencyLimiter();
      app.Run(async context =>
      {
        Task.Delay(100).Wait(); // 100ms sync-over-async

        await context.Response.WriteAsync("Hello World!");
      });
      if (env.IsDevelopment())
      {
        app.UseDeveloperExceptionPage();
      }

      app.UseHttpsRedirection();

      app.UseRouting();

      app.UseAuthorization();

      app.UseEndpoints(endpoints =>
      {
        endpoints.MapControllers();
      });
    }

通過上面簡單的配置,我們就可以將他引入到我們的代碼中,從而做并發(fā)量限制,以及隊列的長度;那么問題來了,他是怎么實現(xiàn)的呢?

 public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action configure)
{
    services.Configure(configure);
    services.AddSingleton();
    return services;
}

QueuePolicy采用的是SemaphoreSlim信號量設(shè)計,SemaphoreSlim、Semaphore(信號量)支持并發(fā)多線程進(jìn)入被保護(hù)代碼,對象在初始化時會指定 較大任務(wù)數(shù)量,當(dāng)線程請求訪問資源,信號量遞減,而當(dāng)他們釋放時,信號量計數(shù)又遞增。

   /// 
    ///   構(gòu)造方法(初始化Queue策略)
    /// 
    /// 
    public QueuePolicy(IOptions options)
    {
      _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
      if (_maxConcurrentRequests <= 0)
      {
        throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
      }

      _requestQueueLimit = options.Value.RequestQueueLimit;
      if (_requestQueueLimit < 0)
      {
        throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
      }
      //使用SemaphoreSlim來限制任務(wù)較大個數(shù)
      _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
    }

ConcurrencyLimiterMiddleware中間件

/// 
    /// Invokes the logic of the middleware.
    /// 
    /// The .
    ///  that completes when the request leaves.
    public async Task Invoke(HttpContext context)
    {
      var waitInQueueTask = _queuePolicy.TryEnterAsync();

      // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
      bool result;

      if (waitInQueueTask.IsCompleted)
      {
        ConcurrencyLimiterEventSource.Log.QueueSkipped();
        result = waitInQueueTask.Result;
      }
      else
      {
        using (ConcurrencyLimiterEventSource.Log.QueueTimer())
        {
          result = await waitInQueueTask;
        }
      }

      if (result)
      {
        try
        {
          await _next(context);
        }
        finally
        {
          _queuePolicy.OnExit();
        }
      }
      else
      {
        ConcurrencyLimiterEventSource.Log.RequestRejected();
        ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
        context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
        await _onRejected(context);
      }
    }

每次當(dāng)我們請求的時候首先會調(diào)用_queuePolicy.TryEnterAsync(),進(jìn)入該方法后先開啟一個私有l(wèi)ock鎖,再接著判斷總請求量是否≥(請求隊列限制的大小+較大并發(fā)請求數(shù)),如果當(dāng)前數(shù)量超出了,那么我直接拋出,送你個503狀態(tài);

 if (result)
 {
     try
     {
       await _next(context);
     }
     finally
    {
      _queuePolicy.OnExit();
    }
    }
    else
    {
      ConcurrencyLimiterEventSource.Log.RequestRejected();
      ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
      context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
      await _onRejected(context);
    }

問題來了,我這邊如果說還沒到你設(shè)置的大小呢,我這個請求沒有給你服務(wù)器造不成壓力,那么你給我處理一下吧.


await _serverSemaphore.WaitAsync();異步等待進(jìn)入信號量,如果沒有線程被授予對信號量的訪問權(quán)限,則進(jìn)入執(zhí)行保護(hù)代碼;否則此線程將在此處等待,直到信號量被釋放為止

 lock (_totalRequestsLock)
  {
    if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
    {
       return false;
    }
      TotalRequests++;
    }
    //異步等待進(jìn)入信號量,如果沒有線程被授予對信號量的訪問權(quán)限,則進(jìn)入執(zhí)行保護(hù)代碼;否則此線程將在此處等待,直到信號量被釋放為止
    await _serverSemaphore.WaitAsync();
    return true;
  }

返回成功后那么中間件這邊再進(jìn)行處理,_queuePolicy.OnExit();通過該調(diào)用進(jìn)行調(diào)用_serverSemaphore.Release();釋放信號燈,再對總請求數(shù)遞減

Stack策略


再來看看另一種方法,棧策略,他是怎么做的呢?一起來看看.再附加上如何使用的代碼.

   public void ConfigureServices(IServiceCollection services)
    {
      services.AddStackPolicy(options =>
      {
        //較大并發(fā)請求數(shù)
        options.MaxConcurrentRequests = 2;
        //請求隊列長度限制
        options.RequestQueueLimit = 1;
      });
      services.AddControllers();
    }

通過上面的配置,我們便可以對我們的應(yīng)用程序執(zhí)行出相應(yīng)的策略.下面再來看看他是怎么實現(xiàn)的呢

 public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action configure)
    {
      services.Configure(configure);
      services.AddSingleton();
      return services;
    }

可以看到這次是通過StackPolicy類做的策略.來一起來看看主要的方法

/// 
    ///   構(gòu)造方法(初始化參數(shù))
    /// 
    /// 
    public StackPolicy(IOptions options)
    {
      //棧分配
      _buffer = new List();
      //隊列大小
      _maxQueueCapacity = options.Value.RequestQueueLimit;
      //較大并發(fā)請求數(shù)
      _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
      //剩余可用空間
      _freeServerSpots = options.Value.MaxConcurrentRequests;
    }

當(dāng)我們通過中間件請求調(diào)用,_queuePolicy.TryEnterAsync()時,首先會判斷我們是否還有訪問請求次數(shù),如果_freeServerSpots>0,那么則直接給我們返回true,讓中間件直接去執(zhí)行下一步,如果當(dāng)前隊列=我們設(shè)置的隊列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留后面的請求;

  public ValueTask TryEnterAsync()
    {
      lock (_bufferLock)
      {
        if (_freeServerSpots > 0)
        {
          _freeServerSpots--;
          return _trueTask;
        }
        // 如果隊列滿了,取消先前的請求
        if (_queueLength == _maxQueueCapacity)
        {
          _hasReachedCapacity = true;
          _buffer[_head].Complete(false);
          _queueLength--;
        }
        var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
        _cachedResettableTCS = null;
        if (_hasReachedCapacity || _queueLength < _buffer.Count)
        {
          _buffer[_head] = tcs;
        }
        else
        {
          _buffer.Add(tcs);
        }
        _queueLength++;
        // increment _head for next time
        _head++;
        if (_head == _maxQueueCapacity)
        {
          _head = 0;
        }
        return tcs.GetValueTask();
      }
    }

當(dāng)我們請求后調(diào)用_queuePolicy.OnExit();出棧,再將請求長度遞減

  public void OnExit()
    {
      lock (_bufferLock)
      {
        if (_queueLength == 0)
        {
          _freeServerSpots++;

          if (_freeServerSpots > _maxConcurrentRequests)
          {
            _freeServerSpots--;
            throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
          }

          return;
        }

        // step backwards and launch a new task
        if (_head == 0)
        {
          _head = _maxQueueCapacity - 1;
        }
        else
        {
          _head--;
        }
        //退出,出棧
        _buffer[_head].Complete(true);
        _queueLength--;
      }
    }

關(guān)于怎么在ASP.NET Core 3.x 中實現(xiàn)并發(fā)限制問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。


網(wǎng)站欄目:怎么在ASP.NETCore3.x中實現(xiàn)并發(fā)限制-創(chuàng)新互聯(lián)
網(wǎng)站鏈接:http://weahome.cn/article/hdeoi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部