Java 從零開始手寫 RPC基于 Websocket 實現(xiàn)
RPC
解決的問題
RPC 主要是為了解決的兩個問題:
(1)解決分布式系統(tǒng)中,服務(wù)之間的調(diào)用問題。
(2)遠(yuǎn)程調(diào)用時,要能夠像本地調(diào)用一樣方便,讓調(diào)用者感知不到遠(yuǎn)程調(diào)用的邏輯。
這一節(jié)我們來學(xué)習(xí)下如何基于 websocket 實現(xiàn)最簡單的 rpc 調(diào)用,后續(xù)會實現(xiàn)基于 netty4 的版本。
開源地址: https://github.com/houbb/rpc
完整流程

其中左邊的Client,對應(yīng)的就是前面的Service A,而右邊的Server,對應(yīng)的則是Service B。
下面一步一步詳細(xì)解釋一下。
(1)Service A的應(yīng)用層代碼中,調(diào)用了Calculator的一個實現(xiàn)類的add方法,希望執(zhí)行一個加法運算;
(2)這個Calculator實現(xiàn)類,內(nèi)部并不是直接實現(xiàn)計算器的加減乘除邏輯,而是通過遠(yuǎn)程調(diào)用Service B的RPC接口,來獲取運算結(jié)果,因此稱之為Stub;
(3)Stub怎么和Service B建立遠(yuǎn)程通訊呢?這時候就要用到遠(yuǎn)程通訊工具了,也就是圖中的Run-time Library,這個工具將幫你實現(xiàn)遠(yuǎn)程通訊的功能,比如Java的Socket,就是這樣一個庫,當(dāng)然,你也可以用基于Http協(xié)議的HttpClient,或者其他通訊工具類,都可以,RPC并沒有規(guī)定說你要用何種協(xié)議進(jìn)行通訊;
(4)Stub通過調(diào)用通訊工具提供的方法,和Service B建立起了通訊,然后將請求數(shù)據(jù)發(fā)給Service B。需要注意的是,由于底層的網(wǎng)絡(luò)通訊是基于二進(jìn)制格式的,因此這里Stub傳給通訊工具類的數(shù)據(jù)也必須是二進(jìn)制,比如calculator.add(1,2),你必須把參數(shù)值1和2放到一個Request對象里頭(這個Request對象當(dāng)然不只這些信息,還包括要調(diào)用哪個服務(wù)的哪個RPC接口等其他信息),然后序列化為二進(jìn)制,再傳給通訊工具類,這一點也將在下面的代碼實現(xiàn)中體現(xiàn);
(5)二進(jìn)制的數(shù)據(jù)傳到Service B這一邊了,Service B當(dāng)然也有自己的通訊工具,通過這個通訊工具接收二進(jìn)制的請求;
(6)既然數(shù)據(jù)是二進(jìn)制的,那么自然要進(jìn)行反序列化了,將二進(jìn)制的數(shù)據(jù)反序列化為請求對象,然后將這個請求對象交給Service B的Stub處理;
(7)和之前的Service A的Stub一樣,這里的Stub也同樣是個“假玩意”,它所負(fù)責(zé)的,只是去解析請求對象,知道調(diào)用方要調(diào)的是哪個RPC接口,傳進(jìn)來的參數(shù)又是什么,然后再把這些參數(shù)傳給對應(yīng)的RPC接口,也就是Calculator的實際實現(xiàn)類去執(zhí)行。很明顯,如果是Java,那這里肯定用到了反射。
(8)RPC接口執(zhí)行完畢,返回執(zhí)行結(jié)果,現(xiàn)在輪到Service B要把數(shù)據(jù)發(fā)給Service A了,怎么發(fā)?一樣的道理,一樣的流程,只是現(xiàn)在Service B變成了Client,Service A變成了Server而已:Service B反序列化執(zhí)行結(jié)果->傳輸給Service A->Service A反序列化執(zhí)行結(jié)果 -> 將結(jié)果返回給Application,完畢。
簡單實現(xiàn)
假設(shè)服務(wù) A,想調(diào)用服務(wù) B 的一個方法。
因為不在同一個內(nèi)存中,無法直接使用。如何可以實現(xiàn)類似 Dubbo 的功能呢?
這里不需要使用 HTTP 級別的通信,使用 TCP 協(xié)議即可。
common
公用模塊,定義通用對象。
- Rpc 常量
- public interface RpcConstant {
- /**
- * 地址
- */
- String ADDRESS = "127.0.0.1";
- /**
- * 端口號
- */
- int PORT = 12345;
- }
- 請求入?yún)?/li>
- public class RpcCalculateRequest implements Serializable {
- private static final long serialVersionUID = 6420751004355300996L;
- /**
- * 參數(shù)一
- */
- private int one;
- /**
- * 參數(shù)二
- */
- private int two;
- //getter & setter & toString()
- }
- 服務(wù)接口
- public interface Calculator {
- /**
- * 計算加法
- * @param one 參數(shù)一
- * @param two 參數(shù)二
- * @return 返回結(jié)果
- */
- int add(int one, int two);
- }
server
- 服務(wù)接口的實現(xiàn)
- public class CalculatorImpl implements Calculator {
- @Override
- public int add(int one, int two) {
- return one + two;
- }
- }
- 啟動服務(wù)
- public static void main(String[] args) throws IOException {
- Calculator calculator = new CalculatorImpl();
- try (ServerSocket listener = new ServerSocket(RpcConstant.PORT)) {
- System.out.println("Server 端啟動:" + RpcConstant.ADDRESS + ":" + RpcConstant.PORT);
- while (true) {
- try (Socket socket = listener.accept()) {
- // 將請求反序列化
- ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
- Object object = objectInputStream.readObject();
- System.out.println("Request is: " + object);
- // 調(diào)用服務(wù)
- int result = 0;
- if (object instanceof RpcCalculateRequest) {
- RpcCalculateRequest calculateRpcRequest = (RpcCalculateRequest) object;
- result = calculator.add(calculateRpcRequest.getOne(), calculateRpcRequest.getTwo());
- }
- // 返回結(jié)果
- ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
- objectOutputStream.writeObject(result);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
啟動日志:
- Server 端啟動:127.0.0.1:12345
client
•客戶端調(diào)用
- public static void main(String[] args) {
- Calculator calculator = new CalculatorProxy();
- int result = calculator.add(1, 2);
- System.out.println(result);
- }
- 計算的代理類
- public class CalculatorProxy implements Calculator {
- @Override
- public int add(int one, int two) {
- try {
- Socket socket = new Socket(RpcConstant.ADDRESS, RpcConstant.PORT);
- // 將請求序列化
- RpcCalculateRequest calculateRpcRequest = new RpcCalculateRequest(one, two);
- ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
- // 將請求發(fā)給服務(wù)提供方
- objectOutputStream.writeObject(calculateRpcRequest);
- // 將響應(yīng)體反序列化
- ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
- Object response = objectInputStream.readObject();
- if (response instanceof Integer) {
- return (Integer) response;
- } else {
- throw new RuntimeException();
- }
- } catch (IOException | ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
- }
- 調(diào)用日志
client 端
- 3
server 端
- Server 端啟動:127.0.0.1:12345
- Request is: RpcCalculateRequest{one=1, two=2}