通過(guò)之前的一篇文章,我們總結(jié)了Executor框架。而在Executor框架中,ThreadPoolExecutor 是最核心的類。
ThreadPoolExecutor 看字面意思,是線程池的執(zhí)行器。我們本篇文章就基于ThreadPoolExecutor 這個(gè)類來(lái)展開(kāi)總結(jié)線程池。
下篇文章會(huì)從源碼的角度解析ThreadPoolExecutor原理。

構(gòu)造方法源碼如下:
public ThreadPoolExecutor(int corePoolSize,//核心線程數(shù)
int maximumPoolSize,//大線程數(shù),可同時(shí)運(yùn)行的大線程數(shù)
long keepAliveTime,//除核心線程數(shù)之外的,空閑下來(lái)的線程的存活時(shí)間
TimeUnit unit,//時(shí)間單位
BlockingQueueworkQueue,//裝任務(wù)的隊(duì)列,存儲(chǔ)等待執(zhí)行的任務(wù)
ThreadFactory threadFactory,//線程工廠,可自定義一個(gè)產(chǎn)生線程的類
RejectedExecutionHandler handler) {//拒絕策略
if (corePoolSize< 0 ||
maximumPoolSize<= 0 ||
maximumPoolSize< corePoolSize ||
keepAliveTime< 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
我們做個(gè)詳解:
corePoolSize : 核心線程數(shù),就是可同時(shí)運(yùn)行的最小數(shù)量,不會(huì)被銷毀的線程數(shù),空閑時(shí)不還給操作系統(tǒng)。
maximumPoolSize:大線程數(shù),任務(wù)有一個(gè)等待隊(duì)列,當(dāng)這個(gè)隊(duì)列滿了,會(huì)啟動(dòng)除核心線程之外的線程,而當(dāng)前可啟動(dòng)的大線程數(shù)就是這個(gè)參數(shù)值了。
workQueue:任務(wù)隊(duì)列,如果新來(lái)了任務(wù),先判斷當(dāng)前運(yùn)行的線程數(shù)量是否達(dá)到核心線程數(shù),沒(méi)達(dá)到就執(zhí)行,如果達(dá)到的話,新任務(wù)就會(huì)被存放在隊(duì)列中。
keepAliveTime :當(dāng)線程池中的線程數(shù)量大于核心線程數(shù)的時(shí)候,如果這時(shí)沒(méi)有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷毀,而是會(huì)等待,直到等待的時(shí)間超過(guò)了這個(gè)時(shí)間才會(huì)被回收銷毀;
unit:keepAliveTime的時(shí)間單位;
threadFactory:生產(chǎn)線程的類;
handler:拒絕策略,如果當(dāng)前同時(shí)運(yùn)行的線程數(shù)量達(dá)到大線程數(shù)量并且隊(duì)列也已經(jīng)被放滿了任務(wù)時(shí),ThreadPoolExecutor會(huì)員拒絕策略。
拒絕策略有以下四種:
2、ThreadPoolExecutor使用AbortPolicy:拋出 RejectedExecutionException來(lái)拒絕新任務(wù)的處理。(拋異常拒絕)
DiscardPolicy :不處理新任務(wù),直接丟棄掉。(不處理)
DiscardOldestPolicy:丟掉最早的未處理的任務(wù)。(丟隊(duì)列最前端的任務(wù))
CallerRunsPolicy:調(diào)用執(zhí)行自己的線程運(yùn)行任務(wù),也就是直接在調(diào)用execute方法的線程中運(yùn)行(run)被拒絕的任務(wù),如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)。因此這種策略會(huì)降低對(duì)于新任務(wù)提交速度,影響程序的整體性能。如果您的應(yīng)用程序可以承受此延遲并且你要求任何一個(gè)任務(wù)請(qǐng)求都要被執(zhí)行的話,你可以選擇這個(gè)策略。(執(zhí)行此任務(wù))當(dāng)大池被填滿時(shí),此策略為我們提供可伸縮隊(duì)列。
示例代碼如下:
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {//創(chuàng)建任務(wù)
Callablecallable = () ->Thread.currentThread().getName();
//線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy());
//提交并執(zhí)行任務(wù)
for (int i = 0; i< 10; i++) {Futuresubmit = executor.submit(callable);
String s = submit.get(3, TimeUnit.SECONDS);
System.out.println(s);
}
//關(guān)閉線程池
executor.shutdown();
} 運(yùn)行結(jié)果如下:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5二、常見(jiàn)線程池
1、FixedThreadPool(指定線程數(shù))其構(gòu)造函數(shù)源碼如下:
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
} 從源碼可以看出,F(xiàn)ixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被設(shè)置為我們自己傳遞的nThreads參數(shù),所以,除核心線程外沒(méi)有多余的線程。
《Java 并發(fā)編程的藝術(shù)》圖片如下:
線程數(shù)小于nThreads時(shí),來(lái)新任務(wù)就創(chuàng)建新線程,等于nThreads時(shí),就加入隊(duì)列等待,然后線程閑下來(lái)就立刻從隊(duì)列中取任務(wù)執(zhí)行。
為什么不推薦使用FixedThreadPool?
因?yàn)閙aximumPoolSize無(wú)效,而LinkedBlockingQueue隊(duì)列的大值是 Integer.MAX_VALUE,運(yùn)行中的線程池會(huì)一直接受任務(wù),直到隊(duì)列滿了還會(huì)接受,極端情況下會(huì)造成OOM。
構(gòu)造函數(shù)源碼如下:
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
threadFactory));
} 就核心線程和大線程就是1。
《Java 并發(fā)編程的藝術(shù)》圖片如下:
和FixedThreadPool一樣,但是就1個(gè)線程,任務(wù)來(lái)了就加入隊(duì)列等待。
為什么不推薦使用SingleThreadExecutor ?
和FixedThreadPool一樣,而LinkedBlockingQueue隊(duì)列的大值是 Integer.MAX_VALUE,運(yùn)行中的線程池會(huì)一直接受任務(wù),直到隊(duì)列滿了還會(huì)接受,極端情況下會(huì)造成OOM。
構(gòu)造函數(shù)代碼如下:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue(),
threadFactory);
} 從代碼可以看出,其沒(méi)有核心線程,來(lái)一個(gè)任務(wù)就創(chuàng)建一個(gè)線程(如果之前的線程不閑下來(lái)的話),一直創(chuàng)建到Integer.MAX_VALUE為止。極端情況下,這樣會(huì)導(dǎo)致耗盡 cpu 和內(nèi)存資源。
《Java 并發(fā)編程的藝術(shù)》圖片如下:
1.首先執(zhí)行 SynchronousQueue.offer(Runnable task) 提交任務(wù)到任務(wù)隊(duì)列。如果當(dāng)前 maximumPool 中有閑線程正在執(zhí)行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主線程執(zhí)行 offer 操作與空閑線程執(zhí)行的 poll 操作配對(duì)成功,主線程把任務(wù)交給空閑線程執(zhí)行,execute()方法執(zhí)行完成,否則執(zhí)行下面的步驟 2;(有空閑線程就執(zhí)行任務(wù))
2.當(dāng)初始 maximumPool 為空,或者 maximumPool 中沒(méi)有空閑線程時(shí),將沒(méi)有線程執(zhí)行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,步驟 1 將失敗,此時(shí) CachedThreadPool 會(huì)創(chuàng)建新線程執(zhí)行任務(wù),execute 方法執(zhí)行完成;(沒(méi)有空閑線程就創(chuàng)建線程執(zhí)行任務(wù))
為什么不推薦使用CachedThreadPool?
允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE ,可能會(huì)創(chuàng)建大量線程,從而導(dǎo)致 OOM。
主要用來(lái)在給定的延遲后運(yùn)行任務(wù),或者定期執(zhí)行任務(wù)。
構(gòu)造函數(shù)代碼如下:
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}ScheduledThreadPoolExecutor 使用的任務(wù)隊(duì)列 DelayQueue 封裝了一個(gè) PriorityQueue,PriorityQueue會(huì)對(duì)隊(duì)列中的任務(wù)進(jìn)行排序,執(zhí)行所需時(shí)間短的放在前面先被執(zhí)行(ScheduledFutureTask 的 time變量小的先執(zhí)行),如果執(zhí)行所需時(shí)間相同則先提交的任務(wù)將被先執(zhí)行(ScheduledFutureTask 的 squenceNumber 變量小的先執(zhí)行)。
運(yùn)行機(jī)制圖片如下:
1、當(dāng)調(diào)用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay() 方法時(shí),會(huì)向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一個(gè)實(shí)現(xiàn)了 RunnableScheduledFuture 接口的 ScheduledFutureTask 。
2、線程池中的線程從 DelayQueue 中獲取 ScheduledFutureTask,然后執(zhí)行任務(wù)。
為什么不推薦使用ScheduledThreadPoolExecutor ?
允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE ,可能會(huì)創(chuàng)建大量線程,從而導(dǎo)致 OOM。而且在實(shí)際項(xiàng)目中應(yīng)用較少,了解即可。
《阿里開(kāi)發(fā)手冊(cè)》華山版的并發(fā)編程這一節(jié),有如下規(guī)定,參考:
多線程編程中一般線程的個(gè)數(shù)都大于 CPU 核心的個(gè)數(shù),而一個(gè) CPU 核心在任意時(shí)刻只能被一個(gè)線程使用,為了讓這些線程都能得到有效執(zhí)行,CPU 采取的策略是為每個(gè)線程分配時(shí)間片并輪轉(zhuǎn)的形式。當(dāng)一個(gè)線程的時(shí)間片用完的時(shí)候就會(huì)重新處于就緒狀態(tài)讓給其他線程使用,這個(gè)過(guò)程就屬于一次上下文切換。
概括來(lái)說(shuō)就是:當(dāng)前任務(wù)在執(zhí)行完 CPU 時(shí)間片切換到另一個(gè)任務(wù)之前會(huì)先保存自己的狀態(tài),以便下次再切換回這個(gè)任務(wù)時(shí),可以再加載這個(gè)任務(wù)的狀態(tài)。
任務(wù)從保存到再加載的過(guò)程就是一次上下文切換。
簡(jiǎn)單的公式:
CPU 密集型任務(wù)(N+1): 這種任務(wù)消耗的主要是 CPU 資源,可以將線程數(shù)設(shè)置為 N(CPU 核心數(shù))+1,比 CPU 核心數(shù)多出來(lái)的一個(gè)線程是為了防止線程偶發(fā)的缺頁(yè)中斷,或者其它原因?qū)е碌娜蝿?wù)暫停而帶來(lái)的影響。一旦任務(wù)暫停,CPU 就會(huì)處于空閑狀態(tài),而在這種情況下多出來(lái)的一個(gè)線程就可以充分利用 CPU 的空閑時(shí)間。
I/O 密集型任務(wù)(2N): 這種任務(wù)應(yīng)用起來(lái),系統(tǒng)會(huì)用大部分的時(shí)間來(lái)處理 I/O 交互,而線程在處理 I/O 的時(shí)間段內(nèi)不會(huì)占用 CPU 來(lái)處理,這時(shí)就可以將 CPU 交出給其它線程使用。因此在 I/O 密集型任務(wù)的應(yīng)用中,我們可以多配置一些線程,具體的計(jì)算方法是 2N。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
新聞標(biāo)題:多線程與高并發(fā)(15)——線程池詳解(非源碼層面)-創(chuàng)新互聯(lián)
轉(zhuǎn)載源于:http://www.chinadenli.net/article6/deejog.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供Google、網(wǎng)站收錄、網(wǎng)站營(yíng)銷、手機(jī)網(wǎng)站建設(shè)、移動(dòng)網(wǎng)站建設(shè)、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容