自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

聊聊分布式下的WebSocket解決方案

開發(fā) 前端 分布式
最近自己搭建了個(gè)項(xiàng)目,項(xiàng)目本身很簡單,但是里面有使用WebSocket進(jìn)行消息提醒的功能,大體情況是這樣的。發(fā)布消息者在系統(tǒng)中發(fā)送消息,實(shí)時(shí)的把消息推送給對(duì)應(yīng)的一個(gè)部門下的所有人。

前言

最近自己搭建了個(gè)項(xiàng)目,項(xiàng)目本身很簡單,但是里面有使用WebSocket進(jìn)行消息提醒的功能,大體情況是這樣的。

發(fā)布消息者在系統(tǒng)中發(fā)送消息,實(shí)時(shí)的把消息推送給對(duì)應(yīng)的一個(gè)部門下的所有人。

這里面如果是單機(jī)應(yīng)用的情況時(shí),我們可以通過部門的id和用戶的id組成一個(gè)唯一的key,與應(yīng)用服務(wù)器建立WebSocket長連接,然后就可以接收到發(fā)布消息者發(fā)送的消息了。

但是真正把項(xiàng)目應(yīng)用于生產(chǎn)環(huán)境中時(shí),我們是不可能就部署一個(gè)單機(jī)應(yīng)用的,而是要部署一個(gè)集群。

[[343393]]

所以我通過Nginx+兩臺(tái)Tomcat搭建了一個(gè)簡單的負(fù)載均衡集群,作為測試使用

但是問題出現(xiàn)了,我們的客戶端瀏覽器只會(huì)與一臺(tái)服務(wù)器建立WebSocket長連接,所以發(fā)布消息者在發(fā)送消息時(shí),就沒法保證所有目標(biāo)部門的人都能接收到消息(因?yàn)檫@些人連接的可能不是一個(gè)服務(wù)器)。

本篇文章就是針對(duì)于這么一個(gè)問題展開討論,提出一種解決方案,當(dāng)然解決方案不止一種,那我們開始吧。

WebSocket單體應(yīng)用介紹

在介紹分布式集群之前,我們先來看一下王子的WebSocket代碼實(shí)現(xiàn),先來看java后端代碼如下:

  1. import javax.websocket.*; 
  2. import javax.websocket.server.PathParam; 
  3. import javax.websocket.server.ServerEndpoint; 
  4. import com.alibaba.fastjson.JSON; 
  5. import com.alibaba.fastjson.JSONObject;import java.io.IOException; 
  6. import java.util.Map; 
  7. import java.util.concurrent.ConcurrentHashMap; 
  8. @ServerEndpoint("/webSocket/{key}"
  9. public class WebSocket { 
  10.     private static int onlineCount = 0; 
  11.     /** 
  12.      * 存儲(chǔ)連接的客戶端 
  13.      */ 
  14.     private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>(); 
  15.     private Session session; 
  16.     /** 
  17.      * 發(fā)送的目標(biāo)科室code 
  18.      */ 
  19.     private String key
  20.     @OnOpen 
  21.     public void onOpen(@PathParam("key") String key, Session session) throws IOException { 
  22.         this.key = key
  23.         this.session = session; 
  24.         if (!clients.containsKey(key)) { 
  25.             addOnlineCount();        }        clients.put(key, this); 
  26.         Log.info(key+"已連接消息服務(wù)!"); 
  27.     }    @OnClose 
  28.     public void onClose() throws IOException { 
  29.         clients.remove(key);        subOnlineCount();    }    @OnMessage 
  30.     public void onMessage(String message) throws IOException { 
  31.         if(message.equals("ping")){ 
  32.             return ; 
  33.         }        JSONObject jsonTo = JSON.parseObject(message);        String mes = (String) jsonTo.get("message"); 
  34.         if (!jsonTo.get("to").equals("All")){ 
  35.             sendMessageTo(mes, jsonTo.get("to").toString()); 
  36.         }else
  37.             sendMessageAll(mes);        }    }    @OnError 
  38.     public void onError(Session session, Throwable error) { 
  39.         error.printStackTrace();    }    private void sendMessageTo(String message, String To) throws IOException { 
  40.         for (WebSocket item : clients.values()) { 
  41.             if (item.key.contains(To) ) 
  42.                 item.session.getAsyncRemote().sendText(message);        }    }    private void sendMessageAll(String message) throws IOException { 
  43.         for (WebSocket item : clients.values()) { 
  44.             item.session.getAsyncRemote().sendText(message);        }    }    public static synchronized int getOnlineCount() { 
  45.         return onlineCount; 
  46.     }    public static synchronized void addOnlineCount() { 
  47.         WebSocket.onlineCount++;    }    public static synchronized void subOnlineCount() { 
  48.         WebSocket.onlineCount--;    }    public static synchronized Map<String, WebSocket> getClients() { 
  49.         return clients; 
  50.     }} 

示例代碼中并沒有使用Spring,用的是原生的java web編寫的,簡單和大家介紹一下里面的方法。

  • onOpen:在客戶端與WebSocket服務(wù)連接時(shí)觸發(fā)方法執(zhí)行
  • onClose:在客戶端與WebSocket連接斷開的時(shí)候觸發(fā)執(zhí)行
  • onMessage:在接收到客戶端發(fā)送的消息時(shí)觸發(fā)執(zhí)行
  • onError:在發(fā)生錯(cuò)誤時(shí)觸發(fā)執(zhí)行

可以看到,在onMessage方法中,我們直接根據(jù)客戶端發(fā)送的消息,進(jìn)行消息的轉(zhuǎn)發(fā)功能,這樣在單體消息服務(wù)中是沒有問題的。

再來看一下js代碼

  1. var host = document.location.host; 
  2.     // 獲得當(dāng)前登錄科室    var deptCodes='${sessionScope.$UserContext.departmentID}'
  3.     deptCodes=deptCodes.replace(/[\[|\]|\s]+/g, ""); 
  4.     var key = '${sessionScope.$UserContext.userID}'+deptCodes; 
  5.     var lockReconnect = false;  //避免ws重復(fù)連接 
  6.     var ws = null;          // 判斷當(dāng)前瀏覽器是否支持WebSocket    var wsUrl = 'ws://' + host + '/webSocket/'key
  7.     createWebSocket(wsUrl);   //連接ws    function createWebSocket(url) { 
  8.         try{            if('WebSocket' in window){ 
  9.                 ws = new WebSocket(url);            }else if('MozWebSocket' in window){   
  10.                 ws = new MozWebSocket(url);            }else
  11.                   layer.alert("您的瀏覽器不支持websocket協(xié)議,建議使用新版谷歌、火狐等瀏覽器,請(qǐng)勿使用IE10以下瀏覽器,360瀏覽器請(qǐng)使用極速模式,不要使用兼容模式!");  
  12.             }            initEventHandle();        }catch(e){            reconnect(url);            console.log(e); 
  13.         }         }    function initEventHandle() { 
  14.         ws.onclose = function () { 
  15.             reconnect(wsUrl);            console.log("llws連接關(guān)閉!"+new Date().toUTCString()); 
  16.         };        ws.onerror = function () { 
  17.             reconnect(wsUrl);            console.log("llws連接錯(cuò)誤!"); 
  18.         };        ws.onopen = function () { 
  19.             heartCheck.reset().start();      //心跳檢測重置            console.log("llws連接成功!"+new Date().toUTCString()); 
  20.         };        ws.onmessage = function (event) {    //如果獲取到消息,心跳檢測重置 
  21.             heartCheck.reset().start();      //拿到任何消息都說明當(dāng)前連接是正常的//接收到消息實(shí)際業(yè)務(wù)處理        ...        };    }    // 監(jiān)聽窗口關(guān)閉事件,當(dāng)窗口關(guān)閉時(shí),主動(dòng)去關(guān)閉websocket連接,防止連接還沒斷開就關(guān)閉窗口,server端會(huì)拋異常。    window.onbeforeunload = function() { 
  22.         ws.close(); 
  23.     }      function reconnect(url) { 
  24.         if(lockReconnect) return
  25.         lockReconnect = true
  26.         setTimeout(function () {     //沒連接上會(huì)一直重連,設(shè)置延遲避免請(qǐng)求過多 
  27.             createWebSocket(url);            lockReconnect = false
  28.         }, 2000); 
  29.     }    //心跳檢測    var heartCheck = {        timeout: 300000,        //5分鐘發(fā)一次心跳 
  30.         timeoutObj: null,        serverTimeoutObj: null,        reset: function(){ 
  31.             clearTimeout(this.timeoutObj);            clearTimeout(this.serverTimeoutObj);            return this; 
  32.         },        start: function(){ 
  33.             var self = this;            this.timeoutObj = setTimeout(function(){ 
  34.                 //這里發(fā)送一個(gè)心跳,后端收到后,返回一個(gè)心跳消息,                //onmessage拿到返回的心跳就說明連接正常                ws.send("ping"); 
  35.                 console.log("ping!"
  36.                 self.serverTimeoutObj = setTimeout(function(){//如果超過一定時(shí)間還沒重置,說明后端主動(dòng)斷開了 
  37.                     ws.close();     //如果onclose會(huì)執(zhí)行reconnect,我們執(zhí)行ws.close()就行了.如果直接執(zhí)行reconnect 會(huì)觸發(fā)onclose導(dǎo)致重連兩次 
  38.                 }, self.timeout)            }, this.timeout)        }  } 

js部分使用的是原生H5編寫的,如果為了更好的兼容瀏覽器,也可以使用SockJS,有興趣小伙伴們可以自行百度。

接下來我們就手動(dòng)的優(yōu)化代碼,實(shí)現(xiàn)WebSocket對(duì)分布式架構(gòu)的支持。

解決方案的思考

現(xiàn)在我們已經(jīng)了解單體應(yīng)用下的代碼結(jié)構(gòu),也清楚了WebSocket在分布式環(huán)境下面臨的問題,那么是時(shí)候思考一下如何能夠解決這個(gè)問題了。

我們先來看一看發(fā)生這個(gè)問題的根本原因是什么。

簡單思考一下就能明白,單體應(yīng)用下只有一臺(tái)服務(wù)器,所有的客戶端連接的都是這一臺(tái)消息服務(wù)器,所以當(dāng)發(fā)布消息者發(fā)送消息時(shí),所有的客戶端其實(shí)已經(jīng)全部與這臺(tái)服務(wù)器建立了連接,直接群發(fā)消息就可以了。

換成分布式系統(tǒng)后,假如我們有兩臺(tái)消息服務(wù)器,那么客戶端通過Nginx負(fù)載均衡后,就會(huì)有一部分連接到其中一臺(tái)服務(wù)器,另一部分連接到另一臺(tái)服務(wù)器,所以發(fā)布消息者發(fā)送消息時(shí),只會(huì)發(fā)送到其中的一臺(tái)服務(wù)器上,而這臺(tái)消息服務(wù)器就可以執(zhí)行群發(fā)操作,但問題是,另一臺(tái)服務(wù)器并不知道這件事,也就無法發(fā)送消息了。

現(xiàn)在我們知道了根本原因是生產(chǎn)消息時(shí),只有一臺(tái)消息服務(wù)器能夠感知到,所以我們只要讓另一臺(tái)消息服務(wù)器也能感知到就可以了,這樣感知到之后,它就可以群發(fā)消息給連接到它上邊的客戶端了。

那么什么方法可以實(shí)現(xiàn)這種功能呢,王子很快想到了引入消息中間件,并使用它的發(fā)布訂閱模式來通知所有消息服務(wù)器就可以了。

引入RabbitMQ解決分布式下的WebSocket問題

在消息中間件的選擇上,王子選擇了RabbitMQ,原因是它的搭建比較簡單,功能也很強(qiáng)大,而且我們只是用到它群發(fā)消息的功能。

RabbitMQ有一個(gè)廣播模式(fanout),我們使用的就是這種模式。

首先我們寫一個(gè)RabbitMQ的連接類:

  1. import com.rabbitmq.client.Connection
  2. import com.rabbitmq.client.ConnectionFactory; 
  3. import java.io.IOException; 
  4. import java.util.concurrent.TimeoutException; 
  5. public class RabbitMQUtil { 
  6.     private static Connection connection
  7.     /** 
  8.      * 與rabbitmq建立連接 
  9.      * @return 
  10.      */ 
  11.     public static Connection getConnection() { 
  12.         if (connection != null&&connection.isOpen()) { 
  13.             return connection
  14.         }        ConnectionFactory factory = new ConnectionFactory(); 
  15.         factory.setVirtualHost("/"); 
  16.         factory.setHost("192.168.220.110"); // 用的是虛擬IP地址 
  17.         factory.setPort(5672); 
  18.         factory.setUsername("guest"); 
  19.         factory.setPassword("guest"); 
  20.         try { 
  21.             connection = factory.newConnection(); 
  22.         } catch (IOException e) { 
  23.             e.printStackTrace(); 
  24.         } catch (TimeoutException e) { 
  25.             e.printStackTrace(); 
  26.         } 
  27.         return connection
  28.     } 

這個(gè)類沒什么說的,就是獲取MQ連接的一個(gè)工廠類。

然后按照我們的思路,就是每次服務(wù)器啟動(dòng)的時(shí)候,都會(huì)創(chuàng)建一個(gè)MQ的消費(fèi)者監(jiān)聽MQ的消息,王子這里測試使用的是Servlet的監(jiān)聽器,如下:

  1. import javax.servlet.ServletContextEvent; 
  2. import javax.servlet.ServletContextListener; 
  3. public class InitListener implements ServletContextListener { 
  4.     @Override 
  5.     public void contextInitialized(ServletContextEvent servletContextEvent) { 
  6.         WebSocket.init();    }    @Override 
  7.     public void contextDestroyed(ServletContextEvent servletContextEvent) { 
  8.     }} 

記得要在Web.xml中配置監(jiān)聽器信息

  1. <?xml version="1.0" encoding="UTF-8"?> 
  2. <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee" 
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  4.          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd" 
  5.          version="4.0"
  6.     <listener> 
  7.         <listener-class>InitListener</listener-class> 
  8.     </listener> 
  9. </web-app> 

WebSocket中增加init方法,作為MQ消費(fèi)者部分

  1. public  static void init() { 
  2.         try {            Connection connection = RabbitMQUtil.getConnection();            Channel channel = connection.createChannel();            //交換機(jī)聲明(參數(shù)為:交換機(jī)名稱;交換機(jī)類型) 
  3.             channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT); 
  4.             //獲取一個(gè)臨時(shí)隊(duì)列 
  5.             String queueName = channel.queueDeclare().getQueue();            //隊(duì)列與交換機(jī)綁定(參數(shù)為:隊(duì)列名稱;交換機(jī)名稱;routingKey忽略) 
  6.             channel.queueBind(queueName,"fanoutLogs",""); 
  7.             //這里重寫了DefaultConsumer的handleDelivery方法,因?yàn)榘l(fā)送的時(shí)候?qū)ο⑦M(jìn)行了getByte(),在這里要重新組裝成String 
  8.             Consumer consumer = new DefaultConsumer(channel) {                @Override                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                    super.handleDelivery(consumerTag, envelope, properties, body); 
  9.                     String message = new String(body,"UTF-8"); 
  10.                     System.out.println(message);            //這里可以使用WebSocket通過消息內(nèi)容發(fā)送消息給對(duì)應(yīng)的客戶端 
  11.                 }            };            //聲明隊(duì)列中被消費(fèi)掉的消息(參數(shù)為:隊(duì)列名稱;消息是否自動(dòng)確認(rèn);consumer主體) 
  12.             channel.basicConsume(queueName,true,consumer); 
  13.             //這里不能關(guān)閉連接,調(diào)用了消費(fèi)方法后,消費(fèi)者會(huì)一直連接著rabbitMQ等待消費(fèi) 
  14.         } catch (IOException e) {            e.printStackTrace();        }    } 

同時(shí)在接收到消息時(shí),不是直接通過WebSocket發(fā)送消息給對(duì)應(yīng)客戶端,而是發(fā)送消息給MQ,這樣如果消息服務(wù)器有多個(gè),就都會(huì)從MQ中獲得消息,之后通過獲取的消息內(nèi)容再使用WebSocket推送給對(duì)應(yīng)的客戶端就可以了。

WebSocket的onMessage方法增加內(nèi)容如下:

  1. try { 
  2.             //嘗試獲取一個(gè)連接 
  3.             Connection connection = RabbitMQUtil.getConnection();            //嘗試創(chuàng)建一個(gè)channel 
  4.             Channel channel = connection.createChannel();            //聲明交換機(jī)(參數(shù)為:交換機(jī)名稱; 交換機(jī)類型,廣播模式) 
  5.             channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT); 
  6.             //消息發(fā)布(參數(shù)為:交換機(jī)名稱; routingKey,忽略。在廣播模式中,生產(chǎn)者聲明交換機(jī)的名稱和類型即可) 
  7.             channel.basicPublish("fanoutLogs",""null,msg.getBytes("UTF-8")); 
  8.             System.out.println("發(fā)布消息"); 
  9.             channel.close();        } catch (IOException |TimeoutException e) { 
  10.             e.printStackTrace(); 
  11.         } 

增加后刪除掉原來的Websocket推送部分代碼。

這樣一整套的解決方案就完成了。

總結(jié)

到這里,我們就解決了分布式下WebSocket的推送消息問題。

我們主要是引入了RabbitMQ,通過RabbitMQ的發(fā)布訂閱模式,讓每個(gè)消息服務(wù)器啟動(dòng)的時(shí)候都去訂閱消息,而無論哪臺(tái)消息服務(wù)器在發(fā)送消息的時(shí)候都會(huì)發(fā)送給MQ,這樣每臺(tái)消息服務(wù)器就都會(huì)感知到發(fā)送消息的時(shí)間,從而再通過Websocket發(fā)送給客戶端。

大體流程就是這樣,那么小伙伴們有沒有想過,如果RabbitMQ掛掉了幾分鐘,之后重啟了,消費(fèi)者是否可以重新連接到RabbitMQ?是否還能正常接收消息呢?

生產(chǎn)環(huán)境下,這個(gè)問題是必須考慮的。

這里已經(jīng)測試過,消費(fèi)者是支持自動(dòng)重連的,所以我們可以放心的使用這套架構(gòu)來解決此問題。

本文到這里就結(jié)束了,歡迎各位小伙伴留言討論,一起學(xué)習(xí),一起進(jìn)步。

責(zé)任編輯:未麗燕 來源: 今日頭條
相關(guān)推薦

2021-09-28 09:43:11

微服務(wù)架構(gòu)技術(shù)

2023-03-05 18:23:38

分布式ID節(jié)點(diǎn)

2025-04-28 00:44:04

2020-05-28 09:35:05

分布式事務(wù)方案

2023-09-14 15:44:46

分布式事務(wù)數(shù)據(jù)存儲(chǔ)

2025-04-29 04:00:00

分布式事務(wù)事務(wù)消息

2010-07-21 13:53:41

SQL Server分

2022-09-07 08:18:26

分布式灰度方案分支號(hào)

2023-11-30 07:19:08

.NET開源

2024-03-26 12:08:53

分布式事務(wù)存儲(chǔ)

2018-07-19 14:53:23

秒殺websocket異步

2021-05-08 08:01:05

Session登錄瀏覽器

2022-02-10 08:57:45

分布式線程鎖

2023-05-18 14:02:00

分布式系統(tǒng)冪等性

2024-06-13 08:04:23

2019-01-11 18:22:07

阿里巴巴技術(shù)開源

2023-09-28 08:39:23

分布式鎖Redis

2021-06-28 10:03:44

分布式數(shù)據(jù)庫架構(gòu)

2020-03-31 16:13:26

分布式事務(wù)方案TCC

2019-07-25 15:32:35

分布式事務(wù)微服務(wù)系統(tǒng)架構(gòu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)