本文基于jdk1.8進行分析。
創(chuàng)新互聯(lián)堅實的技術研發(fā)基礎贏得了行業(yè)內(nèi)的良好口碑,公司成立十余年來,為1000+企業(yè)提供過網(wǎng)站建設、軟件開發(fā)、搜索引擎優(yōu)化技術、互聯(lián)網(wǎng)大數(shù)據(jù)整合營銷服務,多年的技術服務成功經(jīng)驗、眾多的客戶使我們能懂得更多,做得更好。"讓您的網(wǎng)站跑起來"是我們一直追求的目標!
ReentrantLock是一個可重入鎖,在ConcurrentHashMap中使用了ReentrantLock。
首先看一下源碼中對ReentrantLock的介紹。如下圖。ReentrantLock是一個可重入的排他鎖,它和synchronized的方法和代碼有著相同的行為和語義,但有更多的功能。ReentrantLock是被最后一個成功lock鎖并且還沒有unlock的線程擁有著。如果鎖沒有被別的線程擁有,那么一個線程調(diào)用lock方法,就會成功獲取鎖并返回。如果當前線程已經(jīng)擁有該鎖,那么lock方法會立刻返回。這個可以通過isHeldByCurrentThread方法和getHoldCount方法進行驗證。除了這部分介紹外,類前面的javadoc文檔很長,就不在這里全部展開。隨著后面介紹源碼,會一一涉及到。
/** * A reentrant mutual exclusion {@link Lock} with the same basic * behavior and semantics as the implicit monitor lock accessed using * {@code synchronized} methods and statements, but with extended * capabilities. *A {@code ReentrantLock} is owned by the thread last * successfully locking, but not yet unlocking it. A thread invoking * {@code lock} will return, successfully acquiring the lock, when * the lock is not owned by another thread. The method will return * immediately if the current thread already owns the lock. This can * be checked using methods {@link #isHeldByCurrentThread}, and {@link * #getHoldCount}.
首先看一下成員變量,如下圖。ReentrantLock只有一個成員變量sync,即同步器,這個同步器提供所有的機制。Sync是AbstractQueuedSynchronizer的子類,同時,Sync有2個子類,NonfairSync和FairSync,分別是非公平鎖和公平鎖。Sync,NonfaireSync和FairSync的具體實現(xiàn)后面再講。
/** Synchronizer providing all implementation mechanics **/ private final Sync sync;
下面看一下構造函數(shù)。如下圖。可以看到,ReentrantLock默認是非公平鎖,它可以通過參數(shù),指定初始化為公平鎖或非公平鎖。
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. **/ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * @param fair {@code true} if this lock should use a fair ordering policy **/ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
下面看一下ReentrantLock的主要方法。首先是lock方法。如下圖。lock方法的實現(xiàn)很簡單,就是調(diào)用Sync的lock方法。而Sync的lock方法是個抽象的,具體實現(xiàn)在NonfairSync和FairSync中。這里我們先不展開講,而是先讀一下lock方法的注釋,看看它的作用。lock方法的作用是獲取該鎖。分為3種情況。
1,如果鎖沒有被別的線程占有,那么當前線程就可以獲取到鎖并立刻返回,并把鎖計數(shù)設置為1。
2,如果當前線程已經(jīng)占有該鎖了,那么就會把鎖計數(shù)加1,立刻返回。
3,如果鎖被另一個線程占有了,那么當前線程就無法再被線程調(diào)度,并且開始睡眠,直到獲取到鎖,在獲取到到鎖時,會把鎖計數(shù)設置為1。
lockInterruptibly方法與lock功能類似,但lockInterruptibly方法在等待的過程中,可以響應中斷。
/** * Acquires the lock. *Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. *
If the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. *
If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one. **/ public void lock() { sync.lock(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
下面,詳細看一下非公平鎖和公平鎖中對lock函數(shù)的實現(xiàn)。如下圖。下圖同時列出了公平鎖和非公平鎖中l(wèi)ock的實現(xiàn)邏輯。從注釋和代碼邏輯中,都可以看出,非公平鎖進行l(wèi)ock時,先嘗試立刻闖入(搶占),如果成功,則獲取到鎖,如果失敗,再執(zhí)行通常的獲取鎖的行為,即acquire(1)。
/** * 非公平鎖中的lock * Performs lock. Try immediate barge, backing up to normal * acquire on failure. **/ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } //公平鎖中的lock final void lock() { acquire(1); }
那么,我們首先了解下,非公平鎖“嘗試立刻闖入”,究竟做了什么。稍后再繼續(xù)講解通常的獲取鎖的行為。下圖是立即闖入行為compareAndSetState(0, 1)的實現(xiàn)。從compareAndSetState函數(shù)的注釋中,可以知道,如果同步狀態(tài)值與期望值相等,那么就把它的值設置為updated值。否則同步狀態(tài)值與期望值不相等,則返回false。這個操作和volatile有著相同的內(nèi)存語義,也就是說,這個操作對其他線程是可見的。compareAndSetState函數(shù)注釋里描述的功能,是通過unsafe.compareAndSwapInt方法實現(xiàn)的,而unsafe.compareAndSwapInt是一個native方法,是用c++實現(xiàn)的。那么繼續(xù)追問,c++底層是怎么實現(xiàn)的?C++底層是通過CAS指令來實現(xiàn)的。什么是CAS指令呢?來自維基百科的解釋是,CAS,比較和交換,Compare and Swap,是用用于實現(xiàn)多線程原子同步的指令。它將內(nèi)存位置的內(nèi)容和給定值比較,只有在相同的情況下,將該內(nèi)存的值設置為新的給定值。這個操作是原子操作。那么繼續(xù)追問,CAS指令的原子性,是如何實現(xiàn)的呢?我們都知道指令時CPU來執(zhí)行的,在多CPU系統(tǒng)中,內(nèi)存是共享的,內(nèi)存和多個cpu都掛在總線上,當一個CPU執(zhí)行CAS指令時,它會先將總線LOCK位點設置為高電平。如果別的CPU也要執(zhí)行CAS執(zhí)行,它會發(fā)現(xiàn)總線LOCK位點已經(jīng)是高電平了,則無法執(zhí)行CAS執(zhí)行。CPU通過LOCK保證了指令的原子執(zhí)行。
現(xiàn)在來看一下非公平鎖的lock行為,compareAndSetState(0, 1),它期望鎖狀態(tài)為0,即沒有別的線程占用,并把新狀態(tài)設置為1,即標記為占用狀態(tài)。如果成功,則非公平鎖成功搶到鎖,之后setExclusiveOwnerThread,把自己設置為排他線程。非公平鎖這小子太壞了。如果搶占失敗,則執(zhí)行與公平鎖相同的操作。
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. **/ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
下面看一下公平鎖獲取鎖時的行為。如下圖。這部分的邏輯有些多,請閱讀代碼中的注釋進行理解。
/** * 公平鎖的lock **/ final void lock() { acquire(1); } /** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. **/ public final void acquire(int arg) { /** * acquire首先進行tryAcquire()操作。如果tryAcquire()成功時則獲取到鎖,即刻返回。 * 如果tryAcquire()false時,會執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg) * 操作。如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg)true時,則當前線程中斷自己。 * 如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg)false,則返回。 * 其中tryAcquire()操作在NonfairSync中和FairSync中實現(xiàn)又有所區(qū)別。 **/ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * NonfairSync中的tryAcquire。 * @param acquires * @return **/ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. **/ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //首先獲取當前同步狀態(tài)值 int c = getState(); if (c == 0) { //c為0,表示目前沒有線程占用鎖。沒有線程占用鎖時,當前線程嘗試搶鎖,如果搶鎖成功,則返回true。 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //c不等于0時表示鎖被線程占用。如果是當前線程占用了,則將鎖計數(shù)加上acquires,并返回true。 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //以上情況都不是時,返回false,表示非公平搶鎖失敗。 return false; } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. * 這個是公平版本的tryAcquire **/ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //c=0時表示鎖未被占用。這里是先判斷隊列中前面是否有別的線程。沒有別的線程時,才進行CAS操作。 //公平鎖之所以公平,正是因為這里。它發(fā)現(xiàn)鎖未被占用時,首先判斷等待隊列中是否有別的線程已經(jīng)在等待了。 //而非公平鎖,發(fā)現(xiàn)鎖未被占用時,根本不管隊列中的排隊情況,上來就搶。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * 當搶鎖失敗時,先執(zhí)行addWaiter(Node.EXCLUSIVE),將當前線程加入等待隊列,再執(zhí)行該方法。 * 該方法的作用是中斷當前線程,并進行檢查,知道當前線程是隊列中的第一個線程,并且搶鎖成功時, * 該方法返回。 * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting **/ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
接下來是tryLock方法。代碼如下。從注釋中我們可以理解到,只有當調(diào)用tryLock時鎖沒有被別的線程占用,tryLock才會獲取鎖。如果鎖沒有被另一個線程占用,那么就獲取鎖,并立刻返回true,并把鎖計數(shù)設置為1. 甚至在鎖被設置為公平排序的情況下,若果鎖可用,調(diào)用tryLock會立刻獲取鎖,而不管有沒有別的線程在等待鎖了。從這里我們總結出,不管可重入鎖是公平鎖還是非公平鎖,tryLock方法只會是非公平的。
/** * Acquires the lock only if it is not held by another thread at the time * of invocation. *Acquires the lock if it is not held by another thread and * returns immediately with the value {@code true}, setting the * lock hold count to one. Even when this lock has been set to use a * fair ordering policy, a call to {@code tryLock()} will * immediately acquire the lock if it is available, whether or not * other threads are currently waiting for the lock. * This "barging" behavior can be useful in certain * circumstances, even though it breaks fairness. If you want to honor * the fairness setting for this lock, then use * {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) } * which is almost equivalent (it also detects interruption). *
If the current thread already holds this lock then the hold * count is incremented by one and the method returns {@code true}. *
If the lock is held by another thread then this method will return * immediately with the value {@code false}. * @return {@code true} if the lock was free and was acquired by the * current thread, or the lock was already held by the current * thread; and {@code false} otherwise **/ public boolean tryLock() { return sync.nonfairTryAcquire(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
接下來是釋放鎖的方法unlock。代碼如下。unlock方式的實現(xiàn),是以參數(shù)1來調(diào)用sync.release方法。而release方法是如何實現(xiàn)的呢?release方法首先會調(diào)用tryRelease方法,如果tryRelease成功,則喚醒后繼者線程。而tryRelease的實現(xiàn)過程十分清晰,首先獲取鎖狀態(tài),鎖狀態(tài)減去參數(shù)(放鎖次數(shù)),得到新狀態(tài)。然后判斷持有鎖的線程是否為當前線程,如果不是當前線程,則拋出IllegalMonitorStateException。然后判斷,如果新狀態(tài)為0,說明放鎖成功,則把持有鎖的線程設置為null,并返回true。如果新狀態(tài)不為0,則返回false。從tryRelease的返回值來看,它返回的true或false,指的是否成功的釋放了該鎖。成功的釋放該鎖的意思是徹底釋放鎖,別的線程就可以獲取鎖了。這里要認識到,即便tryRelease返回false,它也只是說明了鎖沒有完全釋放,本次調(diào)用的這個釋放次數(shù)值,依然是釋放成功的。
/** * Attempts to release this lock. *If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * @throws IllegalMonitorStateException if the current thread does not * hold this lock **/ public void unlock() { sync.release(1); } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} **/ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /** * Wakes up node's successor, if one exists. * @param node the node **/ private void unparkSuccessor(Node node) { /** * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. **/ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /** * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. **/ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
接下來是newCondition方法。關于Condition這里不展開介紹,只是了解下該方法的作用。如下圖。該方法返回一個和這個鎖實例一起使用的Condition實例。返回的Condition實例支持和Object的監(jiān)控方法例如wait-notify和notifyAll相同的用法。
/** * Returns a {@link Condition} instance for use with this * {@link Lock} instance. *The returned {@link Condition} instance supports the same * usages as do the {@link Object} monitor methods ({@link * Object#wait() wait}, {@link Object#notify notify}, and {@link * Object#notifyAll notifyAll}) when used with the built-in * monitor lock. *
可重入鎖還有一些其他的方法,這里就不一一介紹了。This is the end.
總結
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對創(chuàng)新互聯(lián)的支持。如果你想了解更多相關內(nèi)容請查看下面相關鏈接