自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

自己手寫RPC如何實現(xiàn)同步、異步、單向調(diào)用?直接上代碼!

開發(fā) 前端
那RPC框架只支持同步調(diào)用的話,在高并發(fā)環(huán)境下肯定會出現(xiàn)性能問題,我想讓RPC框架支持同步、異步和單向調(diào)用,這也是很多優(yōu)秀的RPC框架都支持的功能,這個有辦法實現(xiàn)嗎?

很多好用的RPC框架都支持服務(wù)消費者以同步、異步和單向調(diào)用的方式與服務(wù)提供者進行交互,冰河你開發(fā)的這個RPC框架也可以嗎?

一、前言

在前面的章節(jié)中,實現(xiàn)了服務(wù)消費者屏蔽掉基于Netty連接服務(wù)提供者的實現(xiàn)細節(jié)的前提下,以異步轉(zhuǎn)同步的方式調(diào)用服務(wù)提供者。在外部服務(wù)調(diào)用服務(wù)消費者向服務(wù)提供者發(fā)送數(shù)據(jù)的方法時,能夠直接獲取到服務(wù)提供者調(diào)用真實方法返回的結(jié)果數(shù)據(jù)。

那RPC框架只支持同步調(diào)用的話,在高并發(fā)環(huán)境下肯定會出現(xiàn)性能問題,我想讓RPC框架支持同步、異步和單向調(diào)用,這也是很多優(yōu)秀的RPC框架都支持的功能,這個有辦法實現(xiàn)嗎?

我:安排。。。

二、目標

在服務(wù)提供者一端實現(xiàn)了按照自定義網(wǎng)絡(luò)傳輸協(xié)議和數(shù)據(jù)編解碼對接收到的數(shù)據(jù)進行解析,并且能夠?qū)⒔馕龅降臄?shù)據(jù)作為參數(shù)調(diào)用真實方法,并接收真實方法返回的結(jié)果數(shù)據(jù),通過自定義網(wǎng)絡(luò)協(xié)議和數(shù)據(jù)編解碼,將數(shù)據(jù)編碼成二進制字節(jié)流,傳輸給服務(wù)消費者。

在服務(wù)消費者一端實現(xiàn)了按照自定義的網(wǎng)絡(luò)傳輸協(xié)議和數(shù)據(jù)編解碼,將數(shù)據(jù)編碼成二進制字節(jié)流發(fā)送給服務(wù)提供者,能夠接收到服務(wù)提供者響應(yīng)回來的二進制字節(jié)流數(shù)據(jù),并且能夠根據(jù)自定義網(wǎng)絡(luò)傳輸協(xié)議和數(shù)據(jù)編解碼,將接收到的二進制字節(jié)流數(shù)據(jù)解碼成對應(yīng)的明文數(shù)據(jù),接下來,進行進一步處理。

同時,服務(wù)消費者支持在屏蔽掉基于Netty連接服務(wù)提供者的實現(xiàn)細節(jié)的前提下,使得外部服務(wù)調(diào)用服務(wù)消費者向服務(wù)提供者發(fā)送數(shù)據(jù)的方法時,能夠直接獲取到服務(wù)提供者調(diào)用真實方法返回的結(jié)果數(shù)據(jù)。

做到這里,已經(jīng)初步實現(xiàn)了RPC框架最基本的功能。這還遠遠不夠,服務(wù)消費者除了能夠以同步的方式調(diào)用服務(wù)提供者,也要支持異步調(diào)用和單向調(diào)用,看看人家Dubbo,做的是真特么牛逼。

好了,不羨慕人家,我們自己踏踏實實手擼吧,接下來,我們就實現(xiàn)服務(wù)消費者以同步、異步、單向調(diào)用的方式與服務(wù)提供者進行交互。

三、設(shè)計

服務(wù)消費者與服務(wù)提供者之間基于同步、異步和單向調(diào)用的設(shè)計圖分別如下圖所示

  • 同步調(diào)用

圖片

  • 異步調(diào)用

圖片

  • 單向調(diào)用

圖片

通過上圖可以看出:

(1)同步調(diào)用的方式,服務(wù)消費者發(fā)起數(shù)據(jù)請求后,會同步等待返回結(jié)果。

(2)異步調(diào)用的方式,服務(wù)消費者發(fā)起數(shù)據(jù)請求后,會立刻返回,后續(xù)會通過異步的方式獲取數(shù)據(jù)。

(3)單向調(diào)用的方式,服務(wù)消費者發(fā)起數(shù)據(jù)請求后,會立刻返回,不必關(guān)注后續(xù)數(shù)據(jù)的處理結(jié)果。

可以看到,從設(shè)計上還是比較簡單的,接下來,我們就一起實現(xiàn)它。

四、實現(xiàn)

1.工程結(jié)構(gòu)

  • bhrpc-annotation:實現(xiàn)bhrpc框架的核心注解工程。
  • bhrpc-codec:實現(xiàn)bhrpc框架的自定義編解碼功能。
  • bhrpc-common:實現(xiàn)bhrpc框架的通用工具類,包含服務(wù)提供者注解與服務(wù)消費者注解的掃描器。
  • bhrpc-constants:存放實現(xiàn)bhrpc框架通用的常量類。
  • bhrpc-consumer:服務(wù)消費者父工程
  • bhrpc-consumer-common:服務(wù)消費者通用工程
  • bhrpc-protocol:實現(xiàn)bhrpc框架的自定義網(wǎng)絡(luò)傳輸協(xié)議的工程。
  • bhrpc-provider:服務(wù)提供者父工程。
  • bhrpc-provider-common:服務(wù)提供者通用工程。
  • bhrpc-provider-native:以純Java方式啟動bhrpc框架的工程。
  • bhrpc-serialization:實現(xiàn)bhrpc框架序列化與反序列化功能的父工程。
  • bhrpc-serialization-api:實現(xiàn)bhrpc框架序列化與反序列化功能的通用接口工程。
  • bhrpc-serialization-jdk:以JDK的方式實現(xiàn)序列化與反序列化功能。
  • bhrpc-test:測試bhrpc框架的父工程。
  • bhrpc-test-consumer-codec:測試服務(wù)消費者基于自定義網(wǎng)絡(luò)協(xié)議與編解碼與服務(wù)提供者進行數(shù)據(jù)交互
  • bhrpc-test-consumer-handler:測試屏蔽服務(wù)消費者基于Netty與服務(wù)提供者建立連接的細節(jié)后,與服務(wù)提供者進行數(shù)據(jù)通信
  • bhrpc-test-api:測試的通用Servcie接口工程
  • bhrpc-test-provider:測試服務(wù)提供者的工程。
  • bhrpc-test-consumer:測試服務(wù)消費者的工程
  • bhrpc-test-scanner:測試掃描器的工程。

2.核心類實現(xiàn)關(guān)系

服務(wù)消費者與服務(wù)提供者之間基于同步、異步和單向調(diào)用的實現(xiàn)類關(guān)系如下圖所示。

圖片

可以看到,核心類之間的實現(xiàn)關(guān)系還是比較清晰的。

3.RPC上下文RpcContext類的實現(xiàn)

RpcContext類位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.context.RpcContext,源碼如下所示。

public class RpcContext {

    private RpcContext(){
    }

    /**
     * RpcContext實例
     */
    private static final RpcContext AGENT = new RpcContext();

    /**
     * 存放RPCFuture的InheritableThreadLocal
     */
    private static final InheritableThreadLocal<RPCFuture> RPC_FUTURE_INHERITABLE_THREAD_LOCAL = new InheritableThreadLocal<>();
    /**
     * 獲取上下文
     * @return RPC服務(wù)的上下文信息
     */
    public static RpcContext getContext(){
        return AGENT;
    }
    /**
     * 將RPCFuture保存到線程的上下文
     * @param rpcFuture
     */
    public void setRPCFuture(RPCFuture rpcFuture){
        RPC_FUTURE_INHERITABLE_THREAD_LOCAL.set(rpcFuture);
    }
    /**
     * 獲取RPCFuture
     */
    public RPCFuture getRPCFuture(){
        return RPC_FUTURE_INHERITABLE_THREAD_LOCAL.get();
    }
    /**
     * 移除RPCFuture
     */
    public void removeRPCFuture(){
        RPC_FUTURE_INHERITABLE_THREAD_LOCAL.remove();
    }
}

可以看到,在RpcContext類中主要是通過InheritableThreadLocal在維護RPCFuture,并且每個線程維護RPCFuture時,都是相互隔離的。RpcContext類中維護的RPCFuture會在RPC框架全局有效。

4.修改消費者RpcConsumerHandler處理器類

RpcConsumerHandler類位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.handler.RpcConsumerHandler,具體修改步驟如下所示。

(1)新增sendRequestSync()方法

sendRequestSync()方法表示同步調(diào)用的方法,源碼如下所示。

private RPCFuture sendRequestSync(RpcProtocol<RpcRequest> protocol) {
    RPCFuture rpcFuture = this.getRpcFuture(protocol);
    channel.writeAndFlush(protocol);
    return rpcFuture;
}

可以看到,在sendRequestSync()方法中,調(diào)用channel的writeAndFlush()方法發(fā)送數(shù)據(jù)后,會返回RPCFuture對象。

(2)新增sendRequestAsync()方法

sendRequestAsync()方法表示異步調(diào)用的方法,源碼如下所示。

private RPCFuture sendRequestAsync(RpcProtocol<RpcRequest> protocol) {
    RPCFuture rpcFuture = this.getRpcFuture(protocol);
    //如果是異步調(diào)用,則將RPCFuture放入RpcContext
    RpcContext.getContext().setRPCFuture(rpcFuture);
    channel.writeAndFlush(protocol);
    return null;
}

可以看到,sendRequestAsync()方法中,會將RPCFuture對象放入RpcContext上下文中,最終返回null。外部服務(wù)調(diào)用服務(wù)消費者向服務(wù)提供者發(fā)送數(shù)據(jù)后,會通過RpcContext獲取到RPCFuture對象,進而通過RPCFuture對象獲取最終結(jié)果數(shù)據(jù)。

(3)新增sendRequestOneway()方法

sendRequestOneway()方法表示單向調(diào)用的方法,源碼如下所示。

private RPCFuture sendRequestOneway(RpcProtocol<RpcRequest> protocol) {
    channel.writeAndFlush(protocol);
    return null;
}

可以看到,單向調(diào)用方法并不關(guān)心返回結(jié)果。sendRequestOneway()方法直接調(diào)用channel的writeAndFlush()方法,并返回null。

(4)修改sendRequest()方法

在sendRequest()方法的參數(shù)中新增是否是異步調(diào)用的async參數(shù)和是否是單向調(diào)用的oneway參數(shù),以這些參數(shù)來判斷是執(zhí)行同步調(diào)用、異步調(diào)用還是單向調(diào)用,源碼如下所示。

public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol, boolean async, boolean oneway){
    logger.info("服務(wù)消費者發(fā)送的數(shù)據(jù)===>>>{}", JSONObject.toJSONString(protocol));
    return oneway ? this.sendRequestOneway(protocol) : async ? 
           sendRequestAsync(protocol) : this.sendRequestSync(protocol);
}

5.修改RpcConsumer服務(wù)消費者類

RpcConsumer類位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.RpcConsumer,主要是修改RpcConsumer類中的sendRequest()方法,調(diào)用RpcConsumerHandler處理器類的sendRequest()方法時,需要傳遞是否是異步調(diào)用async的標識和是否是單向調(diào)用oneway的標識,源碼如下所示。

public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol) throws Exception {
    //################省略其他代碼################
    RpcRequest request = protocol.getBody();
    return handler.sendRequest(protocol, request.getAsync(), request.getOneway());
}

至此,整個實現(xiàn)就完畢了。實現(xiàn)起來是不是很簡單呢?

五、測試

1.啟動服務(wù)提供者

整個測試過程不需要修改服務(wù)提供者的代碼,所以,先啟動服務(wù)提供者,啟動bhrpc-test-provider工程下的io.binghe.rpc.test.provider.single.RpcSingleServerTest,輸出的結(jié)果信息如下所示。

INFO BaseServer:82 - Server started on 127.0.0.1:27880

可以看到,服務(wù)提供者啟動成功。

2.測試同步調(diào)用

(1)修改同步調(diào)用的main()方法

修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,源碼如下所示。

public static void main(String[] args) throws Exception {
    RpcConsumer consumer = RpcConsumer.getInstance();
    RPCFuture future = consumer.sendRequest(getRpcRequestProtocol());
    LOGGER.info("從服務(wù)消費者獲取到的數(shù)據(jù)===>>>" + future.get());
    consumer.close();
}

可以看到,同步調(diào)用時,會直接回去方法調(diào)用的結(jié)果數(shù)據(jù)。

(2)啟動服務(wù)消費者

啟動bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,輸出的結(jié)果信息如下所示。

13:45:12,576  INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.
13:45:12,693  INFO RpcConsumerHandler:90 - 服務(wù)消費者發(fā)送的數(shù)據(jù)===>>>{"body":{"async":false,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":false,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}
13:45:12,868  INFO RpcConsumerHandler:77 - 服務(wù)消費者接收到的數(shù)據(jù)===>>>{"body":{"async":false,"oneway":false,"result":"hello binghe"},"header":{"magic":16,"msgLen":211,"msgType":2,"requestId":1,"serializationType":"jdk","status":0}}
13:45:12,869  INFO RpcConsumerHandlerTest:38 - 從服務(wù)消費者獲取到的數(shù)據(jù)===>>>hello binghe

可以看到,在服務(wù)消費者輸出的信息中,除了向服務(wù)提供者發(fā)送的數(shù)據(jù)與接收服務(wù)提供者響應(yīng)的數(shù)據(jù)外,還在RpcConsumerHandlerTest類的main()方法中打印出了通過自定義的RPCFuture對象獲取的最終結(jié)果數(shù)據(jù)為hello binghe。符合預(yù)期的效果。

(3)再次查看服務(wù)提供者日志

再次查看服務(wù)提供者輸出的日志信息,如下所示。

13:45:12,748  INFO RpcProviderHandler:132 - use cglib reflect type invoke method...
13:45:12,748  INFO ProviderDemoServiceImpl:33 - 調(diào)用hello方法傳入的參數(shù)為===>>>binghe

可以看到,服務(wù)提供者使用CGLib的方式調(diào)用了真實的方法。

3.測試異步調(diào)用

(1)修改同步調(diào)用的main()方法

修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,源碼如下所示。

public static void main(String[] args) throws Exception {
    RpcConsumer consumer = RpcConsumer.getInstance();
    consumer.sendRequest(getRpcRequestProtocol());
    RPCFuture future = RpcContext.getContext().getRPCFuture();
    LOGGER.info("從服務(wù)消費者獲取到的數(shù)據(jù)===>>>" + future.get());
    consumer.close();
}

可以看到,執(zhí)行異步調(diào)用時,并沒有從調(diào)用consumer的sendRequest()方法直接獲取返回的RPCFuture結(jié)果數(shù)據(jù),而是通過RpcContext上下文獲取到RPCFuture對象,再由RPCFuture對象獲取結(jié)果數(shù)據(jù)。

(2)修改構(gòu)建RpcProtocol對象的方法

修改getRpcRequestProtocol()方法中構(gòu)建RpcRequest的方法參數(shù),將是否是異步調(diào)用的參數(shù)設(shè)置為true,源碼如下所示。

private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){
    //模擬發(fā)送數(shù)據(jù)
    RpcProtocol<RpcRequest> protocol = new RpcProtocol<RpcRequest>();
    //################省略其他代碼##########################
    request.setAsync(true);
    request.setOneway(false);
    protocol.setBody(request);
    return protocol;
}

(3)啟動服務(wù)消費者

啟動bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,輸出的結(jié)果信息如下所示

13:47:55,800  INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.
13:47:55,905  INFO RpcConsumerHandler:90 - 服務(wù)消費者發(fā)送的數(shù)據(jù)===>>>{"body":{"async":true,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":false,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}
13:47:55,971  INFO RpcConsumerHandler:77 - 服務(wù)消費者接收到的數(shù)據(jù)===>>>{"body":{"async":true,"oneway":false,"result":"hello binghe"},"header":{"magic":16,"msgLen":211,"msgType":2,"requestId":1,"serializationType":"jdk","status":0}}
13:47:55,971  INFO RpcConsumerHandlerTest:40 - 從服務(wù)消費者獲取到的數(shù)據(jù)===>>>hello binghe

可以看到,在服務(wù)消費者輸出的信息中,除了向服務(wù)提供者發(fā)送的數(shù)據(jù)與接收服務(wù)提供者響應(yīng)的數(shù)據(jù)外,還在RpcConsumerHandlerTest類的main()方法中打印出了通過自定義的RPCFuture對象獲取的最終結(jié)果數(shù)據(jù)為hello binghe。符合預(yù)期的效果。

(4)再次查看服務(wù)提供者日志

再次查看服務(wù)提供者輸出的日志信息,如下所示。

13:47:55,948  INFO RpcProviderHandler:132 - use cglib reflect type invoke method...
13:47:55,948  INFO ProviderDemoServiceImpl:33 - 調(diào)用hello方法傳入的參數(shù)為===>>>binghe

可以看到,服務(wù)提供者使用CGLib的方式調(diào)用了真實的方法。

4.測試單向調(diào)用

(1)修改同步調(diào)用的main()方法

修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,源碼如下所示。

public static void main(String[] args) throws Exception {
    RpcConsumer consumer = RpcConsumer.getInstance();
    consumer.sendRequest(getRpcRequestProtocol());
    LOGGER.info("無需獲取返回的結(jié)果數(shù)據(jù)");
    consumer.close();
}

可以看到,在單向調(diào)用中,并沒有獲取返回結(jié)果。

(2)修改構(gòu)建RpcProtocol對象的方法

修改getRpcRequestProtocol()方法中構(gòu)建RpcRequest的方法參數(shù),將是否是單向調(diào)用的參數(shù)設(shè)置為true,源碼如下所示。

private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){
    //模擬發(fā)送數(shù)據(jù)
 //#############省略其他代碼#################
    request.setAsync(false);
    request.setOneway(true);
    protocol.setBody(request);
    return protocol;
}

(3)啟動服務(wù)消費者

啟動bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,輸出的結(jié)果信息如下所示。

13:58:26,417  INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.
13:58:26,524  INFO RpcConsumerHandler:90 - 服務(wù)消費者發(fā)送的數(shù)據(jù)===>>>{"body":{"async":false,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":true,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}
13:58:26,531  INFO RpcConsumerHandlerTest:39 - 無需獲取返回的結(jié)果數(shù)據(jù)

可以看到,服務(wù)消費者向服務(wù)提供者發(fā)送數(shù)據(jù)后,并沒有獲取返回的結(jié)果數(shù)據(jù)。

(4)再次查看服務(wù)提供者日志

再次查看服務(wù)提供者輸出的日志信息,如下所示。

13:58:26,565  INFO RpcProviderHandler:132 - use cglib reflect type invoke method...
13:58:26,566  INFO ProviderDemoServiceImpl:33 - 調(diào)用hello方法傳入的參數(shù)為===>>>binghe

可以看到,服務(wù)提供者使用CGLib的方式調(diào)用了真實的方法。

六、總結(jié)

目前實現(xiàn)的RPC框架以Java原生進程的方式啟動后,能夠?qū)崿F(xiàn)服務(wù)消費者以同步、異步和單向調(diào)用的方式與服務(wù)提供者之間進行數(shù)據(jù)交互。至此,我們寫的RPC框架的功能又進一步得到了增強。

我們寫的RPC框架正在一步步實現(xiàn)它該有的功能。

責任編輯:姜華 來源: 冰河技術(shù)
相關(guān)推薦

2021-10-19 08:58:48

Java 語言 Java 基礎(chǔ)

2024-05-31 08:45:24

2009-07-01 14:23:46

JavaScript異

2021-08-06 13:30:37

JS手寫題前端

2020-11-02 08:19:18

RPC框架Java

2021-10-21 08:21:10

Java Reflect Java 基礎(chǔ)

2021-10-13 08:21:52

Java websocket Java 基礎(chǔ)

2021-11-15 14:02:27

RPCSpringBootRabbitMQ

2024-11-11 00:00:10

2020-11-18 19:11:26

iOSFlutterNative

2022-04-02 07:52:47

DubboRPC調(diào)用動態(tài)代理

2014-09-02 10:43:45

RedisRPC

2009-10-20 16:48:30

C#委托

2010-03-01 13:17:46

WCF單向服務(wù)

2010-02-06 09:46:46

C++單向鏈表

2021-04-21 08:01:31

Googleprotobuf嵌入式系統(tǒng)

2023-01-05 08:01:35

SpringRPC框架

2020-01-16 10:48:21

HTTPRPC協(xié)議

2017-03-02 13:31:02

監(jiān)控系統(tǒng)

2024-07-11 16:49:43

同步通信異步通信通信
點贊
收藏

51CTO技術(shù)棧公眾號