【技術革命】JDK21虛擬線程來襲,讓系統(tǒng)的吞吐量翻倍!
1. 虛擬線程簡介
虛擬線程是一種輕量級線程,可大大減少編寫、維護和觀察高吞吐量并發(fā)應用程序的工作量。從JDK19開始發(fā)布了虛擬線程的預覽功能,直到JDK21最終確定虛擬線程。
虛擬線程既廉價(相比平臺線程)又可以創(chuàng)建非常的多,因此絕不應池化:每個應用任務都應創(chuàng)建一個新的虛擬線程。因此,大多數虛擬線程的壽命都很短,調用堆棧也很淺,只需執(zhí)行一次 HTTP 客戶端調用或一次 JDBC 查詢。相比之下,平臺線程重量級、成本高,因此通常必須池化。這些線程的壽命往往較長,具有較深的調用堆棧,可在多個任務之間共享。
總之,虛擬線程保留了可靠的每請求線程風格,這種風格與 Java 平臺的設計相協調,同時還能優(yōu)化利用可用硬件。使用虛擬線程不需要學習新的概念,但可能需要放棄為應對當前線程的高成本而養(yǎng)成的習慣。虛擬線程不僅能幫助應用程序開發(fā)人員,還能幫助框架設計人員提供易于使用的 API,這些 API 與平臺設計兼容,同時又不影響可擴展性。
虛擬線程是 java.lang.Thread 的一個實例,它在底層操作系統(tǒng)線程上運行 Java 代碼,但在代碼的整個生命周期中不會捕獲操作系統(tǒng)線程。這意味著許多虛擬線程可以在同一個操作系統(tǒng)線程上運行 Java 代碼,從而有效地共享操作系統(tǒng)線程。平臺線程會壟斷寶貴的操作系統(tǒng)線程,而虛擬線程不會。虛擬線程的數量可能遠遠大于操作系統(tǒng)線程的數量。
虛擬線程是線程的一種輕量級實現,由 JDK 而不是操作系統(tǒng)提供。它們是用戶模式線程的一種形式,在其他多線程語言(如 Go 中的 goroutines(協程(輕量級線程)) 和 Erlang 中的進程)中取得了成功。用戶模式線程在 Java 早期版本中甚至被稱為 "綠色線程",當時操作系統(tǒng)線程尚未成熟和普及。然而,Java 的綠色線程都共享一個操作系統(tǒng)線程(M:1 調度),最終被作為操作系統(tǒng)線程包裝器(1:1 調度)實現的平臺線程所超越。虛擬線程采用 M:N 調度,即大量(M)虛擬線程被安排運行在較少數量(N)的操作系統(tǒng)線程上。
虛擬線程是 java.lang.Thread 的一個實例,與特定操作系統(tǒng)線程無關。相比之下,平臺線程是以傳統(tǒng)方式實現的 java.lang.Thread 實例,是操作系統(tǒng)線程的薄包裝。
2. 傳統(tǒng)請求線程模型
通常服務器應用程序處理相互獨立的并發(fā)請求時,在請求的整個持續(xù)聲明周期內為該請求指定一個線程來處理該請求。這種按請求線程的風格易于理解、易于編程、易于調試和配置。
對于一個請求處理的處理時間,應用程序同時處理的請求數(即并發(fā)數)必須與吞吐量成比例增長。例如,假設一個平均延遲為 50 毫秒的請求并發(fā)處理 10 個請求,實現了每秒 200 個請求的吞吐量。若要將該應用的吞吐量提高到到每秒 2000 個請求,則需要并發(fā)處理 100 個請求。如果每個請求在請求持續(xù)時間內都由一個線程處理,那么要使應用程序跟上進度,線程數必須隨著吞吐量的增加而增加。
由于 JDK 將線程作為操作系統(tǒng)(OS)線程的包裝器來實現。操作系統(tǒng)線程的成本很高,所以我們不能擁有太多的線程,這就使得線程的實現不適合按請求執(zhí)行的方式。如果每個請求在其生命周期內都要使用一個線程,也就是一個操作系統(tǒng)線程,那么在 CPU 或網絡連接等其他資源耗盡之前,線程數量往往就已經成為限制因素了。JDK 當前的線程實現將應用程序的吞吐量限制在遠低于硬件支持的水平。即使對線程進行了池化,也會出現這種情況,因為池化有助于避免啟動新線程的高昂成本,但不會增加線程總數。
3. 虛擬線程使用
使用方式1:
// 創(chuàng)建一個執(zhí)行器,為每個任務啟動一個新的虛擬線程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, 10_000).forEach(i -> {
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return i;
});
});
}
本例中的任務是簡單的代碼--休眠1秒--現代硬件可以輕松支持 10,000 個虛擬線程同時運行此類代碼。而實際上,JDK 只在少量操作系統(tǒng)線程(可能只有一個)上運行此代碼代碼。
如果該程序使用 ExecutorService(例如 Executors.newCachedThreadPool())為每個任務創(chuàng)建一個新的平臺線程,情況就會截然不同。ExecutorService 會嘗試創(chuàng)建 10,000 個平臺線程,從而創(chuàng)建 10,000 個操作系統(tǒng)線程,根據機器和操作系統(tǒng)的不同,程序可能會崩潰。
即便使用Executors.newFixedThreadPool(200)創(chuàng)建固定數量的線程,情況也不會好到哪里去。ExecutorService 將創(chuàng)建 200 個平臺線程,供所有 10,000 個任務共享,因此許多任務將順序運行而非并發(fā)運行,程序將需要很長時間才能完成。對于該程序而言,擁有 200 個平臺線程的池每秒只能完成 200 個任務,而虛擬線程每秒可完成約 10,000 個任務(經過充分預熱后)。此外,如果將示例程序中的 10_000 改為 1_000_000,那么程序將提交 1,000,000 個任務,創(chuàng)建 1,000,000 個虛擬線程并發(fā)運行,(充分預熱后)吞吐量將達到每秒約 1,000,000 個任務。
注意:如果程序中的任務在一秒鐘內執(zhí)行計算(例如對一個巨大的數組進行排序),而不僅僅是休眠,那么增加線程數超過處理器內核數將無濟于事,無論它們是虛擬線程還是平臺線程。虛擬線程不是更快的線程--它們運行代碼的速度并不比平臺線程快。它們的存在是為了提供規(guī)模(更高的吞吐量),而不是速度(更低的延遲)。虛擬線程的數量可能比平臺線程多得多,因此根據利特爾定律,虛擬線程可以提供更高吞吐量所需的更高并發(fā)性。
使用方式2:
手動創(chuàng)建虛擬線程
// 創(chuàng)建虛擬線程
OfVirtual virtual = Thread.ofVirtual().name("pack") ;
virtual.start(() -> {
System.out.printf("%s - 任務執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 創(chuàng)建不自動啟動的線程
Thread thread = virtual.unstarted(() -> {
System.out.printf("%s - 任務執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 手動啟動虛擬線程
thread.start() ;
// 打印線程對象:VirtualThread[#21,pack]/runnable
System.out.println(thread) ;
// 創(chuàng)建普通線程
OfPlatform platform = Thread.ofPlatform().name("pack") ;
Thread thread = platform.start(() -> {
System.out.printf("%s - 任務執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 這里輸出:Thread[#21,pack,5,main]
System.out.println(thread) ;
在上面的代碼中,打印thread輸出的不是對應的平臺線程,而是虛擬線程
VirtualThread[#21,pack]/runnable
在執(zhí)行的任務中通過Thread.currentThread().getName()方法是沒有任何信息,我們可以通過上面的name()方法來設置線程的名稱及相關的前綴。如下:
Thread.ofPlatform().name("pack") ;
Thread.ofVirtual().name("pack", 0) ;
使用方式3:
通過ThreadFactory工廠創(chuàng)建
ThreadFactory threadFactory = Thread.ofVirtual().factory() ;
threadFactory.newThread(() -> {
System.out.printf("%s - 任務執(zhí)行完成", Thread.currentThread().getName()) ;
}).start() ;
使用方式4:
直接通過Thread靜態(tài)方法
Thread.startVirtualThread(() -> {
System.out.printf("%s - 任務執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
4. 虛擬線程與傳統(tǒng)線程池對比
使用虛擬線程
public class Demo06 {
static class Task implements Runnable {
@Override
public void run() {
System.err.printf("start - %d%n", System.currentTimeMillis()) ;
try {
Thread.sleep(Duration.ofSeconds(1));
} catch (InterruptedException e) {}
System.err.printf(" end - %d%n", System.currentTimeMillis()) ;
}
}
public static void main(String[] args) throws Exception {
ExecutorService es= Executors.newVirtualThreadPerTaskExecutor() ;
es.submit(new Task()) ;
es.submit(new Task()) ;
es.submit(new Task()) ;
System.in.read() ;
}
}
輸出結果:
start - 1698827467289
start - 1698827467289
start - 1698827467291
end - 1698827468317
end - 1698827468317
end - 1698827468317
從結果看出,基本是同時開始,結束也是基本一起結束,總耗時1s。
使用傳統(tǒng)線程
任務都一樣,只是創(chuàng)建線程池的類型修改
public static void main(String[] args) throws Exception {
ExecutorService es= Executors.newFixedThreadPool(1) ;
es.submit(new Task()) ;
es.submit(new Task()) ;
es.submit(new Task()) ;
}
輸出結果:
start - 1698827686133
end - 1698827687165
start - 1698827687165
end - 1698827688177
start - 1698827688177
end - 1698827689178
從結果知道這里是一個任務一個任務的執(zhí)行串行化,但是你注意觀察,其實每個任務的的開始start 的輸出都是要等前一個線程執(zhí)行完了后才能執(zhí)行。結合上面的虛擬線程對比,start是同時輸出的,這也是虛擬線程的有點了。
5. 使用案例
這是一個遠程接口調用的示例:
遠程3個接口,如下:
@GetMapping("/userinfo")
public Object queryUserInfo() {
try {
TimeUnit.SECONDS.sleep(2) ;
} catch (InterruptedException e) {e.printStackTrace();}
return "查詢用戶信息" ;
}
@GetMapping("/stock")
public Object queryStock() {
try {
TimeUnit.SECONDS.sleep(3) ;
} catch (InterruptedException e) {e.printStackTrace();}
return "查詢庫存信息" ;
}
@GetMapping("/order")
public Object queryOrder() {
try {
TimeUnit.SECONDS.sleep(4) ;
} catch (InterruptedException e) {e.printStackTrace();}
return "查詢訂單信息" ;
}
接口調用服務,如下:
@Resource
private RestTemplate restTemplate ;
public Map<String, Object> rpc() {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var start = System.currentTimeMillis() ;
// 1.查詢用戶信息
var userinfo = executor.submit(() -> query("http://localhost:8080/demos/userinfo"));
// 2.查詢庫存信息
var stock = executor.submit(() -> query("http://localhost:8080/demos/stock"));
// 3.查詢訂單信息
var order = executor.submit(() -> query("http://localhost:8080/demos/order"));
Map<String, Object> res = Map.of("userinfo", userinfo.get(), "stock", stock.get(), "order", order.get()) ;
System.out.printf("總計耗時:%d毫秒%n", (System.currentTimeMillis() - start)) ;
return res ;
} catch (Exception e) {
return Map.of() ;
}
}
private Object query(String url) {
return this.restTemplate.getForObject(url, String.class) ;
}
在這個案例中,如果使用傳統(tǒng)的線程池,如果并發(fā)量大,那么很可能很多的任務都要排隊等待,或者你需要創(chuàng)建更多的平臺線程來滿足吞吐量問題。但是現在有了虛擬線程你可以不用再考慮線程不夠用的情況了,每個任務的執(zhí)行都會被一個虛擬的線程執(zhí)行(不是平臺線程,可能這些虛擬線程只會對應到一個平臺線程)。
虛擬線程可在以下情況顯著提高應用吞吐量:
- 并發(fā)任務的數量很高(超過幾千)
- 工作負載不受cpu限制,因為在這種情況下,線程比處理器內核多并不能提高吞吐量
6. 結構化并發(fā)(預覽功能)
結構化并發(fā)目前還是預覽功能,并沒有在JDK21中正式發(fā)布,不過我們可以先來看看什么是結構化并發(fā)。
結構化并發(fā) API 是來簡化并發(fā)編程。結構化并發(fā)將在不同線程中運行的一組相關任務視為一個工作單元,從而簡化了錯誤處理和取消,提高了可靠性,并增強了可觀察性。
結構化并發(fā)的目標是:
- 推廣一種并發(fā)編程風格,消除因取消和關閉而產生的常見風險,如線程泄漏和取消延遲。
- 提高并發(fā)代碼的可觀察性。
通過示例來理解結構化并發(fā)。
如下示例是通過傳統(tǒng)線程池的方式并發(fā)的從遠程獲取信息,代碼如下:
static RestTemplate restTemplate = new RestTemplate() ;
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(2) ;
Future<Object> userInfo = es.submit(UnstructuredConcurrentDemo::queryUserInfo) ;
Future<Object> stock = es.submit(UnstructuredConcurrentDemo::queryStock) ;
Object userInfoRet = userInfo.get() ;
System.out.printf("執(zhí)行結果:用戶信息:%s%n", userInfoRet.toString()) ;
Object stockRet = stock.get() ;
System.out.printf("執(zhí)行結果:庫存信息:%s%n", stockRet.toString()) ;
}
public static Object queryUserInfo() {
return restTemplate.getForObject("http://localhost:8080/demos/userinfo", String.class) ;
}
public static Object queryStock() {
return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;
}
上面的代碼中沒有什么問題,程序都能夠運行的正常,結果如下:
08:49:53.502 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
08:49:53.504 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結果:用戶信息:查詢用戶信息
08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結果:庫存信息:查詢庫存信息
但是如果其中一個任務執(zhí)行失敗了后會如何呢?將其中一個任務拋出異常,如下代碼:
public static Object queryStock() {
System.out.println(1 / 0) ;
return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;
}
再次執(zhí)行代碼,結果如下:
發(fā)生異常:java.lang.ArithmeticException: / by zero
09:06:05.938 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock
09:06:05.948 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]
09:06:08.972 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
09:06:08.974 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結果:庫存信息:查詢庫存信息
從結果看出,獲取用戶信息子任務發(fā)生異常后,并不會影響到獲取庫存子任務的執(zhí)行。
通過結構化并發(fā)方式
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<Object> userInfo = scope.fork(UnstructuredConcurrentDemo::queryUserInfo) ;
Supplier<Object> stock = scope.fork(UnstructuredConcurrentDemo::queryStock) ;
// 等待在此任務范圍內啟動的所有子任務完成或某個子任務失敗。
scope.join() ;
Object userInfoRet = userInfo.get() ;
System.out.printf("執(zhí)行結果:用戶信息:%s%n", userInfoRet.toString()) ;
Object stockRet = stock.get() ;
System.out.printf("執(zhí)行結果:庫存信息:%s%n", stockRet.toString()) ;
}
當一個子任務發(fā)生錯誤時,其它的子任務會在未完成的情況下取消,執(zhí)行結果如下:
08:59:51.951 [] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock
08:59:51.961 [] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]
Exception in thread "main" java.lang.IllegalStateException: Subtask not completed or did not complete successfully
at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.get(StructuredTaskScope.java:936)
at com.pack.rpc.UnstructuredConcurrentDemo.structured(UnstructuredConcurrentDemo.java:26)
at com.pack.rpc.UnstructuredConcurrentDemo.main(UnstructuredConcurrentDemo.java:17)
從控制臺的輸出看出,獲取庫存的調用被取消了。
完畢!??!