聊聊分布式下的WebSocket解決方案
前言
最近自己搭建了個(gè)項(xiàng)目,項(xiàng)目本身很簡單,但是里面有使用WebSocket進(jìn)行消息提醒的功能,大體情況是這樣的。
發(fā)布消息者在系統(tǒng)中發(fā)送消息,實(shí)時(shí)的把消息推送給對(duì)應(yīng)的一個(gè)部門下的所有人。
這里面如果是單機(jī)應(yīng)用的情況時(shí),我們可以通過部門的id和用戶的id組成一個(gè)唯一的key,與應(yīng)用服務(wù)器建立WebSocket長連接,然后就可以接收到發(fā)布消息者發(fā)送的消息了。
但是真正把項(xiàng)目應(yīng)用于生產(chǎn)環(huán)境中時(shí),我們是不可能就部署一個(gè)單機(jī)應(yīng)用的,而是要部署一個(gè)集群。
所以我通過Nginx+兩臺(tái)Tomcat搭建了一個(gè)簡單的負(fù)載均衡集群,作為測試使用
但是問題出現(xiàn)了,我們的客戶端瀏覽器只會(huì)與一臺(tái)服務(wù)器建立WebSocket長連接,所以發(fā)布消息者在發(fā)送消息時(shí),就沒法保證所有目標(biāo)部門的人都能接收到消息(因?yàn)檫@些人連接的可能不是一個(gè)服務(wù)器)。
本篇文章就是針對(duì)于這么一個(gè)問題展開討論,提出一種解決方案,當(dāng)然解決方案不止一種,那我們開始吧。
WebSocket單體應(yīng)用介紹
在介紹分布式集群之前,我們先來看一下王子的WebSocket代碼實(shí)現(xiàn),先來看java后端代碼如下:
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;import java.io.IOException;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- @ServerEndpoint("/webSocket/{key}")
- public class WebSocket {
- private static int onlineCount = 0;
- /**
- * 存儲(chǔ)連接的客戶端
- */
- private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>();
- private Session session;
- /**
- * 發(fā)送的目標(biāo)科室code
- */
- private String key;
- @OnOpen
- public void onOpen(@PathParam("key") String key, Session session) throws IOException {
- this.key = key;
- this.session = session;
- if (!clients.containsKey(key)) {
- addOnlineCount(); } clients.put(key, this);
- Log.info(key+"已連接消息服務(wù)!");
- } @OnClose
- public void onClose() throws IOException {
- clients.remove(key); subOnlineCount(); } @OnMessage
- public void onMessage(String message) throws IOException {
- if(message.equals("ping")){
- return ;
- } JSONObject jsonTo = JSON.parseObject(message); String mes = (String) jsonTo.get("message");
- if (!jsonTo.get("to").equals("All")){
- sendMessageTo(mes, jsonTo.get("to").toString());
- }else{
- sendMessageAll(mes); } } @OnError
- public void onError(Session session, Throwable error) {
- error.printStackTrace(); } private void sendMessageTo(String message, String To) throws IOException {
- for (WebSocket item : clients.values()) {
- if (item.key.contains(To) )
- item.session.getAsyncRemote().sendText(message); } } private void sendMessageAll(String message) throws IOException {
- for (WebSocket item : clients.values()) {
- item.session.getAsyncRemote().sendText(message); } } public static synchronized int getOnlineCount() {
- return onlineCount;
- } public static synchronized void addOnlineCount() {
- WebSocket.onlineCount++; } public static synchronized void subOnlineCount() {
- WebSocket.onlineCount--; } public static synchronized Map<String, WebSocket> getClients() {
- return clients;
- }}
示例代碼中并沒有使用Spring,用的是原生的java web編寫的,簡單和大家介紹一下里面的方法。
- onOpen:在客戶端與WebSocket服務(wù)連接時(shí)觸發(fā)方法執(zhí)行
- onClose:在客戶端與WebSocket連接斷開的時(shí)候觸發(fā)執(zhí)行
- onMessage:在接收到客戶端發(fā)送的消息時(shí)觸發(fā)執(zhí)行
- onError:在發(fā)生錯(cuò)誤時(shí)觸發(fā)執(zhí)行
可以看到,在onMessage方法中,我們直接根據(jù)客戶端發(fā)送的消息,進(jìn)行消息的轉(zhuǎn)發(fā)功能,這樣在單體消息服務(wù)中是沒有問題的。
再來看一下js代碼
- var host = document.location.host;
- // 獲得當(dāng)前登錄科室 var deptCodes='${sessionScope.$UserContext.departmentID}';
- deptCodes=deptCodes.replace(/[\[|\]|\s]+/g, "");
- var key = '${sessionScope.$UserContext.userID}'+deptCodes;
- var lockReconnect = false; //避免ws重復(fù)連接
- var ws = null; // 判斷當(dāng)前瀏覽器是否支持WebSocket var wsUrl = 'ws://' + host + '/webSocket/'+ key;
- createWebSocket(wsUrl); //連接ws function createWebSocket(url) {
- try{ if('WebSocket' in window){
- ws = new WebSocket(url); }else if('MozWebSocket' in window){
- ws = new MozWebSocket(url); }else{
- layer.alert("您的瀏覽器不支持websocket協(xié)議,建議使用新版谷歌、火狐等瀏覽器,請(qǐng)勿使用IE10以下瀏覽器,360瀏覽器請(qǐng)使用極速模式,不要使用兼容模式!");
- } initEventHandle(); }catch(e){ reconnect(url); console.log(e);
- } } function initEventHandle() {
- ws.onclose = function () {
- reconnect(wsUrl); console.log("llws連接關(guān)閉!"+new Date().toUTCString());
- }; ws.onerror = function () {
- reconnect(wsUrl); console.log("llws連接錯(cuò)誤!");
- }; ws.onopen = function () {
- heartCheck.reset().start(); //心跳檢測重置 console.log("llws連接成功!"+new Date().toUTCString());
- }; ws.onmessage = function (event) { //如果獲取到消息,心跳檢測重置
- heartCheck.reset().start(); //拿到任何消息都說明當(dāng)前連接是正常的//接收到消息實(shí)際業(yè)務(wù)處理 ... }; } // 監(jiān)聽窗口關(guān)閉事件,當(dāng)窗口關(guān)閉時(shí),主動(dòng)去關(guān)閉websocket連接,防止連接還沒斷開就關(guān)閉窗口,server端會(huì)拋異常。 window.onbeforeunload = function() {
- ws.close();
- } function reconnect(url) {
- if(lockReconnect) return;
- lockReconnect = true;
- setTimeout(function () { //沒連接上會(huì)一直重連,設(shè)置延遲避免請(qǐng)求過多
- createWebSocket(url); lockReconnect = false;
- }, 2000);
- } //心跳檢測 var heartCheck = { timeout: 300000, //5分鐘發(fā)一次心跳
- timeoutObj: null, serverTimeoutObj: null, reset: function(){
- clearTimeout(this.timeoutObj); clearTimeout(this.serverTimeoutObj); return this;
- }, start: function(){
- var self = this; this.timeoutObj = setTimeout(function(){
- //這里發(fā)送一個(gè)心跳,后端收到后,返回一個(gè)心跳消息, //onmessage拿到返回的心跳就說明連接正常 ws.send("ping");
- console.log("ping!")
- self.serverTimeoutObj = setTimeout(function(){//如果超過一定時(shí)間還沒重置,說明后端主動(dòng)斷開了
- ws.close(); //如果onclose會(huì)執(zhí)行reconnect,我們執(zhí)行ws.close()就行了.如果直接執(zhí)行reconnect 會(huì)觸發(fā)onclose導(dǎo)致重連兩次
- }, self.timeout) }, this.timeout) } }
js部分使用的是原生H5編寫的,如果為了更好的兼容瀏覽器,也可以使用SockJS,有興趣小伙伴們可以自行百度。
接下來我們就手動(dòng)的優(yōu)化代碼,實(shí)現(xiàn)WebSocket對(duì)分布式架構(gòu)的支持。
解決方案的思考
現(xiàn)在我們已經(jīng)了解單體應(yīng)用下的代碼結(jié)構(gòu),也清楚了WebSocket在分布式環(huán)境下面臨的問題,那么是時(shí)候思考一下如何能夠解決這個(gè)問題了。
我們先來看一看發(fā)生這個(gè)問題的根本原因是什么。
簡單思考一下就能明白,單體應(yīng)用下只有一臺(tái)服務(wù)器,所有的客戶端連接的都是這一臺(tái)消息服務(wù)器,所以當(dāng)發(fā)布消息者發(fā)送消息時(shí),所有的客戶端其實(shí)已經(jīng)全部與這臺(tái)服務(wù)器建立了連接,直接群發(fā)消息就可以了。
換成分布式系統(tǒng)后,假如我們有兩臺(tái)消息服務(wù)器,那么客戶端通過Nginx負(fù)載均衡后,就會(huì)有一部分連接到其中一臺(tái)服務(wù)器,另一部分連接到另一臺(tái)服務(wù)器,所以發(fā)布消息者發(fā)送消息時(shí),只會(huì)發(fā)送到其中的一臺(tái)服務(wù)器上,而這臺(tái)消息服務(wù)器就可以執(zhí)行群發(fā)操作,但問題是,另一臺(tái)服務(wù)器并不知道這件事,也就無法發(fā)送消息了。
現(xiàn)在我們知道了根本原因是生產(chǎn)消息時(shí),只有一臺(tái)消息服務(wù)器能夠感知到,所以我們只要讓另一臺(tái)消息服務(wù)器也能感知到就可以了,這樣感知到之后,它就可以群發(fā)消息給連接到它上邊的客戶端了。
那么什么方法可以實(shí)現(xiàn)這種功能呢,王子很快想到了引入消息中間件,并使用它的發(fā)布訂閱模式來通知所有消息服務(wù)器就可以了。
引入RabbitMQ解決分布式下的WebSocket問題
在消息中間件的選擇上,王子選擇了RabbitMQ,原因是它的搭建比較簡單,功能也很強(qiáng)大,而且我們只是用到它群發(fā)消息的功能。
RabbitMQ有一個(gè)廣播模式(fanout),我們使用的就是這種模式。
首先我們寫一個(gè)RabbitMQ的連接類:
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class RabbitMQUtil {
- private static Connection connection;
- /**
- * 與rabbitmq建立連接
- * @return
- */
- public static Connection getConnection() {
- if (connection != null&&connection.isOpen()) {
- return connection;
- } ConnectionFactory factory = new ConnectionFactory();
- factory.setVirtualHost("/");
- factory.setHost("192.168.220.110"); // 用的是虛擬IP地址
- factory.setPort(5672);
- factory.setUsername("guest");
- factory.setPassword("guest");
- try {
- connection = factory.newConnection();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- return connection;
- }
- }
這個(gè)類沒什么說的,就是獲取MQ連接的一個(gè)工廠類。
然后按照我們的思路,就是每次服務(wù)器啟動(dòng)的時(shí)候,都會(huì)創(chuàng)建一個(gè)MQ的消費(fèi)者監(jiān)聽MQ的消息,王子這里測試使用的是Servlet的監(jiān)聽器,如下:
- import javax.servlet.ServletContextEvent;
- import javax.servlet.ServletContextListener;
- public class InitListener implements ServletContextListener {
- @Override
- public void contextInitialized(ServletContextEvent servletContextEvent) {
- WebSocket.init(); } @Override
- public void contextDestroyed(ServletContextEvent servletContextEvent) {
- }}
記得要在Web.xml中配置監(jiān)聽器信息
- <?xml version="1.0" encoding="UTF-8"?>
- <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
- version="4.0">
- <listener>
- <listener-class>InitListener</listener-class>
- </listener>
- </web-app>
WebSocket中增加init方法,作為MQ消費(fèi)者部分
- public static void init() {
- try { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); //交換機(jī)聲明(參數(shù)為:交換機(jī)名稱;交換機(jī)類型)
- channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT);
- //獲取一個(gè)臨時(shí)隊(duì)列
- String queueName = channel.queueDeclare().getQueue(); //隊(duì)列與交換機(jī)綁定(參數(shù)為:隊(duì)列名稱;交換機(jī)名稱;routingKey忽略)
- channel.queueBind(queueName,"fanoutLogs","");
- //這里重寫了DefaultConsumer的handleDelivery方法,因?yàn)榘l(fā)送的時(shí)候?qū)ο⑦M(jìn)行了getByte(),在這里要重新組裝成String
- Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body);
- String message = new String(body,"UTF-8");
- System.out.println(message); //這里可以使用WebSocket通過消息內(nèi)容發(fā)送消息給對(duì)應(yīng)的客戶端
- } }; //聲明隊(duì)列中被消費(fèi)掉的消息(參數(shù)為:隊(duì)列名稱;消息是否自動(dòng)確認(rèn);consumer主體)
- channel.basicConsume(queueName,true,consumer);
- //這里不能關(guān)閉連接,調(diào)用了消費(fèi)方法后,消費(fèi)者會(huì)一直連接著rabbitMQ等待消費(fèi)
- } catch (IOException e) { e.printStackTrace(); } }
同時(shí)在接收到消息時(shí),不是直接通過WebSocket發(fā)送消息給對(duì)應(yīng)客戶端,而是發(fā)送消息給MQ,這樣如果消息服務(wù)器有多個(gè),就都會(huì)從MQ中獲得消息,之后通過獲取的消息內(nèi)容再使用WebSocket推送給對(duì)應(yīng)的客戶端就可以了。
WebSocket的onMessage方法增加內(nèi)容如下:
- try {
- //嘗試獲取一個(gè)連接
- Connection connection = RabbitMQUtil.getConnection(); //嘗試創(chuàng)建一個(gè)channel
- Channel channel = connection.createChannel(); //聲明交換機(jī)(參數(shù)為:交換機(jī)名稱; 交換機(jī)類型,廣播模式)
- channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT);
- //消息發(fā)布(參數(shù)為:交換機(jī)名稱; routingKey,忽略。在廣播模式中,生產(chǎn)者聲明交換機(jī)的名稱和類型即可)
- channel.basicPublish("fanoutLogs","", null,msg.getBytes("UTF-8"));
- System.out.println("發(fā)布消息");
- channel.close(); } catch (IOException |TimeoutException e) {
- e.printStackTrace();
- }
增加后刪除掉原來的Websocket推送部分代碼。
這樣一整套的解決方案就完成了。
總結(jié)
到這里,我們就解決了分布式下WebSocket的推送消息問題。
我們主要是引入了RabbitMQ,通過RabbitMQ的發(fā)布訂閱模式,讓每個(gè)消息服務(wù)器啟動(dòng)的時(shí)候都去訂閱消息,而無論哪臺(tái)消息服務(wù)器在發(fā)送消息的時(shí)候都會(huì)發(fā)送給MQ,這樣每臺(tái)消息服務(wù)器就都會(huì)感知到發(fā)送消息的時(shí)間,從而再通過Websocket發(fā)送給客戶端。
大體流程就是這樣,那么小伙伴們有沒有想過,如果RabbitMQ掛掉了幾分鐘,之后重啟了,消費(fèi)者是否可以重新連接到RabbitMQ?是否還能正常接收消息呢?
生產(chǎn)環(huán)境下,這個(gè)問題是必須考慮的。
這里已經(jīng)測試過,消費(fèi)者是支持自動(dòng)重連的,所以我們可以放心的使用這套架構(gòu)來解決此問題。
本文到這里就結(jié)束了,歡迎各位小伙伴留言討論,一起學(xué)習(xí),一起進(jìn)步。