真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何在Java中使用AbstractQueuedSynchronizer同步框架

這篇文章將為大家詳細(xì)講解有關(guān)如何在Java中使用AbstractQueuedSynchronizer同步框架,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對相關(guān)知識(shí)有一定的了解。

創(chuàng)新互聯(lián)公司是一家專業(yè)提供廣河企業(yè)網(wǎng)站建設(shè),專注與成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站、成都外貿(mào)網(wǎng)站建設(shè)公司、H5建站、小程序制作等業(yè)務(wù)。10年已為廣河眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)的建站公司優(yōu)惠進(jìn)行中。

AbstractQueuedSynchronizer概述

AbstractQueuedSynchronizer是java中非常重要的一個(gè)框架類,它實(shí)現(xiàn)了最核心的多線程同步的語義,我們只要繼承AbstractQueuedSynchronizer就可以非常方便的實(shí)現(xiàn)我們自己的線程同步器,java中的鎖Lock就是基于AbstractQueuedSynchronizer來實(shí)現(xiàn)的。下面首先展示了AbstractQueuedSynchronizer類提供的一些方法:

如何在Java中使用AbstractQueuedSynchronizer同步框架

AbstractQueuedSynchronizer類方法

在類結(jié)構(gòu)上,AbstractQueuedSynchronizer繼承了AbstractOwnableSynchronizer,AbstractOwnableSynchronizer僅有的兩個(gè)方法是提供當(dāng)前獨(dú)占模式的線程設(shè)置:

  /**
   * The current owner of exclusive mode synchronization.
   */
  private transient Thread exclusiveOwnerThread;

  /**
   * Sets the thread that currently owns exclusive access.
   * A {@code null} argument indicates that no thread owns access.
   * This method does not otherwise impose any synchronization or
   * {@code volatile} field accesses.
   * @param thread the owner thread
   */
  protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
  }

  /**
   * Returns the thread last set by {@code setExclusiveOwnerThread},
   * or {@code null} if never set. This method does not otherwise
   * impose any synchronization or {@code volatile} field accesses.
   * @return the owner thread
   */
  protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
  }

exclusiveOwnerThread代表的是當(dāng)前獲得同步的線程,因?yàn)槭仟?dú)占模式,在exclusiveOwnerThread持有同步的過程中其他的線程的任何同步獲取請求將不能得到滿足。

至此,需要說明的是,AbstractQueuedSynchronizer不僅支持獨(dú)占模式下的同步實(shí)現(xiàn),還支持共享模式下的同步實(shí)現(xiàn)。在java的鎖的實(shí)現(xiàn)上就有共享鎖和獨(dú)占鎖的區(qū)別,而這些實(shí)現(xiàn)都是基于AbstractQueuedSynchronizer對于共享同步和獨(dú)占同步的支持。從上面展示的AbstractQueuedSynchronizer提供的方法中,我們可以發(fā)現(xiàn)AbstractQueuedSynchronizer的API大概分為三類:

  • 類似acquire(int)的一類是最基本的一類,不可中斷

  • 類似acquireInterruptibly(int)的一類可以被中斷

  • 類似tryAcquireNanos(int, long)的一類不僅可以被中斷,而且可以設(shè)置阻塞時(shí)間

上面的三種類型的API分為獨(dú)占和共享兩套,我們可以根據(jù)我們的需求來使用合適的API來做多線程同步。

下面是一個(gè)繼承AbstractQueuedSynchronizer來實(shí)現(xiàn)自己的同步器的一個(gè)示例:

class Mutex implements Lock, java.io.Serializable {
 
  // Our internal helper class
  private static class Sync extends AbstractQueuedSynchronizer {
   // Reports whether in locked state
   protected boolean isHeldExclusively() {
    return getState() == 1;
   }
 
   // Acquires the lock if state is zero
   public boolean tryAcquire(int acquires) {
    assert acquires == 1; // Otherwise unused
    if (compareAndSetState(0, 1)) {
     setExclusiveOwnerThread(Thread.currentThread());
     return true;
    }
    return false;
   }
 
   // Releases the lock by setting state to zero
   protected boolean tryRelease(int releases) {
    assert releases == 1; // Otherwise unused
    if (getState() == 0) throw new IllegalMonitorStateException();
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
   }
 
   // Provides a Condition
   Condition newCondition() { return new ConditionObject(); }
 
   // Deserializes properly
   private void readObject(ObjectInputStream s)
     throws IOException, ClassNotFoundException {
    s.defaultReadObject();
    setState(0); // reset to unlocked state
   }
  }
 
  // The sync object does all the hard work. We just forward to it.
  private final Sync sync = new Sync();
 
  public void lock()        { sync.acquire(1); }
  public boolean tryLock()     { return sync.tryAcquire(1); }
  public void unlock()       { sync.release(1); }
  public Condition newCondition()  { return sync.newCondition(); }
  public boolean isLocked()     { return sync.isHeldExclusively(); }
  public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
  public void lockInterruptibly() throws InterruptedException {
   sync.acquireInterruptibly(1);
  }
  public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
   return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
 }}

Mutex實(shí)現(xiàn)的功能是:使用0來代表可以獲得同步變量,使用1來代表需要等待同步變量被釋放再獲取,這是一個(gè)簡單的獨(dú)占鎖實(shí)現(xiàn),任何時(shí)刻只會(huì)有一個(gè)線程獲得鎖,其他請求獲取鎖的線程都會(huì)阻塞等待直到鎖被釋放,等待的線程將再次競爭來獲得鎖。Mutex給了我們很好的范例,我們要實(shí)現(xiàn)自己的線程同步器,那么就繼承AbstractQueuedSynchronizer實(shí)現(xiàn)其三個(gè)抽象方法,然后使用該實(shí)現(xiàn)類來做lock和unlock的操作,可以發(fā)現(xiàn),AbstractQueuedSynchronizer框架為我們鋪平了道路,我們只需要做一點(diǎn)點(diǎn)改變就可以實(shí)現(xiàn)高效安全的線程同步去,下文中將分析AbstractQueuedSynchronizer是如何為我么提供如此強(qiáng)大得同步能力的。

AbstractQueuedSynchronizer實(shí)現(xiàn)細(xì)節(jié)

獨(dú)占模式

AbstractQueuedSynchronizer使用一個(gè)volatile類型的int來作為同步變量,任何想要獲得鎖的線程都需要來競爭該變量,獲得鎖的線程可以繼續(xù)業(yè)務(wù)流程的執(zhí)行,而沒有獲得鎖的線程會(huì)被放到一個(gè)FIFO的隊(duì)列中去,等待再次競爭同步變量來獲得鎖。AbstractQueuedSynchronizer為每個(gè)沒有獲得鎖的線程封裝成一個(gè)Node再放到隊(duì)列中去,下面先來分析一下Node這個(gè)數(shù)據(jù)結(jié)構(gòu):

/** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED = 1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL  = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

上面展示的是Node定義的四個(gè)狀態(tài),需要注意的是只有一個(gè)狀態(tài)是大于0的,也就是CANCELLED,也就是被取消了,不需要為此線程協(xié)調(diào)同步變量的競爭了。其他幾個(gè)的意義見注釋。上一小節(jié)說到,AbstractQueuedSynchronizer提供獨(dú)占式和共享式兩種模式,AbstractQueuedSynchronizer使用下面的兩個(gè)變量來標(biāo)志是共享還是獨(dú)占模式:

/** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

有趣的是,Node使用了一個(gè)變量nextWaiter來代表兩種含義,當(dāng)在獨(dú)占模式下,nextWaiter表示下一個(gè)等在ConditionObject上的Node,在共享模式下就是SHARED,因?yàn)閷τ谌魏我粋€(gè)同步器來說,都不可能同時(shí)實(shí)現(xiàn)共享和獨(dú)占兩種模式的,更為專業(yè)的解釋為:

/**
     * Link to next node waiting on condition, or the special
     * value SHARED. Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

AbstractQueuedSynchronizer使用雙向鏈表來管理請求同步的Node,保存了鏈表的head和tail,新的Node將會(huì)被插到鏈表的尾端,而鏈表的head總是代表著獲得鎖的線程,鏈表頭的線程釋放了鎖之后會(huì)通知后面的線程來競爭共享變量。下面分析一下AbstractQueuedSynchronizer是如何實(shí)現(xiàn)獨(dú)占模式下的acquire和release的。

首先,使用方法acquire(int)可以競爭同步變量,下面是調(diào)用鏈路:

  public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
  }
  
  private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
      }
    }
    enq(node);
    return node;
  }
  
  final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
      boolean interrupted = false;
      for (;;) {
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {
          setHead(node);
          p.next = null; // help GC
          failed = false;
          return interrupted;
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
          interrupted = true;
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
  }

首先會(huì)調(diào)用方法tryAcquire來嘗試獲的鎖,而tryAcquire這個(gè)方法是需要子類來實(shí)現(xiàn)的,子類的實(shí)現(xiàn)無非就是通過compareAndSetState、getState、setState三個(gè)方法來操作同步變量state,子類的方法實(shí)現(xiàn)需要根據(jù)各自的需求場景來實(shí)現(xiàn)。繼續(xù)分析上面的acquire流程,如果tryAcquire返回true了,也就是成功改變了state的值了,也就是獲得了同步鎖了,那么就可以退出了。如果返回false,說明有其他的線程獲得鎖了,這個(gè)時(shí)候AbstractQueuedSynchronizer會(huì)使用addWaiter將當(dāng)前線程添加到等待隊(duì)列的尾部等待再次競爭。需要注意的是將當(dāng)前線程標(biāo)記為了獨(dú)占模式。然后重頭戲來了,方法acquireQueued使得新添加的Node在一個(gè)for死循環(huán)中不斷的輪詢,也就是自旋,acquireQueued方法退出的條件是:

  1. 該節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn),頭結(jié)點(diǎn)代表的是獲得鎖的節(jié)點(diǎn),只有它釋放了state其他線程才能獲得這個(gè)變量的所有權(quán)

  2. 在條件1的前提下,方法tryAcquire返回true,也就是可以獲得同步資源state

滿足上面兩個(gè)條件之后,這個(gè)Node就會(huì)獲得鎖,根據(jù)AbstractQueuedSynchronizer的規(guī)定,獲得鎖的Node必須是鏈表的頭結(jié)點(diǎn),所以,需要將當(dāng)前節(jié)點(diǎn)設(shè)定為頭結(jié)點(diǎn)。那如果不符合上面兩個(gè)條件的Node會(huì)怎么樣呢?看for循環(huán)里面的第二個(gè)分支,首先是shouldParkAfterFailedAcquire方法,看名字應(yīng)該是說判斷是否應(yīng)該park當(dāng)前該線程,然后是方法parkAndCheckInterrupt,這個(gè)方法是在shouldParkAfterFailedAcquire返回true的前提之下才會(huì)之下,意思就是首先判斷一下是否需要park該Node,如果需要,那么就park它。關(guān)于線程的park和unpark,AbstractQueuedSynchronizer使用了偏向底層的技術(shù)來實(shí)現(xiàn),在此先不做分析。現(xiàn)在來分析一下再什么情況下Node會(huì)被park(block):

  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
      /*
       * This node has already set status asking a release
       * to signal it, so it can safely park.
       */
      return true;
    if (ws > 0) {
      /*
       * Predecessor was cancelled. Skip over predecessors and
       * indicate retry.
       */
      do {
        node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      pred.next = node;
    } else {
      /*
       * waitStatus must be 0 or PROPAGATE. Indicate that we
       * need a signal, but don't park yet. Caller will need to
       * retry to make sure it cannot acquire before parking.
       */
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
  }

可以發(fā)現(xiàn),只有當(dāng)Node的前驅(qū)節(jié)點(diǎn)的狀態(tài)為Node.SIGNAL的時(shí)候才會(huì)返回true,也就是說,只有當(dāng)前驅(qū)節(jié)點(diǎn)的狀態(tài)變?yōu)榱薔ode.SIGNAL,才會(huì)去通知當(dāng)前節(jié)點(diǎn),所以如果前驅(qū)節(jié)點(diǎn)是Node.SIGNAL的,那么當(dāng)前節(jié)點(diǎn)就可以放心的park就好了,前驅(qū)節(jié)點(diǎn)在完成工作之后在釋放資源的時(shí)候會(huì)unpark它的后繼節(jié)點(diǎn)。下面看一下release的過程:

  public final boolean release(int arg) {
    if (tryRelease(arg)) {
      Node h = head;
      if (h != null && h.waitStatus != 0)
        unparkSuccessor(h);
      return true;
    }
    return false;
  }
  
  private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling. It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
      compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node. But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
      s = null;
      for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
          s = t;
    }
    if (s != null)
      LockSupport.unpark(s.thread);
  }

首先通過tryRelease方法來保證資源安全完整的釋放了之后,如果發(fā)現(xiàn)節(jié)點(diǎn)的狀態(tài)小于0,會(huì)變?yōu)?。0代表的是初始化的狀態(tài),當(dāng)前的線程已經(jīng)完成了工作,釋放了鎖,就要恢復(fù)原來的樣子。然后會(huì)獲取該節(jié)點(diǎn)的后繼節(jié)點(diǎn),如果沒有后續(xù)節(jié)點(diǎn)了,或者后繼節(jié)點(diǎn)已經(jīng)被取消了,那么從尾部開始向前找第一個(gè)符合要求的節(jié)點(diǎn),然后unpark它。

上面介紹了一對acquire-release,如果希望線程可以在競爭的時(shí)候被中斷,可以使用acquireInterruptibly。如果希望加上獲取鎖的時(shí)間限制,可以使用tryAcquireNanos(int, long)方法來獲取。

共享模式

和獨(dú)占模式一樣,分析一下acquireShared的過程:

  public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
      doAcquireShared(arg);
  }
  
  private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
      boolean interrupted = false;
      for (;;) {
        final Node p = node.predecessor();
        if (p == head) {
          int r = tryAcquireShared(arg);
          if (r >= 0) {
            setHeadAndPropagate(node, r);
            p.next = null; // help GC
            if (interrupted)
              selfInterrupt();
            failed = false;
            return;
          }
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
          interrupted = true;
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
  }

獲取鎖的流程如下:

  1. 嘗試使用tryAcquireShared方法,如果返回值大于等于0則表示成功,否則運(yùn)行doAcquireShared方法

  2. 將當(dāng)前競爭同步的線程添加到鏈表尾部,然后自旋

  3. 獲取前驅(qū)節(jié)點(diǎn),如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),也就是說前驅(qū)節(jié)點(diǎn)現(xiàn)在持有鎖,那么繼續(xù)運(yùn)行4,否則park該節(jié)點(diǎn)等待被unpark

  4. 使用tryAcquireShared方法來競爭,如果返回值大于等于0,那么就算是獲取成功了,否則繼續(xù)自旋嘗試

共享模式下的release流程:

  public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
    }
    return false;
  }
  
  private void doReleaseShared() {
    for (;;) {
      Node h = head;
      if (h != null && h != tail) {
        int ws = h.waitStatus;
        if (ws == Node.SIGNAL) {
          if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
            continue;      // loop to recheck cases
          unparkSuccessor(h);
        }
        else if (ws == 0 &&
             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
          continue;        // loop on failed CAS
      }
      if (h == head)          // loop if head changed
        break;
    }
  }  

  private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
      compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
      s = null;
      for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
          s = t;
    }
    if (s != null)
      LockSupport.unpark(s.thread);
  }

首先嘗試使用tryReleaseShared方法來釋放資源,如果釋放失敗,則返回false,如果釋放成功了,那么繼續(xù)執(zhí)行doReleaseShared方法喚醒后續(xù)節(jié)點(diǎn)來競爭資源。需要注意的是,共享模式和獨(dú)占模式的區(qū)別在于,獨(dú)占模式只允許一個(gè)線程獲得資源,而共享模式允許多個(gè)線程獲得資源。所以在獨(dú)占模式下只有當(dāng)tryAcquire返回true的時(shí)候我們才能確定獲得資源了,而在共享模式下,只要tryAcquireShared返回值大于等于0就可以說明獲得資源了,所以你要確保你需要實(shí)現(xiàn)的需求和AbstractQueuedSynchronizer希望的是一致的。

桶獨(dú)占模式一樣,共享模式也有其他的兩種API:

  • acquireSharedInterruptibly:支持相應(yīng)中斷的資源競爭

  • tryAcquireSharedNanos:可以設(shè)定時(shí)間的資源競爭

關(guān)于如何在Java中使用AbstractQueuedSynchronizer同步框架就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。


本文標(biāo)題:如何在Java中使用AbstractQueuedSynchronizer同步框架
瀏覽路徑:http://weahome.cn/article/igdodd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部