自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

《一起玩Dubbo》系列四之服務如何被調用

開發(fā) 架構
了解過rpc的大概都聽過,rpc就是為了解決遠程方法的本地調用的難題的,其實說穿了,就是為了解決方法在被調用到遠程服被執(zhí)行的流程問題,那么這個流程到底是怎么樣的呢?

[[411936]]

了解過rpc的大概都聽過,rpc就是為了解決遠程方法的本地調用的難題的,其實說穿了,就是為了解決方法在被調用到遠程服被執(zhí)行的流程問題,那么這個流程到底是怎么樣的呢?

同樣的,我繼續(xù)在 dubbo流程圖 中繼續(xù)繪畫我的流程

首先是根據(jù)文章一起玩dubbo,先入個門搭建起demo,包括注冊中心、服務消費方和服務提供方,接下來來擼擼整個過程

這邊為了方便解說,先直接給個demo

這是服務提供方

  1. public class DemoServiceImpl implements DemoService { 
  2.  
  3.     @Override 
  4.     public String sayHello(String name) { 
  5.         System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress()); 
  6.         return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress(); 
  7.     } 

這是服務消費方

  1. public class Consumer { 
  2.  
  3.     public static void main(String[] args) { 
  4.         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring" + 
  5.                 "/dubbo-demo-consumer.xml"}); 
  6.         context.start(); 
  7.         DemoService demoService = (DemoService) context.getBean("demoService"); 
  8.         while (true) { 
  9.             try { 
  10.                 Thread.sleep(1000); 
  11.                 String hello = demoService.sayHello("world"); 
  12.                 System.out.println(hello); 
  13.  
  14.             } catch (Throwable throwable) { 
  15.                 throwable.printStackTrace(); 
  16.             } 
  17.         } 
  18.     } 

我斷點了下這里

image-20210714015553406

走到服務消費方的最底層可以看到

在開始分析細節(jié)之前我們先在大腦風暴下大致流程

一次調用過程需要經歷哪些步驟?

不用看dubbo代碼都可以大概猜到:

  • 要知道遠程服務的地址,
  • 把要調用的方法的具體信息告訴遠程服務,讓遠程服務解析這些信息
  • 遠程服務根據(jù)這些信息找到對應的實現(xiàn)類,進行調用,調用完了
  • 調用結果原路返回,然后客戶端解析響應

第一點,我們通過前幾篇文章已經知道,消費方在發(fā)起調用的時候已經知曉了遠程服務的地址

那么要調用的方法的具體信息包括哪些呢?

客戶端肯定要告訴服務方調用的哪個接口,所以需要方法名、方法的參數(shù)類型、方法的參數(shù)值,然后有可能存在多個版本的情況,所以還得帶上版本號,有這些數(shù)據(jù)后,服務方就可以精準的調用具體的方法了。

我這邊將上面調用的例子先貼出來

mdata也就是我上面說的那些數(shù)據(jù)。

看到這個Request這里,應該就清楚了遠程調用的基本原理了。

這個時候很容易就想到另一個問題,消費方和提供方是如何通信的?

消費方和提供方如何通信?

其實很簡單,就是消費方和提供方通過協(xié)議進行了通信罷了,dubbo的協(xié)議屬于很常見的header+body 形式,而且也有特殊的字符 0xdabb,用來解決 TCP 網(wǎng)絡粘包問題的。這種header是固定長度的,然后header里面填寫 body 的長度是比較常見的做法,包括我司的游戲框架也是用這種模式。

我們可以看看dubbo協(xié)議的鬼樣

可以看到,協(xié)議分為協(xié)議頭和協(xié)議體,16 字節(jié)的頭部主要攜帶了魔法數(shù),也就是之前說的 0xdabb,然后一些請求的設置,消息體的長度等等,16 字節(jié)之后就是協(xié)議體了,包括協(xié)議版本、接口名字、接口版本、方法名字等等。

看到這里又很容易的引申出另一個問題了,協(xié)議是如何序列化的?

協(xié)議的序列化?

序列化的概念其實也簡答, 在消費方先把Java對象轉換為字節(jié)序列,這個過程也被稱為對象的序列化,然后在服務方又把字節(jié)序列恢復為Java對象,這個過程稱為對象的反序列化。

dubbo默認使用的是 hessian2 序列化協(xié)議,hessian2是阿里對于hessian進行行了修改的版本,應該還不錯。

大致總結下,消費方發(fā)起調用,在那一刻,實際調用的是代理類,代理類最終調用的是Client,Client將 Java 的對象序列化生成協(xié)議體,然后通過網(wǎng)絡傳輸給服務方,服務方Server接到這個請求之后,分發(fā)給業(yè)務線程池,由業(yè)務線程調用具體的實現(xiàn)方法。

先see see官網(wǎng)圖吧

分析下消費方的調用鏈路

我們先看看服務消費方的調用邏輯,大家可以對著我這張圖來

好了,我繼續(xù)說

可以看到調用的接口生成的代理類是

而在invoke的時候會先釋放掉部分不需要攔截的方法啦,比如toString什么的,這樣正常吧,這些方法確實不需要攔截的嘛

看看RpcInvocation是什么

可以看到生成的 RpcInvocation 包含了方法名、參數(shù)類和參數(shù)值什么的。

接下來往里進一步看看MockClusterInvoker#invoke 代碼,先解釋下為啥會進來了MockClusterInvoker,看過文章 想學dubbo的看過來,2萬字整理服務引入流程 應該可以理解這個過程,這個過程可以認為是套娃吧,A套B,B套C,一直套到最外層的invoker就是MockClusterInvoker,如果不理解這個過程可以往回看我的文章,很肝卻很實用

這里可以看到就是判斷配置里面有沒有配置mock,mock 的話后續(xù)再展開說,繼續(xù)看看this.invoker.invoke 的實現(xiàn),實際上會調用 AbstractClusterInvoker#invoker

這里倒是涉及到了一個模板方法的設計模式,其實很簡單,就是在抽象類中定好代碼的執(zhí)行骨架,之后將具體的實現(xiàn)延遲到子類中,由子類來決定邏輯,這樣可以在不改變整體執(zhí)行步驟的情況下修改步驟里面的實現(xiàn),減少了重復的代碼,也利于擴展,符合了開閉原則。

接下來看看

做了啥,這一步算是比較重要吧,單獨拎出來講講

這里其實就是先由路由過濾一波,然后返回invoker

繼續(xù)看看doInvoke的流程,我們默認使用的是 FailoverClusterInvoker,也就是失敗自動切換的容錯方式,

這里說說為啥默認是這個哦,其實從實際應用上來說,失敗后自動切換下個服務實例還是比較符合場景的,如果想替換其他模式可以在xml里邊配置

那我們繼續(xù)看看那doInvoke的實現(xiàn)

  1. public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { 
  2.     List<Invoker<T>> copyinvokers = invokers; 
  3.     checkInvokers(copyinvokers, invocation); 
  4.     String methodName = RpcUtils.getMethodName(invocation); 
  5.     int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; 
  6.     if (len <= 0) { 
  7.         len = 1; 
  8.     } 
  9.     // retry loop. 
  10.     RpcException le = null; // last exception. 
  11.     List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. 
  12.     Set<String> providers = new HashSet<String>(len); 
  13.     // 重試次數(shù) 
  14.     for (int i = 0; i < len; i++) { 
  15.         //Reselect before retry to avoid a change of candidate `invokers`. 
  16.         //NOTE: if `invokers` changed, then `invoked` also lose accuracy. 
  17.         if (i > 0) { 
  18.             checkWhetherDestroyed(); 
  19.             copyinvokers = list(invocation); 
  20.             // check again 
  21.             checkInvokers(copyinvokers, invocation); 
  22.         } 
  23.         // 通過負載選擇了一個invoker 
  24.         Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); 
  25.         invoked.add(invoker); 
  26.         // 上下文保留了調用過的invoker 
  27.         RpcContext.getContext().setInvokers((List) invoked); 
  28.         try { 
  29.             // 發(fā)起調用 
  30.             Result result = invoker.invoke(invocation); 
  31.             if (le != null && logger.isWarnEnabled()) { 
  32.                 logger.warn("Although retry the method " + methodName 
  33.                         + " in the service " + getInterface().getName() 
  34.                         + " was successful by the provider " + invoker.getUrl().getAddress() 
  35.                         + ", but there have been failed providers " + providers 
  36.                         + " (" + providers.size() + "/" + copyinvokers.size() 
  37.                         + ") from the registry " + directory.getUrl().getAddress() 
  38.                         + " on the consumer " + NetUtils.getLocalHost() 
  39.                         + " using the dubbo version " + Version.getVersion() + ". Last error is: " 
  40.                         + le.getMessage(), le); 
  41.             } 
  42.             return result; 
  43.         } catch (RpcException e) { 
  44.             if (e.isBiz()) { // biz exception. 
  45.                 throw e; 
  46.             } 
  47.             le = e; 
  48.         } catch (Throwable e) { 
  49.             le = new RpcException(e.getMessage(), e); 
  50.         } finally { 
  51.             providers.add(invoker.getUrl().getAddress()); 
  52.         } 
  53.     } 
  54.     throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " 
  55.             + methodName + " in the service " + getInterface().getName() 
  56.             + ". Tried " + len + " times of the providers " + providers 
  57.             + " (" + providers.size() + "/" + copyinvokers.size() 
  58.             + ") from the registry " + directory.getUrl().getAddress() 
  59.             + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " 
  60.             + Version.getVersion() + ". Last error is: " 
  61.             + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); 

這個調用稍微總結一下就是FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表,并且經過路由之后,通過LoadBalance 從 Invoker 列表中選擇一個 Invoker,也就是負載均衡啦,最后FailoverClusterInvoker會將參數(shù)傳給選擇出的那個 Invoker 實例的 invoke 方法,進行真正的遠程調用。

后面發(fā)起調用的這個 invoke 又是調用抽象類中的 invoke 然后再調用子類的 doInvoker,抽象類中的方法很簡單我就不展示了,我們直接看子類 DubboInvoker 的 doInvoke 方法。

  1. protected Result doInvoke(final Invocation invocation) throws Throwable { 
  2.     RpcInvocation inv = (RpcInvocation) invocation; 
  3.     final String methodName = RpcUtils.getMethodName(invocation); 
  4.     inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); 
  5.     inv.setAttachment(Constants.VERSION_KEY, version); 
  6.  
  7.     ExchangeClient currentClient; 
  8.     // 選擇client 
  9.     if (clients.length == 1) { 
  10.         currentClient = clients[0]; 
  11.     } else { 
  12.         currentClient = clients[index.getAndIncrement() % clients.length]; 
  13.     } 
  14.     try { 
  15.         // 是否異步 
  16.         boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); 
  17.         // 是否oneway方式發(fā)送,也就是需不需要返回值 
  18.         boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); 
  19.         // 超時時間 
  20.         int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 
  21.        // 不需要返回值 
  22.         if (isOneway) { 
  23.             boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); 
  24.             // 協(xié)議發(fā)送 
  25.             currentClient.send(inv, isSent); 
  26.             // future直接是Null 
  27.             RpcContext.getContext().setFuture(null); 
  28.             // 返回空的結果 
  29.             return new RpcResult(); 
  30.         } else if (isAsync) { 
  31.             // 異步發(fā)送 
  32.             ResponseFuture future = currentClient.request(inv, timeout); 
  33.             // 設置future 
  34.             RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); 
  35.             // 返回空結果 
  36.             return new RpcResult(); 
  37.         } else { 
  38.             // 同步發(fā)送 
  39.             RpcContext.getContext().setFuture(null); 
  40.             // 直接調用了future.get去等待 
  41.             return (Result) currentClient.request(inv, timeout).get(); 
  42.         } 
  43.     } catch (TimeoutException e) { 
  44.         throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); 
  45.     } catch (RemotingException e) { 
  46.         throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); 
  47.     } 

這里可以看到調用的方式有三種,分別是 oneway、異步、同步,我分別說說

  • oneway是比較常見的方式了,就是當我們不關心請求是否發(fā)送成功的情況下,就用 oneway 的方式發(fā)送,這種方式消耗最小。
  • 異步調用,我們可以看到其實 Dubbo 天然支持異步的,client 發(fā)送請求之后會得到一個 ResponseFuture,然后把 future 包裝一下塞到上下文中,這樣用戶就可以從上下文中拿到這個 future,然后調用方可以做了一波操作之后再調用 future.whenComplete什么的異步做點什么。
  • 同步調用,Dubbo 底層也幫我們做了,可以看到在 Dubbo 源碼中就調用了 future.get,所以給我們的感覺就是我調用了這個接口的方法之后就阻塞住了,必須要等待結果到了之后才能返回,所以就是同步的。

那么這個回調是怎么做的?

其實很簡單的,就是在調用的時候生成一個唯一的id,將回調和這個id緩存起來,然后將這個id傳遞到服務方,服務方在處理好業(yè)務后將結果和這個id重新發(fā)回到消費方,消費方拿到回調觸發(fā)即可。

我們看看代碼層面的

看看DefaultFuture是什么

看到啦,里邊生成了唯一id,然后放到FUTURES這個并發(fā)容器里邊,我們看看用的地方

這里比較清楚了吧,在收到返回的協(xié)議后將future拿出來去觸發(fā),基于這種思路,很多做回調的都可以用這種設計思路。

到這里服務消費方怎么去觸發(fā)rpc的這個行為基本上就到這了,其實還是很清晰的,先是起服訂閱的時候層層封裝了invoker,然后搞出了一個代理對象注入到我們的接口中,然后在調用接口的時候就一個個調用invoker啦,最后就是發(fā)協(xié)議給服務提供方。

愛了愛了,簡單清晰的邏輯。

接下來說說服務提供方的調用流程。

分析下提供方的調用電路

同樣的,我們先看看服務提供方的調用鏈

這個流程也是特別長的,我這邊只拎幾個重點出來,先看下HeaderExchangeHandler,handleRequest

這里很容易理解啦,就是把request對象中的data取出來傳到DubboProtocol.requestHandler中,這個data就是前面的解碼后的DecodeableRpcInvocation對象它是Invocation接口的一個實現(xiàn),我們可以看看里邊有啥

可以看到調用信息都在這里啦,接下來就簡單了,根據(jù)這些參數(shù)拿到對應的對象反射調用下就可以了,接下來看看DubboProtocol比較核心的reply方法

  1. @Override 
  2. public Object reply(ExchangeChannel channel, Object message) throws RemotingException { 
  3.     if (message instanceof Invocation) { 
  4.         Invocation inv = (Invocation) message; 
  5.         // 根據(jù)調用的參數(shù)拿到對應的invoker,其實就是之前服務暴露的時候有說過的Exporter里邊取的 
  6.         Invoker<?> invoker = getInvoker(channel, inv); 
  7.         // 這里邊是對callback回來的一些處理,先不管 
  8.         if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { 
  9.             String methodsStr = invoker.getUrl().getParameters().get("methods"); 
  10.             boolean hasMethod = false
  11.             if (methodsStr == null || methodsStr.indexOf(",") == -1) { 
  12.                 hasMethod = inv.getMethodName().equals(methodsStr); 
  13.             } else { 
  14.                 String[] methods = methodsStr.split(","); 
  15.                 for (String method : methods) { 
  16.                     if (inv.getMethodName().equals(method)) { 
  17.                         hasMethod = true
  18.                         break; 
  19.                     } 
  20.                 } 
  21.             } 
  22.             if (!hasMethod) { 
  23.                 logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() 
  24.                         + " not found in callback service interface ,invoke will be ignored." 
  25.                         + " please update the api interface. url is:" 
  26.                         + invoker.getUrl()) + " ,invocation is :" + inv); 
  27.                 return null
  28.             } 
  29.         } 
  30.         RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); 
  31.         // 最后invoke一下啦 
  32.         return invoker.invoke(inv); 
  33.     } 
  34.     throw new RemotingException(channel, "Unsupported request: " 
  35.             + (message == null ? null : (message.getClass().getName() + ": " + message)) 
  36.             + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); 

getInvoker的邏輯也簡單,之前的文章服務暴露有說過這個過程啦,其實就是從一個DubboProtocol.exporterMap內找到一個Exporter,再從里邊取出invoker,那么key是啥呢,key其實是由URL生成的serviceKey,此時通過Invocation中的信息就可還原該serviceKey并且找到對應的Exporter和Invoker。再看看之前提過的 JavassistProxyFactory,這是一個給提供方的服務對象生成代理的工廠類

這個也說過啦,調用invoker.invoke時,通過反射調用最終的服務實現(xiàn)執(zhí)行相關邏輯,入口就是這里了。因為這塊之前的文章比較詳細的說過,這里就不重復了。

到了這一步,調用就已經技術了,我們再看看調用結束后怎么將結果返回給服務消費方。

調用結束后,服務提供方方就會創(chuàng)建一個Response對象返回給服務消費方,那么自然在執(zhí)行服務實現(xiàn)時會出現(xiàn)兩種結果:成功和失敗

如果成功的話,則把返回值設置到Response的result中,Response的status設置成OK

如果失敗,把失敗異常設置到Response的errorMessage中,status設置成SERVICE_ERROR

我們會回到HeaderExchangeHandler.received中的代碼來看看,在handleRequest之后,調用channel.send把Response發(fā)送到客戶端,這個channel封裝客戶端-服務端通信鏈路,最終會調用Netty框架,把響應寫回到客戶端。

慣例總結下

終于將調用這個過程說完啦,其實思路還是比較清晰的,不過最好是自己全程斷點細看下啦,可以學到很多東西的。

說說后續(xù)安排:

  • SPI
  • dubbo中的AOP機制
  • 服務治理
  • ....

 

 

 

等好幾個模塊,最后就是帶大家擼一個RPC框架了,還是那句話,想學dubbo的可以持續(xù)關注這一系列。

 

責任編輯:武曉燕 來源: 稀飯下雪
相關推薦

2021-05-13 06:21:26

Dubbo框架RPC

2021-06-01 08:29:08

dubbo線程池服務暴露

2021-08-27 07:06:09

DubboDocker技術

2023-06-28 07:49:02

2023-03-02 07:44:39

pixijsWebGL

2023-11-13 22:27:53

Mapping數(shù)據(jù)庫

2022-02-23 14:43:50

索引數(shù)據(jù)庫mysql

2013-11-21 10:52:34

NVIDIAIBM超級計算機

2012-11-08 17:33:53

智慧云

2023-11-13 12:48:32

語言DSL

2023-11-30 15:23:07

聚合查詢數(shù)據(jù)分析

2024-07-29 08:24:43

2021-03-04 08:06:17

Redis面試模型

2021-10-20 16:13:05

鴻蒙HarmonyOS應用

2022-02-02 21:17:19

Eslint字段配置

2024-06-07 14:54:55

2022-05-20 12:14:50

ZuulSpringClou

2011-09-07 22:59:07

聯(lián)想一體機

2021-05-17 10:50:15

系統(tǒng)調用內核

2012-06-01 10:39:06

天翼空間Windows Pho
點贊
收藏

51CTO技術棧公眾號