并發(fā)編程的原則
10年積累的成都網(wǎng)站設(shè)計、網(wǎng)站制作經(jīng)驗,可以快速應對客戶對網(wǎng)站的新想法和需求。提供各種問題對應的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡服務。我雖然不認識你,你也不認識我。但先網(wǎng)站設(shè)計后付款的網(wǎng)站建設(shè)流程,更有宜黃免費網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
原子性是指在一個操作中就是cpu不可以在中途暫停然后再調(diào)度,既不被中斷操作,即一個操作或者多個操作 要么全部執(zhí)行并且執(zhí)行的過程不會被任何因素打斷,要么就都不執(zhí)行。
對于可見性,Java提供了volatile關(guān)鍵字來保證可見性。當一個共享變量被volatile修飾時,它會保證修改的值會立即被更新到主存,當有其他線程需要讀取時,它會去內(nèi)存中讀取新值。而普通的共享變量不能保證可見性,因為普通共享變量被修改之后,什么時候被寫入主存是不確定的,當其他線程去讀取時,此時內(nèi)存中可能還是原來的舊值,因此無法保證可見性。另外,通過synchronized和Lock也能夠保證可見性,synchronized和Lock能保證同一時刻只有一個線程獲取鎖然后執(zhí)行同步代碼,并且在釋放鎖之前會將對變量的修改刷新到主存當中。
在Java內(nèi)存模型中,允許編譯器和處理器對指令進行重新排序,但是重新排序過程不會影響到單線程程序的執(zhí)行,卻會影響到多線程并發(fā)執(zhí)行的正確性。
Runnable和Thread
這里只說一下實現(xiàn)Runnable接口和繼承Thread類的區(qū)別:以賣10張票的任務為例,如果繼承Thread類的話,啟動三個線程就相當于開了三個窗口,每個窗口都有賣10張票的任務,各賣各的;如果實現(xiàn)Runnable接口的話,啟動三個線程相當開了三個窗口賣票,這三個窗口一共賣10張票。
1.?synchronized對象鎖
synchronized(this)和synchronized方法都是鎖當前對象,synchronized(obj)鎖臨界對象。使用synchronized的話最好是鎖臨界對象。如果想要使得任意多個線程任意多個用戶訪問的時候都不出任何問題,可以考慮一下用鎖當前對象的方法,因為鎖當前對象量級較重,所以一般不用。
如下面Sync類中的兩個方法test_01和test_02()鎖的都是程序創(chuàng)建的Sync對象,細粒度控制推薦用test_02()。
public synchronized void test_01() {
System.out.println("鎖當前對象");
}
public void test_02() {
synchronized (this) {
System.out.println("鎖當前對象");
}
}
下面這個方法鎖的是Sync對象中的object對象(即臨界對象)
public void test_03() {
synchronized (object) {
System.out.println("鎖臨界對象");
}
}
2.?synchronized使用在靜態(tài)方法中鎖定當前類
靜態(tài)同步方法鎖的是當前類型的類對象,如在Sync類中的static test_04()方法上加了同步鎖synchronized,那么此時synchronized鎖的是Sync.class。
// 下面兩個方法都是靜態(tài)同步方法
public static synchronized void test_04() {
System.out.println("鎖Sync.class");
}
public static void test_05() {
synchronized (Sync.class) {
System.out.println("鎖Sync.class類");
}
}
3.?synchronized作用于靜態(tài)和非靜態(tài)方法的區(qū)別
synchronized作用與非靜態(tài)方法,相當于鎖定單個對象,不同對象之間沒有競爭關(guān)系;而作用于靜態(tài)方法時,鎖加載類上,即鎖定class,這時相當于所有對象競爭同一把鎖。
如下例子,線程1會在i=5的時候拋出異常,此時線程1鎖被釋放,線程2開始調(diào)用方法。
public class Test {
static class Test02 implements Runnable {
private int i = 0;
@Override
public synchronized void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + "_" + i++);
if (i == 5) { // 當i==5時拋出異常,鎖被釋放
i = 1 / 0;
}
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException ignored) { }
}
}
}
public static void main(String[] args) {
Test02 test02 = new Test02();
new Thread(test02, "LQ").start();
new Thread(test02, "WH").start();
}
}
在下面代碼中,object被LQ鎖定,WH阻塞。
public class Test {
static Object object = new Object();
void m() {
System.out.println(Thread.currentThread().getName() + " start...");
synchronized (object){
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
}
}
}
static class Test01 implements Runnable {
@Override
public void run() {
new Test().m();
}
}
static class Test02 implements Runnable {
@Override
public void run() {
new Test().m();
}
}
public static void main(String[] args) {
Test01 test01 = new Test01();
Thread thread = new Thread(test01, "LQ");
thread.start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception ignored) {}
Test02 test02 = new Test02();
thread = new Thread(test02, "WH");
thread.start();
}
}
在WH線程中新創(chuàng)建了一個Object,WH正常運行。
public class Test {
static Object object = new Object();
void m() {
System.out.println(Thread.currentThread().getName() + " start...");
synchronized (object) {
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ignored){}
System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
}
}
}
static class Test01 implements Runnable {
@Override
public void run() {
new Test().m();
}
}
static class Test02 implements Runnable {
@Override
public void run() {
object = new Object();
new Test().m();
}
}
public static void main(String[] args) {
Test01 test01 = new Test01();
Thread thread = new Thread(test01, "LQ");
thread.start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception ignored) {}
Test02 test02 = new Test02();
thread = new Thread(test02, "WH");
thread.start();
}
}
上面代碼中,WH線程啟動后會一只處于等待狀態(tài),因為object被LQ線程鎖著,但如果在WH線程中重新new Object()并賦值給object,這樣的話WH線程就能夠正常運行了,原因是:同步鎖鎖定的是對內(nèi)存中的對象,所以LQ鎖定的是第一次new的對象而WH鎖定的是第二次new的對象,如下圖。
?
對于常量:String a = “aaa” 和String b = “aaa”是同一個對象,因此,假如A方法鎖定了a,B方法鎖定了b,啟動LQ線程調(diào)用A方法,然后啟動WH線程調(diào)用B方法,這樣的話WH線程會等到LQ線程結(jié)束后才執(zhí)行。因此,在定義同步代碼塊時,不要使用常量作為鎖的目標對象。
volatile關(guān)鍵字
計算機中有CPU、內(nèi)存和緩存,當CPU運行的時候,默認找緩存中的數(shù)據(jù)。當CPU有中斷的時候,根據(jù)操作系統(tǒng)對CPU的管理特性,可能會清空緩存,重新將內(nèi)存中的數(shù)據(jù)讀到緩存中,也可能不清空緩存,仍舊使用緩存中的數(shù)據(jù)進行后續(xù)的計算。如果CPU不中斷的話,默認CPU只會找緩存數(shù)據(jù)。volatile這個關(guān)鍵字不是改變緩存數(shù)據(jù)特性的,而是直接改變內(nèi)存中的數(shù)據(jù)特性,當對一個對象加了volatile關(guān)鍵字修飾的時候,相當于通知了底層OS操作系統(tǒng),告訴CPU每次進行計算的時候最好去看一下內(nèi)存數(shù)據(jù)是否發(fā)生了變更,這就是內(nèi)存的可見性。volatile關(guān)鍵字就是為了保證內(nèi)存的可見性。
如下代碼會發(fā)生死鎖現(xiàn)象。
public class Volatile01 {
private static boolean b = true;
private void m() {
System.out.println("start...");
while (b) {}
System.out.println("end...");
}
static class Volatile_01 implements Runnable {
@Override
public void run() {
new Volatile01().m();
}
}
public static void main(String[] args) {
Volatile_01 = new Volatile_01();
new Thread(volatile_01).start();
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException ignored) {}
b = false;
}
}
當將上述代碼塊中的共享變量b用volatile修飾時(保證了可見性),就能夠跳出循環(huán)了。
public class Volatile01 {
private static volatile boolean b = true;
private void m() {
System.out.println("start...");
while (b){}
System.out.println("end...");
}
static class Volatile_01 implements Runnable {
@Override
public void run(){
new Volatile01().m();
}
}
public static void main(String[] args) {
Volatile_01 = new Volatile_01();
new Thread(volatile_01).start();
try{
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException ignored){}
b = false;
}
}
join()方法
將多個線程連在一起,阻塞線程,直到調(diào)用join的線程執(zhí)行完成。
如下程序打印的結(jié)果時100000,如果不用join()的話打印的結(jié)果將遠遠小于100000。用join()可以用來等待一組線程執(zhí)行完畢后再進行后續(xù)邏輯處理,以保證數(shù)據(jù)的正確。
public class Test {
private static volatile int count = 0;
private void m() {
for (int i = 0; i < 10000; i++) {
count++;
}
}
static class Test02 implements Runnable {
@Override
public synchronized void run() {
new Test().m();
}
}
public static void main(String[] args) {
Test02 test02 = new Test02();
List threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(test02));
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(count);
}
}
上述代碼中用了synchronized關(guān)鍵字來實現(xiàn)原子性,也可以不用synchronized而用AtomicInteger對象,因為AtomicInteger是一個原子性操作對象,代碼如下。
public class Test{
private static AtomicInteger count = new AtomicInteger();
private void m(){
for (int i = 0; i < 10000; i++){
count.incrementAndGet();
}
}
static class Test02 implements Runnable{
@Override
public void run(){
new Test().m();
}
}
public static void main(String[] args){
Test02 test02 = new Test02();
List threads = new ArrayList<>();
for (int i = 0; i < 10; i++){
threads.add(new Thread(test02));
}
for (Thread thread : threads){
thread.start();
try{
thread.join();
}catch (InterruptedException e){
e.printStackTrace();
}
}
System.out.println(count);
}
}
CountDownLatch對象
CountDownLatch相當于一個門閂,在創(chuàng)建門閂對象的時候可以指定鎖的個數(shù),若某個方法調(diào)用了門閂的await()方法,那么該方法執(zhí)行到await()的時候會被阻塞等待門閂釋放,當門閂上沒有鎖也就是門閂開放的時候繼續(xù)執(zhí)行。減門閂上鎖的方法時countDown()。
如下例,當在m1中調(diào)用了await(),在m2中調(diào)用了countDown(),因此根據(jù)m2的邏輯當m2執(zhí)行完了之后門閂上的鎖數(shù)量就為0了,此時m1方法可以繼續(xù)執(zhí)行了。
public class Test {
private CountDownLatch countDownLatch = new CountDownLatch(5);
private void m1() {
try {
countDownLatch.await(); // 等待門閂開放
} catch (Exception ignored) {
}
System.out.println("method m1.");
}
private void m2() {
while (countDownLatch.getCount() != 0) {
countDownLatch.countDown(); // 減門閂上的鎖
System.out.println("method m2");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
}
}
public static void main(String[] args) {
Test count01 = new Test();
new Thread(count01::m2).start();
new Thread(count01::m1).start();
}
}
門閂可以和鎖混合使用,或替代鎖的功能,再門閂開放之前等待,當門閂完全開放之后執(zhí)行,可避免鎖的效率低下問題。
wait()、notify()和notifyAll()
wait():在對象上調(diào)用wait(), 會使當前線程進入等待狀態(tài), 直至另一個線程對這個對象調(diào)用了notify() 或notifyAll() 方法喚醒線程。
notify():喚醒對象正在等待的一個線程。
notifyAll():當調(diào)用對象的notifyAll()方法時,所有waiting狀態(tài)的線程都會被喚醒。
(生產(chǎn)者消費者)自定義同步容器,容器上限為10,可以在多線程中應用,并保證數(shù)據(jù)線程安全。
public class DeviceSingleton {
private DeviceSingleton() {
}
private final int max = 10;
private int count = 0;
private static final DeviceSingleton DEVICE_SINGLETON = new DeviceSingleton();
public static DeviceSingleton getInstance() {
return DEVICE_SINGLETON;
}
private final List devices = new ArrayList<>();
/**
* 添加
*/
public synchronized void add(E data) {
// 當容器滿了之后進入等待狀態(tài)
while (devices.size() == max) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("add: " + data);
ThreadUtils.sleep(1000);
devices.add(data);
count++;
this.notify();
}
/**
* 獲取
*/
public synchronized E get() {
E data = null;
while (devices.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ThreadUtils.sleep(1000);
data = devices.remove(0);
count--;
this.notifyAll();
return data;
}
/**
* 獲取長度
*/
public synchronized int size() {
return count;
}
@Data
static class Device {
private int id;
private String name;
public Device(int id, String name) {
this.id = id;
this.name = name;
}
}
static class ThreadUtils {
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (Exception ignore) {}
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
DeviceSingleton deviceSingleton = DeviceSingleton.getInstance();
for (int i = 0; i < 10; i++) {
new Thread(() ->
{
for (int j = 0; j < 5; j++) {
System.out.println(deviceSingleton.get());
}
}, "consumer-" + i).start();
}
Thread.sleep(2000);
for (int i = 0; i < 2; i++) {
new Thread(() ->
{
for (int j = 0; j < 25; j++) {
deviceSingleton.add(new DeviceSingleton.Device(j, "device " + j));
}
}, "producer").start();
}
}
}
ReentrantLock鎖
為盡量避免使用synchronized和同步方法出現(xiàn)的一種多線程鎖機制,建議使用的同步方式,效率比synchronized高。使用重入鎖時,需要手動釋放鎖(lock.unlock())。示例如下:
public class ReentrantLockTest {
private final Lock lock = new ReentrantLock();
private void m1() {
lock.lock(); // 加鎖
for (int i = 0; i < 10; i++) {
System.out.println("method m1() " + i);
ThreadUtils.sleep(1000);
}
lock.unlock(); // 解鎖
}
private void m2() {
lock.lock(); // 加鎖
System.out.println("method m2()");
lock.unlock(); // 解鎖
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
new Thread(reentrantLockTest::m1).start();
new Thread(reentrantLockTest::m2).start();
}
}
如果沒有獲取到鎖標記則返回false,當前線程等待,如果獲取到了鎖標記,則返回true,當前線程被鎖定執(zhí)行。示例如下:
public class ReentrantLockTest {
private Lock lock = new ReentrantLock();
private void m1() {
lock.lock(); // 加鎖
for (int i = 0; i < 10; i++) {
ThreadUtils.sleep(1000);
System.out.println("method m1() " + i);
}
lock.unlock(); // 解鎖
}
private void m2() {
boolean isLocked = false;
try {
/*
嘗試鎖,如果有鎖,則無法獲取鎖標記,返回false,否則返回true
如果無法獲取到鎖標記,則說明別的線程正在使用鎖,該線程等待
如果獲取到了鎖標記,則該線程的代碼塊被鎖定
下面是獲取鎖標記的無參方法,當執(zhí)行到該語句的時候立刻獲取鎖標記
也可以用有參的,即當執(zhí)行到該語句多長時間之內(nèi)獲取鎖標記,如果超時,不等待,直接返回。如isLocked = lock.tryLock(5, TimeUnit.SECONDS);表示5秒之內(nèi)獲取鎖標記(5秒之內(nèi)任何時間獲取到鎖標記都會繼續(xù)執(zhí)行),如果超時則直接返回。
*/
isLocked = lock.tryLock();
System.out.println(isLocked ? "m2() synchronized" : "m2() unsynchronized");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 嘗試鎖在解除鎖標記的時候一定要判斷是否獲取到鎖標記
if (isLocked) {
lock.unlock();
}
}
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
new Thread(reentrantLockTest::m1).start();
new Thread(reentrantLockTest::m2).start();
}
}
非可中斷鎖當客戶端調(diào)用interrupt方法時,只是簡單的去設(shè)置interrupted中斷狀態(tài),并沒有進一步拋出異常,而可中斷鎖在監(jiān)測到中斷請求時會拋出InterruptedException ,進而中斷線程執(zhí)行。示例如下:
public class ReentrantLockTest {
private Lock lock = new ReentrantLock();
private void m1() {
lock.lock(); // 加鎖
for (int i = 0; i < 5; i++) {
ThreadUtils.sleep(1000);
System.out.println("method m1() " + i);
}
lock.unlock(); // 解鎖
}
private void m2() {
try {
/*
可打斷鎖,阻塞等待鎖,可以被其他的線程打斷阻塞狀態(tài)
*/
lock.lockInterruptibly(); // 可嘗試打斷
System.out.println("method m2()");
} catch (InterruptedException e) {
System.out.println("鎖被打斷");
} finally {
try {
lock.unlock();
} catch (Exception ignored) {
}
}
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
Thread thread1 = new Thread(reentrantLockTest::m1);
thread1.start();
ThreadUtils.sleep(1000);
Thread thread2 = new Thread(reentrantLockTest::m2);
thread2.start();
ThreadUtils.sleep(1000);
thread2.interrupt(); // 打斷線程休眠
}
}
注意:用ReentrantLock打斷鎖,如果要打斷的話是用線程打斷,跟喚醒不同,notifyAll喚醒是用對象區(qū)喚醒。(打斷thread.interruped(); 喚醒object.notifyAll())。
線程打斷有什么用呢?
我們在用Windows的時候經(jīng)常會遇到軟件鎖死的問題,這時候我們往往會通過打開任務管理器來結(jié)束進程,這種結(jié)束進程可以認為是打斷鎖的阻塞狀態(tài)(即非正常結(jié)束)。
先到先得。若沒有特殊情況,不建議使用公平鎖,如果使用公平鎖的話,一般來說并發(fā)量<=10,如果并發(fā)量較大,而不可避免的有訪問先后順序的話,建議采用別的方法。
public class ReentrantLockTest {
static class TestReentrantLock extends Thread {
// 在創(chuàng)建ReentrantLock對象的時候傳參為true就代表創(chuàng)建公平鎖
private ReentrantLock lock = new ReentrantLock(true);
public void run() {
for (int i = 0; i < 5; i++) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " get lock.");
ThreadUtils.sleep(1000);
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
TestReentrantLock lock = new TestReentrantLock();
lock.start();
new Thread(lock).start();
new Thread(lock).start();
}
}
為Lock增加條件,當條件滿足時做一些事情,如加鎖或解鎖、等待或喚醒等。下面示例就是使用Condition實現(xiàn)的生產(chǎn)者消費者。
public class DeviceContainer
private DeviceContainer() {
}
private static final DeviceContainer DEVICE_CONTAINER = new DeviceContainer<>();
public static DeviceContainer getInstance() {
return DEVICE_CONTAINER;
}
private final List list = new LinkedList<>();
private final int max = 10;
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void add(T t) {
lock.lock();
try {
while (this.size() == max) {
System.out.println(Thread.currentThread().getName() + " 等待");
// 當數(shù)據(jù)長度為max的時候,生產(chǎn)者進入等待隊列,釋放鎖標記
// 借助條件進入的等待隊列
producer.await();
}
System.out.println(Thread.currentThread().getName() + " 添加");
list.add(t);
count++;
// 借助條件喚醒所有的消費者
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
T t = null;
lock.lock();
try {
while (this.size() == 0) {
System.out.println(Thread.currentThread().getName() + " 等待");
// 借助條件使消費者進入等待隊列
consumer.await();
}
System.out.println(Thread.currentThread().getName() + " 獲取");
t = list.remove(0);
count--;
// 借助條件喚醒所有生產(chǎn)者
producer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
private int size() {
return count;
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
DeviceContainer
for (int i = 0; i < 10; i++) {
new Thread(() ->
{
for (int j = 0; j < 5; j++) {
System.out.println(deviceSingleton.get());
}
}, "consumer-" + i).start();
}
ThreadUtils.sleep(1000);
for (int i = 0; i < 2; i++) {
new Thread(() ->
{
for (int j = 0; j < 25; j++) {
deviceSingleton.add(new Device(j, "device " + j));
}
}, "producer-" + i).start();
}
}
}
ConcurrentHashMap/ConcurrentHashSet:底層哈希實現(xiàn)的Map/Set,效率高,使用底層技術(shù)實現(xiàn)的線程安全,量級較synchronized輕。key和value不能為null(不同于HashMap和HashSet)
ConcurrentSkipListMap/ConcurrentSkipListSet:底層跳表實現(xiàn)的Map/Set,有序,線程安全,效率較ConcurrentHashMap/ConcurrentHashSet低。
CopyOnWriteArraySet:底層數(shù)組,線程安全,增加和刪除效率低,查詢效率高。
CopyOnWriteArrayList:底層數(shù)組,線程安全,增加和刪除效率低,查詢效率高。
ConcurrentLinkedQueue/ ConcurrentLinkedDeue:基礎(chǔ)鏈表同步隊列,非阻塞,ConcurrentLinkedQueue底層單向鏈表,ConcurrentLinkedDeue底層雙向鏈表,均***。
ArrayBlockingQueue/LinkedBlockingQueue:阻塞隊列,隊列容量不足自動阻塞,隊列容量為0自動阻塞。ArrayBlockingQueue底層使用數(shù)組,有界;LinkedBlockingQueue底層使用鏈表,默認***。ArrayBlockingQueue根據(jù)調(diào)用API的不同,有不同的特性。當容量不足的時候有阻塞能力。add方法在容量不足的時候會拋出異常;put方法在容量不足時阻塞等待;offer默認不阻塞,當容量不足的時候返回false,否則返回true;三參offer可設(shè)定阻塞時長,若在阻塞時長內(nèi)有容量空閑,則添加并返回true,如果阻塞時長范圍內(nèi)無容量空閑,放棄新增數(shù)據(jù)并返回false。LinkedBlockingQueue的add方法在容量不足的時候會拋出異常;offer方法在容量不足時返回false,否則返回true;三參offer可設(shè)定阻塞時長,若在阻塞時長內(nèi)有容量空閑,則添加并返回true,如果阻塞時長范圍內(nèi)無容量空閑,放棄新增數(shù)據(jù)并返回false。
PriorityQueue:有限集隊列,底層數(shù)組,***。
PriorityBlockingQueue:優(yōu)先級阻塞隊列,底層數(shù)組,***。
LinkedTransferQueue:轉(zhuǎn)移隊列,使用transfer方法實現(xiàn)數(shù)據(jù)的即時處理。隊列使用add保存數(shù)據(jù),不做阻塞等待。transfer是TransferQueue的特有方法,轉(zhuǎn)移隊列必須要有消費者(take()方法的調(diào)用者)。如果沒有任何線程消費數(shù)據(jù),則transfer方法阻塞。一般用于處理即時消息。
SynchronousQueue:阻塞的同步隊列,有界。是一個容量為0的隊列,是一個特殊的TransferQuque。必須先有消費線程等待才能使用的隊列。add方法無阻塞,若沒有消費線程阻塞等待數(shù)據(jù),則拋出異常。put方法有阻塞,若沒有消費線程阻塞等待數(shù)據(jù),則put方法阻塞。
DelayQueue:延時阻塞隊列,***。類似輪詢機制,一般用來做定時任務。業(yè)務場景舉例:具有過期時間的緩存,訂單過期自動取消等。
?
線程池
線程池是一個進程級的資源,默認的生命周期和JVM一致,即從開啟線程池開始,到JVM關(guān)閉為止,是線程池的默認生命周期。如果顯式調(diào)用shutdown方法,那么線程池執(zhí)行所有的任務后自動關(guān)閉。
Executor接口
線程池頂級接口。Executor中只有一個方法execute,是用來處理任務的一個服務方法。調(diào)用者提供Runnable接口的實現(xiàn),線程池通過執(zhí)行線程執(zhí)行這個Runnable。
public class Executor01 {
public static void main(String[] args) {
new Executor_01().execute(() ->
System.out.println(Thread.currentThread().getName() + " test executor.")
);
}
static class Executor_01 implements Executor {@Override
br/>@Override
new Thread(command).start();
}
}
}
ExecutorService
Executor的子接口,與Executor不同的是,它還提供了一個返回值為Future的服務方法submit。
Executors工具類
Executor的工具類,為線程池提供工具方法,可快速創(chuàng)建線程池,所有的線程池類型都實現(xiàn)了這個接口,實現(xiàn)了這個接口就代表有提供線程池的能力。常用方法有:void execute(),F(xiàn)uture submit(Callable),F(xiàn)uture submit(Runnable),void shutdown,boolean isShutdown(),boolean isTerminated()。
public class Test {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建一個長度為5的線程池對象
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " executor.");
ThreadUtils.sleep(1000);
});
}
System.out.println(executorService);
// 優(yōu)雅關(guān)閉
executorService.shutdown();
// 是否已經(jīng)結(jié)束,相當于判斷是否回收了資源,因為線程睡眠,此時還未回收,因此為false
System.out.println(executorService.isTerminated());
// 是否已經(jīng)關(guān)閉,即是否調(diào)用過shutdown方法
System.out.println(executorService.isShutdown());
System.out.println(executorService);
ThreadUtils.sleep(1000);
// 因為上面睡了5秒,任務都已經(jīng)執(zhí)行完了,資源也被回收了,因此為true
System.out.println(executorService.isTerminated());
System.out.println(executorService.isShutdown());
System.out.println(executorService);
}
}
Future
未來結(jié)果,代表線程執(zhí)行結(jié)束后的結(jié)果。通過get方法獲取線程執(zhí)行結(jié)果。
常用方法:get()、get(long, TimeUnit)和isDown()。
get():阻塞等待線程執(zhí)行結(jié)束并得到返回結(jié)果;
get(long, TimeUnit):阻塞固定時長,等待線程結(jié)束后的結(jié)果,如果在阻塞時長范圍內(nèi)線程未執(zhí)行結(jié)束,拋出異常。
isDown():判斷線程是否結(jié)束即判斷call方法是否已完成,要特別注意,這里的isDown與ExecutorService中的isShutdown不同,isShutdown是用來判斷線程是否關(guān)閉的。
public class ExecutorServiceTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
testExecutorService();
}
private static void testExecutorService() throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(1);
Future future = service.submit(() -> {
ThreadUtils.sleep(1000);
return Thread.currentThread().getName() + " submit.";
});
// 查看任務是否完成即線程是否結(jié)束即call方法是否執(zhí)行結(jié)束,
// 要注意的是,這里判斷是否結(jié)束,跟ExecutorService中的isShutDowm不同, isShutdowm是判斷線程是否結(jié)束,而shutdown表示關(guān)閉線程
System.out.println(future.isDone());
// 獲取call方法的返回值
System.out.println(future.get()); // false
System.out.println(future.isDone());
System.out.println(future.get()); // true
// 關(guān)閉線程池
service.shutdown();
}
}
Callable接口
可執(zhí)行接口。類似Runnable接口,也是可以啟動線程的接口。
接口方法:call(),相當于Runnable中的run方法,區(qū)別在于call方法有返回值。
Callable和Runnable的選擇:當需要返回值或需要拋出異常時,使用Callable,其他情況任意選。
ThreadPoolExecutor創(chuàng)建線程池
通過new ThreadPoolExecutor來創(chuàng)建,下圖是ThreadPoolExecutor的三個構(gòu)造方法:
參數(shù)說明:
corePoolSize? 核心線程數(shù)
maximumPoolSize? 最大線程數(shù)
keepAliveTime? 線程最大空閑時間
unitTimeUnit? 時間單位
workQueueBlockingQueue
threadFactoryThreadFactory? 線程創(chuàng)建工廠
handlerRejectedExecutionHandler? 拒絕策略
?
核心線程數(shù)和最大線程數(shù):
當提交一個新任務到線程池時首先判斷核心線程數(shù)corePoolSize是否已達上限,若未達到corePoolSize上限,創(chuàng)建一個工作線程來執(zhí)行任務;否則,再判斷線程池工作隊列workQueueBlockingQueue是否已滿,若沒滿,則將新提交的任務存儲在工作隊列里;否則,線程池將判斷最大線程數(shù)是否已達上限,若未達到maximumPoolSize上限,則創(chuàng)建一個新的工作線程來執(zhí)行任務,滿了,則交給飽和策略來處理這個任務。如果線程池中的線程數(shù)量大于核心線程數(shù) corePoolSize 時,線程空閑時間超過線程最大空閑時間keepAliveTime,則線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize。
自定義線程池
public class ExecutorThreadPoolTest {
public static void main(String[] args) {
testExecutorThreadPool();
}
private static void testExecutorThreadPool() {
// 創(chuàng)建線程池,核心線程數(shù)為2,最大線程數(shù)為4,最大空閑時間為10
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
4,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new MyTreadFactory(),
new MyIgnorePolicy());
// 啟動所有核心線程,使其出與等待狀態(tài)
executor.prestartAllCoreThreads();
// 創(chuàng)建并執(zhí)行任務
for (int i = 1; i <= 10; i++) {
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
}
}
static class MyTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(runnable, "線程【" + mThreadNum.getAndIncrement() + "】");
System.out.println(t.getName() + " 已創(chuàng)建");
return t;
}
}
public static class MyIgnorePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
doLog(runnable, executor);
}
private void doLog(Runnable runnable, ThreadPoolExecutor executor) {
System.err.println(runnable.toString() + " 被拒絕");
}
}
@Data
static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(this.toString() + " 正在運行");
ThreadUtils.sleep(1000);
}
@Override
public String toString() {
return "線程【" + name + "】";
}
}
}
FixedThreadPool線程池
固定容量的線程池,可由Executors來創(chuàng)建,活動狀態(tài)和線程池容量是有上限的,需要手動銷毀線程池。構(gòu)造方法如下:
由此可見,該線程池核心線程數(shù)和最大線程數(shù)均為構(gòu)造參數(shù)值nThreads,線程最大空閑時間為0,任務隊列采用LinkedBlockingQueue,默認容量上限是Integer.MAX_VALUE。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
// 創(chuàng)建容量為10的FixedThreadPool線程池
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
service.execute(()-> System.out.println(Thread.currentThread().getName()));
}
// 銷毀線程池
service.shutdown();
}
}
CachedThreadPool線程池
緩存線程池,通過Executors來創(chuàng)建,默認最大容量為Integer.MAX_VALUE,自動擴容,執(zhí)行完后自動銷毀(這一點與FixedThreadPool不同,F(xiàn)ixedThreadPool的銷毀需要手動調(diào)用shutdown方法)。構(gòu)造方法如下:
由構(gòu)造方法可見,核心線程數(shù)為0,最大線程數(shù)為Integer.MAX_VALUE,最大空閑時間為60秒,任務隊列使用SynchronousQueue。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
// 創(chuàng)建緩存線程池
ExecutorService service = Executors.newCachedThreadPool();
System.out.println(service);
for (int i = 0; i < 5; i++) {
service.execute(() -> {
ThreadUtils.sleep(1000);
System.out.println(Thread.currentThread().getName() + " executor.");
});
}
System.out.println(service);
ThreadUtils.sleep(65);
System.out.println(service);
}
}
ScheduledThreadPool線程池
計劃任務線程池,可以根據(jù)任務自動執(zhí)行計劃的線程池,由Executors創(chuàng)建,需要手動銷毀。計劃任務時選用,如需要定時整理數(shù)據(jù)、服務器定期清除無效文件等。構(gòu)造方法如下:
核心線程數(shù)為構(gòu)造參數(shù)大小,最大線程數(shù)為Integer.MAX_VALUE,最大空閑時間0,任務隊列使用DelayedWorkQuquq。
常用方法有:scheduledAtFixedRate、schedule、execute等。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
// 創(chuàng)建計劃任務線程池
ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
System.out.println(service);
// 定期任務,線程池啟動500毫秒后第一次執(zhí)行任務,以后每300毫秒執(zhí)行一次
service.scheduleAtFixedRate(() -> {
ThreadUtils.sleep(1000);
System.out.println(Thread.currentThread().getName() + " executor.");
}, 500, 300, TimeUnit.MILLISECONDS);
System.out.println(service);
service.shutdown();
}
}
SingleThreadExecutor線程池
單一容量的線程池。需要手動銷毀。有保證任務順序需求時可選用。如大廳中的公共頻道聊天,固定數(shù)量商品的秒殺等。構(gòu)造方法如下:
核心線程數(shù)和最大線程數(shù)均為1,任務隊列為LinkedBlockingQueue。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
// 創(chuàng)建單一容量線程池
ExecutorService service = Executors.newSingleThreadExecutor();
System.out.println(service);
for (int i = 0; i < 5; i++) {
service.execute(() -> {
System.out.println(Thread.currentThread().getName() + " executor.");
ThreadUtils.sleep(1000);
});
}
service.shutdown();
}
}
ForkJoinPool線程池
分支合并線程池,適用于處理復雜任務。初始化線程容量與CPU核心數(shù)有關(guān)。
ForkJoinPool沒有所謂的容量,默認都是一個線程,根據(jù)任務自動分支新的子線程,,當子線程結(jié)束后自動合并。所謂自動合并,是用fork和join兩個方法實現(xiàn)的(手動調(diào)用)。
線程池中運行的可分治合并的任務必須是ForkJoinTask的子類型(RecursiveTask或RecursiveAction,二者的區(qū)別在于一個運行完之后有返回值,一個沒有),其中提供了分支和合并能力。
ForkJoinTask提供了兩個抽象子類型RecursiveTask和RecursiveAction,RecursiveTask是有返回結(jié)果的分支合并任務,RecursiveAction是無返回結(jié)果的分支合并任務(類似Callable和Runnable的區(qū)別)。
ForkJoinTask提供了一個compute方法,這個方法里面就是任務的執(zhí)行邏輯。
該線程池主要用于大量數(shù)據(jù)的計算、數(shù)據(jù)分析等。
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long result = 0L;
for (int NUMBER : NUMBERS) {
result += NUMBER;
}
System.out.println(result);
ForkJoinPool pool = new ForkJoinPool();
// 分支合并任務
AddTask task = new AddTask(0, NUMBERS.length);
// 提交任務
Future future = pool.submit(task);
System.out.println(future.get());
}
private static final int[] NUMBERS = new int[1000000];
private static final int MAX_SIZE = 50000;
private static final Random RANDOM = new Random();
static {
for (int i = 0; i < NUMBERS.length; i++) {
NUMBERS[i] = RANDOM.nextInt(1000);
}
}
static class AddTask extends RecursiveTask {
int begin, end;
AddTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Long compute() {
if ((end - begin) < MAX_SIZE) {
long sum = 0L;
for (int i = begin; i < end; i++) {
sum += NUMBERS[i];
}
return sum;
} else {
// 當結(jié)束值減去開始值大于臨界值的時候進行分支
int middle = begin + (end - begin) / 2;
AddTask task1 = new AddTask(begin, middle);
AddTask task2 = new AddTask(middle, end);
// 分支的工作,就是開啟一個新的線程任務
task1.fork();
task2.fork();
// join就是合并,將任務的結(jié)果獲取,是一個阻塞方法,一定會得到結(jié)果數(shù)據(jù)
return task1.join() + task2.join();
}
}
}
}
一組線程的集合,線程組中多個線程執(zhí)行同一批任務,線程之間是隔離的,互不影響。同一組的線程之間可以通信,但不同組的線程之間不能通信,這樣就做到了線程屏蔽,保證了線程安全。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
ThreadGroup group = new ThreadGroup("LQ");
Thread thread = new Thread(group, () ->
System.out.println("group is " + Thread.currentThread().getThreadGroup().getName())
);
thread.start();
}
}
朋友們覺得內(nèi)容有什么錯誤、不足之處,或者有什么疑問,盡可留言指出來,一起學習哦。