自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Springboot 整合 Websocket 輕松實現(xiàn) IM 及時通訊

開發(fā) 前端
項目中碰到需要及時通訊的場景,使用Springboot集成Websocket,即可瞬間破局。本文介紹Springboot如何集成Websocket、IM及時通訊需要哪些模塊、開發(fā)和部署過程中遇到的問題、以及實現(xiàn)小型IM及時通訊的代碼。

一、方案實踐

集成分為三步:添加依賴、增加配置類和消息核心類、前端集成。

1.1、添加依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.1.13.RELEASE</version>
</dependency>

1.2、增加WebSocket配置類

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * WebSocket配置
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

1.3、增加消息核心類WebSocketServer

@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketServer {
    // 消息存儲
    private static MessageStore messageStore;
    // 消息發(fā)送
    private static MessageSender messageSender;

    public static void setMessageStore(MessageStore messageStore) {
        WebSocketServer.messageStore = messageStore;
    }

    public static void setMessageSender(MessageSender messageSender) {
        WebSocketServer.messageSender = messageSender;
    }

    /**
     * 連接建立成功調用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        messageStore.saveSession(session);
    }

    /**
     * 連接關閉調用的方法
     */
    @OnClose
    public void onClose(Session session, @PathParam("userId") String userId) {
        messageStore.deleteSession(session);
    }

    /**
     * 收到客戶端消息后調用的方法
     *
     * @ Param message 客戶端發(fā)送過來的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) throws Exception {
        log.warn("=========== 收到來自窗口" + session.getId() + "的信息:" + message);
        handleTextMessage(session, new TextMessage(message));
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, @PathParam("userId") String userId, Throwable error) {
        log.error("=========== 發(fā)生錯誤");
        error.printStackTrace();
//        msgStore.deleteSession(session);
    }

    public void handleTextMessage(Session session, TextMessage message) throws Exception {
        log.warn("=========== Received message: {}", message.getPayload());
    }
}

1.4、前端頁面加入socket

<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/html">
  <head>
    <title>WebSocket Example</title>
  </head>
  <body>
    登錄用戶ID:<input type="text" id="sendUserId" /></br>
    接受用戶ID:<input type="text" id="receivedUserId" /></br>
    發(fā)送消息內容:<input type="text" id="messageInput" /></br>
    接受消息內容:<input type="text" id="messageReceive" /></br>
    <button onclick="sendMessage()">Send</button>

    <script>
      var socket = new WebSocket("ws://localhost:8080/websocket/aaa");
      var roomId = "123456";
      // 隨機產出六位數(shù)字
      var sendUserId = Math.floor(Math.random() * 1000000);

      document.getElementById("sendUserId").value = sendUserId;
      var messageReceive = document.getElementById("messageReceive");


      socket.onopen = function (event) {
        console.log("WebSocket is open now.");
        let loginInfo = {
          msgType: 2, //登錄消息
          sendUserId: sendUserId,
          bizType: 1, //業(yè)務類型
          bizOptModule: 1, //業(yè)務模塊
          roomId: roomId,
          msgBody: {},
        };
        socket.send(JSON.stringify(loginInfo));
      };

      socket.onmessage = function (event) {
        var message = event.data;
        console.log("Received message: " + message);
        messageReceive.value = message;
      };

      socket.onclose = function (event) {
        console.log("WebSocket is closed now.");
      };

      function sendMessage() {
        var message = document.getElementById("messageInput").value;
        var receivedUserId = document.getElementById("receivedUserId").value;
        let operateInfo = {
          msgType: 100, //業(yè)務消息
          sendUserId: sendUserId,
          bizType: 1, //業(yè)務類型
          bizOptModule: 1, //業(yè)務模塊
          roomId: roomId,
          receivedUserId: receivedUserId,
          msgBody: {
            operateType: 1, //操作類型:禁言
            operateContent: "1",
          },
        };
        socket.send(JSON.stringify(operateInfo));
      }

      setInterval(() => {
        socket.send("ping");
      }, 30000);
    </script>
  </body>
</html>

二、小型及時通訊包含的模塊

以上只是集成了Websocket框架,實現(xiàn)了基本的全雙工通信,服務器和客戶端都可以同時發(fā)送和接收數(shù)據(jù)。要想實現(xiàn)一些小型完整的及時通訊,還需要具備以下幾個核心模塊。架構圖如下:

2.1、架構圖

圖片圖片

2.2、消息對象模型

組織消息內容,比如消息類型、發(fā)送者用戶ID、接受者用戶ID、具體的消息體等。比如:

public class SocketMsg<T> {

    /**
     * 消息類型:1心跳  2登錄 3業(yè)務操作
     */
    private Integer msgType;

    /**
     * 發(fā)送者用戶ID
     */
    private String sendUserId;
    /**
     * 接受者用戶ID
     */
    private String receivedUserId;

    /**
     * 業(yè)務類型
     */
    private Integer bizType;

    /**
     * 業(yè)務操作模塊
     */
    private Integer bizOptModule;

    /**
     * 消息內容
     */
    private T msgBody;
}

2.3、消息存儲模塊

負責存儲消息內容、用戶ID和sessionID的關系,防止數(shù)據(jù)丟失或者服務器重啟等。

2.4、消息發(fā)送模塊

功能開發(fā)完畢,一般部署到分布式集群環(huán)境,所以通訊時session會分布在多臺服務器。比如用戶A的session在機器1,用戶B的session在機器2,此時A發(fā)送給B,就無法單獨通過機器1完成。

因為機器1拿不到機器2里的session,所以消息發(fā)不過去。此時只能借助別的中間件來實現(xiàn),比如借助消息中間件kafka實現(xiàn)。

機器1將消息發(fā)送給kafka,然后機器1和機器2都監(jiān)聽kafka,然后查看用戶對應的session是否在本機,如果在本機則發(fā)送出去。

2.5、消息推送模塊

模塊3提到的消息發(fā)送流程中,消息發(fā)送給 消息中間件,然后服務器消費到消費,在通過本機的session推送給客戶端。

三、遇到的幾個問題

3.1、連接自動斷開

WebSocket連接之后,發(fā)現(xiàn)一個問題:每隔一段時間如果不傳送數(shù)據(jù)的話,與前端的連接就會自動斷開。可以采用心跳消息的方式來解決這個問題。比如客服端每隔30秒自動發(fā)送ping消息給服務端,服務端返回pong。

3.2、Session無法被序列化

分布式場景會存在這樣的問題:當一次請求負載到第一臺服務器時,session在第一臺服務器線程上,第二次請求,負載到第二臺服務器上,此時通過userId查找當前用戶的session時,是查找不到的。

本來想著把session存入到redis中,就可以從redis獲取用戶的session,希望用這種方式來解決分布式場景下消息發(fā)送的問題。但是會出現(xiàn)如下錯誤:

The remote endpoint was in state [STREAM_WRITING] which is an invalid state for called method

翻看了session源碼,發(fā)現(xiàn)session無法被序列化。所以這個方案只能放棄。解決方案請看下面的問題5或者上面的架構圖。

3.3、對象無法自動注入

使用了@ServerEndpoint注解的類中使用@Resource或@Autowired注入對象都會失敗,并且報空指針異常。

原因是WebSocket服務是線程安全的,那么當我們去發(fā)起一個ws連接時,就會創(chuàng)建一個端點對象。WebSocket服務是多對象的,不是單例的。而我們的Spring的Bean默認就是單例的,在非單例類中注入一個單例的Bean是沖突的。

或者說:

Spring管理采用單例模式(singleton),而 WebSocket 是多對象的,即每個客戶端對應后臺的一個 WebSocket 對象,也可以理解成 new 了一個 WebSocket,這樣當然是不能獲得自動注入的對象了,因為這兩者剛好沖突。

@Autowired 注解注入對象操作是在啟動時執(zhí)行的,而不是在使用時,而 WebSocket 是只有連接使用時才實例化對象,且有多個連接就有多個對象。所以我們可以得出結論,這個 Service 根本就沒有注入到 WebSocket 當中。

如何解決呢?

使用靜態(tài)對象,并且對外暴露set方法,這樣在對象初始化的時候,將其注入到WebSocketServer中。比如說這樣:

@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketServer {
  private static MessageStore messageStore;
  private static MessageSender messageSender;

  public static void setMessageStore(MessageStore messageStore) {
      WebSocketServer.messageStore = messageStore;
  }

  public static void setMessageSender(MessageSender messageSender) {
      WebSocketServer.messageSender = messageSender;
  }
}


@Slf4j
@Service
public class MessageStore {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @PostConstruct
    public void init() {
        WebSocketServer.setMessageStore(this);
    }
}

3.4、分布式場景消息如何發(fā)給客戶端

問題2中提到了分布式場景下存在的session不在本機的問題,這種場景可以通過發(fā)送消息中間件的方式解決。具體這樣解決:

每次連接時,都將userId和對應的session存入到本機,發(fā)送消息時,直接發(fā)送給MQ-Broker,然后每臺應用負載都去消費這個消息,拿到消息之后,判斷在本機能根據(jù)userId是否能找到session,找到session則推送到客戶端。

3.5、部署時Nginx配置問題

代碼開發(fā)完畢之后,本機跑通后,然后部署到服務器之后,還差很重要的一步:配置nginx代理。

3.5.1、給后端應用部署獨立域名

要給后端應用部署獨立域名,nginx代理直接轉發(fā)到應用的獨立域名,不要走微服務的gateway網(wǎng)關轉發(fā)過去。

3.5.2、多層nginx轉發(fā)問題

當只有一層nginx的時候,配置比較簡單,如下:

location ~* ^/api/websocket/* {
      proxy_pass http://mangodwsstest.mangod.top;
      
      proxy_read_timeout 300s;
      proxy_send_timeout 300s;
      proxy_set_header Host mangodwsstest.mangod.top;
      proxy_http_version 1.1;
      proxy_set_header Upgrade $http_upgrade;
      proxy_set_header Connection "Upgrade";
      proxy_set_header X-Real-IP $remote_addr;
 }

但是,當有兩層nginx轉發(fā)的時候,問題就出現(xiàn)了。

在最外層的nginx需要使用如下配置,不能照抄后面一層的配置。proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for和proxy_set_header X-Forwarded-Proto $scheme這兩個配置不能少,用來將協(xié)議和真實IP傳遞給后面一層的nginx。

location ~* ^/api/websocket/* {
      proxy_pass http://mangodwsstest.mangod.top;

      proxy_read_timeout 300s;
      proxy_send_timeout 300s;
      proxy_set_header  Host $http_host;
      proxy_set_header  X-Real-IP  $remote_addr;
      proxy_set_header  X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header  X-Forwarded-Proto $scheme;
      proxy_http_version 1.1;
      proxy_set_header Upgrade $http_upgrade;
      proxy_set_header Connection $connection_upgrade;
}

四、完整代碼和示例

4.1、頁面效果如下

開啟兩個web頁面,用戶1輸入用戶2的用戶ID,輸入發(fā)送消息內容,點擊發(fā)送。在用戶2的頁面的接受消息內容可以看到發(fā)送的消息。

圖片圖片

圖片圖片

4.2、代碼結構

圖片圖片

4.3、代碼地址

https://github.com/yclxiao/spring-websocket.git

五、總結

本文聊了Springboot如何集成Websocket、IM及時通訊需要哪些模塊、開發(fā)和部署過程中遇到的問題、以及實現(xiàn)小型IM及時通訊的代碼。

責任編輯:武曉燕 來源: Java極客技術
相關推薦

2023-01-13 00:02:41

2023-01-05 09:17:58

2023-08-09 08:01:00

WebSockett服務器web

2021-03-26 08:16:32

SpringbootWebsocket前端

2012-08-13 13:03:31

Web

2024-11-14 12:22:37

SpringMail郵件

2015-06-02 11:24:06

容聯(lián)云通訊

2024-09-12 14:50:08

2021-03-25 08:29:33

SpringBootWebSocket即時消息

2023-08-14 08:01:12

websocket8g用戶

2024-11-18 17:04:03

Vue3C#

2023-07-26 07:28:55

WebSocket服務器方案

2024-03-21 08:34:49

Vue3WebSocketHTTP

2021-08-14 09:23:03

即時通訊IM互聯(lián)網(wǎng)

2020-04-23 15:08:41

SpringBootMyCatJava

2021-02-05 07:28:11

SpringbootNettyWebsocke

2022-04-28 07:31:41

Springkafka數(shù)據(jù)量

2024-09-05 08:58:37

2024-08-29 08:58:30

JPA編寫數(shù)據(jù)操

2024-08-02 09:00:17

NettyWebSocketNIO
點贊
收藏

51CTO技術棧公眾號