自己動手擼一個分布式IM(即時通訊) 系統(tǒng)
之前分享過一篇《設(shè)計一個***的消息推送系統(tǒng)》,雖然在文中貼了一些偽代碼,但是有朋友希望能直接分享一些可以運行的源碼,這么久了是時候把坑填上了。
于是我在之前的基礎(chǔ)上完善了一些內(nèi)容,先來看看這個項目的介紹吧:CIM(CROSS-IM) 一款面向開發(fā)者的 IM(即時通訊)系統(tǒng),同時提供了一些組件幫助開發(fā)者構(gòu)建一款屬于自己可水平擴展的 IM 。
借助 CIM 你可以實現(xiàn)以下需求:
- IM 即時通訊系統(tǒng)。
- 適用于 App 的消息推送中間件。
- IOT 海量連接場景中的消息透傳中間件。
完整源碼托管在 GitHub :
- https://github.com/crossoverJie/cim
本次主要涉及到 IM 即時通訊,所以特地錄了兩段視頻演示(群聊、私聊)。
群聊
私聊
架構(gòu)設(shè)計
下面來看看具體的架構(gòu)設(shè)計:
- CIM 中的各個組件均采用 Spring Boot 構(gòu)建。
- 采用 Netty + Google Protocol Buffer 構(gòu)建底層通信。
- Redis 存放各個客戶端的路由信息、賬號信息、在線狀態(tài)等。
- Zookeeper 用于 IM-server 服務(wù)的注冊與發(fā)現(xiàn)。
整體主要由以下模塊組成:
- cim-server,IM 服務(wù)端:用于接收 Client 連接、消息透傳、消息推送等功能。支持集群部署。
- cim-forward-route,消息路由服務(wù)器:用于處理消息路由、消息轉(zhuǎn)發(fā)、用戶登錄、用戶下線以及一些運營工具(獲取在線用戶數(shù)等)。
- cim-client,IM 客戶端:給用戶使用的消息終端,一個命令即可啟動并向其他人發(fā)起通訊(群聊、私聊);同時內(nèi)置了一些常用命令方便使用。
流程圖
整體的流程也比較簡單,流程圖如下:
- 客戶端向 Route 發(fā)起登錄。
- 登錄成功從 Zookeeper 中選擇可用 im-server 返回給客戶端,并保存登錄、路由信息到 Redis。
- 客戶端向 im-server 發(fā)起長連接,成功后保持心跳。
- 客戶端下線時通過 Route 清除狀態(tài)信息。
所以當我們自己部署時需要以下步驟:
- 搭建基礎(chǔ)中間件 Redis、Zookeeper。
- 部署 cim-server,這是真正的 IM 服務(wù)器,為了滿足性能需求所以支持水平擴展,只需要注冊到同一個 Zookeeper 即可。
- 部署 cim-forward-route,這是路由服務(wù)器,所有的消息都需要經(jīng)過它。由于它是無狀態(tài)的,所以也可以利用 Nginx 代理提高可用性。
- cim-client 真正面向用戶的客戶端;啟動之后會自動連接 IM 服務(wù)器便可以在控制臺收發(fā)消息了。
更多使用介紹可以參考快速啟動。
詳細設(shè)計
接下來重點看看具體的實現(xiàn),比如群聊、私聊消息如何流轉(zhuǎn);IM 服務(wù)端負載均衡;服務(wù)如何注冊發(fā)現(xiàn)等等。
IM 服務(wù)端
先來看看服務(wù)端;主要是實現(xiàn)客戶端上下線、消息下發(fā)等功能。
首先是服務(wù)啟動:
由于是在 Spring Boot 中搭建的,所以在應(yīng)用啟動時需要啟動 Netty 服務(wù)。
從 Pipline 中可以看出使用了 Protobuf 的編解碼(具體報文在客戶端中分析)。
注冊發(fā)現(xiàn)
需要滿足 IM 服務(wù)端的水平擴展需求,所以 cim-server 是需要將自身數(shù)據(jù)發(fā)布到注冊中心的。
所以在應(yīng)用啟動成功后需要將自身數(shù)據(jù)注冊到 Zookeeper 中。
最主要的目的就是將當前應(yīng)用的 ip + cim-server-port+ http-port 注冊上去。
上圖是我在演示環(huán)境中注冊的兩個 cim-server 實例(由于在一臺服務(wù)器,所以只是端口不同)。
這樣在客戶端(監(jiān)聽這個 Zookeeper 節(jié)點)就能實時的知道目前可用的服務(wù)信息。
登錄
當客戶端請求 cim-forward-route 中的登錄接口(詳見下文)做完業(yè)務(wù)驗證(就相當于日常登錄其他網(wǎng)站一樣)之后,客戶端會向服務(wù)端發(fā)起一個長連接,如之前的流程所示:
這時客戶端會發(fā)送一個特殊報文,表明當前是登錄信息。服務(wù)端收到后就需要將該客戶端的 userID 和當前 Channel 通道關(guān)系保存起來。
同時也緩存了用戶的信息,也就是 userID 和用戶名。
離線
當客戶端斷線后也需要將剛才緩存的信息清除掉。
同時也需要調(diào)用 Route 接口清除相關(guān)信息(具體接口看下文)。
IM 路由
從架構(gòu)圖中可以看出,路由層是非常重要的一環(huán);它提供了一系列的 HTTP 服務(wù)承接了客戶端和服務(wù)端。目前主要是以下幾個接口:
①注冊接口
由于每一個客戶端都是需要登錄才能使用的,所以***步自然是注冊。
這里就設(shè)計的比較簡單,直接利用 Redis 來存儲用戶信息;用戶信息也只有 ID 和 userName 而已。
只是為了方便查詢在 Redis 中的 KV 又反過來存儲了一份 VK,這樣 ID 和 userName 都必須唯一。
②登錄接口
這里的登錄和 cim-server 中的登錄不一樣,具有業(yè)務(wù)性質(zhì):
- 登錄成功之后需要判斷是否是重復(fù)登錄(一個用戶只能運行一個客戶端)。
- 登錄成功后需要從 Zookeeper 中獲取服務(wù)列表(cim-server)并根據(jù)某種算法選擇一臺服務(wù)返回給客戶端。
- 登錄成功之后還需要保存路由信息,也就是當前用戶分配的服務(wù)實例保存到 Redis 中。
為了實現(xiàn)只能一個用戶登錄,使用了 Redis 中的 Set 來保存登錄信息;利用 userID 作為 Key ,重復(fù)的登錄就會寫入失敗。
類似于 Java 中的 HashSet,只能去重保存。
獲取一臺可用的路由實例也比較簡單:
- 先從 Zookeeper 獲取所有的服務(wù)實例做一個內(nèi)部緩存。
- 輪詢選擇一臺服務(wù)器(目前只有這一種算法,后續(xù)會新增)。
當然要獲取 Zookeeper 中的服務(wù)實例前,自然是需要監(jiān)聽 cim-server 之前注冊上去的那個節(jié)點。
具體代碼如下:
也是在應(yīng)用啟動之后監(jiān)聽 Zookeeper 中的路由節(jié)點,一旦發(fā)生變化就會更新內(nèi)部緩存。
這里使用的是 Guava 的 Cache,它基于 Concurrent HashMap,所以可以保證清除、新增緩存的原子性。
③群聊接口
這是一個真正發(fā)消息的接口,實現(xiàn)的效果就是其中一個客戶端發(fā)消息,其余所有客戶端都能收到!
流程肯定是客戶端發(fā)送一條消息到服務(wù)端,服務(wù)端收到后在上文介紹的 SessionSocketHolder 中遍歷所有 Channel(通道)然后下發(fā)消息即可。
服務(wù)端是單機倒也可以,但現(xiàn)在是集群設(shè)計。所以所有的客戶端會根據(jù)之前的輪詢算法分配到不同的 cim-server 實例中。
因此就需要路由層來發(fā)揮作用了:
路由接口收到消息后首先遍歷出所有的客戶端和服務(wù)實例的關(guān)系。路由關(guān)系在 Redis 中的存放如下:
由于 Redis 單線程的特質(zhì),當數(shù)據(jù)量大時;一旦使用 Keys 匹配所有 cim-route:* 數(shù)據(jù),會導(dǎo)致 Redis 不能處理其他請求。
所以這里改為使用 Scan 命令來遍歷所有的 cim-route:*。接著會挨個調(diào)用每個客戶端所在的服務(wù)端的 HTTP 接口用于推送消息。
在 cim-server 中的實現(xiàn)如下:
cim-server 收到消息后會在內(nèi)部緩存中查詢該 userID 的通道,接著只需要發(fā)消息即可。
④在線用戶接口
這是一個輔助接口,可以查詢出當前在線用戶信息。
實現(xiàn)也很簡單,也就是查詢之前保存 ”用戶登錄狀態(tài)的那個去重 set “即可。
⑤私聊接口
之所以說獲取在線用戶是一個輔助接口,其實就是用于輔助私聊使用的。
一般我們使用私聊的前提肯定得知道當前哪些用戶在線,接著你才會知道你要和誰進行私聊。
類似于這樣:
在我們這個場景中,私聊的前提就是需要獲得在線用戶的 userID。
所以私聊接口在收到消息后需要查詢到接收者所在的 cim-server 實例信息,后續(xù)的步驟就和群聊一致了。調(diào)用接收者所在實例的 HTTP 接口下發(fā)信息。
只是群聊是遍歷所有的在線用戶,私聊只發(fā)送一個的區(qū)別。
⑥下線接口
一旦客戶端下線,我們就需要將之前存放在 Redis 中的一些信息刪除掉(路由信息、登錄狀態(tài))。
IM 客戶端
客戶端中的一些邏輯其實在上文已經(jīng)談到一些了。
登錄
***步也就是登錄,需要在啟動時調(diào)用 Route 的登錄接口,獲得 cim-server 信息再創(chuàng)建連接。
登錄過程中 Route 接口會判斷是否為重復(fù)登錄,重復(fù)登錄則會直接退出程序。
接下來是利用 Route 接口返回的 cim-server 實例信息(ip+port)創(chuàng)建連接。
***一步就是發(fā)送一個登錄標志的信息到服務(wù)端,讓它保持客戶端和 Channel 的關(guān)系。
自定義協(xié)議
上文提到的一些登錄報文、真正的消息報文這些都是在我們自定義協(xié)議中可以區(qū)別出來的。
由于是使用 Google Protocol Buffer 編解碼,所以先看看原始格式。
其實這個協(xié)議中目前一共就三個字段:
- requestId 可以理解為 userId。
- reqMsg 就是真正的消息。
- type 也就是上文提到的消息類別。
目前主要是三種類型,分別對應(yīng)不同的業(yè)務(wù):
心跳
為了保持客戶端和服務(wù)端的連接,每隔一段時間沒有發(fā)送消息都需要自動的發(fā)送心跳。
目前的策略是每隔一分鐘就發(fā)送一個心跳包到服務(wù)端:
這樣服務(wù)端每隔一分鐘沒有收到業(yè)務(wù)消息時就會收到 Ping 的心跳包:
內(nèi)置命令
客戶端也內(nèi)置了一些基本命令來方便使用。
比如輸入 :q 就會退出客戶端,同時會關(guān)閉一些系統(tǒng)資源。
當輸入 :olu(onlineUser 的簡寫)就會去調(diào)用 Route 的獲取所有在線用戶接口。
群聊
群聊的使用非常簡單,只需要在控制臺輸入消息回車即可。這時會去調(diào)用 Route 的群聊接口。
私聊
私聊也是同理,但前提是需要觸發(fā)關(guān)鍵字;使用 userId;; 消息內(nèi)容這樣的格式才會給某個用戶發(fā)送消息,所以一般都需要先使用 :olu 命令獲取所有在線用戶才方便使用。
消息回調(diào)
為了滿足一些定制需求,比如消息需要保存之類的。所以在客戶端收到消息之后會回調(diào)一個接口,在這個接口中可以自定義實現(xiàn)。
因此先創(chuàng)建了一個 Caller 的 Bean,這個 Bean 中包含了一個 CustomMsgHandleListener 接口,需要自行處理只需要實現(xiàn)此接口即可。
自定義界面
由于我自己不怎么會寫界面,但保不準有其他大牛會寫。所以客戶端中的群聊、私聊、獲取在線用戶、消息回調(diào)等業(yè)務(wù)(以及之后的業(yè)務(wù))都是以接口形式提供。
也方便后面做頁面集成,只需要調(diào)這些接口就行了;具體實現(xiàn)不用怎么關(guān)心。
總結(jié)
Cim 目前只是***版,Bug 多,功能少(只拉了幾個群友做了測試);不過后續(xù)還會接著完善,至少這一版會給那些沒有相關(guān)經(jīng)驗的朋友帶來一些思路。
后續(xù)計劃: