線程池,你會(huì)用嗎?(沒(méi)有做到精通的請(qǐng)進(jìn))
在Java并發(fā)編程領(lǐng)域,線程池是一種至關(guān)重要的工具,它能顯著提升應(yīng)用程序的性能與資源管理效率。通過(guò)復(fù)用線程,線程池避免了頻繁創(chuàng)建和銷毀線程所帶來(lái)的開(kāi)銷。在本教程中,我們將深入探討Java和Guava庫(kù)中線程池的使用。
一、Java中的線程池
(一)Executor框架
Java的java.util.concurrent包提供了Executor框架,這是管理線程池的核心。
Executor接口是該框架的基礎(chǔ),它定義了一個(gè)簡(jiǎn)單的方法execute(Runnable task),用于提交任務(wù)執(zhí)行。
Executor接口本身并不直接管理線程,而是將任務(wù)的執(zhí)行委托給實(shí)現(xiàn)類。
(二)ExecutorService
ExecutorService接口擴(kuò)展了Executor接口,提供了更豐富的功能,用于管理線程池的生命周期以及任務(wù)的提交與執(zhí)行。它包含了啟動(dòng)、關(guān)閉線程池的方法,以及提交任務(wù)并獲取執(zhí)行結(jié)果的方法。
2.1 創(chuàng)建線程池
在Java中,我們可以使用Executors類的靜態(tài)方法來(lái)創(chuàng)建不同類型的線程池:
- FixedThreadPool:創(chuàng)建一個(gè)固定大小的線程池,線程池中的線程數(shù)量在創(chuàng)建時(shí)就被確定,并且不會(huì)改變。如果提交的任務(wù)數(shù)量超過(guò)了線程池的容量,任務(wù)將被放入隊(duì)列中等待執(zhí)行。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
- CachedThreadPool:創(chuàng)建一個(gè)可緩存的線程池,如果線程池中的線程在一段時(shí)間內(nèi)沒(méi)有被使用,它們將被回收。如果提交的任務(wù)數(shù)量超過(guò)了當(dāng)前線程池中的線程數(shù)量,新的線程將被創(chuàng)建。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
- SingleThreadExecutor:創(chuàng)建一個(gè)單線程的線程池,它只使用一個(gè)線程來(lái)執(zhí)行任務(wù)。所有提交的任務(wù)將按照順序依次執(zhí)行。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
- ScheduledThreadPool:創(chuàng)建一個(gè)支持定時(shí)及周期性任務(wù)執(zhí)行的線程池??梢园才湃蝿?wù)在指定的延遲后執(zhí)行,或者定期重復(fù)執(zhí)行。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
2.2 提交任務(wù)
一旦創(chuàng)建了線程池,我們可以使用submit方法提交任務(wù)。submit方法有多種重載形式,可接受Runnable或Callable任務(wù),并返回Future對(duì)象,通過(guò)Future對(duì)象可以獲取任務(wù)的執(zhí)行結(jié)果。
Future<Integer> future = fixedThreadPool.submit(() -> {
// 執(zhí)行任務(wù)并返回結(jié)果
return 42;
});
try {
Integer result = future.get();
System.out.println("任務(wù)執(zhí)行結(jié)果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
2.3 關(guān)閉線程池
在應(yīng)用程序結(jié)束時(shí),我們需要正確關(guān)閉線程池,以確保所有任務(wù)都能正常完成,并釋放資源。ExecutorService提供了shutdown和shutdownNow方法來(lái)實(shí)現(xiàn)這一點(diǎn)。
- shutdown:?jiǎn)?dòng)一個(gè)有序關(guān)閉過(guò)程,不再接受新任務(wù),但會(huì)繼續(xù)執(zhí)行已提交的任務(wù)。
fixedThreadPool.shutdown();
- shutdownNow:嘗試停止所有正在執(zhí)行的任務(wù),停止等待任務(wù)的處理,并返回等待執(zhí)行的任務(wù)列表。
List<Runnable> tasks = fixedThreadPool.shutdownNow();
(三)示例:使用線程池進(jìn)行并行計(jì)算
假設(shè)我們有一個(gè)簡(jiǎn)單的任務(wù),需要計(jì)算一組數(shù)字的平方。我們可以使用線程池來(lái)并行執(zhí)行這些計(jì)算,以提高效率。
public class ThreadPoolExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Future<Integer>> futures = new ArrayList<>();
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
for (int number : numbers) {
Future<Integer> future = executorService.submit(() -> number * number);
futures.add(future);
}
executorService.shutdown();
for (Future<Integer> future : futures) {
try {
System.out.println("平方結(jié)果: " + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}
運(yùn)行結(jié)果是:
平方結(jié)果: 1
平方結(jié)果: 4
平方結(jié)果: 9
平方結(jié)果: 16
平方結(jié)果: 25
二、Guava中的線程池
Guava庫(kù)提供了ListeningExecutorService接口,它擴(kuò)展了ExecutorService,并提供了更方便的異步任務(wù)處理方式。ListeningExecutorService允許我們注冊(cè)監(jiān)聽(tīng)器,以便在任務(wù)完成時(shí)得到通知。
(一)創(chuàng)建ListeningExecutorService
在Guava中,我們可以使用MoreExecutors類的靜態(tài)方法來(lái)創(chuàng)建ListeningExecutorService。例如,我們可以將一個(gè)普通的ExecutorService包裝成ListeningExecutorService:
ExecutorService executorService = Executors.newFixedThreadPool(5);
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
(二)提交任務(wù)并注冊(cè)監(jiān)聽(tīng)器
提交任務(wù)后,我們可以使用Futures.addCallback方法注冊(cè)一個(gè)回調(diào),當(dāng)任務(wù)完成時(shí),回調(diào)的onSuccess或onFailure方法將被調(diào)用。
Future<Integer> future = listeningExecutorService.submit(() -> {
// 執(zhí)行任務(wù)并返回結(jié)果
return 42;
});
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
System.out.println("任務(wù)成功執(zhí)行,結(jié)果: " + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("任務(wù)執(zhí)行失敗: " + t.getMessage());
}
});
(三)示例:使用Guava線程池進(jìn)行異步任務(wù)處理
以下是一個(gè)完整的示例,展示如何使用Guava的線程池進(jìn)行異步任務(wù)處理,并注冊(cè)監(jiān)聽(tīng)器來(lái)處理任務(wù)結(jié)果。
public class GuavaThreadPoolExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
ListenableFuture<Integer> future = listeningExecutorService.submit(() -> {
// 模擬任務(wù)執(zhí)行
Thread.sleep(2000);
return 42;
});
final ExecutorService callbackExecutor = Executors.newFixedThreadPool(3);
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
System.out.println("任務(wù)成功執(zhí)行,結(jié)果: " + result);
callbackExecutor.shutdown();
}
@Override
public void onFailure(Throwable t) {
System.out.println("任務(wù)執(zhí)行失敗: " + t.getMessage());
callbackExecutor.shutdown();
}
}, callbackExecutor);
// 關(guān)閉線程池
executorService.shutdown();
}
}
運(yùn)行結(jié)果是:
任務(wù)成功執(zhí)行,結(jié)果: 42
三、補(bǔ)充
補(bǔ)充一下Executors的工廠方法:
方法 | 描述 | 適用場(chǎng)景 |
| 創(chuàng)建一個(gè)可緩存的線程池。如果線程池的當(dāng)前線程數(shù)超過(guò)了處理需求,則會(huì)回收空閑線程;如果需求增加,則可以添加新線程。 | 執(zhí)行大量短期異步任務(wù) |
| 創(chuàng)建一個(gè)固定大小的線程池。線程池中的線程數(shù)量固定,如果所有線程都在忙,新的任務(wù)會(huì)在隊(duì)列中等待。 | 負(fù)載較重且任務(wù)量穩(wěn)定的場(chǎng)景 |
| 創(chuàng)建一個(gè)支持定時(shí)及周期性任務(wù)執(zhí)行的線程池??梢哉{(diào)度命令在給定的延遲后運(yùn)行,或定期執(zhí)行。 | 需要定時(shí)執(zhí)行任務(wù)的場(chǎng)景 |
| 創(chuàng)建一個(gè)單線程化的線程池。確保所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。 | 需要保證任務(wù)順序執(zhí)行的場(chǎng)景 |
| 創(chuàng)建一個(gè)單線程的定時(shí)任務(wù)執(zhí)行器。支持定時(shí)及周期性任務(wù)執(zhí)行。 | 需要單線程執(zhí)行定時(shí)任務(wù)的場(chǎng)景 |
| 創(chuàng)建一個(gè)為每個(gè)任務(wù)創(chuàng)建新線程的執(zhí)行器。每個(gè)任務(wù)都會(huì)啟動(dòng)一個(gè)新的線程來(lái)執(zhí)行。 | 任務(wù)之間完全獨(dú)立且不需要復(fù)用線程的場(chǎng)景 |
| 創(chuàng)建一個(gè)為每個(gè)任務(wù)創(chuàng)建虛擬線程的執(zhí)行器。虛擬線程是輕量級(jí)線程,適用于高并發(fā)場(chǎng)景。 | 需要高并發(fā)且任務(wù)量大的場(chǎng)景 |
| 創(chuàng)建一個(gè)工作竊取線程池。使用 ForkJoinPool 實(shí)現(xiàn),線程池中的線程會(huì)主動(dòng)“竊取”其他線程的任務(wù)來(lái)執(zhí)行,提高 CPU 利用率。 | 計(jì)算密集型任務(wù),可以充分利用多核處理器的優(yōu)勢(shì) |
文末總結(jié)
線程池是Java并發(fā)編程中的重要工具,無(wú)論是Java原生的Executor框架還是Guava庫(kù)提供的擴(kuò)展,都為我們提供了強(qiáng)大的異步任務(wù)處理能力。通過(guò)合理使用線程池,我們可以有效提高應(yīng)用程序的性能和資源利用率。在實(shí)際應(yīng)用中,根據(jù)具體需求選擇合適的線程池類型和使用方式至關(guān)重要。