聊聊 IM 系統(tǒng)重構(gòu)到 SDK 設(shè)計(jì)的最佳實(shí)踐
SDK 設(shè)計(jì)
圖片
在之前提到了 cim 在做集成測(cè)試的時(shí)候遇到的問(wèn)題,需要提供一個(gè) SDK 來(lái)解決,于是我花了一些時(shí)間編寫(xiě)了 SDK,同時(shí)也將 cim-client 重構(gòu)了。
重構(gòu)后的代碼長(zhǎng)這個(gè)樣子:
@Bean
public Client buildClient(@Qualifier("callBackThreadPool") ThreadPoolExecutor callbackThreadPool,
Event event) {
OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(3, TimeUnit.SECONDS)
.readTimeout(3, TimeUnit.SECONDS)
.writeTimeout(3, TimeUnit.SECONDS)
.retryOnConnectionFailure(true).build();
return Client.builder()
.auth(ClientConfigurationData.Auth.builder()
.userName(appConfiguration.getUserName())
.userId(appConfiguration.getUserId())
.build())
.routeUrl(appConfiguration.getRouteUrl())
.loginRetryCount(appConfiguration.getReconnectCount())
.event(event)
.reconnectCheck(client -> !shutDownSign.checkStatus())
.okHttpClient(okHttpClient)
.messageListener(new MsgCallBackListener(msgLogger))
.callbackThreadPool(callbackThreadPool)
.build();
}
配合 springboot 使用時(shí)只需要?jiǎng)?chuàng)建一個(gè) Client 即可,這個(gè) Client 里維護(hù)了核心的:
- 長(zhǎng)鏈接創(chuàng)建、狀態(tài)維護(hù)
- 心跳檢測(cè)
- 超時(shí)、網(wǎng)絡(luò)異常重連等
同時(shí)也提供了簡(jiǎn)易的 API 可以直接收發(fā)消息:
圖片
這樣在集成到業(yè)務(wù)代碼中時(shí)會(huì)更方便。
以前的代碼耦合度非常高,同時(shí)因?yàn)榛A(chǔ)代碼是 18 年寫(xiě)的,現(xiàn)在真的沒(méi)有眼看了;
重構(gòu)的過(guò)程中使用一些 Java8+ 的一些語(yǔ)法糖精簡(jiǎn)了許多代碼,各個(gè)模塊間的組織關(guān)系也重新梳理,現(xiàn)在會(huì)更易維護(hù)了。
比如由于創(chuàng)建客戶端需要許多可選參數(shù),于是就提供了 Builder 模式的創(chuàng)建選項(xiàng):
public interface ClientBuilder {
Client build();
ClientBuilder auth(ClientConfigurationData.Auth auth);
ClientBuilder routeUrl(String routeUrl);
ClientBuilder loginRetryCount(int loginRetryCount);
ClientBuilder event(Event event);
ClientBuilder reconnectCheck(ReconnectCheck reconnectCheck);
ClientBuilder okHttpClient(OkHttpClient okHttpClient);
ClientBuilder messageListener(MessageListener messageListener);
ClientBuilder callbackThreadPool(ThreadPoolExecutor callbackThreadPool);
}
以上部分 API 的設(shè)計(jì)借鑒了 Pulsar。
Proxy 優(yōu)化
除此之外還優(yōu)化了請(qǐng)求代理,這個(gè) Proxy 主要是用于方便在各個(gè)服務(wù)中發(fā)起 rest 調(diào)用,我這里為了輕量也沒(méi)有使用 Dubbo、SpringCloud 這類(lèi)服務(wù)框架。
但如果都硬編碼 http client 去請(qǐng)求時(shí)會(huì)有許多重復(fù)冗余的代碼,比如創(chuàng)建連接、請(qǐng)求參數(shù)、響應(yīng)解析、異常處理等。
于是在之前的版本中就提供了一個(gè) ProxyManager 的基本實(shí)現(xiàn):
@Override
public List<OnlineUsersResVO.DataBodyBean> onlineUsers() throws Exception{
RouteApi routeApi = new ProxyManager<>(RouteApi.class, routeUrl, okHttpClient).getInstance();
Response response = null;
OnlineUsersResVO onlineUsersResVO = null;
try {
response = (Response) routeApi.onlineUser();
String json = response.body().string() ;
onlineUsersResVO = JSON.parseObject(json, OnlineUsersResVO.class);
}catch (Exception e){
log.error("exception",e);
}finally {
response.body().close();
}
return onlineUsersResVO.getDataBody();
}
雖然提供了一些連接管理和參數(shù)封裝等基礎(chǔ)功能,但只實(shí)現(xiàn)了一半。
從上面的代碼也可以看出序列化都得自己實(shí)現(xiàn),這些代碼完全是冗余的。
經(jīng)過(guò)重構(gòu)后以上的代碼可以精簡(jiǎn)到如下:
// 聲明接口
@Request(method = Request.GET)
BaseResponse<Set<CIMUserInfo>> onlineUser() throws Exception;
// 初始化
routeApi = RpcProxyManager.create(RouteApi.class, routeUrl, okHttpClient);
public Set<CIMUserInfo> onlineUser() throws Exception {
BaseResponse<Set<CIMUserInfo>> onlineUsersResVO = routeApi.onlineUser();
return onlineUsersResVO.getDataBody();
}
這個(gè)調(diào)整之后就非常類(lèi)似于 Dubbo gRPC 這類(lèi) RPC 框架的使用,只需要把接口定義好,就和調(diào)用本地函數(shù)一樣的簡(jiǎn)單。
為了方便后續(xù)可能調(diào)用一些外部系統(tǒng),在此基礎(chǔ)上還支持了指定多種請(qǐng)求 method、指定 URL 、返回結(jié)果嵌套泛型等。
@Request(url = "sample-request?author=beeceptor")
EchoGeneric<EchoResponse.HeadersDTO> echoGeneric(EchoRequest message);
@Test
public void testGeneric() {
OkHttpClient client = new OkHttpClient();
String url = "http://echo.free.beeceptor.com";
Echo echo = RpcProxyManager.create(Echo.class, url, client);
EchoRequest request = new EchoRequest();
request.setName("crossoverJie");
request.setAge(18);
request.setCity("shenzhen");
// 支持泛型解析
EchoGeneric<EchoResponse.HeadersDTO> response = echo.echoGeneric(request);
Assertions.assertEquals(response.getHeaders().getHost(), "echo.free.beeceptor.com");
}
支持動(dòng)態(tài) URL 調(diào)用
圖片
還有一個(gè) todo:希望可以將 ProxyManager 交給 Spring 去管理,之前是在每次調(diào)用的地方都會(huì)創(chuàng)建一個(gè) Proxy 對(duì)象,完全沒(méi)有必要,代碼也很冗余。
但有網(wǎng)友在實(shí)現(xiàn)過(guò)程中發(fā)現(xiàn),有個(gè)場(chǎng)景的請(qǐng)求地址是動(dòng)態(tài)的,如果是交給 Spring 管理為單例后是沒(méi)法修改 URL 地址的,因?yàn)檫@個(gè)地址是在創(chuàng)建對(duì)象的時(shí)候初始化的。
所以我就在這里新增了一個(gè)動(dòng)態(tài) URL 的特性:
EchoResponse echoTarget(EchoRequest message, @DynamicUrl(useMethodEndpoint = false) String url);
Echo echo = RpcProxyManager.create(Echo.class, client);
String url = "http://echo.free.beeceptor.com/sample-request?author=beeceptor";
EchoResponse response = echo.echoTarget(request, url);
在聲明接口的時(shí)候使用 @DynamicUrl 的方法參數(shù)注解,告訴代理這個(gè)參數(shù)是 URL。這樣就可以允許在創(chuàng)建 Proxy 對(duì)象的時(shí)候不指定 URL,而是在實(shí)際調(diào)用時(shí)候再傳入具體的 URL,更方便創(chuàng)建單例了。
集成測(cè)試優(yōu)化
同時(shí)還優(yōu)化了集成測(cè)試,支持了 server 的集群版測(cè)試。
@Test
public void testReconnect() throws Exception {
super.startTwoServer();
super.startRoute();
String routeUrl = "http://localhost:8083";
String cj = "cj";
String zs = "zs";
Long cjId = super.registerAccount(cj);
Long zsId = super.registerAccount(zs);
var auth1 = ClientConfigurationData.Auth.builder()
.userName(cj)
.userId(cjId)
.build();
var auth2 = ClientConfigurationData.Auth.builder()
.userName(zs)
.userId(zsId)
.build();
@Cleanup
Client client1 = Client.builder()
.auth(auth1)
.routeUrl(routeUrl)
.build();
TimeUnit.SECONDS.sleep(3);
ClientState.State state = client1.getState();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state));
AtomicReference<String> client2Receive = new AtomicReference<>();
@Cleanup
Client client2 = Client.builder()
.auth(auth2)
.routeUrl(routeUrl)
.messageListener((client, message) -> client2Receive.set(message))
.build();
TimeUnit.SECONDS.sleep(3);
ClientState.State state2 = client2.getState();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state2));
Optional<CIMServerResVO> serverInfo2 = client2.getServerInfo();
Assertions.assertTrue(serverInfo2.isPresent());
System.out.println("client2 serverInfo = " + serverInfo2.get());
// send msg
String msg = "hello";
client1.sendGroup(msg);
Awaitility.await()
.untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));
client2Receive.set("");
System.out.println("ready to restart server");
TimeUnit.SECONDS.sleep(3);
Optional<CIMServerResVO> serverInfo = client1.getServerInfo();
Assertions.assertTrue(serverInfo.isPresent());
System.out.println("server info = " + serverInfo.get());
super.stopServer(serverInfo.get().getCimServerPort());
System.out.println("stop server success! " + serverInfo.get());
// Waiting server stopped, and client reconnect.
TimeUnit.SECONDS.sleep(30);
System.out.println("reconnect state: " + client1.getState());
Awaitility.await().atMost(15, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state));
serverInfo = client1.getServerInfo();
Assertions.assertTrue(serverInfo.isPresent());
System.out.println("client1 reconnect server info = " + serverInfo.get());
// Send message again.
log.info("send message again, client2Receive = {}", client2Receive.get());
client1.sendGroup(msg);
Awaitility.await()
.untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));
super.stopTwoServer();
}
比如在這里編寫(xiě)了一個(gè)客戶端重連的單測(cè),代碼有點(diǎn)長(zhǎng),但它的主要流程如下:
- 啟動(dòng)兩個(gè) Server:Server1,Server2
- 啟動(dòng) Route
- 在啟動(dòng)兩個(gè) Client 發(fā)送消息
- 校驗(yàn)消息發(fā)送是否成功
- 停止 Client1 連接的 Server
- 等待 Client 自動(dòng)重連到另一個(gè) Server
- 再次發(fā)送消息
- 校驗(yàn)消息發(fā)送是否成功
這樣就可以驗(yàn)證在服務(wù)端 Server 宕機(jī)后整個(gè)服務(wù)是否可用,消息收發(fā)是否正常。
public void startTwoServer() {
if (!zooKeeperContainer.isRunning()){
zooKeeperContainer.start();
} zookeeperAddr = String.format("%s:%d", zooKeeperContainer.getHost(), zooKeeperContainer.getMappedPort(ZooKeeperContainer.DEFAULT_CLIENT_PORT));
SpringApplication server = new SpringApplication(CIMServerApplication.class);
String[] args1 = new String[]{
"--cim.server.port=11211",
"--server.port=8081",
"--app.zk.addr=" + zookeeperAddr,
}; ConfigurableApplicationContext run1 = server.run(args1);
runMap.put(Integer.parseInt("11211"), run1);
SpringApplication server2 = new SpringApplication(CIMServerApplication.class);
String[] args2 = new String[]{
"--cim.server.port=11212",
"--server.port=8082",
"--app.zk.addr=" + zookeeperAddr,
}; ConfigurableApplicationContext run2 = server2.run(args2);
runMap.put(Integer.parseInt("11212"), run2);
}
public void stopServer(Integer port) {
runMap.get(port).close();
runMap.remove(port);
}
這里的啟動(dòng)兩個(gè) Server 就是創(chuàng)建了兩個(gè) Server 應(yīng)用,然后保存好端口和應(yīng)用之間的映射關(guān)系。
這樣就可以根據(jù)客戶端連接的 Server 信息指定停止哪一個(gè) Server,更方便做測(cè)試。
這次重啟 cim 的維護(hù)后會(huì)盡量維護(hù)下去,即便更新時(shí)間慢一點(diǎn)。
后續(xù)還會(huì)加上消息 ack、離線消息等之前呼聲很高的功能,感興趣的完全可以一起參與。
源碼地址:https://github.com/crossoverJie/cim