自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

【技術革命】JDK21虛擬線程來襲,讓系統(tǒng)的吞吐量翻倍!

開發(fā) 前端
通常服務器應用程序處理相互獨立的并發(fā)請求時,在請求的整個持續(xù)聲明周期內為該請求指定一個線程來處理該請求。這種按請求線程的風格易于理解、易于編程、易于調試和配置。

1. 虛擬線程簡介

虛擬線程是一種輕量級線程,可大大減少編寫、維護和觀察高吞吐量并發(fā)應用程序的工作量。從JDK19開始發(fā)布了虛擬線程的預覽功能,直到JDK21最終確定虛擬線程。

虛擬線程既廉價(相比平臺線程)又可以創(chuàng)建非常的多,因此絕不應池化:每個應用任務都應創(chuàng)建一個新的虛擬線程。因此,大多數虛擬線程的壽命都很短,調用堆棧也很淺,只需執(zhí)行一次 HTTP 客戶端調用或一次 JDBC 查詢。相比之下,平臺線程重量級、成本高,因此通常必須池化。這些線程的壽命往往較長,具有較深的調用堆棧,可在多個任務之間共享。

總之,虛擬線程保留了可靠的每請求線程風格,這種風格與 Java 平臺的設計相協調,同時還能優(yōu)化利用可用硬件。使用虛擬線程不需要學習新的概念,但可能需要放棄為應對當前線程的高成本而養(yǎng)成的習慣。虛擬線程不僅能幫助應用程序開發(fā)人員,還能幫助框架設計人員提供易于使用的 API,這些 API 與平臺設計兼容,同時又不影響可擴展性。

虛擬線程是 java.lang.Thread 的一個實例,它在底層操作系統(tǒng)線程上運行 Java 代碼,但在代碼的整個生命周期中不會捕獲操作系統(tǒng)線程。這意味著許多虛擬線程可以在同一個操作系統(tǒng)線程上運行 Java 代碼,從而有效地共享操作系統(tǒng)線程。平臺線程會壟斷寶貴的操作系統(tǒng)線程,而虛擬線程不會。虛擬線程的數量可能遠遠大于操作系統(tǒng)線程的數量。

虛擬線程是線程的一種輕量級實現,由 JDK 而不是操作系統(tǒng)提供。它們是用戶模式線程的一種形式,在其他多線程語言(如 Go 中的 goroutines(協程(輕量級線程)) 和 Erlang 中的進程)中取得了成功。用戶模式線程在 Java 早期版本中甚至被稱為 "綠色線程",當時操作系統(tǒng)線程尚未成熟和普及。然而,Java 的綠色線程都共享一個操作系統(tǒng)線程(M:1 調度),最終被作為操作系統(tǒng)線程包裝器(1:1 調度)實現的平臺線程所超越。虛擬線程采用 M:N 調度,即大量(M)虛擬線程被安排運行在較少數量(N)的操作系統(tǒng)線程上。

虛擬線程是 java.lang.Thread 的一個實例,與特定操作系統(tǒng)線程無關。相比之下,平臺線程是以傳統(tǒng)方式實現的 java.lang.Thread 實例,是操作系統(tǒng)線程的薄包裝。

2. 傳統(tǒng)請求線程模型

通常服務器應用程序處理相互獨立的并發(fā)請求時,在請求的整個持續(xù)聲明周期內為該請求指定一個線程來處理該請求。這種按請求線程的風格易于理解、易于編程、易于調試和配置。

對于一個請求處理的處理時間,應用程序同時處理的請求數(即并發(fā)數)必須與吞吐量成比例增長。例如,假設一個平均延遲為 50 毫秒的請求并發(fā)處理 10 個請求,實現了每秒 200 個請求的吞吐量。若要將該應用的吞吐量提高到到每秒 2000 個請求,則需要并發(fā)處理 100 個請求。如果每個請求在請求持續(xù)時間內都由一個線程處理,那么要使應用程序跟上進度,線程數必須隨著吞吐量的增加而增加。

由于 JDK 將線程作為操作系統(tǒng)(OS)線程的包裝器來實現。操作系統(tǒng)線程的成本很高,所以我們不能擁有太多的線程,這就使得線程的實現不適合按請求執(zhí)行的方式。如果每個請求在其生命周期內都要使用一個線程,也就是一個操作系統(tǒng)線程,那么在 CPU 或網絡連接等其他資源耗盡之前,線程數量往往就已經成為限制因素了。JDK 當前的線程實現將應用程序的吞吐量限制在遠低于硬件支持的水平。即使對線程進行了池化,也會出現這種情況,因為池化有助于避免啟動新線程的高昂成本,但不會增加線程總數。

3. 虛擬線程使用

使用方式1:

// 創(chuàng)建一個執(zhí)行器,為每個任務啟動一個新的虛擬線程
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    IntStream.range(0, 10_000).forEach(i -> {
        executor.submit(() -> {
            Thread.sleep(Duration.ofSeconds(1));
            return i;
        });
    });
}

本例中的任務是簡單的代碼--休眠1秒--現代硬件可以輕松支持 10,000 個虛擬線程同時運行此類代碼。而實際上,JDK 只在少量操作系統(tǒng)線程(可能只有一個)上運行此代碼代碼。

如果該程序使用 ExecutorService(例如 Executors.newCachedThreadPool())為每個任務創(chuàng)建一個新的平臺線程,情況就會截然不同。ExecutorService 會嘗試創(chuàng)建 10,000 個平臺線程,從而創(chuàng)建 10,000 個操作系統(tǒng)線程,根據機器和操作系統(tǒng)的不同,程序可能會崩潰。

即便使用Executors.newFixedThreadPool(200)創(chuàng)建固定數量的線程,情況也不會好到哪里去。ExecutorService 將創(chuàng)建 200 個平臺線程,供所有 10,000 個任務共享,因此許多任務將順序運行而非并發(fā)運行,程序將需要很長時間才能完成。對于該程序而言,擁有 200 個平臺線程的池每秒只能完成 200 個任務,而虛擬線程每秒可完成約 10,000 個任務(經過充分預熱后)。此外,如果將示例程序中的 10_000 改為 1_000_000,那么程序將提交 1,000,000 個任務,創(chuàng)建 1,000,000 個虛擬線程并發(fā)運行,(充分預熱后)吞吐量將達到每秒約 1,000,000 個任務。

注意:如果程序中的任務在一秒鐘內執(zhí)行計算(例如對一個巨大的數組進行排序),而不僅僅是休眠,那么增加線程數超過處理器內核數將無濟于事,無論它們是虛擬線程還是平臺線程。虛擬線程不是更快的線程--它們運行代碼的速度并不比平臺線程快。它們的存在是為了提供規(guī)模(更高的吞吐量),而不是速度(更低的延遲)。虛擬線程的數量可能比平臺線程多得多,因此根據利特爾定律,虛擬線程可以提供更高吞吐量所需的更高并發(fā)性。

使用方式2:

手動創(chuàng)建虛擬線程

// 創(chuàng)建虛擬線程
OfVirtual virtual = Thread.ofVirtual().name("pack") ;
virtual.start(() -> {
  System.out.printf("%s - 任務執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 創(chuàng)建不自動啟動的線程
Thread thread = virtual.unstarted(() -> {
  System.out.printf("%s - 任務執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 手動啟動虛擬線程
thread.start() ; 
// 打印線程對象:VirtualThread[#21,pack]/runnable
System.out.println(thread) ;
// 創(chuàng)建普通線程
OfPlatform platform = Thread.ofPlatform().name("pack") ;
Thread thread = platform.start(() -> {
  System.out.printf("%s - 任務執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;
// 這里輸出:Thread[#21,pack,5,main]
System.out.println(thread) ;

在上面的代碼中,打印thread輸出的不是對應的平臺線程,而是虛擬線程

VirtualThread[#21,pack]/runnable

在執(zhí)行的任務中通過Thread.currentThread().getName()方法是沒有任何信息,我們可以通過上面的name()方法來設置線程的名稱及相關的前綴。如下:

Thread.ofPlatform().name("pack") ;
Thread.ofVirtual().name("pack", 0) ;

使用方式3:

通過ThreadFactory工廠創(chuàng)建

ThreadFactory threadFactory = Thread.ofVirtual().factory() ;
threadFactory.newThread(() -> {
  System.out.printf("%s - 任務執(zhí)行完成", Thread.currentThread().getName()) ;
}).start() ;

使用方式4:

直接通過Thread靜態(tài)方法

Thread.startVirtualThread(() -> {
  System.out.printf("%s - 任務執(zhí)行完成", Thread.currentThread().getName()) ;
}) ;

4. 虛擬線程與傳統(tǒng)線程池對比

使用虛擬線程

public class Demo06 {


  static class Task implements Runnable {
    @Override
    public void run() {
      System.err.printf("start - %d%n", System.currentTimeMillis()) ;
      try {
        Thread.sleep(Duration.ofSeconds(1));
      } catch (InterruptedException e) {}
      System.err.printf("  end - %d%n", System.currentTimeMillis()) ;
    }
  }


  public static void main(String[] args) throws Exception {
    ExecutorService es= Executors.newVirtualThreadPerTaskExecutor() ;
    es.submit(new Task()) ;
    es.submit(new Task()) ;
    es.submit(new Task()) ;
    System.in.read() ;
  }


}

輸出結果:

start - 1698827467289
start - 1698827467289
start - 1698827467291
  end - 1698827468317
  end - 1698827468317
  end - 1698827468317

從結果看出,基本是同時開始,結束也是基本一起結束,總耗時1s。

使用傳統(tǒng)線程

任務都一樣,只是創(chuàng)建線程池的類型修改

public static void main(String[] args) throws Exception {
  ExecutorService es= Executors.newFixedThreadPool(1) ;
  es.submit(new Task()) ;
  es.submit(new Task()) ;
  es.submit(new Task()) ;
}

輸出結果:

start - 1698827686133
  end - 1698827687165
start - 1698827687165
  end - 1698827688177
start - 1698827688177
  end - 1698827689178

從結果知道這里是一個任務一個任務的執(zhí)行串行化,但是你注意觀察,其實每個任務的的開始start 的輸出都是要等前一個線程執(zhí)行完了后才能執(zhí)行。結合上面的虛擬線程對比,start是同時輸出的,這也是虛擬線程的有點了。

5. 使用案例

這是一個遠程接口調用的示例:

遠程3個接口,如下:

@GetMapping("/userinfo")
public Object queryUserInfo() {
  try {
    TimeUnit.SECONDS.sleep(2) ;
  } catch (InterruptedException e) {e.printStackTrace();}
  return "查詢用戶信息" ;
}


@GetMapping("/stock")
public Object queryStock() {
  try {
    TimeUnit.SECONDS.sleep(3) ;
  } catch (InterruptedException e) {e.printStackTrace();}
  return "查詢庫存信息" ;
}


@GetMapping("/order")
public Object queryOrder() {
  try {
    TimeUnit.SECONDS.sleep(4) ;
  } catch (InterruptedException e) {e.printStackTrace();}
  return "查詢訂單信息" ;
}

接口調用服務,如下:

@Resource
private RestTemplate restTemplate ;


public Map<String, Object> rpc() {


  try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    var start = System.currentTimeMillis() ;
    // 1.查詢用戶信息
    var userinfo = executor.submit(() -> query("http://localhost:8080/demos/userinfo"));
    // 2.查詢庫存信息
    var stock = executor.submit(() -> query("http://localhost:8080/demos/stock"));
    // 3.查詢訂單信息
    var order = executor.submit(() -> query("http://localhost:8080/demos/order"));
    Map<String, Object> res = Map.of("userinfo", userinfo.get(), "stock", stock.get(), "order", order.get()) ;
    System.out.printf("總計耗時:%d毫秒%n", (System.currentTimeMillis() - start)) ;
    return res ;
  } catch (Exception e) {
    return Map.of() ;
  }
}
private Object query(String url) {
  return this.restTemplate.getForObject(url, String.class) ;
}

在這個案例中,如果使用傳統(tǒng)的線程池,如果并發(fā)量大,那么很可能很多的任務都要排隊等待,或者你需要創(chuàng)建更多的平臺線程來滿足吞吐量問題。但是現在有了虛擬線程你可以不用再考慮線程不夠用的情況了,每個任務的執(zhí)行都會被一個虛擬的線程執(zhí)行(不是平臺線程,可能這些虛擬線程只會對應到一個平臺線程)。

虛擬線程可在以下情況顯著提高應用吞吐量:

  • 并發(fā)任務的數量很高(超過幾千)
  • 工作負載不受cpu限制,因為在這種情況下,線程比處理器內核多并不能提高吞吐量

6. 結構化并發(fā)(預覽功能)

結構化并發(fā)目前還是預覽功能,并沒有在JDK21中正式發(fā)布,不過我們可以先來看看什么是結構化并發(fā)。

結構化并發(fā) API 是來簡化并發(fā)編程。結構化并發(fā)將在不同線程中運行的一組相關任務視為一個工作單元,從而簡化了錯誤處理和取消,提高了可靠性,并增強了可觀察性。

結構化并發(fā)的目標是:

  • 推廣一種并發(fā)編程風格,消除因取消和關閉而產生的常見風險,如線程泄漏和取消延遲。
  • 提高并發(fā)代碼的可觀察性。

通過示例來理解結構化并發(fā)。

如下示例是通過傳統(tǒng)線程池的方式并發(fā)的從遠程獲取信息,代碼如下:

static RestTemplate restTemplate = new RestTemplate() ;
public static void main(String[] args) throws Exception {
  ExecutorService es = Executors.newFixedThreadPool(2) ;
  Future<Object> userInfo = es.submit(UnstructuredConcurrentDemo::queryUserInfo) ;
  Future<Object> stock = es.submit(UnstructuredConcurrentDemo::queryStock) ;
  Object userInfoRet = userInfo.get() ;
  System.out.printf("執(zhí)行結果:用戶信息:%s%n", userInfoRet.toString()) ;
  Object stockRet = stock.get() ;
  System.out.printf("執(zhí)行結果:庫存信息:%s%n", stockRet.toString()) ;
}
public static Object queryUserInfo() {
  return restTemplate.getForObject("http://localhost:8080/demos/userinfo", String.class) ;
}
public static Object queryStock() {
  return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;
}

上面的代碼中沒有什么問題,程序都能夠運行的正常,結果如下:

08:49:53.502 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
08:49:53.504 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結果:用戶信息:查詢用戶信息
08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
08:49:54.493 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結果:庫存信息:查詢庫存信息

但是如果其中一個任務執(zhí)行失敗了后會如何呢?將其中一個任務拋出異常,如下代碼:

public static Object queryStock() {
  System.out.println(1 / 0) ;
  return restTemplate.getForObject("http://localhost:8080/demos/stock", String.class) ;
}

再次執(zhí)行代碼,結果如下:

發(fā)生異常:java.lang.ArithmeticException: / by zero
09:06:05.938 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock
09:06:05.948 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]
09:06:08.972 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Response 200 OK
09:06:08.974 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate -- Reading to [java.lang.String] as "text/plain;charset=UTF-8"
執(zhí)行結果:庫存信息:查詢庫存信息

從結果看出,獲取用戶信息子任務發(fā)生異常后,并不會影響到獲取庫存子任務的執(zhí)行。

通過結構化并發(fā)方式

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
  Supplier<Object>  userInfo  = scope.fork(UnstructuredConcurrentDemo::queryUserInfo) ;
  Supplier<Object> stock = scope.fork(UnstructuredConcurrentDemo::queryStock) ;
  // 等待在此任務范圍內啟動的所有子任務完成或某個子任務失敗。
  scope.join() ;
  Object userInfoRet = userInfo.get() ;
  System.out.printf("執(zhí)行結果:用戶信息:%s%n", userInfoRet.toString()) ;
  Object stockRet = stock.get() ;
  System.out.printf("執(zhí)行結果:庫存信息:%s%n", stockRet.toString()) ;
}

當一個子任務發(fā)生錯誤時,其它的子任務會在未完成的情況下取消,執(zhí)行結果如下:

08:59:51.951 [] DEBUG org.springframework.web.client.RestTemplate -- HTTP GET http://localhost:8080/demos/stock
08:59:51.961 [] DEBUG org.springframework.web.client.RestTemplate -- Accept=[text/plain, application/json, application/*+json, */*]
Exception in thread "main" java.lang.IllegalStateException: Subtask not completed or did not complete successfully
  at java.base/java.util.concurrent.StructuredTaskScope$SubtaskImpl.get(StructuredTaskScope.java:936)
  at com.pack.rpc.UnstructuredConcurrentDemo.structured(UnstructuredConcurrentDemo.java:26)
  at com.pack.rpc.UnstructuredConcurrentDemo.main(UnstructuredConcurrentDemo.java:17)

從控制臺的輸出看出,獲取庫存的調用被取消了。

完畢!??!

責任編輯:武曉燕 來源: Spring全家桶實戰(zhàn)案例源碼
相關推薦

2024-06-06 16:15:00

2023-12-28 10:49:27

響應式編程異步

2023-10-20 08:12:00

JDK21線程池配置

2023-02-09 08:57:11

Callable異步java

2009-02-24 09:28:00

2024-09-12 15:24:29

2024-09-09 14:12:38

2023-08-03 14:18:29

Rust阻塞函數

2023-11-07 15:11:46

Kafka技巧

2023-09-20 09:07:01

Java 21開發(fā)工具包

2013-04-19 09:45:20

AMPLabHadoopHDFS

2024-05-23 16:41:40

2025-03-28 01:03:46

高并發(fā)技術異步

2024-09-21 10:21:47

2019-08-20 00:20:47

TCPHOL吞吐量

2013-08-02 10:17:38

2018-09-06 14:15:06

區(qū)塊鏈區(qū)塊鏈技術

2025-03-04 08:52:21

2023-02-20 15:11:14

物聯網數字經濟

2010-04-14 16:02:09

IDF
點贊
收藏

51CTO技術棧公眾號