Java Executor 框架學(xué)習(xí)總結(jié)
大多數(shù)并發(fā)都是通過任務(wù)執(zhí)行的方式來實(shí)現(xiàn)的。一般有兩種方式執(zhí)行任務(wù):串行和并行。
- class SingleThreadWebServer {
- public static void main(String[] args) throws Exception {
- ServerSocket socket = new ServerSocket(80);
- while(true) {
- Socket conn = socket.accept();
- handleRequest(conn);
- }
- }
- }
- class ThreadPerTaskWebServer {
- public static void main(String[] args) throws Exception {
- ServerSocket socket = new ServerSocket(80);
- while(true) {
- final Socket conn = socket.accept();
- Runnable task = new Runnable() {
- public void run() {
- handleRequest(conn);
- }
- };
- new Thread(task).start();
- }
- }
- }
當(dāng)然上面的這兩種方式都是有問題的。單線程的問題就是并發(fā)量會(huì)是瓶頸,多線程版本就是***制的創(chuàng)建線程會(huì)導(dǎo)致資源不足問題。
Executor 框架
任務(wù)是一組邏輯工作單元,而線程是使任務(wù)異步執(zhí)行的機(jī)制。
JDK 提供了 Executor 接口:
- public interface Executor {
- void execute(Runnable command);
- }
雖然 Executor 接口比較簡單,但是卻是異步任務(wù)執(zhí)行框架的基礎(chǔ),該框架能支持多種不同類型的任務(wù)執(zhí)行策略。它提供了一種標(biāo)準(zhǔn)的方式把任務(wù)的提交過程與執(zhí)行過程進(jìn)行了解 耦。用 Runnable 來代表任務(wù)。Executor 的實(shí)現(xiàn)提供了對生命周期的支持以及統(tǒng)計(jì)信息應(yīng)用程序管理等機(jī)制。
Executor 是基于生產(chǎn)者消費(fèi)者模式的,提交任務(wù)的操作相當(dāng)于生產(chǎn)者,執(zhí)行任務(wù)的線程相當(dāng)于消費(fèi)。
基于 Executor 的 WebServer 例子如下:
- public class TaskExecutorWebServer {
- private static final int NTHREADS = 100;
- private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
- public static void main(String[] args) throws Exception {
- ServerSocket serverSocket = new ServerSocket(80);
- while (true) {
- final Socket conn = serverSocket.accept();
- Runnable task = new Runnable() {
- @Override
- public void run() {
- handleRequest(conn);
- }
- };
- exec.execute(task);
- }
- }
- }
另外可以自己實(shí)現(xiàn) Executor 來控制是并發(fā)還是并行的,如下面代碼:
- /**
- * 執(zhí)行已提交的 Runnable 任務(wù)的對象。
- * 此接口提供一種將任務(wù)提交與每個(gè)任務(wù)將如何運(yùn)行的機(jī)制(包括線程使用的細(xì)節(jié)、調(diào)度等)分離開來的方法。
- * 通常使用 Executor 而不是顯式地創(chuàng)建線程。
- *
- *
- * @author renchunxiao
- *
- */
- public class ExecutorDemo {
- public static void main(String[] args) {
- Executor executor = new ThreadExecutor();
- executor.execute(new Runnable() {
- @Override
- public void run() {
- // do something
- }
- });
- Executor executor2 = new SerialExecutor();
- executor2.execute(new Runnable() {
- @Override
- public void run() {
- // do something
- }
- });
- }
- }
- /**
- * 創(chuàng)建一個(gè)線程來執(zhí)行 command
- *
- * @author renchunxiao
- *
- */
- class ThreadExecutor implements Executor {
- @Override
- public void execute(Runnable command) {
- new Thread(command).start();
- }
- }
- /**
- * 串行執(zhí)行 command
- *
- * @author renchunxiao
- *
- */
- class SerialExecutor implements Executor {
- @Override
- public void execute(Runnable command) {
- command.run();
- }
- }
線程池
線程池就是線程的資源池,可以通過 Executors 中的靜態(tài)工廠方法來創(chuàng)建線程池。
-
newFixedThreadPool。創(chuàng)建固定長度的線程池,每次提交任務(wù)創(chuàng)建一個(gè)線程,直到達(dá)到線程池的***數(shù)量,線程池的大小不再變化。
-
newSingleThreadExecutor。單個(gè)線程池。
-
newCachedThreadPool。根據(jù)任務(wù)規(guī)模變動(dòng)的線程池。
-
newScheduledThreadPool。創(chuàng)建固定長度的線程池,以延遲或定時(shí)的方式來執(zhí)行任務(wù)。
JVM 只有在所有非守護(hù)線程全部終止后才會(huì)退出,所以,如果無法正確的關(guān)閉 Executor,那么 JVM 就無法結(jié)束。
為了解決執(zhí)行服務(wù)的生命周期問題,有個(gè)擴(kuò)展 Executor 接口的新接口 ExecutorService。
- public interface ExecutorService extends Executor {
- void shutdown();
- List<Runnable> shutdownNow();
- boolean isShutdown();
- boolean isTerminated();
- boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException;
- <T> Future<T> submit(Callable<T> task);
- <T> Future<T> submit(Runnable task, T result);
- Future<?> submit(Runnable task);
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException;
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException;
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException;
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
ExecutorService 生命周期有三種狀態(tài):運(yùn)行、關(guān)閉、已終止。ExecutorService 在初始創(chuàng)建時(shí)處于運(yùn)行狀態(tài)。shutdown 方法會(huì)平緩關(guān)閉:不在接受新的任務(wù),并且等待已經(jīng)執(zhí)行的任務(wù)執(zhí)行完成(包括那些還未開始的任務(wù))。shutdownNow 方法將粗暴關(guān)閉:它將嘗試取消所有運(yùn)行中的任務(wù),并且不再啟動(dòng)隊(duì)列中尚未開始的任務(wù)。所有任務(wù)都執(zhí)行完成后進(jìn)入到已終止?fàn)顟B(tài)。
Callable 和 Future
Executor 框架使用 Runnable 作為基本的任務(wù)表示形式。Runnable 是一種有局限性的抽象,它的 run 方法不能返回值和拋出一個(gè)受檢查異常。
許多任務(wù)實(shí)際上是存在延時(shí)的計(jì)算,例如數(shù)據(jù)庫查詢,從網(wǎng)絡(luò)獲取資源。對于這些任務(wù),Callable 是更好的抽象,它認(rèn)為 call 將返回一個(gè)值,并且可能拋出異常。
Executor 執(zhí)行的任務(wù)有四個(gè)生命周期階段:創(chuàng)建、提交、開始和完成。由于有些任務(wù)需要很長時(shí)間有可能希望取消,在 Executor 框架當(dāng)中,已提交未開始的任務(wù)可以取消。
Future 表示一個(gè)任務(wù)的生命周期,并且提供了相應(yīng)的方法來判斷是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果和取消任務(wù)等。