《一起玩Dubbo》系列四之服務如何被調用
了解過rpc的大概都聽過,rpc就是為了解決遠程方法的本地調用的難題的,其實說穿了,就是為了解決方法在被調用到遠程服被執(zhí)行的流程問題,那么這個流程到底是怎么樣的呢?
同樣的,我繼續(xù)在 dubbo流程圖 中繼續(xù)繪畫我的流程
首先是根據(jù)文章一起玩dubbo,先入個門搭建起demo,包括注冊中心、服務消費方和服務提供方,接下來來擼擼整個過程
這邊為了方便解說,先直接給個demo
這是服務提供方
- public class DemoServiceImpl implements DemoService {
- @Override
- public String sayHello(String name) {
- System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
- return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
- }
- }
這是服務消費方
- public class Consumer {
- public static void main(String[] args) {
- ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring" +
- "/dubbo-demo-consumer.xml"});
- context.start();
- DemoService demoService = (DemoService) context.getBean("demoService");
- while (true) {
- try {
- Thread.sleep(1000);
- String hello = demoService.sayHello("world");
- System.out.println(hello);
- } catch (Throwable throwable) {
- throwable.printStackTrace();
- }
- }
- }
- }
我斷點了下這里
image-20210714015553406
走到服務消費方的最底層可以看到
在開始分析細節(jié)之前我們先在大腦風暴下大致流程
一次調用過程需要經歷哪些步驟?
不用看dubbo代碼都可以大概猜到:
- 要知道遠程服務的地址,
- 把要調用的方法的具體信息告訴遠程服務,讓遠程服務解析這些信息
- 遠程服務根據(jù)這些信息找到對應的實現(xiàn)類,進行調用,調用完了
- 調用結果原路返回,然后客戶端解析響應
第一點,我們通過前幾篇文章已經知道,消費方在發(fā)起調用的時候已經知曉了遠程服務的地址
那么要調用的方法的具體信息包括哪些呢?
客戶端肯定要告訴服務方調用的哪個接口,所以需要方法名、方法的參數(shù)類型、方法的參數(shù)值,然后有可能存在多個版本的情況,所以還得帶上版本號,有這些數(shù)據(jù)后,服務方就可以精準的調用具體的方法了。
我這邊將上面調用的例子先貼出來
mdata也就是我上面說的那些數(shù)據(jù)。
看到這個Request這里,應該就清楚了遠程調用的基本原理了。
這個時候很容易就想到另一個問題,消費方和提供方是如何通信的?
消費方和提供方如何通信?
其實很簡單,就是消費方和提供方通過協(xié)議進行了通信罷了,dubbo的協(xié)議屬于很常見的header+body 形式,而且也有特殊的字符 0xdabb,用來解決 TCP 網(wǎng)絡粘包問題的。這種header是固定長度的,然后header里面填寫 body 的長度是比較常見的做法,包括我司的游戲框架也是用這種模式。
我們可以看看dubbo協(xié)議的鬼樣
可以看到,協(xié)議分為協(xié)議頭和協(xié)議體,16 字節(jié)的頭部主要攜帶了魔法數(shù),也就是之前說的 0xdabb,然后一些請求的設置,消息體的長度等等,16 字節(jié)之后就是協(xié)議體了,包括協(xié)議版本、接口名字、接口版本、方法名字等等。
看到這里又很容易的引申出另一個問題了,協(xié)議是如何序列化的?
協(xié)議的序列化?
序列化的概念其實也簡答, 在消費方先把Java對象轉換為字節(jié)序列,這個過程也被稱為對象的序列化,然后在服務方又把字節(jié)序列恢復為Java對象,這個過程稱為對象的反序列化。
dubbo默認使用的是 hessian2 序列化協(xié)議,hessian2是阿里對于hessian進行行了修改的版本,應該還不錯。
大致總結下,消費方發(fā)起調用,在那一刻,實際調用的是代理類,代理類最終調用的是Client,Client將 Java 的對象序列化生成協(xié)議體,然后通過網(wǎng)絡傳輸給服務方,服務方Server接到這個請求之后,分發(fā)給業(yè)務線程池,由業(yè)務線程調用具體的實現(xiàn)方法。
先see see官網(wǎng)圖吧
分析下消費方的調用鏈路
我們先看看服務消費方的調用邏輯,大家可以對著我這張圖來
好了,我繼續(xù)說
可以看到調用的接口生成的代理類是
而在invoke的時候會先釋放掉部分不需要攔截的方法啦,比如toString什么的,這樣正常吧,這些方法確實不需要攔截的嘛
看看RpcInvocation是什么
可以看到生成的 RpcInvocation 包含了方法名、參數(shù)類和參數(shù)值什么的。
接下來往里進一步看看MockClusterInvoker#invoke 代碼,先解釋下為啥會進來了MockClusterInvoker,看過文章 想學dubbo的看過來,2萬字整理服務引入流程 應該可以理解這個過程,這個過程可以認為是套娃吧,A套B,B套C,一直套到最外層的invoker就是MockClusterInvoker,如果不理解這個過程可以往回看我的文章,很肝卻很實用
這里可以看到就是判斷配置里面有沒有配置mock,mock 的話后續(xù)再展開說,繼續(xù)看看this.invoker.invoke 的實現(xiàn),實際上會調用 AbstractClusterInvoker#invoker
這里倒是涉及到了一個模板方法的設計模式,其實很簡單,就是在抽象類中定好代碼的執(zhí)行骨架,之后將具體的實現(xiàn)延遲到子類中,由子類來決定邏輯,這樣可以在不改變整體執(zhí)行步驟的情況下修改步驟里面的實現(xiàn),減少了重復的代碼,也利于擴展,符合了開閉原則。
接下來看看
做了啥,這一步算是比較重要吧,單獨拎出來講講
這里其實就是先由路由過濾一波,然后返回invoker
繼續(xù)看看doInvoke的流程,我們默認使用的是 FailoverClusterInvoker,也就是失敗自動切換的容錯方式,
這里說說為啥默認是這個哦,其實從實際應用上來說,失敗后自動切換下個服務實例還是比較符合場景的,如果想替換其他模式可以在xml里邊配置
那我們繼續(xù)看看那doInvoke的實現(xiàn)
- public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
- List<Invoker<T>> copyinvokers = invokers;
- checkInvokers(copyinvokers, invocation);
- String methodName = RpcUtils.getMethodName(invocation);
- int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
- if (len <= 0) {
- len = 1;
- }
- // retry loop.
- RpcException le = null; // last exception.
- List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
- Set<String> providers = new HashSet<String>(len);
- // 重試次數(shù)
- for (int i = 0; i < len; i++) {
- //Reselect before retry to avoid a change of candidate `invokers`.
- //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
- if (i > 0) {
- checkWhetherDestroyed();
- copyinvokers = list(invocation);
- // check again
- checkInvokers(copyinvokers, invocation);
- }
- // 通過負載選擇了一個invoker
- Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
- invoked.add(invoker);
- // 上下文保留了調用過的invoker
- RpcContext.getContext().setInvokers((List) invoked);
- try {
- // 發(fā)起調用
- Result result = invoker.invoke(invocation);
- if (le != null && logger.isWarnEnabled()) {
- logger.warn("Although retry the method " + methodName
- + " in the service " + getInterface().getName()
- + " was successful by the provider " + invoker.getUrl().getAddress()
- + ", but there have been failed providers " + providers
- + " (" + providers.size() + "/" + copyinvokers.size()
- + ") from the registry " + directory.getUrl().getAddress()
- + " on the consumer " + NetUtils.getLocalHost()
- + " using the dubbo version " + Version.getVersion() + ". Last error is: "
- + le.getMessage(), le);
- }
- return result;
- } catch (RpcException e) {
- if (e.isBiz()) { // biz exception.
- throw e;
- }
- le = e;
- } catch (Throwable e) {
- le = new RpcException(e.getMessage(), e);
- } finally {
- providers.add(invoker.getUrl().getAddress());
- }
- }
- throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
- + methodName + " in the service " + getInterface().getName()
- + ". Tried " + len + " times of the providers " + providers
- + " (" + providers.size() + "/" + copyinvokers.size()
- + ") from the registry " + directory.getUrl().getAddress()
- + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
- + Version.getVersion() + ". Last error is: "
- + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
- }
這個調用稍微總結一下就是FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表,并且經過路由之后,通過LoadBalance 從 Invoker 列表中選擇一個 Invoker,也就是負載均衡啦,最后FailoverClusterInvoker會將參數(shù)傳給選擇出的那個 Invoker 實例的 invoke 方法,進行真正的遠程調用。
后面發(fā)起調用的這個 invoke 又是調用抽象類中的 invoke 然后再調用子類的 doInvoker,抽象類中的方法很簡單我就不展示了,我們直接看子類 DubboInvoker 的 doInvoke 方法。
- protected Result doInvoke(final Invocation invocation) throws Throwable {
- RpcInvocation inv = (RpcInvocation) invocation;
- final String methodName = RpcUtils.getMethodName(invocation);
- inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
- inv.setAttachment(Constants.VERSION_KEY, version);
- ExchangeClient currentClient;
- // 選擇client
- if (clients.length == 1) {
- currentClient = clients[0];
- } else {
- currentClient = clients[index.getAndIncrement() % clients.length];
- }
- try {
- // 是否異步
- boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
- // 是否oneway方式發(fā)送,也就是需不需要返回值
- boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
- // 超時時間
- int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
- // 不需要返回值
- if (isOneway) {
- boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
- // 協(xié)議發(fā)送
- currentClient.send(inv, isSent);
- // future直接是Null
- RpcContext.getContext().setFuture(null);
- // 返回空的結果
- return new RpcResult();
- } else if (isAsync) {
- // 異步發(fā)送
- ResponseFuture future = currentClient.request(inv, timeout);
- // 設置future
- RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
- // 返回空結果
- return new RpcResult();
- } else {
- // 同步發(fā)送
- RpcContext.getContext().setFuture(null);
- // 直接調用了future.get去等待
- return (Result) currentClient.request(inv, timeout).get();
- }
- } catch (TimeoutException e) {
- throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
- } catch (RemotingException e) {
- throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
- }
- }
這里可以看到調用的方式有三種,分別是 oneway、異步、同步,我分別說說
- oneway是比較常見的方式了,就是當我們不關心請求是否發(fā)送成功的情況下,就用 oneway 的方式發(fā)送,這種方式消耗最小。
- 異步調用,我們可以看到其實 Dubbo 天然支持異步的,client 發(fā)送請求之后會得到一個 ResponseFuture,然后把 future 包裝一下塞到上下文中,這樣用戶就可以從上下文中拿到這個 future,然后調用方可以做了一波操作之后再調用 future.whenComplete什么的異步做點什么。
- 同步調用,Dubbo 底層也幫我們做了,可以看到在 Dubbo 源碼中就調用了 future.get,所以給我們的感覺就是我調用了這個接口的方法之后就阻塞住了,必須要等待結果到了之后才能返回,所以就是同步的。
那么這個回調是怎么做的?
其實很簡單的,就是在調用的時候生成一個唯一的id,將回調和這個id緩存起來,然后將這個id傳遞到服務方,服務方在處理好業(yè)務后將結果和這個id重新發(fā)回到消費方,消費方拿到回調觸發(fā)即可。
我們看看代碼層面的
看看DefaultFuture是什么
看到啦,里邊生成了唯一id,然后放到FUTURES這個并發(fā)容器里邊,我們看看用的地方
這里比較清楚了吧,在收到返回的協(xié)議后將future拿出來去觸發(fā),基于這種思路,很多做回調的都可以用這種設計思路。
到這里服務消費方怎么去觸發(fā)rpc的這個行為基本上就到這了,其實還是很清晰的,先是起服訂閱的時候層層封裝了invoker,然后搞出了一個代理對象注入到我們的接口中,然后在調用接口的時候就一個個調用invoker啦,最后就是發(fā)協(xié)議給服務提供方。
愛了愛了,簡單清晰的邏輯。
接下來說說服務提供方的調用流程。
分析下提供方的調用電路
同樣的,我們先看看服務提供方的調用鏈
這個流程也是特別長的,我這邊只拎幾個重點出來,先看下HeaderExchangeHandler,handleRequest
這里很容易理解啦,就是把request對象中的data取出來傳到DubboProtocol.requestHandler中,這個data就是前面的解碼后的DecodeableRpcInvocation對象它是Invocation接口的一個實現(xiàn),我們可以看看里邊有啥
可以看到調用信息都在這里啦,接下來就簡單了,根據(jù)這些參數(shù)拿到對應的對象反射調用下就可以了,接下來看看DubboProtocol比較核心的reply方法
- @Override
- public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
- if (message instanceof Invocation) {
- Invocation inv = (Invocation) message;
- // 根據(jù)調用的參數(shù)拿到對應的invoker,其實就是之前服務暴露的時候有說過的Exporter里邊取的
- Invoker<?> invoker = getInvoker(channel, inv);
- // 這里邊是對callback回來的一些處理,先不管
- if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
- String methodsStr = invoker.getUrl().getParameters().get("methods");
- boolean hasMethod = false;
- if (methodsStr == null || methodsStr.indexOf(",") == -1) {
- hasMethod = inv.getMethodName().equals(methodsStr);
- } else {
- String[] methods = methodsStr.split(",");
- for (String method : methods) {
- if (inv.getMethodName().equals(method)) {
- hasMethod = true;
- break;
- }
- }
- }
- if (!hasMethod) {
- logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
- + " not found in callback service interface ,invoke will be ignored."
- + " please update the api interface. url is:"
- + invoker.getUrl()) + " ,invocation is :" + inv);
- return null;
- }
- }
- RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
- // 最后invoke一下啦
- return invoker.invoke(inv);
- }
- throw new RemotingException(channel, "Unsupported request: "
- + (message == null ? null : (message.getClass().getName() + ": " + message))
- + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
- }
getInvoker的邏輯也簡單,之前的文章服務暴露有說過這個過程啦,其實就是從一個DubboProtocol.exporterMap內找到一個Exporter,再從里邊取出invoker,那么key是啥呢,key其實是由URL生成的serviceKey,此時通過Invocation中的信息就可還原該serviceKey并且找到對應的Exporter和Invoker。再看看之前提過的 JavassistProxyFactory,這是一個給提供方的服務對象生成代理的工廠類
這個也說過啦,調用invoker.invoke時,通過反射調用最終的服務實現(xiàn)執(zhí)行相關邏輯,入口就是這里了。因為這塊之前的文章比較詳細的說過,這里就不重復了。
到了這一步,調用就已經技術了,我們再看看調用結束后怎么將結果返回給服務消費方。
調用結束后,服務提供方方就會創(chuàng)建一個Response對象返回給服務消費方,那么自然在執(zhí)行服務實現(xiàn)時會出現(xiàn)兩種結果:成功和失敗
如果成功的話,則把返回值設置到Response的result中,Response的status設置成OK
如果失敗,把失敗異常設置到Response的errorMessage中,status設置成SERVICE_ERROR
我們會回到HeaderExchangeHandler.received中的代碼來看看,在handleRequest之后,調用channel.send把Response發(fā)送到客戶端,這個channel封裝客戶端-服務端通信鏈路,最終會調用Netty框架,把響應寫回到客戶端。
慣例總結下
終于將調用這個過程說完啦,其實思路還是比較清晰的,不過最好是自己全程斷點細看下啦,可以學到很多東西的。
說說后續(xù)安排:
- SPI
- dubbo中的AOP機制
- 服務治理
- ....
等好幾個模塊,最后就是帶大家擼一個RPC框架了,還是那句話,想學dubbo的可以持續(xù)關注這一系列。