本篇內(nèi)容主要講解“Java并發(fā)編程在各主流框架中怎么應用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java并發(fā)編程在各主流框架中怎么應用”吧!
讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務項目有:域名注冊、虛擬空間、營銷軟件、網(wǎng)站建設、定興網(wǎng)站維護、網(wǎng)站推廣。
JVM 規(guī)范定義了 Java 內(nèi)存模型來屏蔽掉各種操作系統(tǒng)、虛擬機實現(xiàn)廠商和硬件的內(nèi)存訪問差異,以確保 Java 程序在所有操作系統(tǒng)和平臺上能夠達到一致的內(nèi)存訪問效果。
Java 內(nèi)存模型規(guī)定所有的變量都存儲在主內(nèi)存中,每個線程都有自己獨立的工作內(nèi)存,工作內(nèi)存保存了對應該線程使用的變量的主內(nèi)存副本拷貝。 線程對這些變量的操作都在自己的工作內(nèi)存中進行,不能直接操作主內(nèi)存和其他工作內(nèi)存中存儲的變量或者變量副本。線程間的變量傳遞需通過主內(nèi)存來完成,三者的關(guān)系如下圖所示。
Java 內(nèi)存模型定義了 8 種操作來完成主內(nèi)存和工作內(nèi)存的變量訪問,具體如下。
?read:把一個變量的值從主內(nèi)存?zhèn)鬏數(shù)骄€程的工作內(nèi)存中,以便隨后的 load 動作使用。?load:把從主內(nèi)存中讀取的變量值載入工作內(nèi)存的變量副本中。?use:把工作內(nèi)存中一個變量的值傳遞給 Java 虛擬機執(zhí)行引擎。?assign:把從執(zhí)行引擎接收到的變量的值賦值給工作內(nèi)存中的變量。?store:把工作內(nèi)存中一個變量的值傳送到主內(nèi)存中,以便隨后的 write 操作。?write:工作內(nèi)存?zhèn)鬟f過來的變量值放入主內(nèi)存中。?lock:把主內(nèi)存的一個變量標識為某個線程獨占的狀態(tài)。?unlock:把主內(nèi)存中 一個處于鎖定狀態(tài)的變量釋放出來,被釋放后的變量才可以被其他線程鎖定。
這個概念與事務中的原子性大概一致,表明此操作是不可分割,不可中斷的,要么全部執(zhí)行,要么全部不執(zhí)行。Java 內(nèi)存模型直接保證的原子性操作包括 read、load、use、assign、store、write、lock、unlock 這八個。
可見性是指當一個線程修改了共享變量的值,其他線程能夠立即得知這個修改。 Java 內(nèi)存模型是通過在變量修改后將新值同步回主內(nèi)存,在變量讀取前從主內(nèi)存刷新變量值這種依賴主內(nèi)存作為傳遞媒介的方式來實現(xiàn)可見性的,無論是普通變量還是 volatile 變量都是如此,普通變量與 volatile 變量的區(qū)別是,volatile 的特殊規(guī)則保證了新值能立即同步到主內(nèi)存,以及每次使用前立即從主內(nèi)存刷新。因此,可以說 volatile 保證了多線程操作時變量的可見性,而普通變量則不能保證這一點。除了 volatile 外,synchronized 也提供了可見性,synchronized 的可見性是由 “對一個變量執(zhí)行 unlock 操作 之前,必須先把此變量同步回主內(nèi)存中(執(zhí)行 store、write 操作)” 這條規(guī)則獲得。
單線程環(huán)境下,程序會 “有序的”執(zhí)行,即:線程內(nèi)表現(xiàn)為串行語義。但是在多線程環(huán)境下,由于指令重排,并發(fā)執(zhí)行的正確性會受到影響。在 Java 中使用 volatile 和 synchronized 關(guān)鍵字,可以保證多線程執(zhí)行的有序性。volatile 通過加入內(nèi)存屏障指令來禁止內(nèi)存的重排序。synchronized 通過加鎖,保證同一時刻只有一個線程來執(zhí)行同步代碼。
打開 NioEventLoop 的代碼中,有一個控制 IO 操作 和 其他任務運行比例的,用 volatile 修飾的 int 類型字段 ioRatio,代碼如下。
private volatile int ioRatio = 50;
這里為什么要用 volatile 修飾呢?我們首先對 volatile 關(guān)鍵字進行說明,然后再結(jié)合 Netty 的代碼進行分析。
關(guān)鍵字 volatile 是 Java 提供的最輕量級的同步機制,Java 內(nèi)存模型對 volatile 專門定義了一些特殊的訪問規(guī)則。下面我們就看它的規(guī)則。當一個變量被 volatile 修飾后,它將具備以下兩種特性。
?線程可見性:當一個線程修改了被 volatile 修飾的變量后,無論是否加鎖,其他線程都可以立即看到最新的修改,而普通變量卻做不到這點。?禁止指令重排序優(yōu)化:普通的變量僅僅保證在該方法的執(zhí)行過程中所有依賴賦值結(jié)果的地方都能獲取正確的結(jié)果,而不能保證變量賦值操作的順序與程序代碼的執(zhí)行順序一致。舉個簡單的例子說明下指令重排序優(yōu)化問題,代碼如下。
public class ThreadStopExample { private static boolean stop; public static void main(String[] args) throws InterruptedException { Thread workThread = new Thread(new Runnable() { public void run() { int i= 0; while (!stop) { i++; try{ TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }); workThread.start(); TimeUnit.SECONDS.sleep(3); stop = true; }}
我們預期程序會在 3s 后停止,但是實際上它會一直執(zhí)行下去,原因就是虛擬機對代碼進行了指令重排序和優(yōu)化,優(yōu)化后的指令如下。
if (!stop) While(true) ......
workThread 線程在執(zhí)行重排序后的代碼時,是無法發(fā)現(xiàn)變量 stop 被其它線程修改的,因此無法停止運行。要解決這個問題,只要將 stop 前增加 volatile 修飾符即可。volatile 解決了如下兩個問題。第一,主線程對 stop 的修改在 workThread 線程 中可見,也就是說 workThread 線程 立即看到了其他線程對于 stop 變量 的修改。第二,禁止指令重排序,防止因為重排序?qū)е碌牟l(fā)訪問邏輯混亂。
一些人認為使用 volatile 可以代替?zhèn)鹘y(tǒng)鎖,提升并發(fā)性能,這個認識是錯誤的。volatile 僅僅解決了可見性的問題,但是它并不能保證互斥性,也就是說多個線程并發(fā)修改某個變量時,依舊會產(chǎn)生多線程問題。因此,不能靠 volatile 來完全替代傳統(tǒng)的鎖。根據(jù)經(jīng)驗總結(jié),volatile 最適用的場景是 “ 一個線程寫,其他線程讀 ”,如果有多個線程并發(fā)寫操作,仍然需要使用鎖或者線程安全的容器或者原子變量來代替。下面我們繼續(xù)對 Netty 的源碼做分析。上面講到了 ioRatio 被定義成 volatile,下面看看代碼為什么要這樣定義。
final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
通過代碼分析我們發(fā)現(xiàn),在 NioEventLoop 線程 中,ioRatio 并沒有被修改,它是只讀操作。既然沒有修改,為什么要定義成 volatile 呢?繼續(xù)看代碼,我們發(fā)現(xiàn) NioEventLoop 提供了重新設置 IO 執(zhí)行時間比例的公共方法。
public void setIoRatio(int ioRatio) { if (ioRatio <= 0 || ioRatio > 100) { throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); } this.ioRatio = ioRatio; }
首先,NioEventLoop 線程 沒有調(diào)用該 set 方法,說明調(diào)整 IO 執(zhí)行時間比例 是外部發(fā)起的操作,通常是由業(yè)務的線程調(diào)用該方法,重新設置該參數(shù)。這樣就形成了一個線程寫、一個線程讀。根據(jù)前面針對 volatile 的應用總結(jié),此時可以使用 volatile 來代替?zhèn)鹘y(tǒng)的 synchronized 關(guān)鍵字,以提升并發(fā)訪問的性能。
ThreadLocal 又稱為線程本地存儲區(qū)(Thread Local Storage,簡稱為 TLS),每個線程都有自己的私有的本地存儲區(qū)域,不同線程之間彼此不能訪問對方的 TLS 區(qū)域。使用 ThreadLocal 變量 的 set(T value)
方法 可以將數(shù)據(jù)存入該線程本地存儲區(qū),使用 get() 方法 可以獲取到之前存入的值。
不使用 ThreadLocal。
public class SessionBean { public static class Session { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } } public Session createSession() { return new Session(); } public void setId(Session session, String id) { session.setId(id); } public String getId(Session session) { return session.getId(); } public static void main(String[] args) { //沒有使用ThreadLocal,在方法間共享session需要進行session在方法間的傳遞 new Thread(() -> { SessionBean bean = new SessionBean(); Session session = bean.createSession(); bean.setId(session, "susan"); System.out.println(bean.getId(session)); }).start(); }}
上述代碼中,session 需要在方法間傳遞才可以修改和讀取,保證線程中各方法操作的是一個。下面看一下使用 ThreadLocal 的代碼。
public class SessionBean {//定義一個靜態(tài)ThreadLocal變量session,就能夠保證各個線程有自己的一份,并且方法可以方便獲取,不用傳遞 private static ThreadLocalsession = new ThreadLocal<>(); public static class Session { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } } public void createSession() { session.set(new Session()); } public void setId(String id) { session.get().setId(id); } public String getId() { return session.get().getId(); } public static void main(String[] args) { new Thread(() -> { SessionBean bean = new SessionBean(); bean.createSession(); bean.setId("susan"); System.out.println(bean.getId()); }).start(); }}
在方法的內(nèi)部實現(xiàn)中,直接可以通過 session.get() 獲取到當前線程的 session,省掉了參數(shù)在方法間傳遞的環(huán)節(jié)。
一般,類屬性中的數(shù)據(jù)是多個線程共享的,但 ThreadLocal 類型的數(shù)據(jù) 聲明為類屬性,卻可以為每一個使用它(通過 set(T value)
方法)的線程存儲線程私有的數(shù)據(jù),通過其源碼我們可以發(fā)現(xiàn)其中的原理。
public class ThreadLocal{ /** * 下面的 getMap()方法 傳入當前線程,獲得一個ThreadLocalMap對象,說明每一個線程維護了 * 自己的一個 map,保證讀取出來的value是自己線程的。 * * ThreadLocalMap 是ThreadLocal靜態(tài)內(nèi)部類,存儲value的鍵值就是ThreadLocal本身。 * * 因此可以斷定,每個線程維護一個ThreadLocalMap的鍵值對映射Map。不同線程的Map的 key值 是一樣的, * 都是ThreadLocal,但 value 是不同的。 */ public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }}
Spring 事務處理的設計與實現(xiàn)中大量使用了 ThreadLocal 類,比如,TransactionSynchronizationManager 維護了一系列的 ThreadLocal 變量,用于存儲線程私有的 事務屬性及資源。源碼如下。
/** * 管理每個線程的資源和事務同步的中心幫助程序。供資源管理代碼使用,但不供典型應用程序代碼使用。 * * 資源管理代碼應該檢查線程綁定的資源,如,JDBC連接 或 Hibernate Sessions。 * 此類代碼通常不應該將資源綁定到線程,因為這是事務管理器的職責。另一個選項是, * 如果事務同步處于活動狀態(tài),則在首次使用時延遲綁定,以執(zhí)行跨任意數(shù)量資源的事務。 */public abstract class TransactionSynchronizationManager { /** * 一般是一個線程持有一個 獨立的事務,以相互隔離地處理各自的事務。 * 所以這里使用了很多 ThreadLocal對象,為每個線程綁定 對應的事務屬性及資源, * 以便后續(xù)使用時能直接獲取。 */ private static final ThreadLocal
Mybatis 的 SqlSession 對象 也是各線程私有的資源,所以對其的管理也使用到了 ThreadLocal 類。源碼如下。
public class SqlSessionManager implements SqlSessionFactory, SqlSession { private final ThreadLocallocalSqlSession = new ThreadLocal<>(); public void startManagedSession() { this.localSqlSession.set(openSession()); } public void startManagedSession(boolean autoCommit) { this.localSqlSession.set(openSession(autoCommit)); } public void startManagedSession(Connection connection) { this.localSqlSession.set(openSession(connection)); } public void startManagedSession(TransactionIsolationLevel level) { this.localSqlSession.set(openSession(level)); } public void startManagedSession(ExecutorType execType) { this.localSqlSession.set(openSession(execType)); } public void startManagedSession(ExecutorType execType, boolean autoCommit) { this.localSqlSession.set(openSession(execType, autoCommit)); } public void startManagedSession(ExecutorType execType, TransactionIsolationLevel level) { this.localSqlSession.set(openSession(execType, level)); } public void startManagedSession(ExecutorType execType, Connection connection) { this.localSqlSession.set(openSession(execType, connection)); } public boolean isManagedSessionStarted() { return this.localSqlSession.get() != null; } @Override public Connection getConnection() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot get connection. No managed session is started."); } return sqlSession.getConnection(); } @Override public void clearCache() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot clear the cache. No managed session is started."); } sqlSession.clearCache(); } @Override public void commit() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot commit. No managed session is started."); } sqlSession.commit(); } @Override public void commit(boolean force) { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot commit. No managed session is started."); } sqlSession.commit(force); } @Override public void rollback() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot rollback. No managed session is started."); } sqlSession.rollback(); } @Override public void rollback(boolean force) { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot rollback. No managed session is started."); } sqlSession.rollback(force); } @Override public List flushStatements() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot rollback. No managed session is started."); } return sqlSession.flushStatements(); } @Override public void close() { final SqlSession sqlSession = localSqlSession.get(); if (sqlSession == null) { throw new SqlSessionException("Error: Cannot close. No managed session is started."); } try { sqlSession.close(); } finally { localSqlSession.set(null); } }}
首先通過 ThreadPoolExecutor 的源碼 看一下線程池的主要參數(shù)及方法。
public class ThreadPoolExecutor extends AbstractExecutorService { /** * 核心線程數(shù) * 當向線程池提交一個任務時,若線程池已創(chuàng)建的線程數(shù)小于corePoolSize,即便此時存在空閑線程, * 也會通過創(chuàng)建一個新線程來執(zhí)行該任務,直到已創(chuàng)建的線程數(shù)大于或等于corePoolSize */ private volatile int corePoolSize; /** * 最大線程數(shù) * 當隊列滿了,且已創(chuàng)建的線程數(shù)小于maximumPoolSize,則線程池會創(chuàng)建新的線程來執(zhí)行任務。 * 另外,對于無界隊列,可忽略該參數(shù) */ private volatile int maximumPoolSize; /** * 線程存活保持時間 * 當線程池中線程數(shù) 超出核心線程數(shù),且線程的空閑時間也超過 keepAliveTime時, * 那么這個線程就會被銷毀,直到線程池中的線程數(shù)小于等于核心線程數(shù) */ private volatile long keepAliveTime; /** * 任務隊列 * 用于傳輸和保存等待執(zhí)行任務的阻塞隊列 */ private final BlockingQueueworkQueue; /** * 線程工廠 * 用于創(chuàng)建新線程。threadFactory 創(chuàng)建的線程也是采用 new Thread() 方式,threadFactory * 創(chuàng)建的線程名都具有統(tǒng)一的風格:pool-m-thread-n(m為線程池的編號,n為線程池中線程的編號 */ private volatile ThreadFactory threadFactory; /** * 線程飽和策略 * 當線程池和隊列都滿了,再加入的線程會執(zhí)行此策略 */ private volatile RejectedExecutionHandler handler; /** * 構(gòu)造方法提供了多種重載,但實際上都使用了最后一個重載 完成了實例化 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } /** * 執(zhí)行一個任務,但沒有返回值 */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } /** * 提交一個線程任務,有返回值。該方法繼承自其父類 AbstractExecutorService,有多種重載,這是最常用的一個。 * 通過future.get()獲取返回值(阻塞直到任務執(zhí)行完) */ public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; } /** * 關(guān)閉線程池,不再接收新的任務,但會把已有的任務執(zhí)行完 */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } /** * 立即關(guān)閉線程池,已有的任務也會被拋棄 */ public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } public boolean isShutdown() { return ! isRunning(ctl.get()); }}
線程池執(zhí)行流程,如下圖所示。
Executors 類 通過 ThreadPoolExecutor 封裝了 4 種常用的線程池:CachedThreadPool,F(xiàn)ixedThreadPool,ScheduledThreadPool 和 SingleThreadExecutor。其功能如下。
1.CachedThreadPool:用來創(chuàng)建一個幾乎可以無限擴大的線程池(最大線程數(shù)為 Integer.MAX_VALUE),適用于執(zhí)行大量短生命周期的異步任務。2.FixedThreadPool:創(chuàng)建一個固定大小的線程池,保證線程數(shù)可控,不會造成線程過多,導致系統(tǒng)負載更為嚴重。3.SingleThreadExecutor:創(chuàng)建一個單線程的線程池,可以保證任務按調(diào)用順序執(zhí)行。4.ScheduledThreadPool:適用于執(zhí)行 延時 或者 周期性 任務。
?CPU 密集型任務
盡量使用較小的線程池,一般為 CPU 核心數(shù)+1。因為 CPU 密集型任務 使得 CPU 使用率 很高,若開過多的線程數(shù),會造成 CPU 過度切換。?IO 密集型任務
可以使用稍大的線程池,一般為 2*CPU 核心數(shù)。IO 密集型任務 CPU 使用率 并不高,因此可以讓 CPU 在等待 IO 的時候有其他線程去處理別的任務,充分利用 CPU 時間。
Tomcat 在分發(fā) web 請求時使用了線程池來處理。
public interface BlockingQueueextends Queue { // 將給定元素設置到隊列中,如果設置成功返回true, 否則返回false。如果是往限定了長度的隊列中設置值,推薦使用offer()方法。 boolean add(E e); // 將給定的元素設置到隊列中,如果設置成功返回true, 否則返回false. e的值不能為空,否則拋出空指針異常。 boolean offer(E e); // 將元素設置到隊列中,如果隊列中沒有多余的空間,該方法會一直阻塞,直到隊列中有多余的空間。 void put(E e) throws InterruptedException; // 將給定元素在給定的時間內(nèi)設置到隊列中,如果設置成功返回true, 否則返回false. boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; // 從隊列中獲取值,如果隊列中沒有值,線程會一直阻塞,直到隊列中有值,并且該方法取得了該值。 E take() throws InterruptedException; // 在給定的時間里,從隊列中獲取值,時間到了直接調(diào)用普通的 poll()方法,為null則直接返回null。 E poll(long timeout, TimeUnit unit) throws InterruptedException; // 獲取隊列中剩余的空間。 int remainingCapacity(); // 從隊列中移除指定的值。 boolean remove(Object o); // 判斷隊列中是否擁有該值。 public boolean contains(Object o); // 將隊列中值,全部移除,并發(fā)設置到給定的集合中。 int drainTo(Collection super E> c); // 指定最多數(shù)量限制將隊列中值,全部移除,并發(fā)設置到給定的集合中。 int drainTo(Collection super E> c, int maxElements);}
?ArrayBlockingQueue
基于數(shù)組的阻塞隊列實現(xiàn),在 ArrayBlockingQueue 內(nèi)部,維護了一個定長數(shù)組,以便緩存隊列中的數(shù)據(jù)對象,這是一個常用的阻塞隊列,除了一個定長數(shù)組外,ArrayBlockingQueue 內(nèi)部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數(shù)組中的位置。
ArrayBlockingQueue 在生產(chǎn)者放入數(shù)據(jù) 和 消費者獲取數(shù)據(jù)時,都是共用同一個鎖對象,由此也意味著兩者無法真正并行運行,這點尤其不同于 LinkedBlockingQueue。ArrayBlockingQueue 和 LinkedBlockingQueue 間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例,而后者則會生成一個額外的 Node 對象。這在長時間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對于 GC 的影響還是存在一定的區(qū)別。而在創(chuàng)建 ArrayBlockingQueue 時,我們還可以控制對象的內(nèi)部鎖是否采用公平鎖,默認采用非公平鎖。
?LinkedBlockingQueue
基于鏈表的阻塞隊列,同 ArrayListBlockingQueue 類似,其內(nèi)部也維持著一個數(shù)據(jù)緩沖隊列(該隊列由一個鏈表構(gòu)成),當生產(chǎn)者往隊列中放入一個數(shù)據(jù)時,隊列會從生產(chǎn)者手中獲取數(shù)據(jù),并緩存在隊列內(nèi)部,而生產(chǎn)者立即返回;只有當隊列緩沖區(qū)達到最大值緩存容量時(LinkedBlockingQueue 可以通過構(gòu)造函數(shù)指定該值),才會阻塞生產(chǎn)者隊列,直到消費者從隊列中消費掉一份數(shù)據(jù),生產(chǎn)者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而 LinkedBlockingQueue 之所以能夠高效的處理并發(fā)數(shù)據(jù),還因為其對于生產(chǎn)者端和消費者端分別采用了獨立的鎖來控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能。
需要注意的是,如果構(gòu)造一個 LinkedBlockingQueue 對象,而沒有指定其容量大小,LinkedBlockingQueue 會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產(chǎn)者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了。
?PriorityBlockingQueue
基于優(yōu)先級的阻塞隊列(優(yōu)先級的判斷通過構(gòu)造函數(shù)傳入的 Compator 對象來決定),但需要注意的是 PriorityBlockingQueue 并不會阻塞數(shù)據(jù)生產(chǎn)者,而只會在沒有可消費的數(shù)據(jù)時,阻塞數(shù)據(jù)的消費者。因此使用的時候要特別注意,生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度絕對不能快于消費者消費數(shù)據(jù)的速度,否則時間一長,會最終耗盡所有的可用堆內(nèi)存空間。在實現(xiàn) PriorityBlockingQueue 時,內(nèi)部控制線程同步的鎖采用的是公平鎖。
互斥同步最主要的問題就是進行線程阻塞和喚醒所帶來的性能的額外損耗,因此這種同步被稱為阻塞同步,它屬于一種悲觀的并發(fā)策略,我們稱之為悲觀鎖。隨著硬件和操作系統(tǒng)指令集的發(fā)展和優(yōu)化,產(chǎn)生了非阻塞同步,被稱為樂觀鎖。簡單地說,就是先進行操作,操作完成之后再判斷操作是否成功,是否有并發(fā)問題,如果有則進行失敗補償,如果沒有就算操作成功,這樣就從根本上避免了同步鎖的弊端。
目前,在 Java 中應用最廣泛的非阻塞同步就是 CAS。從 JDK1.5 以后,可以使用 CAS 操作,該操作由 sun.misc.Unsafe 類里的 compareAndSwapInt() 和 compareAndSwapLong() 等方法實現(xiàn)。通常情況下 sun.misc.Unsafe 類 對于開發(fā)者是不可見的,因此,JDK 提供了很多 CAS 包裝類 簡化開發(fā)者的使用,如 AtomicInteger。使用 Java 自帶的 Atomic 原子類,可以避免同步鎖帶來的并發(fā)訪問性能降低的問題,減少犯錯的機會。
到此,相信大家對“Java并發(fā)編程在各主流框架中怎么應用”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學習!