真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Java并發(fā)之Semaphore源碼的示例分析

這篇文章主要介紹了Java并發(fā)之Semaphore源碼的示例分析,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

成都創(chuàng)新互聯(lián)專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、廊坊網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5頁面制作、商城建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站制作、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為廊坊等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

Semaphore(信號量)是JUC包中比較常用到的一個(gè)類,它是AQS共享模式的一個(gè)應(yīng)用,可以允許多個(gè)線程同時(shí)對共享資源進(jìn)行操作,并且可以有效的控制并發(fā)數(shù),利用它可以很好的實(shí)現(xiàn)流量控制。Semaphore提供了一個(gè)許可證的概念,可以把這個(gè)許可證看作公共汽車車票,只有成功獲取車票的人才能夠上車,并且車票是有一定數(shù)量的,不可能毫無限制的發(fā)下去,這樣就會(huì)導(dǎo)致公交車超載。所以當(dāng)車票發(fā)完的時(shí)候(公交車以滿載),其他人就只能等下一趟車了。如果中途有人下車,那么他的位置將會(huì)空閑出來,因此如果這時(shí)其他人想要上車的話就又可以獲得車票了。利用Semaphore可以實(shí)現(xiàn)各種池,我們在本篇末尾將會(huì)動(dòng)手寫一個(gè)簡易的數(shù)據(jù)庫連接池。首先我們來看一下Semaphore的構(gòu)造器。

//構(gòu)造器1
public Semaphore(int permits) {
  sync = new NonfairSync(permits);
}

//構(gòu)造器2
public Semaphore(int permits, boolean fair) {
  sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore提供了兩個(gè)帶參構(gòu)造器,沒有提供無參構(gòu)造器。這兩個(gè)構(gòu)造器都必須傳入一個(gè)初始的許可證數(shù)量,使用構(gòu)造器1構(gòu)造出來的信號量在獲取許可證時(shí)會(huì)采用非公平方式獲取,使用構(gòu)造器2可以通過參數(shù)指定獲取許可證的方式(公平or非公平)。Semaphore主要對外提供了兩類API,獲取許可證和釋放許可證,默認(rèn)的是獲取和釋放一個(gè)許可證,也可以傳入?yún)?shù)來同時(shí)獲取和釋放多個(gè)許可證。在本篇中我們只講每次獲取和釋放一個(gè)許可證的情況。

1.獲取許可證

//獲取一個(gè)許可證(響應(yīng)中斷)
public void acquire() throws InterruptedException {
  sync.acquireSharedInterruptibly(1);
}

//獲取一個(gè)許可證(不響應(yīng)中斷)
public void acquireUninterruptibly() {
  sync.acquireShared(1);
}

//嘗試獲取許可證(非公平獲取)
public boolean tryAcquire() {
  return sync.nonfairTryAcquireShared(1) >= 0;
}

//嘗試獲取許可證(定時(shí)獲取)
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
  return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

上面的API是Semaphore提供的默認(rèn)獲取許可證操作。每次只獲取一個(gè)許可證,這也是現(xiàn)實(shí)生活中較常遇到的情況。除了直接獲取還提供了嘗試獲取,直接獲取操作在失敗之后可能會(huì)阻塞線程,而嘗試獲取則不會(huì)。另外還需注意的是tryAcquire方法是使用非公平方式嘗試獲取的。在平時(shí)我們比較常用到的是acquire方法去獲取許可證。下面我們就來看看它是怎樣獲取的。可以看到acquire方法里面直接就是調(diào)用sync.acquireSharedInterruptibly(1),這個(gè)方法是AQS里面的方法,我們在講AQS源碼系列文章的時(shí)候曾經(jīng)講過,現(xiàn)在我們再來回顧一下。

//以可中斷模式獲取鎖(共享模式)
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  //首先判斷線程是否中斷, 如果是則拋出異常
  if (Thread.interrupted()) {
    throw new InterruptedException();
  }
  //1.嘗試去獲取鎖
  if (tryAcquireShared(arg) < 0) {
    //2. 如果獲取失敗則進(jìn)人該方法
    doAcquireSharedInterruptibly(arg);
  }
}

acquireSharedInterruptibly方法首先就是去調(diào)用tryAcquireShared方法去嘗試獲取,tryAcquireShared在AQS里面是抽象方法,F(xiàn)airSync和NonfairSync這兩個(gè)派生類實(shí)現(xiàn)了該方法的邏輯。FairSync實(shí)現(xiàn)的是公平獲取的邏輯,而NonfairSync實(shí)現(xiàn)的非公平獲取的邏輯。

abstract static class Sync extends AbstractQueuedSynchronizer {
  //非公平方式嘗試獲取
  final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
      //獲取可用許可證
      int available = getState();
      //獲取剩余許可證
      int remaining = available - acquires;
      //1.如果remaining小于0則直接返回remaining
      //2.如果remaining大于0則先更新同步狀態(tài)再返回remaining
      if (remaining < 0 || compareAndSetState(available, remaining)) {
        return remaining;
      }
    }
  }
}

//非公平同步器
static final class NonfairSync extends Sync {
  private static final long serialVersionUID = -2694183684443567898L;

  NonfairSync(int permits) {
    super(permits);
  }

  //嘗試獲取許可證
  protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
  }
}

//公平同步器
static final class FairSync extends Sync {
  private static final long serialVersionUID = 2014338818796000944L;

  FairSync(int permits) {
    super(permits);
  }

  //嘗試獲取許可證
  protected int tryAcquireShared(int acquires) {
    for (;;) {
      //判斷同步隊(duì)列前面有沒有人排隊(duì)
      if (hasQueuedPredecessors()) {
        //如果有的話就直接返回-1,表示嘗試獲取失敗
        return -1;
      }
      //獲取可用許可證
      int available = getState();
      //獲取剩余許可證
      int remaining = available - acquires;
      //1.如果remaining小于0則直接返回remaining
      //2.如果remaining大于0則先更新同步狀態(tài)再返回remaining
      if (remaining < 0 || compareAndSetState(available, remaining)) {
        return remaining;
      }
    }
  }
}

這里需要注意的是NonfairSync的tryAcquireShared方法直接調(diào)用的是nonfairTryAcquireShared方法,這個(gè)方法是在父類Sync里面的。非公平獲取鎖的邏輯是先取出當(dāng)前同步狀態(tài)(同步狀態(tài)表示許可證個(gè)數(shù)),將當(dāng)前同步狀態(tài)減去參入的參數(shù),如果結(jié)果不小于0的話證明還有可用的許可證,那么就直接使用CAS操作更新同步狀態(tài)的值,最后不管結(jié)果是否小于0都會(huì)返回該結(jié)果值。這里我們要了解tryAcquireShared方法返回值的含義,返回負(fù)數(shù)表示獲取失敗,零表示當(dāng)前線程獲取成功但后續(xù)線程不能再獲取,正數(shù)表示當(dāng)前線程獲取成功并且后續(xù)線程也能夠獲取。我們再來看acquireSharedInterruptibly方法的代碼。

//以可中斷模式獲取鎖(共享模式)
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  //首先判斷線程是否中斷, 如果是則拋出異常
  if (Thread.interrupted()) {
    throw new InterruptedException();
  }
  //1.嘗試去獲取鎖
  //負(fù)數(shù):表示獲取失敗
  //零值:表示當(dāng)前線程獲取成功, 但是后繼線程不能再獲取了
  //正數(shù):表示當(dāng)前線程獲取成功, 并且后繼線程同樣可以獲取成功
  if (tryAcquireShared(arg) < 0) {
    //2. 如果獲取失敗則進(jìn)人該方法
    doAcquireSharedInterruptibly(arg);
  }
}

如果返回的remaining小于0的話就代表獲取失敗,因此tryAcquireShared(arg) < 0就為true,所以接下來就會(huì)調(diào)用doAcquireSharedInterruptibly方法,這個(gè)方法我們在講AQS的時(shí)候講過,它會(huì)將當(dāng)前線程包裝成結(jié)點(diǎn)放入同步隊(duì)列尾部,并且有可能掛起線程。這也是當(dāng)remaining小于0時(shí)線程會(huì)排隊(duì)阻塞的原因。而如果返回的remaining>=0的話就代表當(dāng)前線程獲取成功,因此tryAcquireShared(arg) < 0就為flase,所以就不會(huì)再去調(diào)用doAcquireSharedInterruptibly方法阻塞當(dāng)前線程了。以上是非公平獲取的整個(gè)邏輯,而公平獲取時(shí)僅僅是在此之前先去調(diào)用hasQueuedPredecessors方法判斷同步隊(duì)列是否有人在排隊(duì),如果有的話就直接return -1表示獲取失敗,否則才繼續(xù)執(zhí)行下面和非公平獲取一樣的步驟。

2.釋放許可證

//釋放一個(gè)許可證
public void release() {
  sync.releaseShared(1);
}

調(diào)用release方法是釋放一個(gè)許可證,它的操作很簡單,就調(diào)用了AQS的releaseShared方法,我們來看看這個(gè)方法。

//釋放鎖的操作(共享模式)
public final boolean releaseShared(int arg) {
  //1.嘗試去釋放鎖
  if (tryReleaseShared(arg)) {
    //2.如果釋放成功就喚醒其他線程
    doReleaseShared();
    return true;
  }
  return false;
}

AQS的releaseShared方法首先調(diào)用tryReleaseShared方法嘗試釋放鎖,這個(gè)方法的實(shí)現(xiàn)邏輯在子類Sync里面。

abstract static class Sync extends AbstractQueuedSynchronizer {
  ...
  //嘗試釋放操作
  protected final boolean tryReleaseShared(int releases) {
    for (;;) {
      //獲取當(dāng)前同步狀態(tài)
      int current = getState();
      //將當(dāng)前同步狀態(tài)加上傳入的參數(shù)
      int next = current + releases;
      //如果相加結(jié)果小于當(dāng)前同步狀態(tài)的話就報(bào)錯(cuò)
      if (next < current) {
        throw new Error("Maximum permit count exceeded");
      }
      //以CAS方式更新同步狀態(tài)的值, 更新成功則返回true, 否則繼續(xù)循環(huán)
      if (compareAndSetState(current, next)) {
        return true;
      }
    }
  }
  ...
}

可以看到tryReleaseShared方法里面采用for循環(huán)進(jìn)行自旋,首先獲取同步狀態(tài),將同步狀態(tài)加上傳入的參數(shù),然后以CAS方式更新同步狀態(tài),更新成功就返回true并跳出方法,否則就繼續(xù)循環(huán)直到成功為止,這就是Semaphore釋放許可證的流程。

3.動(dòng)手寫個(gè)連接池

Semaphore代碼并沒有很復(fù)雜,常用的操作就是獲取和釋放一個(gè)許可證,這些操作的實(shí)現(xiàn)邏輯也都比較簡單,但這并不妨礙Semaphore的廣泛應(yīng)用。下面我們就來利用Semaphore實(shí)現(xiàn)一個(gè)簡單的數(shù)據(jù)庫連接池,通過這個(gè)例子希望讀者們能更加深入的掌握Semaphore的運(yùn)用。

public class ConnectPool {
  
  //連接池大小
  private int size;
  //數(shù)據(jù)庫連接集合
  private Connect[] connects;
  //連接狀態(tài)標(biāo)志
  private boolean[] connectFlag;
  //剩余可用連接數(shù)
  private volatile int available;
  //信號量
  private Semaphore semaphore;
  
  //構(gòu)造器
  public ConnectPool(int size) { 
    this.size = size;
    this.available = size;
    semaphore = new Semaphore(size, true);
    connects = new Connect[size];
    connectFlag = new boolean[size];
    initConnects();
  }
  
  //初始化連接
  private void initConnects() {
    //生成指定數(shù)量的數(shù)據(jù)庫連接
    for(int i = 0; i < this.size; i++) {
      connects[i] = new Connect();
    }
  }
  
  //獲取數(shù)據(jù)庫連接
  private synchronized Connect getConnect(){ 
    for(int i = 0; i < connectFlag.length; i++) {
      //遍歷集合找到未使用的連接
      if(!connectFlag[i]) {
        //將連接設(shè)置為使用中
        connectFlag[i] = true;
        //可用連接數(shù)減1
        available--;
        System.out.println("【"+Thread.currentThread().getName()+"】以獲取連接   剩余連接數(shù):" + available);
        //返回連接引用
        return connects[i];
      }
    }
    return null;
  }
  
  //獲取一個(gè)連接
  public Connect openConnect() throws InterruptedException {
    //獲取許可證
    semaphore.acquire();
    //獲取數(shù)據(jù)庫連接
    return getConnect();
  }
  
  //釋放一個(gè)連接
  public synchronized void release(Connect connect) { 
    for(int i = 0; i < this.size; i++) {
      if(connect == connects[i]){
        //將連接設(shè)置為未使用
        connectFlag[i] = false;
        //可用連接數(shù)加1
        available++;
        System.out.println("【"+Thread.currentThread().getName()+"】以釋放連接   剩余連接數(shù):" + available);
        //釋放許可證
        semaphore.release();
      }
    }
  }
  
  //剩余可用連接數(shù)
  public int available() {
    return available;
  }
  
}

測試代碼:

public class TestThread extends Thread {
  
  private static ConnectPool pool = new ConnectPool(3);
  
  @Override
  public void run() {
    try {
      Connect connect = pool.openConnect();
      Thread.sleep(100); //休息一下
      pool.release(connect);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  
  public static void main(String[] args) {
    for(int i = 0; i < 10; i++) {
      new TestThread().start();
    }
  }

}

測試結(jié)果:

Java并發(fā)之Semaphore源碼的示例分析

我們使用一個(gè)數(shù)組來存放數(shù)據(jù)庫連接的引用,在初始化連接池的時(shí)候會(huì)調(diào)用initConnects方法創(chuàng)建指定數(shù)量的數(shù)據(jù)庫連接,并將它們的引用存放到數(shù)組中,此外還有一個(gè)相同大小的數(shù)組來記錄連接是否可用。每當(dāng)外部線程請求獲取一個(gè)連接時(shí),首先調(diào)用semaphore.acquire()方法獲取一個(gè)許可證,然后將連接狀態(tài)設(shè)置為使用中,最后返回該連接的引用。許可證的數(shù)量由構(gòu)造時(shí)傳入的參數(shù)決定,每調(diào)用一次semaphore.acquire()方法許可證數(shù)量減1,當(dāng)數(shù)量減為0時(shí)說明已經(jīng)沒有連接可以使用了,這時(shí)如果其他線程再來獲取就會(huì)被阻塞。每當(dāng)線程釋放一個(gè)連接的時(shí)候會(huì)調(diào)用semaphore.release()將許可證釋放,此時(shí)許可證的總量又會(huì)增加,代表可用的連接數(shù)增加了,那么之前被阻塞的線程將會(huì)醒來繼續(xù)獲取連接,這時(shí)再次獲取就能夠成功獲取連接了。測試示例中初始化了一個(gè)3個(gè)連接的連接池,我們從測試結(jié)果中可以看到,每當(dāng)線程獲取一個(gè)連接剩余的連接數(shù)將會(huì)減1,等到減為0時(shí)其他線程就不能再獲取了,此時(shí)必須等待一個(gè)線程將連接釋放之后才能繼續(xù)獲取。可以看到剩余連接數(shù)總是在0到3之間變動(dòng),說明我們這次的測試是成功的。

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Java并發(fā)之Semaphore源碼的示例分析”這篇文章對大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!


本文名稱:Java并發(fā)之Semaphore源碼的示例分析
標(biāo)題鏈接:http://weahome.cn/article/gsheei.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部