從本課開(kāi)始學(xué)習(xí)并發(fā)編程的內(nèi)容。主要介紹并發(fā)編程的基礎(chǔ)知識(shí)、鎖、內(nèi)存模型、線程池、各種并發(fā)容器的使用。
成都創(chuàng)新互聯(lián)公司主要從事做網(wǎng)站、網(wǎng)站設(shè)計(jì)、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)讓胡路,10余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來(lái)電咨詢建站服務(wù):028-86922220
并發(fā)編程
線程通信
AQS
Condition
Lock
本節(jié)學(xué)習(xí)線程間的通信,并手寫緩存隊(duì)列。
有兩種:
synchronized
結(jié)合wait()
、notify()
、notifyAll()
來(lái)實(shí)現(xiàn)Lock
并且結(jié)合Condition
來(lái)實(shí)現(xiàn)本節(jié)內(nèi)容主要講解Condition。
是個(gè)接口。實(shí)現(xiàn)類是ConditionObject
,AQS的一個(gè)內(nèi)部類
public interface Condition {
void await() throws InterruptedException;
void awaitUnInterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
await()
,會(huì)使當(dāng)前線程等待,并且釋放鎖;當(dāng)其他線程執(zhí)行signal()
或signalAll()
時(shí),線程會(huì)重新獲取鎖并繼續(xù)執(zhí)行;或者當(dāng)線程被中斷時(shí),會(huì)使線程跳出等待。該方法和Object.wait()
功能相似awaitUnInterruptibly()
,與await
類似,但是不會(huì)響應(yīng)中斷,即使是在等待狀態(tài)signal()
,用于喚醒一個(gè)等待的線程。相對(duì)的signalAll()
方法可以喚醒所有等待的線程。和Object.notify()
功能類似condition.await()必須在lock和unlock之間使用
使用lock.newCondition()
來(lái)獲取Condition
當(dāng)執(zhí)行await()
或signal()
時(shí),線程不一定立即響應(yīng),此時(shí)會(huì)出現(xiàn)虛假等待和虛假喚醒。這是對(duì)基礎(chǔ)平臺(tái)語(yǔ)義的讓步。若使用"if (!條件)"來(lái)做判斷的話會(huì)有問(wèn)題,所以一般使用 "while(!條件)"來(lái)防止這種情況
不用IF,使用WHILE
if (!條件) {
condition.await();
}
while (!條件) {
condition.await();
}
上代碼(生產(chǎn)者消費(fèi)者模式)
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description: 緩沖隊(duì)列
* @Author: lsw
* @Version: 1.0
*/
public class BoundedBuffer {
final Lock lock = new ReentrantLock(); // 鎖對(duì)象
final Condition notFull = lock.newCondition(); // 寫條件
final Condition notEmpty = lock.newCondition(); // 讀條件
final Object[] items = new Object[100]; // 容器
int putIdx, // 寫索引
takeIdx, // 讀索引
count; // 當(dāng)前數(shù)量
public void put(Object it) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await(); // 當(dāng)容器存滿時(shí),使寫線程等待
}
// 正常情況
items[putIdx] = it;
putIdx++;
// 存到尾部,則再?gòu)念^開(kāi)始
if (putIdx == items.length) {
putIdx = 0;
}
count++;
// 存入對(duì)象,就通知讀線程進(jìn)行讀取
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // 當(dāng)容器空,則使讀線程等待
}
// 正常情況
Object it = items[takeIdx];
takeIdx++;
// 如果讀到尾部,則從頭開(kāi)始
if (takeIdx == items.length) {
takeIdx = 0;
}
count--;
// 喚醒寫線程
notFull.signal();
return it;
} finally {
lock.unlock();
}
}
}
通過(guò)針對(duì)同一個(gè)Lock創(chuàng)建多個(gè)Condition,可以非常靈活的控制各個(gè)線程執(zhí)行或者等待。這就是Condition的強(qiáng)大之處
使用Lock實(shí)現(xiàn)加鎖解鎖以及Condition對(duì)線程進(jìn)行狀態(tài)操作時(shí),底層都會(huì)用到LockSupport.part()
或者LockSupport.unpark()
。下面我們來(lái)研究下這個(gè)工具類。
public class LockSupport {
static void park() {}
static void park(Object blocker) {}
static void parkNanos(long nanos) {}
static void parkNanos(Object blocker, long nanos) {}
static void parkUntil(long deadline) {}
static void park(Object blocker, long deadline) {}
static void unpark(Thread t) {}
}
park()
方法的作用是使當(dāng)前線程進(jìn)入等待WAITING隊(duì)列,直到調(diào)用unpark()
或者響應(yīng)中斷
parkNanos()
方法是指使當(dāng)前線程進(jìn)入等待隊(duì)列,且等待時(shí)間不可超過(guò)指定的時(shí)長(zhǎng)
parkUntil()
方法是指使當(dāng)前線程進(jìn)入等待隊(duì)列,直到某個(gè)截止時(shí)間退出等待
參數(shù)blocker
是可用于記錄導(dǎo)致線程等待的對(duì)象,方便排查問(wèn)題
unpark()
用于喚醒指定的線程
這些功能的底層是調(diào)用的Unsafe
本地類庫(kù)的UNSAFE.park()
和UNSAFE.unpark()