真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何使用Java高并發(fā)編程之Semaphore

本篇內(nèi)容主要講解“如何使用Java高并發(fā)編程之Semaphore”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“如何使用Java高并發(fā)編程之Semaphore”吧!

在安州等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè) 網(wǎng)站設(shè)計(jì)制作按需網(wǎng)站設(shè)計(jì),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站設(shè)計(jì),營銷型網(wǎng)站,成都外貿(mào)網(wǎng)站建設(shè),安州網(wǎng)站建設(shè)費(fèi)用合理。

共享鎖、獨(dú)占鎖

學(xué)習(xí)semaphore之前我們必須要先了解下什么是共享鎖。

共享鎖:它是允許多個(gè)線程同時(shí)獲取鎖,并發(fā)的訪問共享資源

獨(dú)占鎖:也有人把它叫做“獨(dú)享鎖”,它是是獨(dú)占的,排他的,只能被一個(gè)線程可持有,  當(dāng)獨(dú)占鎖已經(jīng)被某個(gè)線程持有時(shí),其他線程只能等待它被釋放后,才能去爭鎖,并且同一時(shí)刻只有一個(gè)線程能爭鎖成功。

什么是Semaphore

Semaphore(信號(hào)量)是用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。很多年以來,我都覺得從字面上很難理解Semaphore所表達(dá)的含義,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量,只允許同時(shí)有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會(huì)看到綠燈,可以開進(jìn)這條馬路,后面的車會(huì)看到紅燈,不能駛?cè)隭X馬路,但是如果前一百輛中有五輛車已經(jīng)離開了XX馬路,那么后面就允許有5輛車駛?cè)腭R路,這個(gè)例子里說的車就是線程,駛?cè)腭R路就表示線程在執(zhí)行,離開馬路就表示線程執(zhí)行完成,看見紅燈就表示線程被阻塞,不能執(zhí)行?!?/p>

  • Semaphore機(jī)制是提供給線程搶占式獲取許可,所以他可以實(shí)現(xiàn)公平或者非公平,類似于ReentrantLock。說了這么多我們來個(gè)實(shí)際的例子看一看,比如我們?nèi)ネ\噲鐾\?,停車場總共只?個(gè)車位,但是現(xiàn)在有8輛汽車來停車,剩下的3輛汽車要么等其他汽車開走后進(jìn)行停車,或者去找別的停車位?

/**  * @author: 公眾號(hào)【Java金融】  */ public class SemaphoreTest {     public static void main(String[] args) throws InterruptedException {          // 初始化五個(gè)車位         Semaphore semaphore = new Semaphore(5);         // 等所有車子         final CountDownLatch latch = new CountDownLatch(8);         for (int i = 0; i < 8; i++) {             int finalI = i;             if (i == 5) {                 Thread.sleep(1000);                 new Thread(() -> {                     stopCarNotWait(semaphore, finalI);                     latch.countDown();                 }).start();                 continue;             }             new Thread(() -> {                 stopCarWait(semaphore, finalI);                 latch.countDown();             }).start();         }         latch.await();         log("總共還剩:" + semaphore.availablePermits() + "個(gè)車位");     }      private static void stopCarWait(Semaphore semaphore, int finalI) {         String format = String.format("車牌號(hào)%d", finalI);         try {             semaphore.acquire(1);             log(format + "找到車位了,去停車了");             Thread.sleep(10000);         } catch (Exception e) {             e.printStackTrace();         } finally {             semaphore.release(1);             log(format + "開走了");         }     }      private static void stopCarNotWait(Semaphore semaphore, int finalI) {          String format = String.format("車牌號(hào)%d", finalI);         try {             if (semaphore.tryAcquire()) {                 log(format + "找到車位了,去停車了");                 Thread.sleep(10000);                 log(format + "開走了");                 semaphore.release();             } else {                 log(format + "沒有停車位了,不在這里等了去其他地方停車去了");             }         } catch (Exception e) {             e.printStackTrace();         }      }      public static void log(String content) {         // 格式化         DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");         // 當(dāng)前時(shí)間         LocalDateTime now = LocalDateTime.now();         System.out.println(now.format(fmTime) + "  "+content);     } }
2021-03-01 18:54:57  車牌號(hào)0找到車位了,去停車了 2021-03-01 18:54:57  車牌號(hào)3找到車位了,去停車了 2021-03-01 18:54:57  車牌號(hào)2找到車位了,去停車了 2021-03-01 18:54:57  車牌號(hào)1找到車位了,去停車了 2021-03-01 18:54:57  車牌號(hào)4找到車位了,去停車了 2021-03-01 18:54:58  車牌號(hào)5沒有停車位了,不在這里等了去其他地方停車去了 2021-03-01 18:55:07  車牌號(hào)7找到車位了,去停車了 2021-03-01 18:55:07  車牌號(hào)6找到車位了,去停車了 2021-03-01 18:55:07  車牌號(hào)2開走了 2021-03-01 18:55:07  車牌號(hào)0開走了 2021-03-01 18:55:07  車牌號(hào)3開走了 2021-03-01 18:55:07  車牌號(hào)4開走了 2021-03-01 18:55:07  車牌號(hào)1開走了 2021-03-01 18:55:17  車牌號(hào)7開走了 2021-03-01 18:55:17  車牌號(hào)6開走了 2021-03-01 18:55:17  總共還剩:5個(gè)車位

從輸出結(jié)果我們可以看到車牌號(hào)5這輛車看見沒有車位了,就不在這個(gè)地方傻傻的等了,而是去其他地方了,但是車牌號(hào)6和車牌號(hào)7分別需要等到車庫開出兩輛車空出兩個(gè)車位后才停進(jìn)去。這就體現(xiàn)了Semaphore  的acquire 方法如果沒有獲取到憑證它就會(huì)阻塞,而tryAcquire方法如果沒有獲取到憑證不會(huì)阻塞的。

semaphore在dubbo中的應(yīng)用

在Dubbo中可以給Provider配置線程池大小來控制系統(tǒng)提供服務(wù)的最大并行度,默認(rèn)是200。

比如我現(xiàn)在這個(gè)訂單系統(tǒng)有三個(gè)接口,分別為創(chuàng)單、取消訂單、修改訂單。這三個(gè)接口加起來的并發(fā)是200但是創(chuàng)單接口是核心接口,我想讓它多分點(diǎn)線程來執(zhí)行  讓它可以有最大150個(gè)線程,取消訂單和修改訂單分別最大25個(gè)線程執(zhí)行就可以了。dubbo提供了executes這一屬性來實(shí)現(xiàn)這個(gè)功能

  

我們可以看看dubbo內(nèi)部是如何來executes的,具體實(shí)現(xiàn)是在ExecuteLimitFilter這個(gè)類我們可以

 public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {         URL url = invoker.getUrl();         String methodName = invocation.getMethodName();         Semaphore executesLimit = null;         boolean acquireResult = false;         int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);         if (max > 0) {             RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());             // 如果當(dāng)前使用的線程數(shù)量已經(jīng)大于等于設(shè)置的閾值,那么直接拋出異常 //            if (count.getActive() >= max) { // throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than  limited.");             /**              * http://manzhizhen.iteye.com/blog/2386408              * use semaphore for concurrency control (to limit thread number)              */                           executesLimit = count.getSemaphore(max);             if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {                 throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than  limited.");             }         }         long begin = System.currentTimeMillis();         boolean isSuccess = true;         // 計(jì)數(shù)器+1         RpcStatus.beginCount(url, methodName);         try {             Result result = invoker.invoke(invocation);             return result;         } catch (Throwable t) {             isSuccess = false;             if (t instanceof RuntimeException) {                 throw (RuntimeException) t;             } else {                 throw new RpcException("unexpected exception when ExecuteLimitFilter", t);             }         } finally {            // 計(jì)數(shù)器-1             RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);             if(acquireResult) {                 executesLimit.release();             }         }     }

從上述代碼我們也可以看出早期這個(gè)是沒有采用Semaphore來實(shí)現(xiàn)的,而是直接采用被注釋的 if (count.getActive() >=  max) 這個(gè)來來實(shí)現(xiàn)的,由于這個(gè)count.getActive() >= max  和這個(gè)計(jì)數(shù)加1不是原子性的,所以會(huì)有問題,具體bug號(hào)可以看https://github.com/apache/dubbo/pull/582后面才采用上述代碼用Semaphore來修復(fù)非原子性問題。具體更詳細(xì)的分析可以參見代碼的鏈接。不過現(xiàn)在最新版本(2.7.9)我看是采用采用自旋加上和CAS來實(shí)現(xiàn)的。

Semaphore

上面就是對(duì)Semaphore一個(gè)簡單的使用以及dubbo中用到的例子,說句實(shí)話Semaphore在工作中用的還是比較少的,不過面試又有可能會(huì)被問到,所以還是有必要來一起學(xué)習(xí)一下它。我們前面《Java高并發(fā)編程基礎(chǔ)之AQS》通過ReentrantLock  一起學(xué)習(xí)了下AQS,其實(shí)Semaphore同樣也是通過AQS來是實(shí)現(xiàn)的,我們可以一起來對(duì)照下獨(dú)占鎖的方法,基本上都是有方法一一相對(duì)應(yīng)的。圖片這里有兩點(diǎn)稍微需要注意的地方:

如何使用Java高并發(fā)編程之Semaphore

  • 在獨(dú)占鎖模式中,我們只有在獲取了獨(dú)占鎖的節(jié)點(diǎn)釋放鎖時(shí),才會(huì)喚醒后繼節(jié)點(diǎn),因?yàn)楠?dú)占鎖只能被一個(gè)線程持有,如果它還沒有被釋放,就沒有必要去喚醒它的后繼節(jié)點(diǎn)。

  • 在共享鎖模式下,當(dāng)一個(gè)節(jié)點(diǎn)獲取到了共享鎖,我們?cè)讷@取成功后就可以喚醒后繼節(jié)點(diǎn)了,而不需要等到該節(jié)點(diǎn)釋放鎖的時(shí)候,這是因?yàn)楣蚕礞i可以被多個(gè)線程同時(shí)持有,一個(gè)鎖獲取到了,則后繼的節(jié)點(diǎn)都可以直接來獲取。因此,在共享鎖模式下,在獲取鎖和釋放鎖結(jié)束時(shí),都會(huì)喚醒后繼節(jié)點(diǎn)。

獲取憑證

我們同樣還是通過非公平鎖的模式來獲取憑證 我們可以看下acquire的核心方法

public final void acquireSharedInterruptibly(int arg)           throws InterruptedException {       if (Thread.interrupted())           throw new InterruptedException();       if (tryAcquireShared(arg) < 0)           doAcquireSharedInterruptibly(arg);   }    protected int tryAcquireShared(int acquires) {            return nonfairTryAcquireShared(acquires);   }  // 主要看下這個(gè)方法,這個(gè)方法返回的值也就是tryAcquireShared返回的值,因?yàn)閠ryAcquireShared->nonfairTryAcquireShared    final int nonfairTryAcquireShared(int acquires) {          //自旋    for (;;) {         //Semaphore用AQS的state變量的值代表可用許可數(shù)         int available = getState();         //可用許可數(shù)減去本次需要獲取的許可數(shù)即為剩余許可數(shù)         int remaining = available - acquires;         //如果剩余許可數(shù)小于0或者CAS將當(dāng)前可用許可數(shù)設(shè)置為剩余許可數(shù)成功,則返回成功許可數(shù)         if (remaining < 0 ||             compareAndSetState(available, remaining))             return remaining;     }
  • 當(dāng)tryAcquireShared  獲取返回許可書小于0時(shí)說明獲取許可失敗需要進(jìn)入doAcquireSharedInterruptibly這個(gè)方法去休眠。

  • 當(dāng)tryAcquireShared 獲取返回許可書小于0時(shí)說明獲取許可成功直接結(jié)束。

doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {        // 獨(dú)占鎖的acquireQueued調(diào)用的是addWaiter(Node.EXCLUSIVE),        // 而共享鎖調(diào)用的是addWaiter(Node.SHARED),表明了該節(jié)點(diǎn)處于共享模式        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return;                    }                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();            }        } finally {            if (failed)                cancelAcquire(node);        }    }

這個(gè)方法是不是跟我們上篇文章講的AQS的獨(dú)占鎖的acquireQueued很像,不過獨(dú)占鎖它是直接調(diào)用了用了setHead(node)方法,而共享鎖調(diào)用的是setHeadAndPropagate(node,  r)這個(gè)方法除了調(diào)用setHead 里面還調(diào)用了doReleaseShared(喚醒后繼節(jié)點(diǎn))

private void setHeadAndPropagate(Node node, int propagate) {       Node h = head; // Record old head for check below       setHead(node);       if (propagate > 0 || h == null || h.waitStatus < 0 ||           (h = head) == null || h.waitStatus < 0) {           Node s = node.next;           if (s == null || s.isShared())               doReleaseShared();       }   }

其他的方法基本上是和ReentrantLock來實(shí)現(xiàn)的獨(dú)占鎖差不多,我相信大家對(duì)源碼分析感興趣的應(yīng)該也不多,其他更多細(xì)節(jié)問題還是需要自己親自動(dòng)手去看源碼的。

總結(jié)

當(dāng)信號(hào)量Semaphore初始化設(shè)置許可證為1  時(shí),它也可以當(dāng)作互斥鎖使用。其中0、1就相當(dāng)于它的狀態(tài),當(dāng)=1時(shí)表示其他線程可以獲取,當(dāng)=0時(shí),排他,即其他線程必須要等待。

Semaphore是JUC包中的一個(gè)很簡單的工具類,用來實(shí)現(xiàn)多線程下對(duì)于資源的同一時(shí)刻的訪問線程數(shù)限制

Semaphore中存在一個(gè)【許可】的概念,即訪問資源之前,先要獲得許可,如果當(dāng)前許可數(shù)量為0,那么線程阻塞,直到獲得許可

Semaphore內(nèi)部使用AQS實(shí)現(xiàn),由抽象內(nèi)部類Sync繼承了AQS。因?yàn)镾emaphore天生就是共享的場景,所以其內(nèi)部實(shí)際上類似于共享鎖的實(shí)現(xiàn)

共享鎖的調(diào)用框架和獨(dú)占鎖很相似,它們最大的不同在于獲取鎖的邏輯——共享鎖可以被多個(gè)線程同時(shí)持有,而獨(dú)占鎖同一時(shí)刻只能被一個(gè)線程持有。

由于共享鎖同一時(shí)刻可以被多個(gè)線程持有,因此當(dāng)頭節(jié)點(diǎn)獲取到共享鎖時(shí),可以立即喚醒后繼節(jié)點(diǎn)來爭鎖,而不必等到釋放鎖的時(shí)候。因此,共享鎖觸發(fā)喚醒后繼節(jié)點(diǎn)的行為可能有兩處,一處在當(dāng)前節(jié)點(diǎn)成功獲得共享鎖后,一處在當(dāng)前節(jié)點(diǎn)釋放共享鎖后。

采用semaphore來進(jìn)行限流的話會(huì)產(chǎn)生突刺現(xiàn)象。

★指在一定時(shí)間內(nèi)的一小段時(shí)間內(nèi)就用完了所有資源,后大部分時(shí)間中無資源可用。比如在限流方法中的計(jì)算器算法,設(shè)置1s內(nèi)的最大請(qǐng)求數(shù)為100,在前100ms已經(jīng)有了100個(gè)請(qǐng)求,則后面900ms將無法處理請(qǐng)求,這就是突刺現(xiàn)象。

到此,相信大家對(duì)“如何使用Java高并發(fā)編程之Semaphore”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!


文章名稱:如何使用Java高并發(fā)編程之Semaphore
本文鏈接:http://weahome.cn/article/gesijh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部