教你用純Java實(shí)現(xiàn)一個(gè)即時(shí)通訊系統(tǒng)(附源碼)
項(xiàng)目背景
和各位讀者大致介紹下具體場(chǎng)景,線上的小程序中開(kāi)放一些語(yǔ)音麥克風(fēng)的房間,讓用戶進(jìn)入房間之后可以互相通過(guò)語(yǔ)音聊天的方式進(jìn)行互動(dòng)。
這里分享一下相關(guān)的技術(shù)設(shè)計(jì)方案。這款系統(tǒng)的核心點(diǎn)設(shè)計(jì)在于如何能讓一個(gè)用戶發(fā)出的語(yǔ)音通知到其他用戶上邊。語(yǔ)音數(shù)據(jù)在客戶端同事的處理下最終變成了io數(shù)據(jù)流請(qǐng)求到了后端,后端只需要將這些數(shù)據(jù)流傳達(dá)給各個(gè)不同的終端即可達(dá)到廣播通知的效果。
單機(jī)版架構(gòu)
最初期上線的時(shí)候,為了趕速度,快速試錯(cuò),所以簡(jiǎn)單地采用了單機(jī)版架構(gòu)去設(shè)計(jì)。結(jié)合技術(shù)棧為 SpringBoot,WebSocket,MySQL技術(shù)。
線上一間語(yǔ)音房間的同時(shí)在線人數(shù)并不會(huì)特別多,大概在15-50人的區(qū)間段內(nèi),系統(tǒng)核心代碼是通過(guò)SpringBoot內(nèi)部的WebSocket技術(shù)去進(jìn)行數(shù)據(jù)的主動(dòng)推送。
設(shè)計(jì)思路
整體的設(shè)計(jì)圖比較簡(jiǎn)單,基本就是一臺(tái)服務(wù)器存儲(chǔ)WebSocket連接,如下圖所示:
用戶進(jìn)行WebSocket初始化連接的時(shí)候需要一個(gè)連接分配和存儲(chǔ)的過(guò)程:
早期的存儲(chǔ)是存放在了服務(wù)器本地的一個(gè)Map集合中。
當(dāng)WebSocket進(jìn)行連接的時(shí)候就會(huì)往內(nèi)存中寫(xiě)入一條數(shù)據(jù)信息,當(dāng)鏈接斷開(kāi)的時(shí)候,就將內(nèi)存中的數(shù)據(jù)移除。然后進(jìn)行語(yǔ)音廣播的時(shí)候需要結(jié)合WebSocket內(nèi)部的廣播發(fā)送功能進(jìn)行通知
看似設(shè)計(jì)比較簡(jiǎn)單,但是在后期業(yè)務(wù)變得龐大的時(shí)候出現(xiàn)了瓶頸。因?yàn)殡S著參加語(yǔ)音活動(dòng)用戶的增加,越來(lái)越多的WebSocketSession對(duì)象需要被存儲(chǔ)到內(nèi)存當(dāng)中,這種有狀態(tài)性的存儲(chǔ)對(duì)于單機(jī)擴(kuò)容不靈活。
設(shè)計(jì)缺陷
1.假設(shè)原先的服務(wù)器擴(kuò)容到了A,B兩臺(tái)機(jī)器,A用戶在A機(jī)器上邊建立了WebSocketSession,B用戶在B機(jī)器上邊建立的WebSocketSession連接。此時(shí)如果A想要和B進(jìn)行對(duì)話發(fā)送,需要先查找到具體WebSocketSession存放在哪臺(tái)機(jī)器上邊。
2.當(dāng)用戶出現(xiàn)了網(wǎng)絡(luò)異常,臨時(shí)斷開(kāi)連接進(jìn)行重連的時(shí)候,也可能會(huì)出現(xiàn)1所說(shuō)的問(wèn)題。
集群架構(gòu)
設(shè)計(jì)思路
一旦出現(xiàn)需要發(fā)送語(yǔ)音通知的時(shí)候,發(fā)送一條廣播的mq消息,每個(gè)機(jī)器都接收到消息之后,觸發(fā)自己的廣播操作即可。
RocketMq的接入系統(tǒng)設(shè)計(jì)里面mq采用的是廣播模式,這和我們通常使用的集群模式有一定的區(qū)別。
消息隊(duì)列RocketMQ版是基于發(fā)布或訂閱模型的消息系統(tǒng)。消費(fèi)者,即消息的訂閱方訂閱關(guān)注的Topic,以獲取并消費(fèi)消息。由于消費(fèi)者應(yīng)用一般是分布式系統(tǒng),以集群方式部署,因此消息隊(duì)列RocketMQ版約定以下概念:
- 集群:使用相同Group ID的消費(fèi)者屬于同一個(gè)集群。同一個(gè)集群下的消費(fèi)者消費(fèi)邏輯必須完全一致(包括Tag的使用)。
- 集群消費(fèi):當(dāng)使用集群消費(fèi)模式時(shí),消息隊(duì)列RocketMQ版認(rèn)為任意一條消息只需要被集群內(nèi)的任意一個(gè)消費(fèi)者處理即可。
- 廣播消費(fèi):當(dāng)使用廣播消費(fèi)模式時(shí),消息隊(duì)列RocketMQ版會(huì)將每條消息推送給集群內(nèi)所有注冊(cè)過(guò)的消費(fèi)者,保證消息至少被每個(gè)消費(fèi)者消費(fèi)一次。
集群消費(fèi)模式適用場(chǎng)景 適用于消費(fèi)端集群化部署,每條消息只需要被處理一次的場(chǎng)景。此外,由于消費(fèi)進(jìn)度在服務(wù)端維護(hù),可靠性更高。具體消費(fèi)示例如下圖所示。
注意事項(xiàng)
- 集群消費(fèi)模式下,每一條消息都只會(huì)被分發(fā)到一臺(tái)機(jī)器上處理。如果需要被集群下的每一臺(tái)機(jī)器都處理,請(qǐng)使用廣播模式。
- 集群消費(fèi)模式下,不保證每一次失敗重投的消息路由到同一臺(tái)機(jī)器上。
廣播消費(fèi)模式適用場(chǎng)景 適用于消費(fèi)端集群化部署,每條消息需要被集群下的每個(gè)消費(fèi)者處理的場(chǎng)景。具體消費(fèi)示例如下圖所示。
注意事項(xiàng)
- 廣播消費(fèi)模式下不支持順序消息。
- 廣播消費(fèi)模式下不支持重置消費(fèi)位點(diǎn)。
- 每條消息都需要被相同訂閱邏輯的多臺(tái)機(jī)器處理。
- 消費(fèi)進(jìn)度在客戶端維護(hù),出現(xiàn)重復(fù)消費(fèi)的概率稍大于集群模式。
- 廣播模式下,消息隊(duì)列RocketMQ版保證每條消息至少被每臺(tái)客戶端消費(fèi)一次,但是并不會(huì)重投消費(fèi)失敗的消息,因此業(yè)務(wù)方需要關(guān)注消費(fèi)失敗的情況。
- 廣播模式下,客戶端每一次重啟都會(huì)從最新消息消費(fèi)??蛻舳嗽诒煌V蛊陂g發(fā)送至服務(wù)端的消息將會(huì)被自動(dòng)跳過(guò),請(qǐng)謹(jǐn)慎選擇。
- 廣播模式下,每條消息都會(huì)被大量的客戶端重復(fù)處理,因此推薦盡可能使用集群模式。
- 廣播模式下服務(wù)端不維護(hù)消費(fèi)進(jìn)度,所以消息隊(duì)列RocketMQ版控制臺(tái)不支持消息堆積查詢、消息堆積報(bào)警和訂閱關(guān)系查詢功能。
這里面的應(yīng)用場(chǎng)景需要對(duì)集群內(nèi)部對(duì)每個(gè)消費(fèi)者都對(duì)服務(wù)器內(nèi)存中的socket連接進(jìn)行session是否存在對(duì)判斷,因此需要采用mq的廣播模式。
關(guān)于mq部分的接入代碼
Consumer模塊的配置:
- package org.idea.web.socket.config;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- /**
- * @Author linhao
- * @Date created in 10:30 上午 2021/5/10
- */
- @ConfigurationProperties(prefix = "rocketmq.consumer")
- public class MqConsumerConfig {
- private boolean isOn;
- private String groupName;
- private String nameSrvAddr;
- private String topics;
- private Integer consumeThreadMin;
- private Integer consumeThreadMax;
- private Integer consumeMessageBatchMaxSize;
- /**
- getter 和 setter部分省略
- **/
- }
Producer模塊的配置展示:
- package org.idea.web.socket.config;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- /**
- * @Author linhao
- * @Date created in 10:26 上午 2021/5/10
- */
- @ConfigurationProperties(prefix = "rocketmq.producer")
- public class MqProducerConfig {
- private boolean isOn;
- private String groupName;
- private String nameSrvAddr;
- private Integer maxMessageSize;
- private Integer sendMsgTimeout;
- private Integer retryTimesWhenSendFailed;
- /**
- getter 和 setter部分省略
- **/
- }
RocketMq內(nèi)部的消費(fèi)端Bean配置
- package org.idea.web.socket.mq;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
- import org.idea.web.socket.config.MqConsumerConfig;
- import org.idea.web.socket.config.MqProducerConfig;
- import org.springframework.boot.autoconfigure.AutoConfigureAfter;
- import org.springframework.boot.autoconfigure.AutoConfigureBefore;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
- import org.springframework.boot.context.properties.EnableConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import javax.annotation.Resource;
- /**
- * @Author linhao
- * @Date created in 10:34 上午 2021/5/10
- */
- @Configuration
- @Slf4j
- @EnableConfigurationProperties({MqConsumerConfig.class})
- public class MqConsumerAutoConfig {
- @Resource
- private MqConsumerConfig mqConsumerConfig;
- @Resource
- //這個(gè)接口需要手動(dòng)實(shí)現(xiàn)順序消費(fèi)的邏輯 每次獲取到消息隊(duì)列的第一條數(shù)據(jù)
- private MessageListenerHandler messageListenerConcurrently;
- @Bean
- @ConditionalOnMissingBean
- public DefaultMQPushConsumer defaultMQPushConsumer() {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
- consumer.setNamesrvAddr(mqConsumerConfig.getNameSrvAddr());
- consumer.setConsumerGroup(mqConsumerConfig.getGroupName());
- consumer.setConsumeThreadMin(mqConsumerConfig.getConsumeThreadMin());
- consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax());
- consumer.registerMessageListener(messageListenerConcurrently);
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- //消費(fèi)模型是什么?
- consumer.setMessageModel(MessageModel.BROADCASTING);
- //默認(rèn)一次拉取一條消費(fèi)
- consumer.setConsumeMessageBatchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize());
- //*表示訂閱所有的tag
- try {
- consumer.subscribe(mqConsumerConfig.getTopics(), "*");
- consumer.start();
- log.info("【 MqConsumerAutoConfig 】mq consumer is started!");
- } catch (Exception e) {
- log.error("mq start fail,e is ", e);
- }
- return consumer;
- }
- }
RocketMq的服務(wù)生產(chǎn)者Bean配置
- package org.idea.web.socket.mq;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.idea.web.socket.config.MqProducerConfig;
- import org.springframework.boot.autoconfigure.AutoConfigureAfter;
- import org.springframework.boot.autoconfigure.AutoConfigureBefore;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
- import org.springframework.boot.context.properties.EnableConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import javax.annotation.Resource;
- /**
- * @Author linhao
- * @Date created in 11:05 上午 2021/5/10
- */
- @Configuration
- @Slf4j
- @EnableConfigurationProperties({MqProducerConfig.class})
- public class MqProducerAutoConfig {
- @Resource
- private MqProducerConfig mqProducerConfig;
- @Bean
- @ConditionalOnMissingBean
- //意味著DefaultMQProducer的配置可以被覆蓋
- public DefaultMQProducer defaultMQProducer() {
- DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName());
- producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());
- //沒(méi)有則自動(dòng)創(chuàng)建topic的key
- // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
- producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize());
- producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout());
- producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed());
- try {
- producer.start();
- log.info("【 MqProducerAutoConfig 】mq producer is started!");
- } catch (Exception e) {
- log.error("[MqProducerAutoConfig] start fail, e is ", e);
- }
- return producer;
- }
- }
然后是對(duì)RocketMq內(nèi)部發(fā)送消息事件的一層函數(shù)封裝
- package org.idea.web.socket.mq;
- import com.alibaba.fastjson.JSON;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- import org.idea.web.socket.config.MqProducerConfig;
- import org.idea.web.socket.dto.BroadcastMqDTO;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.io.UnsupportedEncodingException;
- /**
- * 消息廣播發(fā)送端
- *
- * @Author linhao
- * @Date created in 10:43 下午 2021/5/9
- */
- @Component
- @Slf4j
- public class BroadcastMqProducer {
- @Resource
- private DefaultMQProducer defaultMQProducer;
- @Resource
- private MqProducerConfig mqProducerConfig;
- private static String TOPIC = "ws-topic";
- private static String TAGS = "ws-tag";
- public static Integer ALL_USER_RECEIVE_TYPE = 1;
- public static Integer ONE_USER_RECEIVE_TYPE = 2;
- /**
- * 點(diǎn)對(duì)點(diǎn)之間的消息發(fā)送
- *
- * @param destSessionKey
- * @param msg
- * @return
- */
- public SendResult sendWebSocketToUser(String destSessionKey,String msg) {
- if (StringUtils.isEmpty(msg)) {
- log.error("[sendWebSocketToUser] msg can not be null!");
- return null;
- }
- Message message = null;
- SendResult sendResult = null;
- try {
- BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();
- broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);
- broadcastMqDTO.setMessage(msg);
- broadcastMqDTO.setSessionKey(destSessionKey);
- message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
- sendResult = defaultMQProducer.send(message);
- } catch (Exception e) {
- log.error("[sendWebSocketBroadcastMsg] e is ", e);
- }
- return sendResult;
- }
- /**
- * 廣播消息發(fā)送
- *
- * @param msg
- * @return
- */
- public SendResult sendWebSocketBroadcastMsg(String msg) {
- if (StringUtils.isEmpty(msg)) {
- log.error("[sendWebSocketBroadcastMsg] msg can not be null!");
- return null;
- }
- Message message = null;
- SendResult sendResult = null;
- try {
- BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();
- broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);
- broadcastMqDTO.setMessage(msg);
- message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));
- sendResult = defaultMQProducer.send(message);
- } catch (Exception e) {
- log.error("[sendWebSocketBroadcastMsg] e is ", e);
- }
- return sendResult;
- }
- }
對(duì)消息的訂閱模塊實(shí)現(xiàn)代碼如下:
- package org.idea.web.socket.mq;
- import com.alibaba.fastjson.JSON;
- import com.oracle.tools.packager.Log;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.idea.web.socket.dto.BroadcastMqDTO;
- import org.idea.web.socket.manager.SocketManager;
- import org.springframework.messaging.simp.SimpMessagingTemplate;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
- import org.springframework.web.socket.WebSocketSession;
- import javax.annotation.Resource;
- import java.util.List;
- import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;
- import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;
- /**
- * @Author linhao
- * @Date created in 10:59 上午 2021/5/10
- */
- @Component
- @Slf4j
- public class MessageListenerHandler implements MessageListenerConcurrently {
- @Resource
- private SocketManager socketManager;
- @Resource
- private SimpMessagingTemplate template;
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- if (CollectionUtils.isEmpty(list)) {
- Log.info("receive empty msg");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- MessageExt messageExt = list.get(0);
- byte[] bytes = messageExt.getBody();
- String json = new String(bytes);
- BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);
- log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO);
- if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {
- log.info("[consumeMessage] 廣播發(fā)送消息:觸發(fā)----》消息內(nèi)容為:" + broadcastMqDTO);
- template.convertAndSend("/topic/sendTopic", broadcastMqDTO);
- } else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {
- String sessionKey = broadcastMqDTO.getSessionKey();
- WebSocketSession webSocketSession = socketManager.get(sessionKey);
- if (webSocketSession != null) {
- template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());
- log.info("[consumeMessage] 點(diǎn)對(duì)點(diǎn)發(fā)送消息;觸發(fā)----》消息內(nèi)容為:" + broadcastMqDTO);
- }
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
整體設(shè)計(jì)結(jié)構(gòu)如下圖:
于是按照這個(gè)結(jié)構(gòu)進(jìn)行了一版本的緊急開(kāi)發(fā)迭代,原先的單臺(tái)服務(wù)器擴(kuò)展為了服務(wù)集群。
業(yè)務(wù)拓展后續(xù)產(chǎn)品經(jīng)理提出一個(gè)需求,要求支持在同一間房?jī)?nèi)的兩個(gè)用戶之間發(fā)送悄悄話功能。這就需要我們進(jìn)行一個(gè)點(diǎn)對(duì)點(diǎn)之間傳輸通訊的功能了。因此需要在mq通知到每臺(tái)機(jī)器的時(shí)候加一個(gè)本地Session遍歷的邏輯,如果當(dāng)前機(jī)器存有用戶token對(duì)應(yīng)的session變量,那么就單獨(dú)針對(duì)那個(gè)Session進(jìn)行WebSocket的發(fā)送通知。
設(shè)計(jì)弊端一旦某臺(tái)機(jī)器出現(xiàn)了異常崩潰,那么就意味著這臺(tái)機(jī)器上的所有語(yǔ)音連接可能會(huì)出現(xiàn)中斷情況。目前這一塊的問(wèn)題也在考慮解決,計(jì)劃是將WebSocketSession存入到分布式緩存的redis中保證數(shù)據(jù)可靠存儲(chǔ),但是在后續(xù)嘗試的時(shí)候發(fā)現(xiàn)WebSocketSession對(duì)象沒(méi)有實(shí)現(xiàn)序列化接口,在存儲(chǔ)到Redis的時(shí)候會(huì)出現(xiàn)異常。目前這個(gè)問(wèn)題還在尋找解決思路中,不知道各位讀者朋友們有什么好的思路。
遇到的問(wèn)題點(diǎn)用戶請(qǐng)求直接訪問(wèn)到了我們的內(nèi)部服務(wù)器,如果在請(qǐng)求的中間加入一臺(tái)nginx做負(fù)載均衡則需要在nginx中配置一些額外信息。
項(xiàng)目的源代碼比較多,這里我把核心部分的代碼整理了一份,感興趣的朋友可以到我的gitee上邊去下載:
https://gitee.com/IdeaHome_admin/socket-framework