????AbstractQueuedSynchronizer (AQS)類如其名,抽象的隊(duì)列式同步容器,AQS定義類一套多線程訪問(wèn)共享資源的同步器,許多同步類的實(shí)現(xiàn)都依賴于它,比如之前學(xué)習(xí)的ReentrantLock/Semaphore/CountDownLatch。
在堆龍德慶等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都做網(wǎng)站、成都網(wǎng)站制作 網(wǎng)站設(shè)計(jì)制作定制制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,營(yíng)銷型網(wǎng)站建設(shè),外貿(mào)網(wǎng)站制作,堆龍德慶網(wǎng)站建設(shè)費(fèi)用合理。
AQS阻塞隊(duì)列.png
1。自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源state的獲取于釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),AQS已經(jīng)在頂層實(shí)現(xiàn)好了,自定義同步容器實(shí)現(xiàn)時(shí)主要實(shí)現(xiàn)以下幾種方法:
2.isHeldExclusively():該線程是否正在獨(dú)占資源,只有用到condition才需要取實(shí)現(xiàn)它。
3.tryAcquire(int):獨(dú)占方式,嘗試獲取資源,成功則返回true,失敗則返回false。
4.tryRelease(int):獨(dú)占方式,嘗試釋放資源,成功則返回true,失敗則返回false。
5.tryAcquireShared(int):共享方式,嘗試獲取資源,附屬表示獲取失敗,0表示獲取成功,但是沒(méi)有剩余資源可用,其他線程不能在獲取,正數(shù)表示獲取成功,并且有剩余資源,也就是自己可以重復(fù)獲取(可重入)。
6.tryReleaseShare(int):共享方式。嘗試釋放資源,成功返回true,失敗返回false。
7.以ReentrantLock為例,state初始化為0,表示未鎖定狀態(tài)。A線程lock()的時(shí)候,會(huì)調(diào)用tryAcquire()獨(dú)占該鎖并將state+1。此后其他線程在tryAcquire()獨(dú)占該鎖并將state+1。此后表示其他線程再tryAcquire()時(shí)就會(huì)失敗,直到A線程unlock()到state=0(幾鎖釋放)為止,其他線程才有機(jī)會(huì)獲取該鎖。當(dāng)然,釋放鎖之前,A線程自己是可以重復(fù)獲取此鎖的(state會(huì)累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態(tài)的。
2.1 Reentrantlock類結(jié)構(gòu)
public class ReentrantLock implements Lock, java.io.Serializable
可以看出Reentrantlock 實(shí)現(xiàn)類lock接口,首先看下這個(gè)lock接口。
public interface Lock {
//普通的獲取鎖的方法,lock會(huì)阻塞直到成功
void lock();
//與lock不同的時(shí),它可以相應(yīng)中斷,
void lockInterruptibly() throws InterruptedException;
//嘗試獲取鎖,立即返回,不阻塞,成功返回true,失敗返回false
boolean tryLock();
//先嘗試獲取鎖,如果成功則返回true,失敗則阻塞等待,等待時(shí)間由參數(shù)決定,在等待的同時(shí)相應(yīng)中斷,拋出中斷異常,如果在等待的過(guò)程中獲得類鎖則返回true,否則直到時(shí)間超時(shí),則返回false。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//普通釋放鎖的方法
void unlock();
//新建一個(gè)條件,lock可以關(guān)聯(lián)多個(gè)條件,關(guān)于條件在以后篇幅中記錄
Condition newCondition();
}
????可以看出,顯示鎖,與synchronied相比,支持非阻塞的方式獲取鎖,可以響應(yīng)中斷,可以限時(shí),這使得它非常的靈活。
?????現(xiàn)在回到Reentrantlock類,可以看到其內(nèi)部基本上定義類三個(gè)內(nèi)部類:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
.....
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
static final class NonfairSync extends Sync {
....
}
....
static final class FairSync extends Sync {
.....
}
.....
}
??????很顯然從上面的類結(jié)構(gòu),我們可以得出jdk利用CAS算法和AQS實(shí)現(xiàn)類ReentrantLock。
2.2. 構(gòu)造函數(shù)分析
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
???ReentrantLock出于性能考慮,默認(rèn)采用非公平鎖。
2.3. 獲取鎖
2.3.1. 非公平模式下的加鎖
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//直接通過(guò)CAS算法,通過(guò)改變共享資源-state值,成功則設(shè)置當(dāng)前線程為持有鎖的線程(獨(dú)占),失敗則調(diào)用頂層AQS的acquire方法,再次嘗試,失敗則將線程放到阻塞隊(duì)列
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}
2.3.2. 公平模式下的加鎖
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
//獲取鎖
final void lock() {
acquire(1); //在這里調(diào)用的時(shí)AQS頂層方法,獲取失敗則將線程放到阻塞隊(duì)列末尾
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//當(dāng)前資源還沒(méi)有被其他線程鎖定
if (c == 0) {
//沒(méi)有其他線程正在競(jìng)爭(zhēng)當(dāng)前的臨界資源,并且通過(guò)CAS加鎖成功,則設(shè)置當(dāng)前的線程為獨(dú)占此資源的線程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果此時(shí)臨界資源已經(jīng)被鎖定,則先判斷持有者是否時(shí)當(dāng)前線程
else if (current == getExclusiveOwnerThread()) {
//如果是持有者,則可重復(fù)獲取鎖,同時(shí)更新狀態(tài)值,注意加幾次鎖就要釋放幾次,直到為0
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
2.3.3、acquireInterruptibly解析
public void lockInterruptibly() throws InterruptedException {
//調(diào)用AQS頂層方法
sync.acquireInterruptibly(1);
}
//AQS頂層方法
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//中斷響應(yīng)
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
大致過(guò)程如下:
(1)如果線程已經(jīng)中斷則直接拋出異常
(2)如果線程還沒(méi)中斷,則通過(guò)自旋方式,申請(qǐng)鎖直至到當(dāng)前線程獲得鎖,或失敗,然后響應(yīng)中斷。
2.4 釋放鎖
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
其釋放獨(dú)占鎖的過(guò)程大致如下:
(1)先獲取當(dāng)前資源狀態(tài),減去釋放release,一般是1;
(2)判斷當(dāng)前線程是否鎖定資源的線程,如果不是則拋出異常;
(3)假如資源狀態(tài)值為0,則說(shuō)明鎖已經(jīng)釋放,將獨(dú)占線程至空,更新狀態(tài)值,返回設(shè)置成功或失敗