讓我們一起學習管道模式,你會了嗎?
本文轉載自微信公眾號「JavaKeeper」,作者海星。轉載本文請聯(lián)系JavaKeeper公眾號。
管道模式,不屬于23種設計模式之一(是責任鏈模式的一種變體),但是在我們實際業(yè)務架構中還是有很多場景適用的,主要用于將復雜的進程分解成多個獨立的子任務,像流水線一樣去執(zhí)行,了解一下唄
一、開場
假設我們有這樣的一個需求,讀取文件內容,并過濾包含 “hello” 的字符串,然后將其反轉
Linux 一行搞定
- cat hello.txt | grep "hello" | rev
用世界上最好語言 Java 實現(xiàn)也很簡單
- File file = new File("/Users/starfish/Documents/hello.txt");
- String content = FileUtils.readFileToString(file,"UTF-8");
- List<String> helloStr = Stream.of(content).filter(s -> s.contains("hello")).collect(Collectors.toList());
- System.out.println(new StringBuilder(String.join("",helloStr)).reverse().toString());
再假設我們上邊的場景是在一個大型系統(tǒng)中,有這樣的數(shù)據(jù)流需要多次進行復雜的邏輯處理,還是簡單粗暴的把一系列流程像上邊那樣放在一個大組件中嗎?
這樣的設計完全違背了單一職責原則,我們在增改,或者減少一些處理邏輯的時候,就必須對整個組件進行改動??蓴U展性和可重用性幾乎沒有~~
那有沒有一種模式可以將整個處理流程進行詳細劃分,劃分出的每個小模塊互相獨立且各自負責一小段邏輯處理,這些小模塊可以按順序連起來,前一模塊的輸出作為后一模塊的輸入,最后一個模塊的輸出為最終的處理結果呢?
如此一來修改邏輯時只針對某個模塊修改,添加或減少處理邏輯也可細化到某個模塊顆粒度,并且每個模塊可重復利用,可重用性大大增強。
恩,這就是我們要說的管道模式
二、定義
管道模式(Pipeline Pattern) 是責任鏈模式(Chain of Responsibility Pattern)的常用變體之一。
顧名思義,管道模式就像一條管道把多個對象連接起來,整體看起來就像若干個閥門嵌套在管道中,而處理邏輯就放在閥門上,需要處理的對象進入管道后,分別經過各個閥門,每個閥門都會對進入的對象進行一些邏輯處理,經過一層層的處理后從管道尾出來,此時的對象就是已完成處理的目標對象。
管道模式用于將復雜的進程分解成多個獨立的子任務。每個獨立的任務都是可復用的,因此這些任務可以被組合成復雜的進程。
PS:純的責任鏈模式在鏈上只會有一個處理器用于處理數(shù)據(jù),而管道模式上多個處理器都會處理數(shù)據(jù)。
三、角色
管道模式:對于管道模式來說,有 3 個對象:
- 閥門:處理數(shù)據(jù)的節(jié)點,或者叫過濾器、階段
- 管道:組織各個閥門
- 客戶端:構造管道,并調用
四、實例
程序員還是看代碼消化才快些,我們用管道模式實現(xiàn)下文章開頭的小需求
1、處理器(管道的各個階段)
- public interface Handler<I,O> {
- O process(I input);
- }
2、定義具體的處理器(閥門)
- public class FileProcessHandler implements Handler<File,String>{
- @Override
- public String process(File file) {
- System.out.println("===文件處理===");
- try{
- return FileUtils.readFileToString(file,"UTF-8");
- }catch (IOException e){
- e.printStackTrace();
- }
- return null;
- }
- }
- public class CharacterFilterHandler implements Handler<String, String> {
- @Override
- public String process(String input) {
- System.out.println("===字符過濾===");
- List<String> hello = Stream.of(input).filter(s -> s.contains("hello")).collect(Collectors.toList());
- return String.join("",hello);
- }
- }
- public class CharacterReverseHandler implements Handler<String,String>{
- @Override
- public String process(String input) {
- System.out.println("===反轉字符串===");
- return new StringBuilder(input).reverse().toString();
- }
- }
3、管道
- public class Pipeline<I,O> {
- private final Handler<I,O> currentHandler;
- Pipeline(Handler<I, O> currentHandler) {
- this.currentHandler = currentHandler;
- }
- <K> Pipeline<I, K> addHandler(Handler<O, K> newHandler) {
- return new Pipeline<>(input -> newHandler.process(currentHandler.process(input)));
- }
- O execute(I input) {
- return currentHandler.process(input);
- }
- }
4、 客戶端使用
- import lombok.val;
- public class ClientTest {
- public static void main(String[] args) {
- File file = new File("/Users/apple/Documents/hello.txt");
- val filters = new Pipeline<>(new FileProcessHandler())
- .addHandler(new CharacterFilterHandler())
- .addHandler(new CharacterReverseHandler());
- System.out.println(filters.execute(file));
- }
- }
5、結果
UML 類圖
產品他么的又來了,這次是刪除 hello.txt 中的 world 字符
三下五除二,精通 shell 編程的我搞定了
- cat hello.txt |grep hello |rev | tr -d 'world'
Java 怎么搞,你應該很清晰了吧
五、優(yōu)缺點
Pipeline 模式的核心思想是將一個任務處理分解為若干個處理階段(Stage),其中每個處理階段的輸出作為下一個處理階段的輸入,并且各個處理階段都有相應的工作者線程去執(zhí)行相應的計算。因此,處理一批任務時,各個任務的各個處理階段是并行(Parallel)的。通過并行計算,Pipeline 模式使應用程序能夠充分利用多核 CPU 資源,提高其計算效率。 ——《Java 多線程編程實戰(zhàn)指南》
優(yōu)點
- 將復雜的處理流程分解成獨立的子任務,解耦上下游處理邏輯,也方便您對每個子任務的測試
- 被分解的子任務還可以被不同的處理進程復用
- 在復雜進程中添加、移除和替換子任務非常輕松,對已存在的進程沒有任何影響,這就加大了該模式的擴展性和靈活性
- 對于每個處理單元又可以打補丁,做監(jiān)聽。(這就是切面編程了)
模式需要注意的東西
- Pipeline的深度:Pipeline 中 Pipe 的個數(shù)被稱作 Pipeline 的深度。所以我們在用 Pipeline 的深度與 JVM 宿主機的 CPU 個數(shù)間的關系。如果 Pipeline 實例所處的任務多屬于 CPU 密集型,那么深度最好不超過 Ncpu。如果 Pipeline 所處理的任務多屬于 I/O 密集型,那么 Pipeline 的深度最好不要超過 2*Ncpu。
- 基于線程池的 Pipe:如果 Pipe 實例使用線程池,由于有多個 Pipe 實例,更容易出現(xiàn)線程死鎖的問題,需要仔細考慮。
- 錯誤處理:Pipe 實例對其任務進行過程中跑出的異常可能需要相應 Pipe 實例之外進行處理。
- 此時,處理方法通常有兩種:一是各個 Pipe 實例捕獲到異常后調用 PipeContext 實例的 handleError 進行錯誤處理。另一個是創(chuàng)建一個專門負責錯我處理的 Pipe 實例,其他 Pipe 實例捕獲異常后提交相關數(shù)據(jù)給該 Pipe 實例處理。
- 可配置的 Pipeline:Pipeline 模式可以用代碼的方式將若干個 Pipe 實例添加,也可以用配置文件的方式實現(xiàn)動態(tài)方式添加 Pipe。
六、Java Function
如果,你的管道邏輯真的很簡單,也直接用 Java8 提供的 Function 就,具體實現(xiàn)如下這樣
- File file = new File("/Users/apple/Documents/hello.txt");
- Function<File,String> readFile = input -> {
- System.out.println("===文件處理===");
- try{
- return FileUtils.readFileToString(input,"UTF-8");
- }catch (IOException e){
- e.printStackTrace();
- }
- return null;
- };
- Function<String, String> filterCharacter = input -> {
- System.out.println("===字符過濾===");
- List<String> hello = Stream.of(input).filter(s -> s.contains("hello")).collect(Collectors.toList());
- return String.join("",hello);
- };
- Function<String, String> reverseCharacter = input -> {
- System.out.println("===反轉字符串===");
- return new StringBuilder(input).reverse().toString();
- };
- final Function<File,String> pipe = readFile
- .andThen(filterCharacter)
- .andThen(reverseCharacter);
- System.out.println(pipe.apply(file));
最后
但是,并不是一碰到這種類似流式處理的任務就需要用管道,Pipeline 模式中各個處理階段所用的工作者線程或者線程池,表示各個階段的輸入/輸出對象的創(chuàng)建和一定(進出隊列)都有其自身的時間和空間開銷,所以使用 Pipeline 模式的時候需要考慮它所付出的代價。建議處理規(guī)模較大的任務,否則可能得不償失。
參考
https://java-design-patterns.com/patterns/pipeline/
https://developer.aliyun.com/article/778865
https://yasinshaw.com/articles/108
《Java多線程編程實戰(zhàn)指南(設計模式篇)》