SpringBoot使用WebSocket實(shí)現(xiàn)即時(shí)消息
環(huán)境:SpringBoot2.4.12.
依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
定義消息類型
- 抽象消息對(duì)象
public class AbstractMessage {
/**
* 消息類型
*/
protected String type ;
/**
* 消息內(nèi)容
*/
protected String content ;
/**
* 消息日期
*/
protected String date ;
}
消息對(duì)象子類
1、Ping檢查消息
public class PingMessage extends AbstractMessage {
public PingMessage() {}
public PingMessage(String type) {
this.type = type ;
}
}
2、系統(tǒng)消息
public class SystemMessage extends AbstractMessage {
public SystemMessage() {}
public SystemMessage(String type, String content) {
this.type = type ;
this.content = content ;
}
}
3、點(diǎn)對(duì)點(diǎn)消息
public class PersonMessage extends AbstractMessage {
private String fromName ;
private String toName ;
}
消息類型定義
public enum MessageType {
/**
* 系統(tǒng)消息 0000;心跳檢查消息 0001;點(diǎn)對(duì)點(diǎn)消息2001
*/
SYSTEM("0000"), PING("0001"), PERSON("2001") ;
private String type ;
private MessageType(String type) {
this.type = type ;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
WebSocket服務(wù)端點(diǎn)
該類作用就是定義客戶端連接的地址
@ServerEndpoint(value = "/message/{username}",
encoders = {WsMessageEncoder.class},
decoders = {WsMessageDecoder.class},
subprotocols = {"gmsg"},
configurator = MessageConfigurator.class)
@Component
public class GMessageListener {
public static ConcurrentMap<String, UserSession> sessions = new ConcurrentHashMap<>();
private static Logger logger = LoggerFactory.getLogger(GMessageListener.class) ;
private String username ;
@OnOpen
public void onOpen(Session session, EndpointConfig config, @PathParam("username") String username){
UserSession userSession = new UserSession(session.getId(), username, session) ;
this.username = username ;
sessions.put(username, userSession) ;
logger.info("【{}】用戶進(jìn)入, 當(dāng)前連接數(shù):{}", username, sessions.size()) ;
}
@OnClose
public void onClose(Session session, CloseReason reason){
UserSession userSession = sessions.remove(this.username) ;
if (userSession != null) {
logger.info("用戶【{}】, 斷開連接, 當(dāng)前連接數(shù):{}", username, sessions.size()) ;
}
}
@OnMessage
public void pongMessage(Session session, PongMessage message) {
ByteBuffer buffer = message.getApplicationData() ;
logger.debug("接受到Pong幀【這是由瀏覽器發(fā)送】:" + buffer.toString());
}
@OnMessage
public void onMessage(Session session, AbstractMessage message) {
if (message instanceof PingMessage) {
logger.debug("這里是ping消息");
return ;
}
if (message instanceof PersonMessage) {
PersonMessage personMessage = (PersonMessage) message ;
if (this.username.equals(personMessage.getToName())) {
logger.info("【{}】收到消息:{}", this.username, personMessage.getContent());
} else {
UserSession userSession = sessions.get(personMessage.getToName()) ;
if (userSession != null) {
try {
userSession.getSession().getAsyncRemote().sendText(new ObjectMapper().writeValueAsString(message)) ;
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
return ;
}
if (message instanceof SystemMessage) {
logger.info("接受到消息類型為【系統(tǒng)消息】") ;
return ;
}
}
@OnError
public void onError(Session session, Throwable error) {
logger.error(error.getMessage()) ;
}
}
WsMessageEncoder.java類
該類的主要作用是,當(dāng)發(fā)送的消息是對(duì)象時(shí),該如何轉(zhuǎn)換
public class WsMessageEncoder implements Encoder.Text<AbstractMessage> {
private static Logger logger = LoggerFactory.getLogger(WsMessageDecoder.class) ;
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
}
@Override
public String encode(AbstractMessage tm) throws EncodeException {
String message = null ;
try {
message = new ObjectMapper().writeValueAsString(tm);
} catch (JsonProcessingException e) {
logger.error("JSON處理錯(cuò)誤:{}", e) ;
}
return message;
}
}
WsMessageDecoder.java類
該類的作用是,當(dāng)接收到消息時(shí)如何轉(zhuǎn)換成對(duì)象。
public class WsMessageDecoder implements Decoder.Text<AbstractMessage> {
private static Logger logger = LoggerFactory.getLogger(WsMessageDecoder.class) ;
private static Set<String> msgTypes = new HashSet<>() ;
static {
msgTypes.add(MessageType.PING.getType()) ;
msgTypes.add(MessageType.SYSTEM.getType()) ;
msgTypes.add(MessageType.PERSON.getType()) ;
}
@Override
@SuppressWarnings("unchecked")
public AbstractMessage decode(String s) throws DecodeException {
AbstractMessage message = null ;
try {
ObjectMapper mapper = new ObjectMapper() ;
Map<String,String> map = mapper.readValue(s, Map.class) ;
String type = map.get("type") ;
switch(type) {
case "0000":
message = mapper.readValue(s, SystemMessage.class) ;
break;
case "0001":
message = mapper.readValue(s, PingMessage.class) ;
break;
case "2001":
message = mapper.readValue(s, PersonMessage.class) ;
break;
}
} catch (JsonProcessingException e) {
logger.error("JSON處理錯(cuò)誤:{}", e) ;
}
return message ;
}
// 該方法判斷消息是否可以被解碼(轉(zhuǎn)換)
@Override
@SuppressWarnings("unchecked")
public boolean willDecode(String s) {
Map<String, String> map = new HashMap<>() ;
try {
map = new ObjectMapper().readValue(s, Map.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
logger.debug("檢查消息:【" + s + "】是否可以解碼") ;
String type = map.get("type") ;
if (StringUtils.isEmpty(type) || !msgTypes.contains(type)) {
return false ;
}
return true ;
}
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
}
}
MessageConfigurator.java類
該類的作用是配置服務(wù)端點(diǎn),比如配置握手信息
public class MessageConfigurator extends ServerEndpointConfig.Configurator {
private static Logger logger = LoggerFactory.getLogger(MessageConfigurator.class) ;
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
logger.debug("握手請(qǐng)求頭信息:" + request.getHeaders());
logger.debug("握手響應(yīng)頭信息:" + response.getHeaders());
super.modifyHandshake(sec, request, response);
}
}
WebSocke配置類
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter (){
return new ServerEndpointExporter();
}
}
當(dāng)以jar包形式運(yùn)行時(shí)需要配置該bean,暴露我們配置的@ServerEndpoint;當(dāng)我們以war獨(dú)立tomcat運(yùn)行時(shí)不能配置該bean。
前端頁(yè)面
<!doctype html>
<html>
<head>
<meta charset="UTF-8">
<meta name="Author" content="">
<meta name="Keywords" content="">
<meta name="Description" content="">
<script src="g-messages.js?v=1"></script>
<title>WebSocket</title>
<style type="text/css">
</style>
<script>
let gm = null ;
let username = null ;
function ListenerMsg({url, protocols = ['gmsg'], options = {}}) {
if (!url){
throw new Error("未知服務(wù)地址") ;
}
gm = new window.__GM({
url: url,
protocols: protocols
}) ;
gm.open(options) ;
}
ListenerMsg.init = (user) => {
if (!user) {
alert("未知的當(dāng)前登錄人") ;
return ;
}
let url = `ws://localhost:8080/message/${user}` ;
let msg = document.querySelector("#msg")
ListenerMsg({url, options: {
onmessage (e) {
let data = JSON.parse(e.data) ;
let li = document.createElement("li") ;
li.innerHTML = "【" + data.fromName + "】對(duì)你說:" + data.content ;
msg.appendChild(li) ;
}
}}) ;
}
function enter() {
username = document.querySelector("#nick").value ;
ListenerMsg.init(username) ;
document.querySelector("#chat").style.display = "block" ;
document.querySelector("#enter").style.display = "none" ;
document.querySelector("#cu").innerText = username ;
}
function send() {
let a = document.querySelector("#toname") ;
let b = document.querySelector("#content") ;
let toName = a.value ;
let content = b.value ;
gm.sendMessage({type: "2001", content, fromName: username, toName}) ;
a.value = '' ;
b.value = '' ;
}
</script>
</head>
<body>
<div id="enter">
<input id="nick"/><button type="button" onclick="enter()">進(jìn)入</button>
</div>
<hr/>
<div id="chat" style="display:none;">
當(dāng)前用戶:<b id="cu"></b><br/>
用戶:<input id="toname" name="toname"/><br/><br/>
內(nèi)容:<textarea id="content" rows="3" cols="22"></textarea><br/>
<button type="button" onclick="send()">發(fā)送</button>
</div>
<div>
<ul id="msg">
</ul>
</div>
</body>
</html>
這里有個(gè)g-messages.js文件是我寫的一個(gè)工具類,用來做連接及心跳檢查用的。
到此所有的代碼完畢,接下來測(cè)試
測(cè)試
打開兩個(gè)標(biāo)簽頁(yè),以不同的用戶進(jìn)入。
輸入對(duì)方用戶名發(fā)送消息
成功了,簡(jiǎn)單的websocket。我們生產(chǎn)環(huán)境還就這么完的,8g內(nèi)存跑了6w的用戶。