Java新的結(jié)構(gòu)化并行模式入門指南
譯文譯者 | 布加迪
審校 | 重樓
結(jié)構(gòu)化并發(fā)是Java中使用多線程的一種新方式。它允許開發(fā)人員在充分利用傳統(tǒng)線程和虛擬線程的同時(shí)考慮邏輯組中的工作。結(jié)構(gòu)化并發(fā)出現(xiàn)在Java 21的預(yù)覽版中,它是決定Java未來(lái)的一個(gè)關(guān)鍵方面,所以現(xiàn)在是開始使用它的好時(shí)機(jī)。
為什么我們需要結(jié)構(gòu)化并發(fā)?
編寫并發(fā)軟件是軟件開發(fā)者面臨的最大挑戰(zhàn)之一。Java的線程模式使其成為并發(fā)語(yǔ)言中的有力競(jìng)爭(zhēng)者,但是多線程一直天生很棘手。結(jié)構(gòu)化并發(fā)允許您使用具有結(jié)構(gòu)化編程語(yǔ)法的多線程。實(shí)質(zhì)上,它提供了一種使用熟悉的程序流程和構(gòu)件編寫并發(fā)軟件的方法。這讓開發(fā)者可以專注于手頭的事務(wù),而不是線程編排。正如結(jié)構(gòu)化并發(fā)性的JEP所說:“如果一個(gè)任務(wù)分成并發(fā)子任務(wù),它們都回到相同的位置,即任務(wù)的代碼塊。”
虛擬線程現(xiàn)在是Java的一項(xiàng)正式特性,它可以低成本生成線程,從而獲得并發(fā)性能。結(jié)構(gòu)化并發(fā)提供了這么做的簡(jiǎn)單語(yǔ)法。因此,Java現(xiàn)在有了一個(gè)獨(dú)特的、高度優(yōu)化的線程系統(tǒng),而且易于理解。
新的StructuredTaskScope類
結(jié)構(gòu)化并發(fā)中的主要類是java.util.concurrent.StructuredTaskScope。Java 21文檔包含如何使用結(jié)構(gòu)化并發(fā)的示例。截止本文發(fā)稿時(shí),您需要使用--enable-preview和--source 21或--source 22來(lái)啟用Java程序中的結(jié)構(gòu)化并發(fā)。我的$java --version是openjdk 22-ea,所以我們使用Maven的示例將為編譯步驟指定--enable-preview --source 22,為執(zhí)行步驟指定--enable-preview。注意,SDKMan對(duì)于管理多個(gè)JDK安裝是一個(gè)很好的選擇。
您可以在本文的GitHub代碼存儲(chǔ)庫(kù)中找到示例代碼。注意為執(zhí)行設(shè)置—enable-preview的.mvn/jvm.config文件。若要運(yùn)行代碼,使用$mvn clean compile exec:java。
具有結(jié)構(gòu)化并發(fā)的多線程
就本文示例而言,我們將向Star Wars API(SWAPI)發(fā)出幾個(gè)請(qǐng)求,通過行星的ID獲取有關(guān)行星的信息。如果我們?cè)跇?biāo)準(zhǔn)的同步Java中執(zhí)行此操作,可能會(huì)使用Apache HTTPClient執(zhí)行類似代碼片段1的操作。
代碼片段1. 類似傳統(tǒng)方法的多API調(diào)用
package com.infoworld;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
public class App {
public String getPlanet(int planetId) throws Exception {
System.out.println("BEGIN getPlanet()");
String url = "https://swapi.dev/api/planets/" + planetId + "/";
String ret = "?";
CloseableHttpClient httpClient = HttpClients.createDefault();
HttpGet request = new HttpGet(url);
CloseableHttpResponse response = httpClient.execute(request);
// Check the response status code
if (response.getStatusLine().getStatusCode() != 200) {
System.err.println("Error fetching planet information for ID: " + planetId);
throw new RuntimeException("Error fetching planet information for ID: " + planetId);
} else {
// Parse the JSON response and extract planet information
ret = EntityUtils.toString(response.getEntity());
System.out.println("Got a Planet: " + ret);
}
// Close the HTTP response and client
response.close();
httpClient.close();
return ret;
}
void sync() throws Exception {
int[] planetIds = {1,2,3,4,5};
for (int planetId : planetIds) {
getPlanet(planetId);
}
}
public static void main(String[] args) {
var myApp = new App();
System.out.println("\n\r-- BEGIN Sync");
try {
myApp.sync();
} catch (Exception e){
System.err.println("Error: " + e);
}
}
}
在代碼片段1中,我們有一個(gè)調(diào)用sync()方法的主方法,該方法在調(diào)用“https://swapi.dev/api/planets/”+ planetId端點(diǎn)時(shí),只是對(duì)一組ID進(jìn)行迭代處理。這些調(diào)用通過getPlanet()方法發(fā)出,該方法使用Apache HTTP庫(kù)來(lái)處理樣板請(qǐng)求、響應(yīng)和錯(cuò)誤處理。實(shí)際上,該方法接收每個(gè)響應(yīng),如果正確(200),輸出到控制臺(tái),否則拋出錯(cuò)誤。(這些示例使用了最少的錯(cuò)誤,所以在這種情況下我們只拋出RuntimeException。)
輸出是這樣的:
-- BEGIN Sync
BEGIN getPlanet()
Got a Planet: {"name":"Tatooine"}
BEGIN getPlanet()
Got a Planet: {"name":"Alderaan"}
BEGIN getPlanet()
Got a Planet: {"name":"Yavin”}
BEGIN getPlanet()
Got a Planet: {"name":"Hoth"}
BEGIN getPlanet()
Got a Planet: {"name":"Dagobah"}
現(xiàn)在不妨使用結(jié)構(gòu)化并發(fā)嘗試同一個(gè)示例。如代碼片段2所示,結(jié)構(gòu)化并發(fā)允許我們將調(diào)用分解成并發(fā)請(qǐng)求,并將所有內(nèi)容放在相同的代碼空間中。在代碼片段2中,我們添加了必要的StructuredTaskScope導(dǎo)入,然后使用其核心方法fork()和join(),將每個(gè)請(qǐng)求分解成各自的線程,然后等待它們?nèi)客瓿伞?/span>
代碼片段2. 使用StructuredTaskScopeNow的多API調(diào)用
package com.infoworld;
import java.util.concurrent.*;
import java.util.concurrent.StructuredTaskScope.*;
//...
public class App {
public String getPlanet(int planetId) throws Exception {
// ... same ...
}
void sync() throws Exception {
int[] planetIds = {1,2,3,4,5};
for (int planetId : planetIds) {
getPlanet(planetId);
}
}
void sc() throws Exception {
int[] planetIds = {1,2,3,4,5};
try (var scope = new StructuredTaskScope<Object>()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
}catch (Exception e){
System.out.println("Error: " + e);
}
}
public static void main(String[] args) {
var myApp = new App();
// ...
System.out.println("\n\r-- BEGIN Structured Concurrency");
try {
myApp.sc();
} catch (Exception e){
System.err.println("Error: " + e);
}
}
}
如果我們運(yùn)行代碼片段2,將得到類似的輸出,但速度要快不少,這是由于請(qǐng)求是同時(shí)發(fā)出、并發(fā)進(jìn)行的。不妨考慮sc()方法(使用多線程)與sync()方法(使用同步代碼)之間的區(qū)別。結(jié)構(gòu)化并發(fā)方法沒有想象的那么難,提供結(jié)果的速度卻快得多。
處理任務(wù)和子任務(wù)
默認(rèn)情況下,StructuredTaskScope被創(chuàng)建時(shí),它使用虛擬線程,所以我們實(shí)際上并沒有在這里配置操作系統(tǒng)線程;相反,我們告訴JVM以最有效的方式編排請(qǐng)求。(StructuredTaskScope的構(gòu)造函數(shù)也接受ThreadFactory。)
在代碼片段2中,我們?cè)趖ry-with-resource塊中創(chuàng)建StructuredTaskScope對(duì)象,這是它原本的使用方式。我們可以使用fork()創(chuàng)建任意數(shù)量的作業(yè)。fork()方法接受任何實(shí)現(xiàn)Callable的程序,也就是說,任何方法或函數(shù)。這里,我們將getPlanet()方法包裝在一個(gè)匿名函數(shù)中:()-> getPlanet(planetId)——這是一種向目標(biāo)函數(shù)傳遞參數(shù)的實(shí)用語(yǔ)法。
當(dāng)我們調(diào)用join()時(shí),我們告訴作用域等待所有被分叉的作業(yè)。實(shí)質(zhì)上,join()將我們帶回到同步模式。分叉的作業(yè)將按照TaskScope的配置進(jìn)行處理。
關(guān)閉任務(wù)作用域
由于我們?cè)趖ry-with-resource塊中創(chuàng)建了TaskScope,因此當(dāng)該塊結(jié)束時(shí),作用域將自動(dòng)關(guān)閉。這為作用域調(diào)用shutdown()進(jìn)程,作用域可以定制,以便根據(jù)需要來(lái)處理運(yùn)行中線程的處置。如果需要在作用域關(guān)閉之前關(guān)閉它,也可以手動(dòng)調(diào)用shutdown()方法。
StructuredTaskScope包括兩個(gè)實(shí)現(xiàn)內(nèi)置關(guān)閉策略的類:ShutDownOnSuccess和ShutDownOnFailure。這些類監(jiān)視成功或出錯(cuò)的子任務(wù),然后取消其余運(yùn)行中的線程。使用目前的設(shè)置,我們可以這樣使用這些類:
代碼片段3. 內(nèi)置關(guān)閉策略
void failFast() throws ExecutionException, InterruptedException {
int[] planetIds = {1,2,3,-1,4};
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
}
}
void succeedFast() throws ExecutionException, InterruptedException {
int[] planetIds = {1,2};
try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
} catch (Exception e){
System.out.println("Error: " + e);
}
}
public static void main(String[] args) {
var myApp = new App();
System.out.println("\n\r-- BEGIN succeedFast");
try {
myApp. succeedFast();
} catch (Exception e) {
System.out.println(e.getMessage());
}
System.out.println("\n\r-- BEGIN failFast");
try {
myApp.failFast();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
這些策略將給出類似以下的輸出:
-- BEGIN succeedFast
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Alderaan"}
org.apache.http.impl.execchain.RetryExec execute
INFO: I/O exception (java.net.SocketException) caught when processing request to {s}->https://swapi.dev:443: Closed by interrupt
-- BEGIN failFast
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Hoth"}
Got a Planet: {"name":"Tatooine"}
Error fetching planet information for ID: -1
org.apache.http.impl.execchain.RetryExec execute
INFO: I/O exception (java.net.SocketException) caught when processing request to {s}->https://swapi.dev:443: Closed by interrupt
因此,我們擁有的是一種簡(jiǎn)單的機(jī)制,可以并發(fā)啟動(dòng)所有請(qǐng)求,然后在一個(gè)請(qǐng)求成功或失敗時(shí)取消其余的請(qǐng)求。這里,可以進(jìn)行任何定制。結(jié)構(gòu)化并發(fā)文檔包括一個(gè)示例,在子任務(wù)成功或失敗時(shí)收集子任務(wù)結(jié)果,然后返回結(jié)果。這很容易完成,只需通過覆蓋join()方法,并觀察每個(gè)任務(wù)的結(jié)果。
StructuredTaskScope.Subtask
在我們的示例中沒有看到的一件事是觀察子任務(wù)的返回值。每次StructuredTaskScope.fork()被調(diào)用時(shí),就返回StructuredTaskScope.SubTask對(duì)象。我們可以利用它來(lái)觀察任務(wù)的狀態(tài)。比如在sc()方法中,我們可以這么做:
代碼片段4. 使用StructuredTaskScope.Subtask觀察狀態(tài)
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.ArrayList;
void sc() throws Exception {
int[] planetIds = {1,2,3,4,5};
ArrayList<Subtask> tasks = new ArrayList<Subtask>(planetIds.length);
try (var scope = new StructuredTaskScope<Object>()) {
for (int planetId : planetIds) {
tasks.add(scope.fork(() -> getPlanet(planetId)));
}
scope.join();
}catch (Exception e){
System.out.println("Error: " + e);
}
for (Subtask t : tasks){
System.out.println("Task: " + t.state());
}
}
在這個(gè)示例中,我們將每個(gè)任務(wù)保存在ArrayList中,然后在進(jìn)行join()操作之后輸出它們的狀態(tài)。注意,Subtask的可用狀態(tài)被定義為enum。這個(gè)新方法將輸出類似以下的內(nèi)容:
-- BEGIN Structured Concurrency
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
BEGIN getPlanet()
Got a Planet: {"name":"Dagobah"}
Got a Planet: {"name":"Hoth"}
Got a Planet: {"name":"Tatooine"}
Got a Planet: {"name":"Yavin IV"}
Got a Planet: {"name":"Alderaan"}
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS
Task: SUCCESS
結(jié)論
在虛擬線程和結(jié)構(gòu)化并發(fā)之間,Java開發(fā)者擁有一種引人注目的新機(jī)制,可以將幾乎所有代碼分解成并發(fā)任務(wù),不會(huì)有太大的開銷。上下文和需求很重要,所以不要僅僅因?yàn)?/span>存在這些新的并發(fā)工具就使用它們。與此同時(shí),這種組合確實(shí)提供了一些強(qiáng)大的力量。一旦您遇到出現(xiàn)許多任務(wù)的瓶頸時(shí),您可以輕松地將它們?nèi)拷唤o虛擬線程引擎,該引擎將找到編排它們的最佳方法。具有結(jié)構(gòu)化并發(fā)的新線程模式還使您易于定制和微調(diào)這種行為。
至于開發(fā)者將來(lái)如何在我們的應(yīng)用程序、框架和服務(wù)器中使用這些新的并發(fā)功能,值得我們拭目以待。
小知識(shí):結(jié)構(gòu)化并發(fā)中的線程樹
結(jié)構(gòu)化并發(fā)包括對(duì)調(diào)試和理解線程之間關(guān)系的支持。特別是,結(jié)構(gòu)化并發(fā)將所有線程關(guān)聯(lián)到樹結(jié)構(gòu)中,作用域位于根。這樣一來(lái),查看線程之間的關(guān)系就變得很簡(jiǎn)單,即便使用嵌套作用域也是如此。說明文檔提供了一個(gè)好的示例,表明如何使用Java診斷命令(jcmd)實(shí)用程序,將線程的運(yùn)行時(shí)布局轉(zhuǎn)儲(chǔ)到控制臺(tái)。
原文標(biāo)題:Get started with Java's new structured concurrency model,作者:Matthew Tyson