自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

聊聊 IM 系統(tǒng)重構(gòu)到 SDK 設(shè)計(jì)的最佳實(shí)踐

開(kāi)發(fā) 前端
如果我們網(wǎng)絡(luò)環(huán)境發(fā)生了變化,比如從家里的 Wi-Fi 切換到了公司的,需要手動(dòng)刪除下 FE/meta 下的所有文件再次啟動(dòng),BE 則是需要重啟一下容器。

SDK 設(shè)計(jì)

圖片圖片

在之前提到了 cim 在做集成測(cè)試的時(shí)候遇到的問(wèn)題,需要提供一個(gè) SDK 來(lái)解決,于是我花了一些時(shí)間編寫(xiě)了 SDK,同時(shí)也將 cim-client 重構(gòu)了。

重構(gòu)后的代碼長(zhǎng)這個(gè)樣子:

@Bean
    public Client buildClient(@Qualifier("callBackThreadPool") ThreadPoolExecutor callbackThreadPool,
                              Event event) {
        OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(3, TimeUnit.SECONDS)
                .readTimeout(3, TimeUnit.SECONDS)
                .writeTimeout(3, TimeUnit.SECONDS)
                .retryOnConnectionFailure(true).build();

        return Client.builder()
                .auth(ClientConfigurationData.Auth.builder()
                        .userName(appConfiguration.getUserName())
                        .userId(appConfiguration.getUserId())
                        .build())
                .routeUrl(appConfiguration.getRouteUrl())
                .loginRetryCount(appConfiguration.getReconnectCount())
                .event(event)
                .reconnectCheck(client -> !shutDownSign.checkStatus())
                .okHttpClient(okHttpClient)
                .messageListener(new MsgCallBackListener(msgLogger))
                .callbackThreadPool(callbackThreadPool)
                .build();
    }

配合 springboot 使用時(shí)只需要?jiǎng)?chuàng)建一個(gè) Client 即可,這個(gè) Client 里維護(hù)了核心的:

  • 長(zhǎng)鏈接創(chuàng)建、狀態(tài)維護(hù)
  • 心跳檢測(cè)
  • 超時(shí)、網(wǎng)絡(luò)異常重連等

同時(shí)也提供了簡(jiǎn)易的 API 可以直接收發(fā)消息:

圖片圖片

這樣在集成到業(yè)務(wù)代碼中時(shí)會(huì)更方便。

以前的代碼耦合度非常高,同時(shí)因?yàn)榛A(chǔ)代碼是 18 年寫(xiě)的,現(xiàn)在真的沒(méi)有眼看了;

重構(gòu)的過(guò)程中使用一些 Java8+ 的一些語(yǔ)法糖精簡(jiǎn)了許多代碼,各個(gè)模塊間的組織關(guān)系也重新梳理,現(xiàn)在會(huì)更易維護(hù)了。

比如由于創(chuàng)建客戶端需要許多可選參數(shù),于是就提供了 Builder 模式的創(chuàng)建選項(xiàng):

public interface ClientBuilder {  
  
    Client build();  
    ClientBuilder auth(ClientConfigurationData.Auth auth);  
    ClientBuilder routeUrl(String routeUrl);  
    ClientBuilder loginRetryCount(int loginRetryCount);  
    ClientBuilder event(Event event);  
    ClientBuilder reconnectCheck(ReconnectCheck reconnectCheck);  
    ClientBuilder okHttpClient(OkHttpClient okHttpClient);  
    ClientBuilder messageListener(MessageListener messageListener);  
    ClientBuilder callbackThreadPool(ThreadPoolExecutor callbackThreadPool);  
}


以上部分 API 的設(shè)計(jì)借鑒了 Pulsar。

Proxy 優(yōu)化

除此之外還優(yōu)化了請(qǐng)求代理,這個(gè) Proxy 主要是用于方便在各個(gè)服務(wù)中發(fā)起 rest 調(diào)用,我這里為了輕量也沒(méi)有使用 Dubbo、SpringCloud 這類(lèi)服務(wù)框架。

但如果都硬編碼 http client 去請(qǐng)求時(shí)會(huì)有許多重復(fù)冗余的代碼,比如創(chuàng)建連接、請(qǐng)求參數(shù)、響應(yīng)解析、異常處理等。

于是在之前的版本中就提供了一個(gè) ProxyManager 的基本實(shí)現(xiàn):

@Override  
public List<OnlineUsersResVO.DataBodyBean> onlineUsers() throws Exception{  
    RouteApi routeApi = new ProxyManager<>(RouteApi.class, routeUrl, okHttpClient).getInstance();  
  
    Response response = null;  
    OnlineUsersResVO onlineUsersResVO = null;  
    try {  
        response = (Response) routeApi.onlineUser();  
        String json = response.body().string() ;  
        onlineUsersResVO = JSON.parseObject(json, OnlineUsersResVO.class);  
  
    }catch (Exception e){  
        log.error("exception",e);  
    }finally {  
        response.body().close();  
    }  
    return onlineUsersResVO.getDataBody();  
}

雖然提供了一些連接管理和參數(shù)封裝等基礎(chǔ)功能,但只實(shí)現(xiàn)了一半。

從上面的代碼也可以看出序列化都得自己實(shí)現(xiàn),這些代碼完全是冗余的。

經(jīng)過(guò)重構(gòu)后以上的代碼可以精簡(jiǎn)到如下:

// 聲明接口
@Request(method = Request.GET)  
BaseResponse<Set<CIMUserInfo>> onlineUser() throws Exception;

// 初始化
routeApi = RpcProxyManager.create(RouteApi.class, routeUrl, okHttpClient);

public Set<CIMUserInfo> onlineUser() throws Exception {  
    BaseResponse<Set<CIMUserInfo>> onlineUsersResVO = routeApi.onlineUser();  
    return onlineUsersResVO.getDataBody();  
}

這個(gè)調(diào)整之后就非常類(lèi)似于 Dubbo gRPC 這類(lèi) RPC 框架的使用,只需要把接口定義好,就和調(diào)用本地函數(shù)一樣的簡(jiǎn)單。

為了方便后續(xù)可能調(diào)用一些外部系統(tǒng),在此基礎(chǔ)上還支持了指定多種請(qǐng)求 method、指定 URL 、返回結(jié)果嵌套泛型等。

@Request(url = "sample-request?author=beeceptor")  
EchoGeneric<EchoResponse.HeadersDTO> echoGeneric(EchoRequest message);

@Test  
public void testGeneric() {  
    OkHttpClient client = new OkHttpClient();  
    String url = "http://echo.free.beeceptor.com";  
    Echo echo = RpcProxyManager.create(Echo.class, url, client);  
    EchoRequest request = new EchoRequest();  
    request.setName("crossoverJie");  
    request.setAge(18);  
    request.setCity("shenzhen");  
    // 支持泛型解析
    EchoGeneric<EchoResponse.HeadersDTO> response = echo.echoGeneric(request);  
    Assertions.assertEquals(response.getHeaders().getHost(), "echo.free.beeceptor.com");  
}

支持動(dòng)態(tài) URL 調(diào)用

圖片圖片

還有一個(gè) todo:希望可以將 ProxyManager 交給 Spring 去管理,之前是在每次調(diào)用的地方都會(huì)創(chuàng)建一個(gè) Proxy 對(duì)象,完全沒(méi)有必要,代碼也很冗余。

但有網(wǎng)友在實(shí)現(xiàn)過(guò)程中發(fā)現(xiàn),有個(gè)場(chǎng)景的請(qǐng)求地址是動(dòng)態(tài)的,如果是交給 Spring 管理為單例后是沒(méi)法修改 URL 地址的,因?yàn)檫@個(gè)地址是在創(chuàng)建對(duì)象的時(shí)候初始化的。

所以我就在這里新增了一個(gè)動(dòng)態(tài) URL 的特性:

EchoResponse echoTarget(EchoRequest message, @DynamicUrl(useMethodEndpoint = false) String url);

Echo echo = RpcProxyManager.create(Echo.class, client);
String url = "http://echo.free.beeceptor.com/sample-request?author=beeceptor";
EchoResponse response = echo.echoTarget(request, url);

在聲明接口的時(shí)候使用 @DynamicUrl 的方法參數(shù)注解,告訴代理這個(gè)參數(shù)是 URL。這樣就可以允許在創(chuàng)建  Proxy 對(duì)象的時(shí)候不指定 URL,而是在實(shí)際調(diào)用時(shí)候再傳入具體的 URL,更方便創(chuàng)建單例了。

集成測(cè)試優(yōu)化

同時(shí)還優(yōu)化了集成測(cè)試,支持了 server 的集群版測(cè)試。

https://github.com/crossoverJie/cim/blob/4c149f8bda78718e3ecae2c5759aa9732eff9132/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java#L210

@Test  
public void testReconnect() throws Exception {  
    super.startTwoServer();  
    super.startRoute();  
  
    String routeUrl = "http://localhost:8083";  
    String cj = "cj";  
    String zs = "zs";  
    Long cjId = super.registerAccount(cj);  
    Long zsId = super.registerAccount(zs);  
    var auth1 = ClientConfigurationData.Auth.builder()  
            .userName(cj)  
            .userId(cjId)  
            .build();  
    var auth2 = ClientConfigurationData.Auth.builder()  
            .userName(zs)  
            .userId(zsId)  
            .build();  
  
    @Cleanup  
    Client client1 = Client.builder()  
            .auth(auth1)  
            .routeUrl(routeUrl)  
            .build();  
    TimeUnit.SECONDS.sleep(3);  
    ClientState.State state = client1.getState();  
    Awaitility.await().atMost(10, TimeUnit.SECONDS)  
            .untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state));  
  
  
    AtomicReference<String> client2Receive = new AtomicReference<>();  
    @Cleanup  
    Client client2 = Client.builder()  
            .auth(auth2)  
            .routeUrl(routeUrl)  
            .messageListener((client, message) -> client2Receive.set(message))  
            .build();  
    TimeUnit.SECONDS.sleep(3);  
    ClientState.State state2 = client2.getState();  
    Awaitility.await().atMost(10, TimeUnit.SECONDS)  
            .untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state2));  
  
    Optional<CIMServerResVO> serverInfo2 = client2.getServerInfo();  
    Assertions.assertTrue(serverInfo2.isPresent());  
    System.out.println("client2 serverInfo = " + serverInfo2.get());  
  
    // send msg  
    String msg = "hello";  
    client1.sendGroup(msg);  
    Awaitility.await()  
            .untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));  
    client2Receive.set("");  
  
  
    System.out.println("ready to restart server");  
    TimeUnit.SECONDS.sleep(3);  
    Optional<CIMServerResVO> serverInfo = client1.getServerInfo();  
    Assertions.assertTrue(serverInfo.isPresent());  
    System.out.println("server info = " + serverInfo.get());  
  
    super.stopServer(serverInfo.get().getCimServerPort());  
    System.out.println("stop server success! " + serverInfo.get());  
  
  
    // Waiting server stopped, and client reconnect.  
    TimeUnit.SECONDS.sleep(30);  
    System.out.println("reconnect state: " + client1.getState());  
    Awaitility.await().atMost(15, TimeUnit.SECONDS)  
            .untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state));  
    serverInfo = client1.getServerInfo();  
    Assertions.assertTrue(serverInfo.isPresent());  
    System.out.println("client1 reconnect server info = " + serverInfo.get());  
  
    // Send message again.  
    log.info("send message again, client2Receive = {}", client2Receive.get());  
    client1.sendGroup(msg);  
    Awaitility.await()  
            .untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));  
    super.stopTwoServer();  
}

比如在這里編寫(xiě)了一個(gè)客戶端重連的單測(cè),代碼有點(diǎn)長(zhǎng),但它的主要流程如下:

  • 啟動(dòng)兩個(gè) Server:Server1,Server2
  • 啟動(dòng) Route
  • 在啟動(dòng)兩個(gè) Client 發(fā)送消息
  • 校驗(yàn)消息發(fā)送是否成功
  • 停止 Client1 連接的 Server
  • 等待 Client 自動(dòng)重連到另一個(gè) Server
  • 再次發(fā)送消息
  • 校驗(yàn)消息發(fā)送是否成功

這樣就可以驗(yàn)證在服務(wù)端 Server 宕機(jī)后整個(gè)服務(wù)是否可用,消息收發(fā)是否正常。

public void startTwoServer() {  
    if (!zooKeeperContainer.isRunning()){  
        zooKeeperContainer.start();  
    }    zookeeperAddr = String.format("%s:%d", zooKeeperContainer.getHost(), zooKeeperContainer.getMappedPort(ZooKeeperContainer.DEFAULT_CLIENT_PORT));  
    SpringApplication server = new SpringApplication(CIMServerApplication.class);  
    String[] args1 = new String[]{  
            "--cim.server.port=11211",  
            "--server.port=8081",  
            "--app.zk.addr=" + zookeeperAddr,  
    };    ConfigurableApplicationContext run1 = server.run(args1);  
    runMap.put(Integer.parseInt("11211"), run1);  
  
  
    SpringApplication server2 = new SpringApplication(CIMServerApplication.class);  
    String[] args2 = new String[]{  
            "--cim.server.port=11212",  
            "--server.port=8082",  
            "--app.zk.addr=" + zookeeperAddr,  
    };    ConfigurableApplicationContext run2 = server2.run(args2);  
    runMap.put(Integer.parseInt("11212"), run2);  
}

public void stopServer(Integer port) {  
    runMap.get(port).close();  
    runMap.remove(port);  
}

這里的啟動(dòng)兩個(gè) Server 就是創(chuàng)建了兩個(gè) Server 應(yīng)用,然后保存好端口和應(yīng)用之間的映射關(guān)系。

這樣就可以根據(jù)客戶端連接的 Server 信息指定停止哪一個(gè) Server,更方便做測(cè)試。

這次重啟 cim 的維護(hù)后會(huì)盡量維護(hù)下去,即便更新時(shí)間慢一點(diǎn)。

后續(xù)還會(huì)加上消息 ack、離線消息等之前呼聲很高的功能,感興趣的完全可以一起參與。

源碼地址:https://github.com/crossoverJie/cim

責(zé)任編輯:武曉燕 來(lái)源: crossoverJie
相關(guān)推薦

2023-09-11 08:50:03

Maven工具關(guān)系管理

2016-12-27 08:49:55

API設(shè)計(jì)策略

2013-06-13 09:21:31

RESTful APIRESTfulAPI

2010-12-28 10:12:39

PHP

2025-01-02 09:06:43

2017-10-20 08:25:10

數(shù)據(jù)收集工具數(shù)據(jù)源

2024-08-29 09:32:36

2024-10-14 14:28:19

支付系統(tǒng)設(shè)計(jì)

2014-05-19 10:08:36

IM系統(tǒng)架構(gòu)設(shè)計(jì)

2020-06-08 18:41:07

Kafka微服務(wù)Web

2009-06-22 14:48:21

DRY架構(gòu)設(shè)計(jì)

2020-02-06 08:03:53

疫情設(shè)計(jì)IM系統(tǒng)

2017-03-06 20:39:41

整潔代碼Clean Code

2011-12-31 10:18:33

響應(yīng)設(shè)計(jì)

2020-08-07 09:41:00

微服務(wù)架構(gòu)數(shù)據(jù)

2024-09-23 00:00:00

下拉菜單UI控件

2014-02-26 11:01:28

日志優(yōu)化系統(tǒng)日志

2023-12-07 19:48:42

2021-09-03 23:01:58

CSS 技巧代碼重構(gòu)

2023-09-13 08:00:00

JavaScript循環(huán)語(yǔ)句
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)