使用消息隊(duì)列輕松實(shí)現(xiàn)分布式WebSocket
知識(shí)星球中的球友問(wèn)了一個(gè)關(guān)于websocket的問(wèn)題,大致如下:
圖片
簡(jiǎn)單的概括一下:如果我們的項(xiàng)目是分布式環(huán)境,登錄的用戶被Nginx的反向代理分配到多個(gè)不同服務(wù)器,那么在其中一個(gè)服務(wù)器建立了WebSocket連接的用戶如何給在另外一個(gè)服務(wù)器上建立了WebSocket連接的用戶發(fā)送消息呢?
今天就來(lái)解答一下球友的問(wèn)題:其實(shí),要解決這個(gè)問(wèn)題就需要實(shí)現(xiàn)分布式WebSocket,而分布式WebSocket一般可以通過(guò)以下兩種方案來(lái)實(shí)現(xiàn):
- 將消息(<用戶id,消息內(nèi)容>)統(tǒng)一推送到一個(gè)消息隊(duì)列(Redis、Kafka等)的的topic,然后每個(gè)應(yīng)用節(jié)點(diǎn)都訂閱這個(gè)topic,在接收到WebSocket消息后取出這個(gè)消息的“消息接收者的用戶ID/用戶名”,然后再比對(duì)自身是否存在相應(yīng)用戶的連接,如果存在則推送消息,否則丟棄接收到的這個(gè)消息(這個(gè)消息接收者所在的應(yīng)用節(jié)點(diǎn)會(huì)處理)
- 在用戶建立WebSocket連接后,使用Redis緩存記錄用戶的WebSocket建立在哪個(gè)應(yīng)用節(jié)點(diǎn)上,然后同樣使用消息隊(duì)列將消息推送到接收者所在的應(yīng)用節(jié)點(diǎn)上面(實(shí)現(xiàn)上比方案一要復(fù)雜,但是網(wǎng)絡(luò)流量會(huì)更低)
實(shí)現(xiàn)方案
下面將以第一種方案來(lái)具體實(shí)現(xiàn),實(shí)現(xiàn)方式如下
已加入星球的小伙伴如需案例源碼聯(lián)系陳某!
1. 定義一個(gè)WebSocket Channel枚舉類
public enum WebSocketChannelEnum {
//測(cè)試使用的簡(jiǎn)易點(diǎn)對(duì)點(diǎn)聊天
CHAT("CHAT", "測(cè)試使用的簡(jiǎn)易點(diǎn)對(duì)點(diǎn)聊天", "/topic/reply");
WebSocketChannelEnum(String code, String description, String subscribeUrl) {
this.code = code;
this.description = description;
this.subscribeUrl = subscribeUrl;
}
/**
* 唯一CODE
*/
private String code;
/**
* 描述
*/
private String description;
/**
* WebSocket客戶端訂閱的URL
*/
private String subscribeUrl;
public String getCode() {
return code;
}
public String getDescription() {
return description;
}
public String getSubscribeUrl() {
return subscribeUrl;
}
/**
* 通過(guò)CODE查找枚舉類
*/
public static WebSocketChannelEnum fromCode(String code){
if(StringUtils.isNoneBlank(code)){
for(WebSocketChannelEnum channelEnum : values()){
if(channelEnum.code.equals(code)){
return channelEnum;
}
}
}
return null;
}
}
2. 配置基于Redis的消息隊(duì)列
需要注意的是,在大中型正式項(xiàng)目中并不推薦使用Redis實(shí)現(xiàn)的消息隊(duì)列,因?yàn)榻?jīng)過(guò)測(cè)試它并不是特別可靠,所以應(yīng)該考慮使用Kafka、rabbitMQ等專業(yè)的消息隊(duì)列中間件
@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig {
@Value("${spring.redis.timeout}")
private String timeOut;
@Value("${spring.redis.cluster.nodes}")
private String nodes;
@Value("${spring.redis.cluster.max-redirects}")
private int maxRedirects;
@Value("${spring.redis.jedis.pool.max-active}")
private int maxActive;
@Value("${spring.redis.jedis.pool.max-wait}")
private int maxWait;
@Value("${spring.redis.jedis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.min-idle}")
private int minIdle;
@Value("${spring.redis.message.topic-name}")
private String topicName;
@Bean
public JedisPoolConfig jedisPoolConfig(){
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxActive);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setMaxWaitMillis(maxWait);
return config;
}
@Bean
public RedisClusterConfiguration redisClusterConfiguration(){
RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes));
configuration.setMaxRedirects(maxRedirects);
return configuration;
}
/**
* JedisConnectionFactory
*/
@Bean
public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){
return new JedisConnectionFactory(configuration,jedisPoolConfig);
}
/**
* 使用Jackson序列化對(duì)象
*/
@Bean
public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(objectMapper);
return serializer;
}
/**
* RedisTemplate
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
//字符串方式序列化KEY
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
//JSON方式序列化VALUE
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* 消息監(jiān)聽(tīng)器
*/
@Bean
MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){
//消息接收者以及對(duì)應(yīng)的默認(rèn)處理方法
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");
//消息的反序列化方式
messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);
return messageListenerAdapter;
}
/**
* message listener container
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory
, MessageListenerAdapter messageListenerAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//添加消息監(jiān)聽(tīng)器
container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));
return container;
}
}
需要注意的是,這里使用的配置如下所示:
spring:
...
#redis
redis:
cluster:
nodes: namenode22:6379,datanode23:6379,datanode24:6379
max-redirects: 6
timeout: 300000
jedis:
pool:
max-active: 8
max-wait: 100000
max-idle: 8
min-idle: 0
#自定義的監(jiān)聽(tīng)的TOPIC路徑
message:
topic-name: topic-test
3. 定義一個(gè)Redis消息的處理者
@Component
public class MessageReceiver {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
/**
* 處理WebSocket消息
*/
public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
//1. 取出用戶名并判斷是否連接到當(dāng)前應(yīng)用節(jié)點(diǎn)的WebSocket
SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){
//2. 獲取WebSocket客戶端的訂閱地址
WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
if(channelEnum != null){
//3. 給WebSocket客戶端發(fā)送消息
messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
}
}
}
}
4. 在Controller中發(fā)送WebSocket消息
@Controller
@RequestMapping(("/wsTemplate"))
public class RedisMessageController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${spring.redis.message.topic-name}")
private String topicName;
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
@Resource(name = "redisServiceImpl")
private RedisService redisService;
/**
* 給指定用戶發(fā)送WebSocket消息
*/
@PostMapping("/sendToUser")
@ResponseBody
public String chat(HttpServletRequest request) {
//消息接收者
String receiver = request.getParameter("receiver");
//消息內(nèi)容
String msg = request.getParameter("msg");
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(resultData));
return "ok";
}
/**
* 給指定用戶發(fā)送消息,并處理接收者不在線的情況
* @param sender 消息發(fā)送者
* @param receiver 消息接收者
* @param destination 目的地
* @param payload 消息正文
*/
private void sendToUser(String sender, String receiver, String destination, String payload){
SimpUser simpUser = userRegistry.getUser(receiver);
//如果接收者存在,則發(fā)送消息
if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){
messagingTemplate.convertAndSendToUser(receiver, destination, payload);
}
//如果接收者在線,則說(shuō)明接收者連接了集群的其他節(jié)點(diǎn),需要通知接收者連接的那個(gè)節(jié)點(diǎn)發(fā)送消息
else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)){
RedisWebsocketMsg<String> redisWebsocketMsg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload);
redisService.convertAndSend(topicName, redisWebsocketMsg);
}
//否則將消息存儲(chǔ)到redis,等用戶上線后主動(dòng)拉取未讀消息
else{
//存儲(chǔ)消息的Redis列表名
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
logger.info(MessageFormat.format("消息接收者{0}還未建立WebSocket連接,{1}發(fā)送的消息【{2}】將被存儲(chǔ)到Redis的【{3}】列表中", receiver, sender, payload, listKey));
//存儲(chǔ)消息到Redis中
redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);
}
}
/**
* 拉取指定監(jiān)聽(tīng)路徑的未讀的WebSocket消息
* @param destination 指定監(jiān)聽(tīng)路徑
* @return java.util.Map<java.lang.String,java.lang.Object>
*/
@PostMapping("/pullUnreadMessage")
@ResponseBody
public Map<String, Object> pullUnreadMessage(String destination){
Map<String, Object> result = new HashMap<>();
try {
HttpSession session = SpringContextUtils.getSession();
//當(dāng)前登錄用戶
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
//存儲(chǔ)消息的Redis列表名
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
//從Redis中拉取所有未讀消息
List<Object> messageList = redisService.rangeList(listKey, 0, -1);
result.put("code", "200");
if(messageList !=null && messageList.size() > 0){
//刪除Redis中的這個(gè)未讀消息列表
redisService.delete(listKey);
//將數(shù)據(jù)添加到返回集,供前臺(tái)頁(yè)面展示
result.put("result", messageList);
}
}catch (Exception e){
result.put("code", "500");
result.put("msg", e.getMessage());
}
return result;
}
}
5. WebSocket相關(guān)配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Autowired
private AuthHandshakeInterceptor authHandshakeInterceptor;
@Autowired
private MyHandshakeHandler myHandshakeHandler;
@Autowired
private MyChannelInterceptor myChannelInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat-websocket")
.addInterceptors(authHandshakeInterceptor)
.setHandshakeHandler(myHandshakeHandler)
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//客戶端需要把消息發(fā)送到/message/xxx地址
registry.setApplicationDestinationPrefixes("/message");
//服務(wù)端廣播消息的路徑前綴,客戶端需要相應(yīng)訂閱/topic/yyy這個(gè)地址的消息
registry.enableSimpleBroker("/topic");
//給指定用戶發(fā)送消息的路徑前綴,默認(rèn)值是/user/
registry.setUserDestinationPrefix("/user/");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(myChannelInterceptor);
}
}
6. 示例頁(yè)面
<head>
<meta content="text/html;charset=UTF-8"/>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<title>Chat With STOMP Message</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<script th:src="@{/layui/layui.js}"></script>
<script th:src="@{/layui/lay/modules/layer.js}"></script>
<link th:href="@{/layui/css/layui.css}" rel="stylesheet">
<link th:href="@{/layui/css/modules/layer/default/layer.css}" rel="stylesheet">
<link th:href="@{/css/style.css}" rel="stylesheet">
<style type="text/css">
#connect-container {
margin: 0 auto;
width: 400px;
}
#connect-container div {
padding: 5px;
margin: 0 7px 10px 0;
}
.message input {
padding: 5px;
margin: 0 7px 10px 0;
}
.layui-btn {
display: inline-block;
}
</style>
<script type="text/javascript">
var stompClient = null;
$(function () {
var target = $("#target");
if (window.location.protocol === 'http:') {
target.val('http://' + window.location.host + target.val());
} else {
target.val('https://' + window.location.host + target.val());
}
});
function setConnected(connected) {
var connect = $("#connect");
var disconnect = $("#disconnect");
var echo = $("#echo");
if (connected) {
connect.addClass("layui-btn-disabled");
disconnect.removeClass("layui-btn-disabled");
echo.removeClass("layui-btn-disabled");
} else {
connect.removeClass("layui-btn-disabled");
disconnect.addClass("layui-btn-disabled");
echo.addClass("layui-btn-disabled");
}
connect.attr("disabled", connected);
disconnect.attr("disabled", !connected);
echo.attr("disabled", !connected);
}
//連接
function connect() {
var target = $("#target").val();
var ws = new SockJS(target);
stompClient = Stomp.over(ws);
stompClient.connect({}, function () {
setConnected(true);
log('Info: STOMP connection opened.');
//連接成功后,主動(dòng)拉取未讀消息
pullUnreadMessage("/topic/reply");
//訂閱服務(wù)端的/topic/reply地址
stompClient.subscribe("/user/topic/reply", function (response) {
log(JSON.parse(response.body).content);
})
},function () {
//斷開(kāi)處理
setConnected(false);
log('Info: STOMP connection closed.');
});
}
//斷開(kāi)連接
function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
stompClient = null;
}
setConnected(false);
log('Info: STOMP connection closed.');
}
//向指定用戶發(fā)送消息
function sendMessage() {
if (stompClient != null) {
var receiver = $("#receiver").val();
var msg = $("#message").val();
log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg}));
$.ajax({
url: "/wsTemplate/sendToUser",
type: "POST",
dataType: "json",
async: true,
data: {
"receiver": receiver,
"msg": msg
},
success: function (data) {
}
});
} else {
layer.msg('STOMP connection not established, please connect.', {
offset: 'auto'
,icon: 2
});
}
}
//從服務(wù)器拉取未讀消息
function pullUnreadMessage(destination) {
$.ajax({
url: "/wsTemplate/pullUnreadMessage",
type: "POST",
dataType: "json",
async: true,
data: {
"destination": destination
},
success: function (data) {
if (data.result != null) {
$.each(data.result, function (i, item) {
log(JSON.parse(item).content);
})
} else if (data.code !=null && data.code == "500") {
layer.msg(data.msg, {
offset: 'auto'
,icon: 2
});
}
}
});
}
//日志輸出
function log(message) {
console.debug(message);
}
</script>
</head>
<body>
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being
enabled. Please enable
Javascript and reload this page!</h2></noscript>
<div>
<div id="connect-container" class="layui-elem-field">
<legend>Chat With STOMP Message</legend>
<div>
<input id="target" type="text" class="layui-input" size="40" style="width: 350px" value="/chat-websocket"/>
</div>
<div>
<button id="connect" class="layui-btn layui-btn-normal" onclick="connect();">Connect</button>
<button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"
onclick="disconnect();">Disconnect
</button>
</div>
<div class="message">
<input id="receiver" type="text" class="layui-input" size="40" style="width: 350px" placeholder="接收者姓名" value=""/>
<input id="message" type="text" class="layui-input" size="40" style="width: 350px" placeholder="消息內(nèi)容" value=""/>
</div>
<div>
<button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"
onclick="sendMessage();">Send Message
</button>
</div>
</div>
</div>
</body>
</html>