自己手寫RPC如何實現(xiàn)同步、異步、單向調(diào)用?直接上代碼!
很多好用的RPC框架都支持服務(wù)消費者以同步、異步和單向調(diào)用的方式與服務(wù)提供者進行交互,冰河你開發(fā)的這個RPC框架也可以嗎?
一、前言
在前面的章節(jié)中,實現(xiàn)了服務(wù)消費者屏蔽掉基于Netty連接服務(wù)提供者的實現(xiàn)細節(jié)的前提下,以異步轉(zhuǎn)同步的方式調(diào)用服務(wù)提供者。在外部服務(wù)調(diào)用服務(wù)消費者向服務(wù)提供者發(fā)送數(shù)據(jù)的方法時,能夠直接獲取到服務(wù)提供者調(diào)用真實方法返回的結(jié)果數(shù)據(jù)。
那RPC框架只支持同步調(diào)用的話,在高并發(fā)環(huán)境下肯定會出現(xiàn)性能問題,我想讓RPC框架支持同步、異步和單向調(diào)用,這也是很多優(yōu)秀的RPC框架都支持的功能,這個有辦法實現(xiàn)嗎?
我:安排。。。
二、目標
在服務(wù)提供者一端實現(xiàn)了按照自定義網(wǎng)絡(luò)傳輸協(xié)議和數(shù)據(jù)編解碼對接收到的數(shù)據(jù)進行解析,并且能夠?qū)⒔馕龅降臄?shù)據(jù)作為參數(shù)調(diào)用真實方法,并接收真實方法返回的結(jié)果數(shù)據(jù),通過自定義網(wǎng)絡(luò)協(xié)議和數(shù)據(jù)編解碼,將數(shù)據(jù)編碼成二進制字節(jié)流,傳輸給服務(wù)消費者。
在服務(wù)消費者一端實現(xiàn)了按照自定義的網(wǎng)絡(luò)傳輸協(xié)議和數(shù)據(jù)編解碼,將數(shù)據(jù)編碼成二進制字節(jié)流發(fā)送給服務(wù)提供者,能夠接收到服務(wù)提供者響應(yīng)回來的二進制字節(jié)流數(shù)據(jù),并且能夠根據(jù)自定義網(wǎng)絡(luò)傳輸協(xié)議和數(shù)據(jù)編解碼,將接收到的二進制字節(jié)流數(shù)據(jù)解碼成對應(yīng)的明文數(shù)據(jù),接下來,進行進一步處理。
同時,服務(wù)消費者支持在屏蔽掉基于Netty連接服務(wù)提供者的實現(xiàn)細節(jié)的前提下,使得外部服務(wù)調(diào)用服務(wù)消費者向服務(wù)提供者發(fā)送數(shù)據(jù)的方法時,能夠直接獲取到服務(wù)提供者調(diào)用真實方法返回的結(jié)果數(shù)據(jù)。
做到這里,已經(jīng)初步實現(xiàn)了RPC框架最基本的功能。這還遠遠不夠,服務(wù)消費者除了能夠以同步的方式調(diào)用服務(wù)提供者,也要支持異步調(diào)用和單向調(diào)用,看看人家Dubbo,做的是真特么牛逼。
好了,不羨慕人家,我們自己踏踏實實手擼吧,接下來,我們就實現(xiàn)服務(wù)消費者以同步、異步、單向調(diào)用的方式與服務(wù)提供者進行交互。
三、設(shè)計
服務(wù)消費者與服務(wù)提供者之間基于同步、異步和單向調(diào)用的設(shè)計圖分別如下圖所示
- 同步調(diào)用
- 異步調(diào)用
- 單向調(diào)用
通過上圖可以看出:
(1)同步調(diào)用的方式,服務(wù)消費者發(fā)起數(shù)據(jù)請求后,會同步等待返回結(jié)果。
(2)異步調(diào)用的方式,服務(wù)消費者發(fā)起數(shù)據(jù)請求后,會立刻返回,后續(xù)會通過異步的方式獲取數(shù)據(jù)。
(3)單向調(diào)用的方式,服務(wù)消費者發(fā)起數(shù)據(jù)請求后,會立刻返回,不必關(guān)注后續(xù)數(shù)據(jù)的處理結(jié)果。
可以看到,從設(shè)計上還是比較簡單的,接下來,我們就一起實現(xiàn)它。
四、實現(xiàn)
1.工程結(jié)構(gòu)
- bhrpc-annotation:實現(xiàn)bhrpc框架的核心注解工程。
- bhrpc-codec:實現(xiàn)bhrpc框架的自定義編解碼功能。
- bhrpc-common:實現(xiàn)bhrpc框架的通用工具類,包含服務(wù)提供者注解與服務(wù)消費者注解的掃描器。
- bhrpc-constants:存放實現(xiàn)bhrpc框架通用的常量類。
- bhrpc-consumer:服務(wù)消費者父工程
- bhrpc-consumer-common:服務(wù)消費者通用工程
- bhrpc-protocol:實現(xiàn)bhrpc框架的自定義網(wǎng)絡(luò)傳輸協(xié)議的工程。
- bhrpc-provider:服務(wù)提供者父工程。
- bhrpc-provider-common:服務(wù)提供者通用工程。
- bhrpc-provider-native:以純Java方式啟動bhrpc框架的工程。
- bhrpc-serialization:實現(xiàn)bhrpc框架序列化與反序列化功能的父工程。
- bhrpc-serialization-api:實現(xiàn)bhrpc框架序列化與反序列化功能的通用接口工程。
- bhrpc-serialization-jdk:以JDK的方式實現(xiàn)序列化與反序列化功能。
- bhrpc-test:測試bhrpc框架的父工程。
- bhrpc-test-consumer-codec:測試服務(wù)消費者基于自定義網(wǎng)絡(luò)協(xié)議與編解碼與服務(wù)提供者進行數(shù)據(jù)交互
- bhrpc-test-consumer-handler:測試屏蔽服務(wù)消費者基于Netty與服務(wù)提供者建立連接的細節(jié)后,與服務(wù)提供者進行數(shù)據(jù)通信
- bhrpc-test-api:測試的通用Servcie接口工程
- bhrpc-test-provider:測試服務(wù)提供者的工程。
- bhrpc-test-consumer:測試服務(wù)消費者的工程
- bhrpc-test-scanner:測試掃描器的工程。
2.核心類實現(xiàn)關(guān)系
服務(wù)消費者與服務(wù)提供者之間基于同步、異步和單向調(diào)用的實現(xiàn)類關(guān)系如下圖所示。
可以看到,核心類之間的實現(xiàn)關(guān)系還是比較清晰的。
3.RPC上下文RpcContext類的實現(xiàn)
RpcContext類位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.context.RpcContext,源碼如下所示。
public class RpcContext {
private RpcContext(){
}
/**
* RpcContext實例
*/
private static final RpcContext AGENT = new RpcContext();
/**
* 存放RPCFuture的InheritableThreadLocal
*/
private static final InheritableThreadLocal<RPCFuture> RPC_FUTURE_INHERITABLE_THREAD_LOCAL = new InheritableThreadLocal<>();
/**
* 獲取上下文
* @return RPC服務(wù)的上下文信息
*/
public static RpcContext getContext(){
return AGENT;
}
/**
* 將RPCFuture保存到線程的上下文
* @param rpcFuture
*/
public void setRPCFuture(RPCFuture rpcFuture){
RPC_FUTURE_INHERITABLE_THREAD_LOCAL.set(rpcFuture);
}
/**
* 獲取RPCFuture
*/
public RPCFuture getRPCFuture(){
return RPC_FUTURE_INHERITABLE_THREAD_LOCAL.get();
}
/**
* 移除RPCFuture
*/
public void removeRPCFuture(){
RPC_FUTURE_INHERITABLE_THREAD_LOCAL.remove();
}
}
可以看到,在RpcContext類中主要是通過InheritableThreadLocal在維護RPCFuture,并且每個線程維護RPCFuture時,都是相互隔離的。RpcContext類中維護的RPCFuture會在RPC框架全局有效。
4.修改消費者RpcConsumerHandler處理器類
RpcConsumerHandler類位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.handler.RpcConsumerHandler,具體修改步驟如下所示。
(1)新增sendRequestSync()方法
sendRequestSync()方法表示同步調(diào)用的方法,源碼如下所示。
private RPCFuture sendRequestSync(RpcProtocol<RpcRequest> protocol) {
RPCFuture rpcFuture = this.getRpcFuture(protocol);
channel.writeAndFlush(protocol);
return rpcFuture;
}
可以看到,在sendRequestSync()方法中,調(diào)用channel的writeAndFlush()方法發(fā)送數(shù)據(jù)后,會返回RPCFuture對象。
(2)新增sendRequestAsync()方法
sendRequestAsync()方法表示異步調(diào)用的方法,源碼如下所示。
private RPCFuture sendRequestAsync(RpcProtocol<RpcRequest> protocol) {
RPCFuture rpcFuture = this.getRpcFuture(protocol);
//如果是異步調(diào)用,則將RPCFuture放入RpcContext
RpcContext.getContext().setRPCFuture(rpcFuture);
channel.writeAndFlush(protocol);
return null;
}
可以看到,sendRequestAsync()方法中,會將RPCFuture對象放入RpcContext上下文中,最終返回null。外部服務(wù)調(diào)用服務(wù)消費者向服務(wù)提供者發(fā)送數(shù)據(jù)后,會通過RpcContext獲取到RPCFuture對象,進而通過RPCFuture對象獲取最終結(jié)果數(shù)據(jù)。
(3)新增sendRequestOneway()方法
sendRequestOneway()方法表示單向調(diào)用的方法,源碼如下所示。
private RPCFuture sendRequestOneway(RpcProtocol<RpcRequest> protocol) {
channel.writeAndFlush(protocol);
return null;
}
可以看到,單向調(diào)用方法并不關(guān)心返回結(jié)果。sendRequestOneway()方法直接調(diào)用channel的writeAndFlush()方法,并返回null。
(4)修改sendRequest()方法
在sendRequest()方法的參數(shù)中新增是否是異步調(diào)用的async參數(shù)和是否是單向調(diào)用的oneway參數(shù),以這些參數(shù)來判斷是執(zhí)行同步調(diào)用、異步調(diào)用還是單向調(diào)用,源碼如下所示。
public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol, boolean async, boolean oneway){
logger.info("服務(wù)消費者發(fā)送的數(shù)據(jù)===>>>{}", JSONObject.toJSONString(protocol));
return oneway ? this.sendRequestOneway(protocol) : async ?
sendRequestAsync(protocol) : this.sendRequestSync(protocol);
}
5.修改RpcConsumer服務(wù)消費者類
RpcConsumer類位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.RpcConsumer,主要是修改RpcConsumer類中的sendRequest()方法,調(diào)用RpcConsumerHandler處理器類的sendRequest()方法時,需要傳遞是否是異步調(diào)用async的標識和是否是單向調(diào)用oneway的標識,源碼如下所示。
public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol) throws Exception {
//################省略其他代碼################
RpcRequest request = protocol.getBody();
return handler.sendRequest(protocol, request.getAsync(), request.getOneway());
}
至此,整個實現(xiàn)就完畢了。實現(xiàn)起來是不是很簡單呢?
五、測試
1.啟動服務(wù)提供者
整個測試過程不需要修改服務(wù)提供者的代碼,所以,先啟動服務(wù)提供者,啟動bhrpc-test-provider工程下的io.binghe.rpc.test.provider.single.RpcSingleServerTest,輸出的結(jié)果信息如下所示。
INFO BaseServer:82 - Server started on 127.0.0.1:27880
可以看到,服務(wù)提供者啟動成功。
2.測試同步調(diào)用
(1)修改同步調(diào)用的main()方法
修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,源碼如下所示。
public static void main(String[] args) throws Exception {
RpcConsumer consumer = RpcConsumer.getInstance();
RPCFuture future = consumer.sendRequest(getRpcRequestProtocol());
LOGGER.info("從服務(wù)消費者獲取到的數(shù)據(jù)===>>>" + future.get());
consumer.close();
}
可以看到,同步調(diào)用時,會直接回去方法調(diào)用的結(jié)果數(shù)據(jù)。
(2)啟動服務(wù)消費者
啟動bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,輸出的結(jié)果信息如下所示。
13:45:12,576 INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.
13:45:12,693 INFO RpcConsumerHandler:90 - 服務(wù)消費者發(fā)送的數(shù)據(jù)===>>>{"body":{"async":false,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":false,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}
13:45:12,868 INFO RpcConsumerHandler:77 - 服務(wù)消費者接收到的數(shù)據(jù)===>>>{"body":{"async":false,"oneway":false,"result":"hello binghe"},"header":{"magic":16,"msgLen":211,"msgType":2,"requestId":1,"serializationType":"jdk","status":0}}
13:45:12,869 INFO RpcConsumerHandlerTest:38 - 從服務(wù)消費者獲取到的數(shù)據(jù)===>>>hello binghe
可以看到,在服務(wù)消費者輸出的信息中,除了向服務(wù)提供者發(fā)送的數(shù)據(jù)與接收服務(wù)提供者響應(yīng)的數(shù)據(jù)外,還在RpcConsumerHandlerTest類的main()方法中打印出了通過自定義的RPCFuture對象獲取的最終結(jié)果數(shù)據(jù)為hello binghe。符合預(yù)期的效果。
(3)再次查看服務(wù)提供者日志
再次查看服務(wù)提供者輸出的日志信息,如下所示。
13:45:12,748 INFO RpcProviderHandler:132 - use cglib reflect type invoke method...
13:45:12,748 INFO ProviderDemoServiceImpl:33 - 調(diào)用hello方法傳入的參數(shù)為===>>>binghe
可以看到,服務(wù)提供者使用CGLib的方式調(diào)用了真實的方法。
3.測試異步調(diào)用
(1)修改同步調(diào)用的main()方法
修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,源碼如下所示。
public static void main(String[] args) throws Exception {
RpcConsumer consumer = RpcConsumer.getInstance();
consumer.sendRequest(getRpcRequestProtocol());
RPCFuture future = RpcContext.getContext().getRPCFuture();
LOGGER.info("從服務(wù)消費者獲取到的數(shù)據(jù)===>>>" + future.get());
consumer.close();
}
可以看到,執(zhí)行異步調(diào)用時,并沒有從調(diào)用consumer的sendRequest()方法直接獲取返回的RPCFuture結(jié)果數(shù)據(jù),而是通過RpcContext上下文獲取到RPCFuture對象,再由RPCFuture對象獲取結(jié)果數(shù)據(jù)。
(2)修改構(gòu)建RpcProtocol對象的方法
修改getRpcRequestProtocol()方法中構(gòu)建RpcRequest的方法參數(shù),將是否是異步調(diào)用的參數(shù)設(shè)置為true,源碼如下所示。
private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){
//模擬發(fā)送數(shù)據(jù)
RpcProtocol<RpcRequest> protocol = new RpcProtocol<RpcRequest>();
//################省略其他代碼##########################
request.setAsync(true);
request.setOneway(false);
protocol.setBody(request);
return protocol;
}
(3)啟動服務(wù)消費者
啟動bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,輸出的結(jié)果信息如下所示
13:47:55,800 INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.
13:47:55,905 INFO RpcConsumerHandler:90 - 服務(wù)消費者發(fā)送的數(shù)據(jù)===>>>{"body":{"async":true,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":false,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}
13:47:55,971 INFO RpcConsumerHandler:77 - 服務(wù)消費者接收到的數(shù)據(jù)===>>>{"body":{"async":true,"oneway":false,"result":"hello binghe"},"header":{"magic":16,"msgLen":211,"msgType":2,"requestId":1,"serializationType":"jdk","status":0}}
13:47:55,971 INFO RpcConsumerHandlerTest:40 - 從服務(wù)消費者獲取到的數(shù)據(jù)===>>>hello binghe
可以看到,在服務(wù)消費者輸出的信息中,除了向服務(wù)提供者發(fā)送的數(shù)據(jù)與接收服務(wù)提供者響應(yīng)的數(shù)據(jù)外,還在RpcConsumerHandlerTest類的main()方法中打印出了通過自定義的RPCFuture對象獲取的最終結(jié)果數(shù)據(jù)為hello binghe。符合預(yù)期的效果。
(4)再次查看服務(wù)提供者日志
再次查看服務(wù)提供者輸出的日志信息,如下所示。
13:47:55,948 INFO RpcProviderHandler:132 - use cglib reflect type invoke method...
13:47:55,948 INFO ProviderDemoServiceImpl:33 - 調(diào)用hello方法傳入的參數(shù)為===>>>binghe
可以看到,服務(wù)提供者使用CGLib的方式調(diào)用了真實的方法。
4.測試單向調(diào)用
(1)修改同步調(diào)用的main()方法
修改bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,源碼如下所示。
public static void main(String[] args) throws Exception {
RpcConsumer consumer = RpcConsumer.getInstance();
consumer.sendRequest(getRpcRequestProtocol());
LOGGER.info("無需獲取返回的結(jié)果數(shù)據(jù)");
consumer.close();
}
可以看到,在單向調(diào)用中,并沒有獲取返回結(jié)果。
(2)修改構(gòu)建RpcProtocol對象的方法
修改getRpcRequestProtocol()方法中構(gòu)建RpcRequest的方法參數(shù),將是否是單向調(diào)用的參數(shù)設(shè)置為true,源碼如下所示。
private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){
//模擬發(fā)送數(shù)據(jù)
//#############省略其他代碼#################
request.setAsync(false);
request.setOneway(true);
protocol.setBody(request);
return protocol;
}
(3)啟動服務(wù)消費者
啟動bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest類的main()方法,輸出的結(jié)果信息如下所示。
13:58:26,417 INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.
13:58:26,524 INFO RpcConsumerHandler:90 - 服務(wù)消費者發(fā)送的數(shù)據(jù)===>>>{"body":{"async":false,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":true,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}
13:58:26,531 INFO RpcConsumerHandlerTest:39 - 無需獲取返回的結(jié)果數(shù)據(jù)
可以看到,服務(wù)消費者向服務(wù)提供者發(fā)送數(shù)據(jù)后,并沒有獲取返回的結(jié)果數(shù)據(jù)。
(4)再次查看服務(wù)提供者日志
再次查看服務(wù)提供者輸出的日志信息,如下所示。
13:58:26,565 INFO RpcProviderHandler:132 - use cglib reflect type invoke method...
13:58:26,566 INFO ProviderDemoServiceImpl:33 - 調(diào)用hello方法傳入的參數(shù)為===>>>binghe
可以看到,服務(wù)提供者使用CGLib的方式調(diào)用了真實的方法。
六、總結(jié)
目前實現(xiàn)的RPC框架以Java原生進程的方式啟動后,能夠?qū)崿F(xiàn)服務(wù)消費者以同步、異步和單向調(diào)用的方式與服務(wù)提供者之間進行數(shù)據(jù)交互。至此,我們寫的RPC框架的功能又進一步得到了增強。
我們寫的RPC框架正在一步步實現(xiàn)它該有的功能。