SpringBoot使用WebSocket實現(xiàn)即時消息
環(huán)境:SpringBoot2.3.9.RELEASE
依賴
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
定義消息類型
抽象消息對象
- public class AbstractMessage {
- /**
- * 消息類型
- */
- protected String type ;
- /**
- * 消息內(nèi)容
- */
- protected String content ;
- /**
- * 消息日期
- */
- protected String date ;
- }
消息對象子類
1、Ping檢查消息
- public class PingMessage extends AbstractMessage {
- public PingMessage() {}
- public PingMessage(String type) {
- this.type = type ;
- }
- }
2、系統(tǒng)消息
- public class SystemMessage extends AbstractMessage {
- public SystemMessage() {}
- public SystemMessage(String type, String content) {
- this.type = type ;
- this.content = content ;
- }
- }
3、點對點消息
- public class PersonMessage extends AbstractMessage {
- private String fromName ;
- private String toName ;
- }
消息類型定義
- public enum MessageType {
- /**
- * 系統(tǒng)消息 0000;心跳檢查消息 0001;點對點消息2001
- */
- SYSTEM("0000"), PING("0001"), PERSON("2001") ;
- private String type ;
- private MessageType(String type) {
- this.type = type ;
- }
- public String getType() {
- return type;
- }
- public void setType(String type) {
- this.type = type;
- }
- }
WebSocket服務(wù)端點
該類作用就是定義客戶端連接的地址
- @ServerEndpoint(value = "/message/{username}",
- encoders = {WsMessageEncoder.class},
- decoders = {WsMessageDecoder.class},
- subprotocols = {"gmsg"},
- configurator = MessageConfigurator.class)
- @Component
- public class GMessageListener {
- public static ConcurrentMap<String, UserSession> sessions = new ConcurrentHashMap<>();
- private static Logger logger = LoggerFactory.getLogger(GMessageListener.class) ;
- private String username ;
- @OnOpen
- public void onOpen(Session session, EndpointConfig config, @PathParam("username") String username){
- UserSession userSession = new UserSession(session.getId(), username, session) ;
- this.username = username ;
- sessions.put(username, userSession) ;
- logger.info("【{}】用戶進(jìn)入, 當(dāng)前連接數(shù):{}", username, sessions.size()) ;
- }
- @OnClose
- public void onClose(Session session, CloseReason reason){
- UserSession userSession = sessions.remove(this.username) ;
- if (userSession != null) {
- logger.info("用戶【{}】, 斷開連接, 當(dāng)前連接數(shù):{}", username, sessions.size()) ;
- }
- }
- @OnMessage
- public void pongMessage(Session session, PongMessage message) {
- ByteBuffer buffer = message.getApplicationData() ;
- logger.debug("接受到Pong幀【這是由瀏覽器發(fā)送】:" + buffer.toString());
- }
- @OnMessage
- public void onMessage(Session session, AbstractMessage message) {
- if (message instanceof PingMessage) {
- logger.debug("這里是ping消息");
- return ;
- }
- if (message instanceof PersonMessage) {
- PersonMessage personMessage = (PersonMessage) message ;
- if (this.username.equals(personMessage.getToName())) {
- logger.info("【{}】收到消息:{}", this.username, personMessage.getContent());
- } else {
- UserSession userSession = sessions.get(personMessage.getToName()) ;
- if (userSession != null) {
- try {
- userSession.getSession().getAsyncRemote().sendText(new ObjectMapper().writeValueAsString(message)) ;
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- }
- }
- return ;
- }
- if (message instanceof SystemMessage) {
- logger.info("接受到消息類型為【系統(tǒng)消息】") ;
- return ;
- }
- }
- @OnError
- public void onError(Session session, Throwable error) {
- logger.error(error.getMessage()) ;
- }
- }
WsMessageEncoder.java類
該類的主要作用是,當(dāng)發(fā)送的消息是對象時,該如何轉(zhuǎn)換
- public class WsMessageEncoder implements Encoder.Text<AbstractMessage> {
- private static Logger logger = LoggerFactory.getLogger(WsMessageDecoder.class) ;
- @Override
- public void init(EndpointConfig endpointConfig) {
- }
- @Override
- public void destroy() {
- }
- @Override
- public String encode(AbstractMessage tm) throws EncodeException {
- String message = null ;
- try {
- message = new ObjectMapper().writeValueAsString(tm);
- } catch (JsonProcessingException e) {
- logger.error("JSON處理錯誤:{}", e) ;
- }
- return message;
- }
- }
WsMessageDecoder.java類
該類的作用是,當(dāng)接收到消息時如何轉(zhuǎn)換成對象。
- public class WsMessageDecoder implements Decoder.Text<AbstractMessage> {
- private static Logger logger = LoggerFactory.getLogger(WsMessageDecoder.class) ;
- private static Set<String> msgTypes = new HashSet<>() ;
- static {
- msgTypes.add(MessageType.PING.getType()) ;
- msgTypes.add(MessageType.SYSTEM.getType()) ;
- msgTypes.add(MessageType.PERSON.getType()) ;
- }
- @Override
- @SuppressWarnings("unchecked")
- public AbstractMessage decode(String s) throws DecodeException {
- AbstractMessage message = null ;
- try {
- ObjectMapper mapper = new ObjectMapper() ;
- Map<String,String> map = mapper.readValue(s, Map.class) ;
- String type = map.get("type") ;
- switch(type) {
- case "0000":
- message = mapper.readValue(s, SystemMessage.class) ;
- break;
- case "0001":
- message = mapper.readValue(s, PingMessage.class) ;
- break;
- case "2001":
- message = mapper.readValue(s, PersonMessage.class) ;
- break;
- }
- } catch (JsonProcessingException e) {
- logger.error("JSON處理錯誤:{}", e) ;
- }
- return message ;
- }
- // 該方法判斷消息是否可以被解碼(轉(zhuǎn)換)
- @Override
- @SuppressWarnings("unchecked")
- public boolean willDecode(String s) {
- Map<String, String> map = new HashMap<>() ;
- try {
- map = new ObjectMapper().readValue(s, Map.class);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- logger.debug("檢查消息:【" + s + "】是否可以解碼") ;
- String type = map.get("type") ;
- if (StringUtils.isEmpty(type) || !msgTypes.contains(type)) {
- return false ;
- }
- return true ;
- }
- @Override
- public void init(EndpointConfig endpointConfig) {
- }
- @Override
- public void destroy() {
- }
- }
MessageConfigurator.java類
該類的作用是配置服務(wù)端點,比如配置握手信息
- public class MessageConfigurator extends ServerEndpointConfig.Configurator {
- private static Logger logger = LoggerFactory.getLogger(MessageConfigurator.class) ;
- @Override
- public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
- logger.debug("握手請求頭信息:" + request.getHeaders());
- logger.debug("握手響應(yīng)頭信息:" + response.getHeaders());
- super.modifyHandshake(sec, request, response);
- }
- }
WebSocke配置類
- @Configuration
- public class WebSocketConfig {
- @Bean
- public ServerEndpointExporter serverEndpointExporter (){
- return new ServerEndpointExporter();
- }
- }
當(dāng)以jar包形式運行時需要配置該bean,暴露我們配置的@ServerEndpoint;當(dāng)我們以war獨立tomcat運行時不能配置該bean。
前端頁面
- <!doctype html>
- <html>
- <head>
- <meta charset="UTF-8">
- <meta name="Author" content="">
- <meta name="Keywords" content="">
- <meta name="Description" content="">
- <script src="g-messages.js?v=1"></script>
- <title>WebSocket</title>
- <style type="text/css">
- </style>
- <script>
- let gm = null ;
- let username = null ;
- function ListenerMsg({url, protocols = ['gmsg'], options = {}}) {
- if (!url){
- throw new Error("未知服務(wù)地址") ;
- }
- gm = new window.__GM({
- url: url,
- protocols: protocols
- }) ;
- gm.open(options) ;
- }
- ListenerMsg.init = (user) => {
- if (!user) {
- alert("未知的當(dāng)前登錄人") ;
- return ;
- }
- let url = `ws://localhost:8080/message/${user}` ;
- let msg = document.querySelector("#msg")
- ListenerMsg({url, options: {
- onmessage (e) {
- let data = JSON.parse(e.data) ;
- let li = document.createElement("li") ;
- li.innerHTML = "【" + data.fromName + "】對你說:" + data.content ;
- msg.appendChild(li) ;
- }
- }}) ;
- }
- function enter() {
- username = document.querySelector("#nick").value ;
- ListenerMsg.init(username) ;
- document.querySelector("#chat").style.display = "block" ;
- document.querySelector("#enter").style.display = "none" ;
- document.querySelector("#cu").innerText = username ;
- }
- function send() {
- let a = document.querySelector("#toname") ;
- let b = document.querySelector("#content") ;
- let toName = a.value ;
- let content = b.value ;
- gm.sendMessage({type: "2001", content, fromName: username, toName}) ;
- a.value = '' ;
- b.value = '' ;
- }
- </script>
- </head>
- <body>
- <div id="enter">
- <input id="nick"/><button type="button" onclick="enter()">進(jìn)入</button>
- </div>
- <hr/>
- <div id="chat" style="display:none;">
- 當(dāng)前用戶:<b id="cu"></b><br/>
- 用戶:<input id="toname" name="toname"/><br/><br/>
- 內(nèi)容:<textarea id="content" rows="3" cols="22"></textarea><br/>
- <button type="button" onclick="send()">發(fā)送</button>
- </div>
- <div>
- <ul id="msg">
- </ul>
- </div>
- </body>
- </html>
到此所有的代碼完畢,接下來測試
測試
打開兩個標(biāo)簽頁,以不同的用戶進(jìn)入。
輸入對方用戶名發(fā)送消息
成功了,簡單的websocket。我們生產(chǎn)環(huán)境還就這么完的,8g內(nèi)存跑了6w的用戶。
完畢!!!