真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網站制作重慶分公司

jdk自帶線程池實例詳解

二、簡介

為神木等地區(qū)用戶提供了全套網頁設計制作服務,及神木網站建設行業(yè)解決方案。主營業(yè)務為成都網站設計、網站制作、神木網站設計,以傳統(tǒng)方式定制建設網站,并提供域名空間備案等一條龍服務,秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務。我們深信只要達到每一位用戶的要求,就會得到認可,從而選擇與我們長期合作。這樣,我們也可以走得更遠!

多線程技術主要解決處理器單元內多個線程執(zhí)行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力,但頻繁的創(chuàng)建線程的開銷是很大的,那么如何來減少這部分的開銷了,那么就要考慮使用線程池了。線程池就是一個線程的容器,每次只執(zhí)行額定數量的線程,線程池就是用來管理這些額定數量的線程。

三、涉及線程池的類結構圖

jdk自帶線程池實例詳解

其中供我們使用的,主要是ThreadPoolExecutor類。

四、如何創(chuàng)建線程池

我們創(chuàng)建線程池一般有以下幾種方法:

1、使用Executors工廠類

Executors主要提供了下面幾種創(chuàng)建線程池的方法:

jdk自帶線程池實例詳解

下面來看下使用示例:

1)newFixedThreadPool(固定大小的線程池)

public class FixedThreadPool { 
  public static void main(String[] args) { 
    ExecutorService pool = Executors.newFixedThreadPool(5);// 創(chuàng)建一個固定大小為5的線程池 
    for (int i = 0; i < 10; i++) { 
      pool.submit(new MyThread()); 
    } 
    pool.shutdown(); 
  } 
} 
public class MyThread extends Thread { 
  @Override 
  public void run() { 
    System.out.println(Thread.currentThread().getName() + "正在執(zhí)行。。。"); 
  } 
} 

測試結果如下:

pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-2正在執(zhí)行。。。 
pool-1-thread-3正在執(zhí)行。。。 
pool-1-thread-2正在執(zhí)行。。。 
pool-1-thread-3正在執(zhí)行。。。 
pool-1-thread-2正在執(zhí)行。。。 
pool-1-thread-2正在執(zhí)行。。。 
pool-1-thread-3正在執(zhí)行。。。 
pool-1-thread-5正在執(zhí)行。。。 
pool-1-thread-4正在執(zhí)行。。。 

固定大小的線程池:每次提交一個任務就創(chuàng)建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結束,那么線程池會補充一個新線程線。

2)newSingleThreadExecutor(單線程的線程池)

public class SingleThreadPool { 
  public static void main(String[] args) {  
    ExecutorService pool=Executors.newSingleThreadExecutor();//創(chuàng)建一個單線程池  
    for(int i=0;i<100;i++){  
      pool.submit(new MyThread());  
    }  
    pool.shutdown();  
  } 
} 

測試結果如下:

pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 

單線程的線程池:這個線程池只有一個線程在工作,也就是相當于單線程串行執(zhí)行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執(zhí)行順序按照任務的提交順序執(zhí)行。

3)newScheduledThreadPool

public class ScheduledThreadPool { 
  public static void main(String[] args) {  
    ScheduledExecutorService pool=Executors.newScheduledThreadPool(6);  
    for(int i=0;i<10000;i++){  
      pool.submit(new MyThread());  
    }  
      
    pool.schedule(new MyThread(), 1000, TimeUnit.MILLISECONDS);  
    pool.schedule(new MyThread(), 1000, TimeUnit.MILLISECONDS);  
    pool.shutdown();  
  }  
} 

測試結果如下:

pool-1-thread-1正在執(zhí)行。。。 
pool-1-thread-6正在執(zhí)行。。。 
pool-1-thread-5正在執(zhí)行。。。 
pool-1-thread-4正在執(zhí)行。。。 
pool-1-thread-2正在執(zhí)行。。。 
pool-1-thread-3正在執(zhí)行。。。 
pool-1-thread-4正在執(zhí)行。。。 
pool-1-thread-5正在執(zhí)行。。。 
pool-1-thread-6正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 
…………此處會延時1S………… 
pool-1-thread-4正在執(zhí)行。。。 
pool-1-thread-1正在執(zhí)行。。。 

測試結果的最后兩個線程都是在延時1S之后,才開始執(zhí)行的。此線程池支持定時以及周期性執(zhí)行任務的需求

4)newCachedThreadPool(可緩存的線程池)

public class CachedThreadPool { 
  public static void main(String[] args) {  
    ExecutorService pool=Executors.newCachedThreadPool();  
    for(int i=0;i<100;i++){  
      pool.submit(new MyThread());  
    }  
    pool.shutdown();  
  }  
} 

測試結果如下:

pool-1-thread-5正在執(zhí)行。。。 
pool-1-thread-7正在執(zhí)行。。。 
pool-1-thread-5正在執(zhí)行。。。 
pool-1-thread-16正在執(zhí)行。。。 
pool-1-thread-17正在執(zhí)行。。。 
pool-1-thread-16正在執(zhí)行。。。 
pool-1-thread-5正在執(zhí)行。。。 
pool-1-thread-7正在執(zhí)行。。。 
pool-1-thread-16正在執(zhí)行。。。 
pool-1-thread-18正在執(zhí)行。。。 
pool-1-thread-10正在執(zhí)行。。。 

可緩存的線程池:如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。

官方建議程序員使用較為方便的Executors工廠方法Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)Executors.newSingleThreadExecutor()(單個后臺線程),這幾種線程池均為大多數使用場景預定義了默認配置。

2、繼承ThreadPoolExecutor類,并復寫父類的構造方法。

在介紹這種方式之前,我們來分析下前面幾個創(chuàng)建線程池的底層代碼是怎樣的?

public class Executors { 
  public static ExecutorService newFixedThreadPool(int nThreads) { 
    return new ThreadPoolExecutor(nThreads, nThreads, 
                   0L, TimeUnit.MILLISECONDS, 
                   new LinkedBlockingQueue()); 
} 
  public static ExecutorService newSingleThreadExecutor() { 
    return new FinalizableDelegatedExecutorService 
      (new ThreadPoolExecutor(1, 1, 
                  0L, TimeUnit.MILLISECONDS, 
                  new LinkedBlockingQueue())); 
  } 
} 

從Executors工廠類的底層代碼可以看出,工廠類提供的創(chuàng)建線程池的方法,其實都是通過構造ThreadPoolExecutor來實現(xiàn)的。ThreadPoolExecutor構造方法代碼如下:

public ThreadPoolExecutor(int corePoolSize, 
               int maximumPoolSize, 
               long keepAliveTime, 
               TimeUnit unit, 
               BlockingQueue workQueue, 
               ThreadFactory threadFactory, 
               RejectedExecutionHandler handler) { 
    if (corePoolSize < 0 || 
      maximumPoolSize <= 0 || 
      maximumPoolSize < corePoolSize || 
      keepAliveTime < 0) 
      throw new IllegalArgumentException(); 
    if (workQueue == null || threadFactory == null || handler == null) 
      throw new NullPointerException(); 
    this.corePoolSize = corePoolSize; 
    this.maximumPoolSize = maximumPoolSize; 
    this.workQueue = workQueue; 
    this.keepAliveTime = unit.toNanos(keepAliveTime); 
    this.threadFactory = threadFactory; 
    this.handler = handler; 
  } 

那么接下來,我們就來談談這個ThreadPoolExecutor構造方法。在這個構造方法中,主要有以下幾個參數:

corePoolSize--池中所保存的線程數,包括空閑線程。

maximumPoolSize--池中允許的最大線程數。

keepAliveTime--當線程數大于corePoolSize時,此為終止空閑線程等待新任務的最長時間。

Unit--keepAliveTime 參數的時間單位。

workQueue--執(zhí)行前用于保持任務的隊列。此隊列僅保持由 execute方法提交的 Runnable任務。

threadFactory--執(zhí)行程序創(chuàng)建新線程時使用的工廠。

Handler--由于超出線程范圍和隊列容量而使執(zhí)行被阻塞時所使用的處理程序。

接下來,咋們來說下這幾個參數之間的關系。當線程池剛創(chuàng)建的時候,線程池里面是沒有任何線程的(注意,并不是線程池一創(chuàng)建,里面就創(chuàng)建了一定數量的線程),當調用execute()方法添加一個任務時,線程池會做如下的判斷:

1)如果當前正在運行的線程數量小于corePoolSize,那么立刻創(chuàng)建一個新的線程,執(zhí)行這個任務。

2)如果當前正在運行的線程數量大于或等于corePoolSize,那么這個任務將會放入隊列中。

3)如果線程池的隊列已經滿了,但是正在運行的線程數量小于maximumPoolSize,那么還是會創(chuàng)建新的線程,執(zhí)行這個任務。

4)如果隊列已經滿了,且當前正在運行的線程數量大于或等于maximumPoolSize,那么線程池會根據拒絕執(zhí)行策略來處理當前的任務。

5)當一個任務執(zhí)行完后,線程會從隊列中取下一個任務來執(zhí)行,如果隊列中沒有需要執(zhí)行的任務,那么這個線程就會處于空閑狀態(tài),如果超過了keepAliveTime存活時間,則這個線程會被線程池回收(注:回收線程是有條件的,如果當前運行的線程數量大于corePoolSize的話,這個線程就會被銷毀,如果不大于corePoolSize,是不會銷毀這個線程的,線程的數量必須保持在corePoolSize數量內).為什么不是線程一空閑就回收,而是需要等到超過keepAliveTime才進行線程的回收了,原因很簡單:因為線程的創(chuàng)建和銷毀消耗很大,更不能頻繁的進行創(chuàng)建和銷毀,當超過keepAliveTime后,發(fā)現(xiàn)確實用不到這個線程了,才會進行銷毀。這其中unit表示keepAliveTime的時間單位,unit的定義如下:

public enum TimeUnit { 
  NANOSECONDS { 
    // keepAliveTime以納秒為單位 
  }, 
  MICROSECONDS { 
    // keepAliveTime以微秒為單位 
  }, 
  MILLISECONDS { 
    // keepAliveTime以毫秒為單位 
  }, 
  SECONDS { 
    // keepAliveTime以秒為單位 
  }, 
  MINUTES { 
    // keepAliveTime以分鐘為單位 
  }, 
  HOURS { 
    // keepAliveTime以小時為單位 
  }, 
  DAYS { 
    // keepAliveTime以天為單位 
  }; 

下面從源碼來分析一下,對于上面的幾種情況,主要涉及到的源碼有以下幾塊:

private boolean addIfUnderCorePoolSize(Runnable firstTask) { 
    Thread t = null; 
    final ReentrantLock mainLock = this.mainLock; 
    mainLock.lock(); 
    try { 
      if (poolSize < corePoolSize && runState == RUNNING) 
        t = addThread(firstTask); 
    } finally { 
      mainLock.unlock(); 
    } 
    if (t == null) 
      return false; 
    t.start(); 
    return true; 
} 

其實,這段代碼很簡單,主要描述的就是,如果當前的線程池小于corePoolSize的時候,是直接新建一個線程來處理任務。 

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { 
    Thread t = null; 
    final ReentrantLock mainLock = this.mainLock; 
    mainLock.lock(); 
    try { 
      if (poolSize < maximumPoolSize && runState == RUNNING) 
        t = addThread(firstTask); 
    } finally { 
      mainLock.unlock(); 
    } 
    if (t == null) 
      return false; 
    t.start(); 
    return true; 
} 

上面這段代碼描述的是,如果當前線程池的數量小于maximumPoolSize的時候,也會創(chuàng)建一個線程,來執(zhí)行任務 

五、線程池的隊列

線程池的隊列,總的來說有3種:

直接提交:工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續(xù)到達時,此策略允許無界線程具有增長的可能性。

無界隊列:使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創(chuàng)建的線程就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立于其他任務,即任務執(zhí)行互不影響時,適合于使用無界隊列;例如,在 Web頁服務器中。這種排隊可用于處理瞬態(tài)突發(fā)請求,當命令以超過隊列所能處理的平均數連續(xù)到達時,此策略允許無界線程具有增長的可能性。

有界隊列:當使用有限的 maximumPoolSizes時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O邊界),則系統(tǒng)可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

下面就來說下線程池的隊列,類結構圖如下:

jdk自帶線程池實例詳解

1)SynchronousQueue

該隊列對應的就是上面所說的直接提交,首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,但是由于該Queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續(xù)添加。

2)LinkedBlockingQueue

該隊列對應的就是上面的無界隊列。

3)ArrayBlockingQueue

該隊列對應的就是上面的有界隊列。ArrayBlockingQueue有以下3中構造方法:

public ArrayBlockingQueue(int capacity) { 
    this(capacity, false); 
  } 
  public ArrayBlockingQueue(int capacity, boolean fair) { 
    if (capacity <= 0) 
      throw new IllegalArgumentException(); 
    this.items = (E[]) new Object[capacity]; 
    lock = new ReentrantLock(fair); 
    notEmpty = lock.newCondition(); 
    notFull = lock.newCondition(); 
} 
  public ArrayBlockingQueue(int capacity, boolean fair, 
               Collection<? extends E> c) { 
    this(capacity, fair); 
    if (capacity < c.size()) 
      throw new IllegalArgumentException(); 
    for (Iterator<? extends E> it = c.iterator(); it.hasNext();) 
      add(it.next()); 
  } 

下面我們重點來說下這個fair,fair表示隊列訪問線程的競爭策略,當為true的時候,任務插入隊列遵從FIFO的規(guī)則,如果為false,則可以“插隊”。舉個例子,假如現(xiàn)在有很多任務在排隊,這個時候正好一個線程執(zhí)行完了任務,同時又新來了一個任務,如果為false的話,這個任務不用在隊列中排隊,可以直接插隊,然后執(zhí)行。如下圖所示:

jdk自帶線程池實例詳解

六、線程池的拒絕執(zhí)行策略

當線程的數量達到最大值時,這個時候,任務還在不斷的來,這個時候,就只好拒絕接受任務了。

ThreadPoolExecutor 允許自定義當添加任務失敗后的執(zhí)行策略。你可以調用線程池的 setRejectedExecutionHandler() 方法,用自定義的RejectedExecutionHandler 對象替換現(xiàn)有的策略,ThreadPoolExecutor提供的默認的處理策略是直接丟棄,同時拋異常信息,ThreadPoolExecutor 提供 4 個現(xiàn)有的策略,分別是:
ThreadPoolExecutor.AbortPolicy:表示拒絕任務并拋出異常,源碼如下:

public static class AbortPolicy implements RejectedExecutionHandler { 
    /** 
     * Creates an AbortPolicy. 
     */ 
    public AbortPolicy() { } 
    /** 
     * Always throws RejectedExecutionException. 
     * @param r the runnable task requested to be executed 
     * @param e the executor attempting to execute this task 
     * @throws RejectedExecutionException always. 
     */ 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
      throw new RejectedExecutionException(); //拋異常 
    } 
  } 

 ThreadPoolExecutor.DiscardPolicy:表示拒絕任務但不做任何動作,源碼如下:

public static class DiscardPolicy implements RejectedExecutionHandler { 
    /** 
     * Creates a DiscardPolicy. 
     */ 
    public DiscardPolicy() { } 
    /** 
     * Does nothing, which has the effect of discarding task r. 
     * @param r the runnable task requested to be executed 
     * @param e the executor attempting to execute this task 
     */ 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
    } // 直接拒絕,但不做任何操作 
  } 

ThreadPoolExecutor.CallerRunsPolicy:表示拒絕任務,并在調用者的線程中直接執(zhí)行該任務,源碼如下:

public static class CallerRunsPolicy implements RejectedExecutionHandler { 
    /** 
     * Creates a CallerRunsPolicy. 
     */ 
    public CallerRunsPolicy() { } 
    /** 
     * Executes task r in the caller's thread, unless the executor 
     * has been shut down, in which case the task is discarded. 
     * @param r the runnable task requested to be executed 
     * @param e the executor attempting to execute this task 
     */ 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
      if (!e.isShutdown()) { 
        r.run(); // 直接執(zhí)行任務 
      } 
    } 
  } 

 ThreadPoolExecutor.DiscardOldestPolicy:表示先丟棄任務隊列中的第一個任務,然后把這個任務加進隊列。源碼如下:

public static class DiscardOldestPolicy implements RejectedExecutionHandler { 
    /** 
     * Creates a DiscardOldestPolicy for the given executor. 
     */ 
    public DiscardOldestPolicy() { } 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
      if (!e.isShutdown()) { 
        e.getQueue().poll(); // 丟棄隊列中的第一個任務 
        e.execute(r); // 執(zhí)行新任務 
      } 
    } 
} 

當任務源源不斷到來的時候,會從Queue中poll一個任務出來,然后執(zhí)行新的任務 

總結

以上所述是小編給大家介紹的jdk自帶線程池詳解,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對創(chuàng)新互聯(lián)網站的支持!


新聞標題:jdk自帶線程池實例詳解
網站URL:http://weahome.cn/article/jhihjs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部