新聞中心
(手機橫屏看源碼更方便)
注:java源碼分析部分如無特殊說明均基于 java8 版本。
簡介
Java的線程池是塊硬骨頭,對線程池的源碼做深入研究不僅能提高對Java整個并發(fā)編程的理解,也能提高自己在面試中的表現(xiàn),增加被錄取的可能性。
本系列將分成很多個章節(jié),本章作為線程池的第一章將對整個線程池體系做一個總覽。
體系結(jié)構(gòu)
上圖列舉了線程池中非常重要的接口和類:
(1)Executor,線程池頂級接口;
(2)ExecutorService,線程池次級接口,對Executor做了一些擴展,增加一些功能;
(3)ScheduledExecutorService,對ExecutorService做了一些擴展,增加一些定時任務(wù)相關(guān)的功能;
(4)AbstractExecutorService,抽象類,運用模板方法設(shè)計模式實現(xiàn)了一部分方法;
(5)ThreadPoolExecutor,普通線程池類,這也是我們通常所說的線程池,包含最基本的一些線程池操作相關(guān)的方法實現(xiàn);
(6)ScheduledThreadPoolExecutor,定時任務(wù)線程池類,用于實現(xiàn)定時任務(wù)相關(guān)功能;
(7)ForkJoinPool,新型線程池類,java7中新增的線程池類,基于工作竊取理論實現(xiàn),運用于大任務(wù)拆小任務(wù)、任務(wù)無限多的場景;
(8)Executors,線程池工具類,定義了一些快速實現(xiàn)線程池的方法(謹慎使用);
Executor
線程池頂級接口,只定義了一個執(zhí)行無返回值任務(wù)的方法。
public interface Executor {
// 執(zhí)行無返回值任務(wù),本文由公從號“彤哥讀源碼”原創(chuàng)
void execute(Runnable command);
}
ExecutorService
線程池次級接口,對Executor做了一些擴展,主要增加了關(guān)閉線程池、執(zhí)行有返回值任務(wù)、批量執(zhí)行任務(wù)的方法。
public interface ExecutorService extends Executor {
// 關(guān)閉線程池,不再接受新任務(wù),但已經(jīng)提交的任務(wù)會執(zhí)行完成
void shutdown();
// 立即關(guān)閉線程池,嘗試停止正在運行的任務(wù),未執(zhí)行的任務(wù)將不再執(zhí)行
// 被迫停止及未執(zhí)行的任務(wù)將以列表的形式返回
List shutdownNow();
// 檢查線程池是否已關(guān)閉
boolean isShutdown();
// 檢查線程池是否已終止,只有在shutdown()或shutdownNow()之后調(diào)用才有可能為true
boolean isTerminated();
// 在指定時間內(nèi)線程池達到終止狀態(tài)了才會返回true
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 執(zhí)行有返回值的任務(wù),任務(wù)的返回值為task.call()的結(jié)果
Future submit(Callable task);
// 執(zhí)行有返回值的任務(wù),任務(wù)的返回值為這里傳入的result
// 當然只有當任務(wù)執(zhí)行完成了調(diào)用get()時才會返回
Future submit(Runnable task, T result);
// 執(zhí)行有返回值的任務(wù),任務(wù)的返回值為null
// 當然只有當任務(wù)執(zhí)行完成了調(diào)用get()時才會返回
Future> submit(Runnable task);
// 批量執(zhí)行任務(wù),只有當這些任務(wù)都完成了這個方法才會返回
List> invokeAll(Collection extends Callable> tasks)
throws InterruptedException;
// 在指定時間內(nèi)批量執(zhí)行任務(wù),未執(zhí)行完成的任務(wù)將被取消
// 這里的timeout是所有任務(wù)的總時間,不是單個任務(wù)的時間
List> invokeAll(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 返回任意一個已完成任務(wù)的執(zhí)行結(jié)果,未執(zhí)行完成的任務(wù)將被取消
T invokeAny(Collection extends Callable> tasks)
throws InterruptedException, ExecutionException;
// 在指定時間內(nèi)如果有任務(wù)已完成,則返回任意一個已完成任務(wù)的執(zhí)行結(jié)果,未執(zhí)行完成的任務(wù)將被取消
T invokeAny(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ScheduledExecutorService
對ExecutorService做了一些擴展,增加一些定時任務(wù)相關(guān)的功能,主要包含兩大類:執(zhí)行一次,重復(fù)多次執(zhí)行。
public interface ScheduledExecutorService extends ExecutorService {
// 在指定延時后執(zhí)行一次
public ScheduledFuture> schedule(Runnable command,
long delay, TimeUnit unit);
// 在指定延時后執(zhí)行一次
public ScheduledFuture schedule(Callable callable,
long delay, TimeUnit unit);
// 在指定延時后開始執(zhí)行,并在之后以指定時間間隔重復(fù)執(zhí)行(間隔不包含任務(wù)執(zhí)行的時間)
// 相當于之后的延時以任務(wù)開始計算,本文由公從號“彤哥讀源碼”原創(chuàng)
public ScheduledFuture> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 在指定延時后開始執(zhí)行,并在之后以指定延時重復(fù)執(zhí)行(間隔包含任務(wù)執(zhí)行的時間)
// 相當于之后的延時以任務(wù)結(jié)束計算
public ScheduledFuture> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
AbstractExecutorService
抽象類,運用模板方法設(shè)計模式實現(xiàn)了一部分方法,主要為執(zhí)行有返回值任務(wù)、批量執(zhí)行任務(wù)的方法。
public abstract class AbstractExecutorService implements ExecutorService {
protected RunnableFuture newTaskFor(Runnable runnable, T value) {
return new FutureTask(runnable, value);
}
protected RunnableFuture newTaskFor(Callable callable) {
return new FutureTask(callable);
}
public Future> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public Future submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public T invokeAny(Collection extends Callable> tasks)
throws InterruptedException, ExecutionException {
// 略...
}
public T invokeAny(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
// 略...
}
public List> invokeAll(Collection extends Callable> tasks)
throws InterruptedException {
// 略...
}
public List> invokeAll(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
// 略...
}
}
可以看到,這里的submit()方法對傳入的任務(wù)都包裝成了FutureTask來進行處理,這是什么東西呢?歡迎關(guān)注后面的章節(jié)。
ThreadPoolExecutor
普通線程池類,這也是我們通常所說的線程池,包含最基本的一些線程池操作相關(guān)的方法實現(xiàn)。
線程池的主要實現(xiàn)邏輯都在這里面,比如線程的創(chuàng)建、任務(wù)的處理、拒絕策略等,我們后面單獨分析這個類。
ScheduledThreadPoolExecutor
定時任務(wù)線程池類,用于實現(xiàn)定時任務(wù)相關(guān)功能,將任務(wù)包裝成定時任務(wù),并按照定時策略來執(zhí)行,我們后面單獨分析這個類。
問題:你知道定時任務(wù)線程池類使用的是什么隊列嗎?
ForkJoinPool
新型線程池類,java7中新增的線程池類,這個線程池與Go中的線程模型特別類似,都是基于工作竊取理論,特別適合于處理歸并排序這種先分后合的場景。
Executors
線程池工具類,定義了一系列快速實現(xiàn)線程池的方法——newXXX(),不過阿里手冊是不建議使用這個類來新建線程池的,彤哥我并不這么認為,只要能掌握其源碼,知道其利敝偶爾還是可以用的,后面我們再來說這個事。
彩蛋
無彩蛋不歡,今天的問題是定時任務(wù)線程池用的是哪種隊列來實現(xiàn)的?
答:延時隊列。定時任務(wù)線程池中并沒有直接使用并發(fā)集合中的DelayQueue,而是自己又實現(xiàn)了一個DelayedWorkQueue,不過跟DelayQueue的實現(xiàn)原理是一樣的。
延時隊列使用什么數(shù)據(jù)結(jié)構(gòu)來實現(xiàn)的呢?
答:堆(DelayQueue中使用的是優(yōu)先級隊列,而優(yōu)先級隊列使用的堆;DelayedWorkQueue直接使用的堆)。
關(guān)于延時隊列、優(yōu)先級隊列和堆的相關(guān)內(nèi)容點擊下面的鏈接直達:
死磕 java集合之DelayQueue源碼分析
死磕 java集合之PriorityQueue源碼分析
拜托,面試別再問我堆(排序)了!
創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國云服務(wù)器,動態(tài)BGP最優(yōu)骨干路由自動選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機房獨有T級流量清洗系統(tǒng)配攻擊溯源,準確進行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動現(xiàn)已開啟,新人活動云服務(wù)器買多久送多久。
當前題目:死磕java線程系列之線程池深入解析——體系結(jié)構(gòu)-創(chuàng)新互聯(lián)
當前網(wǎng)址:http://fisionsoft.com.cn/article/cdoggc.html