本篇內(nèi)容介紹了“Java中J.U.C并發(fā)包誕生分析”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
從網(wǎng)站建設(shè)到定制行業(yè)解決方案,為提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作服務(wù)體系,各種行業(yè)企業(yè)客戶提供網(wǎng)站建設(shè)解決方案,助力業(yè)務(wù)快速發(fā)展。創(chuàng)新互聯(lián)將不斷加快創(chuàng)新步伐,提供優(yōu)質(zhì)的建站服務(wù)。
JCP是Java Community Process的簡寫,是一種開發(fā)和修訂Java技術(shù)規(guī)范的流程,同時(shí),我們提到JCP一般是指維護(hù)管理這套流程的組織,這個(gè)組織主要由Java開發(fā)者以及被授權(quán)的非盈利組織組成,他們掌管著Java的發(fā)展方向。JCP是Sun公司1998年12月8日提出的,旨在通過java社區(qū)的力量推進(jìn)Java的發(fā)展。截止目前,已經(jīng)從1.0版本發(fā)展到了最新的2019年7月21日頒布的2.11版本。JCP流程中有四個(gè)主要階段,分別是啟動(dòng)、發(fā)布草案、最終版本、維護(hù),一個(gè)Java的新功能從啟動(dòng)階段提出,到順利走完整套流程后,就會(huì)出現(xiàn)在下個(gè)版本的JDK中了。
JSR是Java Specification Requests的簡寫,是服務(wù)JCP啟動(dòng)階段提出草案的規(guī)范,任何人注冊成為JCP的會(huì)員后,都可以向JCP提交JSR。比如,你覺得JDK中String的操作方法沒有g(shù)uava中的實(shí)用,你提個(gè)JSR增強(qiáng)String中的方法,只要能夠通過JCP的審核,就可以在下個(gè)版本的JDK中看到了。我們熟知的提案有,Java緩存api的JSR-107、Bean屬性校驗(yàn)JSR-303等,當(dāng)然還有本篇要講的Java并發(fā)包JSR-166.
JCP官網(wǎng):https://jcp.org
Doug Lea,中文名為道格·利。是美國的一個(gè)大學(xué)教師,大神級的人物,J.U.C就是出自他之手。JDK1.5之前,我們控制程序并發(fā)訪問同步代碼只能使用synchronized,那個(gè)時(shí)候synchronized的性能還沒優(yōu)化好,性能并不好,控制線程也只能使用Object的wait和notify方法。這個(gè)時(shí)候Doug Lea給JCP提交了JSR-166的提案,在提交JSR-166之前,Doug Lea已經(jīng)使用了類似J.U.C包功能的代碼已經(jīng)三年多了,這些代碼就是J.U.C的原型,下面簡單看下這些具有歷史味道的代碼,同時(shí)也能引發(fā)我們的一些思考,如果JDK中沒有,那么就自己造呀!
public class Mutex implements Sync { /** The lock status **/ protected boolean inuse_ = false; @Override public void acquire() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } synchronized(this) { try { //如果inuse_為true就wait住線程 while (inuse_) { wait(); } inuse_ = true; } catch (InterruptedException ex) { notify(); throw ex; } } } /** * 釋放鎖,通知線程繼續(xù)執(zhí)行 */ @Override public synchronized void release() { inuse_ = false; notify(); } @Override public boolean attempt(long msecs) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } synchronized(this) { if (!inuse_) { inuse_ = true; return true; } else if (msecs <= 0) { return false; } else { long waitTime = msecs; long start = System.currentTimeMillis(); try { for (;;) { wait(waitTime); if (!inuse_) { inuse_ = true; return true; } else { waitTime = msecs - (System.currentTimeMillis() - start); if (waitTime <= 0) return false; } } } catch (InterruptedException ex) { notify(); throw ex; } } } } }
public class CountDown implements Sync { protected final int initialCount_; protected int count_; /** * Create a new CountDown with given count value **/ public CountDown(int count) { count_ = initialCount_ = count; } @Override public void acquire() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } synchronized (this) { while (count_ > 0) { wait(); } } } @Override public boolean attempt(long msecs) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } synchronized (this) { if (count_ <= 0) { return true; } else if (msecs <= 0) { return false; } else { long waitTime = msecs; long start = System.currentTimeMillis(); for (; ; ) { wait(waitTime); if (count_ <= 0) { return true; } else { waitTime = msecs - (System.currentTimeMillis() - start); if (waitTime <= 0) { return false; } } } } } } /** * Decrement the count. * After the initialCount'th release, all current and future * acquires will pass **/ @Override public synchronized void release() { if (--count_ == 0) { notifyAll(); } } /** * Return the initial count value **/ public int initialCount() { return initialCount_; } public synchronized int currentCount() { return count_; } }
了解J.U.C的都知道,在這個(gè)包里面AbstractQueuedSynchronizer是精髓所在,這就是我們在聊并發(fā)包時(shí)俗稱的AQS,這個(gè)框架設(shè)計(jì)為同步狀態(tài)的原子性管理、線程的阻塞和解除阻塞以及排隊(duì)提供一種通用的機(jī)制。并發(fā)包下ReentrantLock、CountDownLatch等都是基于AQS來實(shí)現(xiàn)的,在看下上面的原型實(shí)現(xiàn)都是實(shí)現(xiàn)的Sync接口,是不是似曾相識(shí),下面的Sync就是AbstractQueuedSynchronizer的原型了
AQS設(shè)計(jì)論文:http://gee.cs.oswego.edu/dl/papers/aqs.pdf
中文譯文:https://www.cnblogs.com/dennyzhangdd/p/7218510.html
public interface Sync { public void acquire() throws InterruptedException; public boolean attempt(long msecs) throws InterruptedException; public void release(); /** One second, in milliseconds; convenient as a time-out value **/ public static final long ONE_SECOND = 1000; /** One minute, in milliseconds; convenient as a time-out value **/ public static final long ONE_MINUTE = 60 * ONE_SECOND; /** One hour, in milliseconds; convenient as a time-out value **/ public static final long ONE_HOUR = 60 * ONE_MINUTE; /** One day, in milliseconds; convenient as a time-out value **/ public static final long ONE_DAY = 24 * ONE_HOUR; /** One week, in milliseconds; convenient as a time-out value **/ public static final long ONE_WEEK = 7 * ONE_DAY; /** One year in milliseconds; convenient as a time-out value **/ public static final long ONE_YEAR = (long)(365.2425 * ONE_DAY); /** One century in milliseconds; convenient as a time-out value **/ public static final long ONE_CENTURY = 100 * ONE_YEAR; }
這個(gè)JSR的目標(biāo)類似于JDK1.2 Collections包的目標(biāo):
1.標(biāo)準(zhǔn)化一個(gè)簡單,可擴(kuò)展的框架,該框架將常用的實(shí)用程序組織成一個(gè)足夠小的包,以便用戶可以輕松學(xué)習(xí)并由開發(fā)人員維護(hù)。
2.提供一些高質(zhì)量的實(shí)現(xiàn)。
該包將包含接口和類,這些接口和類在各種編程樣式和應(yīng)用程序中都很有用。這些類包括:
原子變量。
專用鎖,屏障,信號量和條件變量。
為多線程使用而設(shè)計(jì)的隊(duì)列和相關(guān)集合。
線程池和自定義執(zhí)行框架。
我們還將研究核心語言和庫中的相關(guān)支持。 請注意,這些與J2EE中使用的事務(wù)并發(fā)控制框架完全不同。(但是,對于那些創(chuàng)建此類框架的人來說,它們會(huì)很有用。)
J2SE
底層線程原語(例如synchronized塊,Object.wait和Object.notify)不足以用于許多編程任務(wù)。因此,應(yīng)用程序員經(jīng)常被迫實(shí)現(xiàn)自己的更高級別的并發(fā)工具。這導(dǎo)致了巨大的重復(fù)工作。此外,眾所周知,這些設(shè)施難以正確,甚至更難以優(yōu)化。應(yīng)用程序員編寫的并發(fā)工具通常不正確或效率低下。提供一組標(biāo)準(zhǔn)的并發(fā)實(shí)用程序?qū)⒑喕帉懜鞣N多線程應(yīng)用程序的任務(wù),并通??梢蕴岣呤褂盟鼈兊膽?yīng)用程序的質(zhì)量。
目前,開發(fā)人員只能使用Java語言本身提供的并發(fā)控制結(jié)構(gòu)。對于某些應(yīng)用程序來說,這些級別太低,而對其他應(yīng)用程序則不完整
絕大多數(shù)軟件包將在低級Java構(gòu)造之上實(shí)現(xiàn)。但是,有一些關(guān)于原子性和監(jiān)視器的關(guān)鍵JVM /語言增強(qiáng)功能是獲得高效和正確語義所必需的。
java.util.concurrent中
只是間接地,因?yàn)樵诓煌脚_(tái)上運(yùn)行的JVM可能能夠以不同方式優(yōu)化某些構(gòu)造。
沒有
沒有
沒有
目標(biāo)是將此規(guī)范包含在J2SE 1.5(Tiger)的JSR中。
電子郵件,電話會(huì)議和不常見的會(huì)議。我們還將使用或創(chuàng)建一個(gè)開放的郵件列表,供專家組以外的其他感興趣的人討論。
前面說到AQS是并發(fā)包下的精髓所在,那么LockSupport和Unsafe就是整個(gè)JSR-166并發(fā)包的所有功能實(shí)現(xiàn)的靈魂,縱觀整個(gè)并發(fā)包下的代碼,無處不見LockSupport和Unsafe的身影。LockSupport提供了兩個(gè)關(guān)鍵方法,park和unpark,用來操作線程的阻塞和放行,功能可以類比Object的wait和notify,但是比這兩個(gè)api更靈活。下面是博主簡化了的實(shí)現(xiàn)(JDK中不是這樣的)
/** 用于創(chuàng)建鎖和其他同步類的基本線程阻塞基礎(chǔ)類,提供基礎(chǔ)的線程控制功能。 */ public class LockSupport { private LockSupport() {} /** 解除park的阻塞,如果還沒阻塞,它對{@code park}的下一次調(diào)用將保證不會(huì)阻塞 */ public static void unpark(Thread thread) { if (thread != null) { UNSAFE.unpark(thread); } } /** 阻塞當(dāng)前線程,除非先調(diào)用了unpark()方法。 */ public static void park() { UNSAFE.park(false, 0L); } //Hotspot implementation via intrinsics API private static final Unsafe UNSAFE; static { try { try { final PrivilegedExceptionActionaction = new PrivilegedExceptionAction () { @Override public Unsafe run() throws Exception { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); return (Unsafe) theUnsafe.get(null); } }; UNSAFE = AccessController.doPrivileged(action); } catch (Exception e) { throw new RuntimeException("Unable to load unsafe", e); } } catch (Exception ex) { throw new Error(ex); } } }
有了park和unpark后,我們也可以這樣來實(shí)現(xiàn)Lock的功能,代碼如下,有了ConcurrentLinkedQueue加持后,就可以在基本的鎖的功能上,實(shí)現(xiàn)公平鎖的語義了。
/** * 簡版先進(jìn)先出的公平鎖實(shí)現(xiàn) * @author: kl @kailing.pub * @date: 2019/9/2 */ public class FIFOMutex { private final AtomicBoolean locked = new AtomicBoolean(false); private final Queuewaiters = new ConcurrentLinkedQueue<>(); public void lock() { boolean wasInterrupted = false; Thread current = Thread.currentThread(); waiters.add(current); // 在隊(duì)列中不是第一個(gè)或無法獲取鎖時(shí)阻塞 while (waiters.peek() != current || !locked.compareAndSet(false, true)) { LockSupport.park(this); if (Thread.interrupted()) { wasInterrupted = true; } } waiters.remove(); if (wasInterrupted) { current.interrupt(); } } public void unlock() { locked.set(false); LockSupport.unpark(waiters.peek()); } }
心細(xì)的你可能發(fā)現(xiàn)了LockSupport最終還是基于Unsafe的park和unpark來實(shí)現(xiàn)的,Unsafe在JDK1.5之前就存在的,那JSR166后增加了哪些內(nèi)容呢?先來看下Unsafe是什么來頭。JDK源碼中是這樣描述的:一組用于執(zhí)行低層、不安全操作的方法。盡管該類和所有方法都是公共的,但是該類的使用受到限制,因?yàn)橹挥惺苄湃蔚拇a才能獲得該類的實(shí)例。如其名,不安全的,所以在JDK1.8后直接不提供源碼了,JDK中其他的代碼都可以在IDE中直接看到.java的文件,而Unsafe只有.class編譯后的代碼。因?yàn)閁nsafe是真的有黑魔法,可以直接操作系統(tǒng)級的資源,比如系統(tǒng)內(nèi)存、線程等。JDK不直接對外暴露Unsafe的api,如果直接在自己的應(yīng)用程序中像JDK中那么獲取Unsafe的實(shí)例,如:
private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
會(huì)直接拋異常SecurityException("Unsafe"),正確的獲取方式如下:
private static final Unsafe UNSAFE; static { try { try { final PrivilegedExceptionActionaction = new PrivilegedExceptionAction () { @Override public Unsafe run() throws Exception { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); return (Unsafe) theUnsafe.get(null); } }; UNSAFE = AccessController.doPrivileged(action); } catch (Exception e) { throw new RuntimeException("Unable to load unsafe", e); } } catch (Exception ex) { throw new Error(ex); } }
其實(shí),深入到Unsafe后,會(huì)發(fā)現(xiàn)深入不下去了,Unsafe中的方法是都是native標(biāo)記的本地方法,沒有實(shí)現(xiàn),如:
public native void unpark(Object var1); public native void park(boolean var1, long var2);
如果是在Windows下,最終調(diào)用的就是使用C++開發(fā)的最終編譯成.dll的包,所以只要看到C++相關(guān)的代碼就知道怎么回事了
JDK源碼下載地址:http://www.java.net/download/openjdk/jdk8/promoted/b132/openjdk-8-src-b132-03_mar_2014.zip
JDK源碼倉庫:http://hg.openjdk.java.net/
首先定位到Unsafe.cpp,文件位置在:openjdk\hotspot\src\share\vm\prims\Unsafe.cpp,會(huì)發(fā)現(xiàn)和JSR166相關(guān)的都有注釋,如:// These are the methods prior to the JSR 166 changes in 1.6.0。根據(jù)這些信息,得知JSR166在Unsafe中新增了五個(gè)方法,分別是compareAndSwapObject、compareAndSwapInt、compareAndSwapLong、park、unpark,這就是并發(fā)包中CAS原子操作和線程控制的核心所在了,并發(fā)包中的大部分功能都是基于他們來實(shí)現(xiàn)的。最后我們看下park和unpark的具體實(shí)現(xiàn),在學(xué)校學(xué)的C語言丟的差不好多了,但是下面的代碼還語義還是很清晰的
// JSR166 // ------------------------------------------------------- /* * The Windows implementation of Park is very straightforward: Basic * operations on Win32 Events turn out to have the right semantics to * use them directly. We opportunistically resuse the event inherited * from Monitor. * void Parker::park(bool isAbsolute, jlong time) { guarantee (_ParkEvent != NULL, "invariant") ; // First, demultiplex/decode time arguments if (time < 0) { // don't wait return; } else if (time == 0 && !isAbsolute) { time = INFINITE; } else if (isAbsolute) { time -= os::javaTimeMillis(); // convert to relative time if (time <= 0) // already elapsed return; } else { // relative time /= 1000000; // Must coarsen from nanos to millis if (time == 0) // Wait for the minimal time unit if zero time = 1; } JavaThread* thread = (JavaThread*)(Thread::current()); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; // Don't wait if interrupted or already triggered if (Thread::is_interrupted(thread, false) || WaitForSingleObject(_ParkEvent, 0) == WAIT_OBJECT_0) { ResetEvent(_ParkEvent); return; } else { ThreadBlockInVM tbivm(jt); OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); jt->set_suspend_equivalent(); WaitForSingleObject(_ParkEvent, time); ResetEvent(_ParkEvent); // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } } } void Parker::unpark() { guarantee (_ParkEvent != NULL, "invariant") ; SetEvent(_ParkEvent); }
“Java中J.U.C并發(fā)包誕生分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!