Go 中的 Channel 與 Java BlockingQueue 的本質(zhì)區(qū)別
本文轉(zhuǎn)載自微信公眾號「crossoverJie」,作者crossoverJie。轉(zhuǎn)載本文請聯(lián)系crossoverJie公眾號。
前言
最近在實現(xiàn)兩個需求,由于兩者之間并沒有依賴關(guān)系,所以想利用隊列進行解耦;但在 Go 的標(biāo)準(zhǔn)庫中并沒有現(xiàn)成可用并且并發(fā)安全的數(shù)據(jù)結(jié)構(gòu);但 Go 提供了一個更加優(yōu)雅的解決方案,那就是 channel。
channel 應(yīng)用
Go 與 Java 的一個很大的區(qū)別就是并發(fā)模型不同,Go 采用的是 CSP(Communicating sequential processes) 模型;用 Go 官方的說法:
Do not communicate by sharing memory; instead, share memory by communicating.
翻譯過來就是:不用使用共享內(nèi)存來通信,而是用通信來共享內(nèi)存。
而這里所提到的通信,在 Go 里就是指代的 channel。
只講概念并不能快速的理解與應(yīng)用,所以接下來會結(jié)合幾個實際案例更方便理解。
futrue task
Go 官方?jīng)]有提供類似于 Java 的 FutureTask 支持:
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- ExecutorService executorService = Executors.newFixedThreadPool(2);
- Task task = new Task();
- FutureTask<String> futureTask = new FutureTask<>(task);
- executorService.submit(futureTask);
- String s = futureTask.get();
- System.out.println(s);
- executorService.shutdown();
- }
- }
- class Task implements Callable<String> {
- @Override
- public String call() throws Exception {
- // 模擬http
- System.out.println("http request");
- Thread.sleep(1000);
- return "request success";
- }
- }
但我們可以使用 channel 配合 goroutine 實現(xiàn)類似的功能:
- func main() {
- ch := Request("https://github.com")
- select {
- case r := <-ch:
- fmt.Println(r)
- }
- }
- func Request(url string) <-chan string {
- ch := make(chan string)
- go func() {
- // 模擬http請求
- time.Sleep(time.Second)
- ch <- fmt.Sprintf("url=%s, res=%s", url, "ok")
- }()
- return ch
- }
goroutine 發(fā)起請求后直接將這個 channel 返回,調(diào)用方會在請求響應(yīng)之前一直阻塞,直到 goroutine 拿到了響應(yīng)結(jié)果。
goroutine 互相通信
- /**
- * 偶數(shù)線程
- */
- public static class OuNum implements Runnable {
- private TwoThreadWaitNotifySimple number;
- public OuNum(TwoThreadWaitNotifySimple number) {
- this.number = number;
- }
- @Override
- public void run() {
- for (int i = 0; i < 11; i++) {
- synchronized (TwoThreadWaitNotifySimple.class) {
- if (number.flag) {
- if (i % 2 == 0) {
- System.out.println(Thread.currentThread().getName() + "+-+偶數(shù)" + i);
- number.flag = false;
- TwoThreadWaitNotifySimple.class.notify();
- }
- } else {
- try {
- TwoThreadWaitNotifySimple.class.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
- /**
- * 奇數(shù)線程
- */
- public static class JiNum implements Runnable {
- private TwoThreadWaitNotifySimple number;
- public JiNum(TwoThreadWaitNotifySimple number) {
- this.number = number;
- }
- @Override
- public void run() {
- for (int i = 0; i < 11; i++) {
- synchronized (TwoThreadWaitNotifySimple.class) {
- if (!number.flag) {
- if (i % 2 == 1) {
- System.out.println(Thread.currentThread().getName() + "+-+奇數(shù)" + i);
- number.flag = true;
- TwoThreadWaitNotifySimple.class.notify();
- }
- } else {
- try {
- TwoThreadWaitNotifySimple.class.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
這里截取了”兩個線程交替打印奇偶數(shù)“的部分代碼。
Java 提供了 object.wait()/object.notify() 這樣的等待通知機制,可以實現(xiàn)兩個線程間通信。
go 通過 channel 也能實現(xiàn)相同效果:
- func main() {
- ch := make(chan struct{})
- go func() {
- for i := 1; i < 11; i++ {
- ch <- struct{}{}
- //奇數(shù)
- if i%2 == 1 {
- fmt.Println("奇數(shù):", i)
- }
- }
- }()
- go func() {
- for i := 1; i < 11; i++ {
- <-ch
- if i%2 == 0 {
- fmt.Println("偶數(shù):", i)
- }
- }
- }()
- time.Sleep(10 * time.Second)
- }
本質(zhì)上他們都是利用了線程(goroutine)阻塞然后喚醒的特性,只是 Java 是通過 wait/notify 機制;
而 go 提供的 channel 也有類似的特性:
向 channel 發(fā)送數(shù)據(jù)時(ch<-struct{}{})會被阻塞,直到 channel 被消費(<-ch)。
以上針對于無緩沖 channel。
channel 本身是由 go 原生保證并發(fā)安全的,不用額外的同步措施,可以放心使用。
廣播通知
不僅是兩個 goroutine 之間通信,同樣也能廣播通知,類似于如下 Java 代碼:
- public static void main(String[] args) throws InterruptedException {
- for (int i = 0; i < 10; i++) {
- new Thread(() -> {
- try {
- synchronized (NotifyAll.class){
- NotifyAll.class.wait();
- }
- System.out.println(Thread.currentThread().getName() + "done....");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }).start();
- }
- Thread.sleep(3000);
- synchronized (NotifyAll.class){
- NotifyAll.class.notifyAll();
- }
- }
主線程將所有等待的子線程全部喚醒,這個本質(zhì)上也是通過 wait/notify 機制實現(xiàn)的,區(qū)別只是通知了所有等待的線程。
換做是 go 的實現(xiàn):
- func main() {
- notify := make(chan struct{})
- for i := 0; i < 10; i++ {
- go func(i int) {
- for {
- select {
- case <-notify:
- fmt.Println("done.......",i)
- return
- case <-time.After(1 * time.Second):
- fmt.Println("wait notify",i)
- }
- }
- }(i)
- }
- time.Sleep(1 * time.Second)
- close(notify)
- time.Sleep(3 * time.Second)
- }
當(dāng)關(guān)閉一個 channel 后,會使得所有獲取 channel 的 goroutine 直接返回,不會阻塞,正是利用這一特性實現(xiàn)了廣播通知所有 goroutine 的目的。
注意,同一個 channel 不能反復(fù)關(guān)閉,不然會出現(xiàn)panic。
channel 解耦
以上例子都是基于無緩沖的 channel,通常用于 goroutine 之間的同步;同時 channel 也具備緩沖的特性:
- ch :=make(chan T, 100)
可以直接將其理解為隊列,正是因為具有緩沖能力,所以我們可以將業(yè)務(wù)之間進行解耦,生產(chǎn)方只管往 channel 中丟數(shù)據(jù),消費者只管將數(shù)據(jù)取出后做自己的業(yè)務(wù)。
同時也具有阻塞隊列的特性:
- 當(dāng) channel 寫滿時生產(chǎn)者將會被阻塞。
- 當(dāng) channel 為空時消費者也會阻塞。
從上文的例子中可以看出,實現(xiàn)相同的功能 go 的寫法會更加簡單直接,相對的 Java 就會復(fù)雜許多(當(dāng)然這也和這里使用的偏底層 api 有關(guān))。
Java 中的 BlockingQueue
這些特性都與 Java 中的 BlockingQueue 非常類似,他們具有以下的相同點:
- 可以通過兩者來進行 goroutine/thread 通信。
- 具備隊列的特征,可以解耦業(yè)務(wù)。
- 支持并發(fā)安全。
同樣的他們又有很大的區(qū)別,從表現(xiàn)上看:
- channel 支持 select 語法,對 channel 的管理更加簡潔直觀。
- channel 支持關(guān)閉,不能向已關(guān)閉的 channel 發(fā)送消息。
- channel 支持定義方向,在編譯器的幫助下可以在語義上對行為的描述更加準(zhǔn)確。
當(dāng)然還有本質(zhì)上的區(qū)別就是 channel 是 go 推薦的 CSP 模型的核心,具有編譯器的支持,可以有很輕量的成本實現(xiàn)并發(fā)通信。
而 BlockingQueue 對于 Java 來說只是一個實現(xiàn)了并發(fā)安全的數(shù)據(jù)結(jié)構(gòu),即便不使用它也有其他的通信方式;只是他們都具有阻塞隊列的特征,所有在初步接觸 channel 時容易產(chǎn)生混淆。
相同點 | channel 特有 |
---|---|
阻塞策略 | 支持select |
設(shè)置大小 | 支持關(guān)閉 |
并發(fā)安全 | 自定義方向 |
普通數(shù)據(jù)結(jié)構(gòu) | 編譯器支持 |
總結(jié)
有過一門編程語言的使用經(jīng)歷在學(xué)習(xí)其他語言是確實是要方便許多,比如之前寫過 Java 再看 Go 時就會發(fā)現(xiàn)許多類似之處,只是實現(xiàn)不同。
拿這里的并發(fā)通信來說,本質(zhì)上是因為并發(fā)模型上的不同;
Go 更推薦使用通信來共享內(nèi)存,而 Java 大部分場景都是使用共享內(nèi)存來通信(這樣就得加鎖來同步)。
帶著疑問來學(xué)習(xí)確實會事半功倍。