自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

最簡最快了解RPC核心流程

開發(fā)
本文主要以Dubbo為例介紹了RPC調(diào)用核心流程,同時,寫了個簡易的RPC調(diào)用代碼。

本文主要以最簡易最快速的方式介紹RPC調(diào)用核心流程,文中以Dubbo為例。同時,會寫一個簡易的RPC調(diào)用代碼,方便理解和記憶核心組件和核心流程。

一、核心思想

RPC調(diào)用過程中,最粗礦的核心組件3個:Registry、Provider、Consumer。最粗礦的流程4個:注冊、訂閱、通知、調(diào)用。最簡單的流程圖就1個:

本文會繼續(xù)細(xì)粒度地拆解以上流程,拆解之前,請牢記這段話:

RPC調(diào)用,不管中間流程多么復(fù)雜,不管代碼多么復(fù)雜,所有的努力也只為做2件事情:

  • 在Consumer端,將ReferenceConfig配置的類轉(zhuǎn)換成Proxy代理。
  • 在Provider端,將ServiceConfig配置的類轉(zhuǎn)換成Proxy代理。

二、核心組件

為了能在Consumer端和Provider端生成各自的Proxy代理,并且發(fā)起調(diào)用和響應(yīng),需要如下核心組件:

(1) Registry:注冊中心,主要是為了實現(xiàn)  Provider接口注冊、Consumer訂閱接口、接口變更通知、接口查找等功能。

(2) Proxy:服務(wù)代理,核心中的核心,一切的努力都是為了生成合適的Proxy服務(wù)代理。

  • Consumer的Proxy:Consumer端根據(jù)ReferenceConfig生成Proxy,此Proxy主要用于找到合適的Provider接口,然后發(fā)起網(wǎng)絡(luò)調(diào)用。
  • Provider的Proxy:Provider端根據(jù)ServiceConfig生成Proxy,此Proxy主要作用是通過類似反射的方法調(diào)用本地代碼,再將結(jié)果返回給Consumer。

(3) Protocol:服務(wù)協(xié)議,它相當(dāng)于一個中間層,用于與注冊中心打交道 和 封裝 RPC 調(diào)用。它在初始化時會創(chuàng)建Client模塊 與 服務(wù)端建立連接,也會生成用于遠(yuǎn)程調(diào)用的Invoker。

(4) Cluster:服務(wù)集群,主要用于路由、負(fù)載均衡、服務(wù)容錯等。

(5) Invoker:服務(wù)調(diào)用者。

  • Consumer的服務(wù)調(diào)用者主要是利用Client模塊發(fā)起遠(yuǎn)程調(diào)用,然后等待Provider返回結(jié)果。
  • Provider的服務(wù)調(diào)用者主要是根據(jù)接收到的消息利用反射生成本地代理,然后執(zhí)行方法,再將結(jié)果返回到Consumer。

(6) Client:客戶端模塊,默認(rèn)是Netty實現(xiàn),主要用于客戶端和服務(wù)端通訊(主要是服務(wù)調(diào)用),比如將請求的接口、參數(shù)、請求ID等封裝起來發(fā)給Server端。

(7) Server:服務(wù)端模擬,默認(rèn)是Netty實現(xiàn)。主要是用于客戶端和服務(wù)端通訊。

三、核心流程

1.Consumer流程

(1) 流程

Consumer的流程實際上就是一個從ReferenceConfig 生成Proxy代理的過程。核心事情由Protocol完成。

  • 根據(jù)ReferenceConfig生成代理
  • 注冊到注冊中心、訂閱注冊中心事件
  • 建立NettyClient,并且與NettyServer建立連接
  • 生成客戶端的ClientInvoker
  • 選擇負(fù)載均衡和集群容錯
  • ClientInvoker發(fā)起網(wǎng)絡(luò)調(diào)用和等待結(jié)果

(2) 流程圖:

2.Provider流程

(1) 流程

Provider的流程實際上就是個從ServiceConfig生成Proxy代理的過程。核心事情由PorxyProtocol完成。

  • 根據(jù)ServiceConfig生成本地代理
  • 注冊到注冊中心
  • 啟動NettyServer等待客戶端連接
  • 生成服務(wù)端Invoker
  • Invoker監(jiān)聽調(diào)用請求
  • 接收到請求后新建任務(wù)丟入到線程池去執(zhí)行
  • 執(zhí)行時會生成本地代理執(zhí)行(比如通過反射去調(diào)用具體的方法),再將返回結(jié)果寫出去

(2) 流程圖:

3.整體流程圖

四、簡易代碼實現(xiàn)

1.核心代碼介紹

(1) 客戶端Proxy:

/**
 * 獲取代理Service
 */
@SuppressWarnings("unchecked")
public <T> T getService(Class clazz) throws Exception {

    return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();

            if ("equals".equals(methodName) || "hashCode".equals(methodName)) {
                throw new IllegalAccessException("不能訪問" + methodName + "方法");
            }
            if ("toString".equals(methodName)) {
                return clazz.getName() + "#" + methodName;
            }

            List<RegistryInfo> registryInfoList = interfaceMethodsRegistryInfoMap.get(clazz);
            if (registryInfoList == null) {
                throw new RuntimeException("無法找到對應(yīng)的服務(wù)提供者");
            }

            LoadBalancer loadBalancer = new RandomLoadBalancer();
            RegistryInfo registryInfo = loadBalancer.choose(registryInfoList);

            ChannelHandlerContext ctx = registryChannelMap.get(registryInfo);

            String identity = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
            String requestId;

            synchronized (ProxyProtocol.this) {
                requestIdWorker.increment();
                requestId = String.valueOf(requestIdWorker.longValue());
            }

            ClientInvoker clientInvoker = new DefaultClientInvoker(method.getReturnType(), ctx, requestId, identity);

            inProcessInvokerMap.put(identity + "#" + requestId, clientInvoker);

            return clientInvoker.invoke(args);
        }
    });
}

(2) 服務(wù)端Proxy

private class RpcInvokerTask implements Runnable {
    private RpcRequest rpcRequest;

    public RpcInvokerTask(RpcRequest rpcRequest) {
        this.rpcRequest = rpcRequest;
    }

    @Override
    public void run() {
        try {
            ChannelHandlerContext ctx = rpcRequest.getCtx();
            String interfaceIdentity = rpcRequest.getInterfaceIdentity();
            String requestId = rpcRequest.getRequestId();
            Map<String, Object> parameterMap = rpcRequest.getParameterMap();

            //interfaceIdentity組成:接口類+方法+參數(shù)類型
            Map<String, String> interfaceIdentityMap = string2Map(interfaceIdentity);

            //拿出是哪個類
            String interfaceName = interfaceIdentityMap.get("interface");
            Class interfaceClass = Class.forName(interfaceName);
            Object o = interfaceInstanceMap.get(interfaceClass);

            //拿出是哪個方法
            Method method = interfaceMethodMap.get(interfaceIdentity);

            //反射執(zhí)行
            Object result = null;
            String parameterStr = interfaceIdentityMap.get("parameter");
            if (parameterStr != null && parameterStr.length() > 0) {
                String[] parameterTypeClasses = parameterStr.split(",");//接口方法參數(shù)參數(shù)可能有多個,用,號隔開
                Object[] parameterInstance = new Object[parameterTypeClasses.length];
                for (int i = 0; i < parameterTypeClasses.length; i++) {
                    parameterInstance[i] = parameterMap.get(parameterTypeClasses[i]);
                }
                result = method.invoke(o, parameterInstance);
            } else {
                result = method.invoke(o);
            }

            //將結(jié)果封裝成rcpResponse
            RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);

            //ctx返回執(zhí)行結(jié)果
            String resultStr = JSONObject.toJSONString(rpcResponse) + DELIMITER_STR;

            ByteBuf byteBuf = Unpooled.copiedBuffer(resultStr.getBytes());
            ctx.writeAndFlush(byteBuf);

            System.out.println("響應(yīng)給客戶端:" + resultStr);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

(3) Protocol

public ProxyProtocol(String registryUrl, List<ServiceConfig> serviceConfigList, List<ReferenceConfig> referenceConfigList, int port) throws Exception {
    this.serviceConfigList = serviceConfigList == null ? new ArrayList<>() : serviceConfigList;
    this.registryUrl = registryUrl;
    this.port = port;
    this.referenceConfigList = referenceConfigList == null ? new ArrayList<>() : referenceConfigList;

    //1、初始化注冊中心
    initRegistry(this.registryUrl);

    //2、將服務(wù)注冊到注冊中心
    InetAddress addr = InetAddress.getLocalHost();
    String hostName = addr.getHostName();
    String hostAddr = addr.getHostAddress();
    registryInfo = new RegistryInfo(hostName, hostAddr, this.port);
    doRegistry(registryInfo);

    //3、初始化nettyServer,啟動nettyServer
    if (!this.serviceConfigList.isEmpty()) {
        nettyServer = new NettyServer(this.serviceConfigList, this.interfaceMethodMap);
        nettyServer.init(this.port);
    }

    //如果是客戶端引用啟動,則初始化處理線程
    if (!this.referenceConfigList.isEmpty()) {
        initProcessor();
    }
}

(4) 客戶端Invoker

@Override
public T invoke(Object[] args) {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("interfaces", identity);

    JSONObject param = new JSONObject();
    if (args != null) {
        for (Object obj : args) {
            param.put(obj.getClass().getName(), obj);
        }
    }
    jsonObject.put("parameter", param);
    jsonObject.put("requestId", requestId);
    String msg = jsonObject.toJSONString() + Constants.DELIMITER_STR;
    System.out.println("發(fā)送給服務(wù)端JSON為:" + msg);

    ByteBuf byteBuf = Unpooled.copiedBuffer(msg.getBytes());
    ctx.writeAndFlush(byteBuf);

    wait4Result();

    return result;
}

private void wait4Result() {
    synchronized (this) {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Override
public void setResult(String result) {
    synchronized (this) {
        this.result = (T) JSONObject.parseObject(result, returnType);
        notifyAll();
    }
}

(5) 服務(wù)端Invoker

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String message = (String) msg;
    System.out.println("提供者收到消息:" + message);
    //解析消費者發(fā)來的消息
    RpcRequest rpcRequest = RpcRequest.parse(message, ctx);
    //接受到消息,啟動線程池處理消費者發(fā)過來的請求
    threadPoolExecutor.execute(new RpcInvokerTask(rpcRequest));
}

/**
 * 處理消費者發(fā)過來的請求
 */
private class RpcInvokerTask implements Runnable {
    private RpcRequest rpcRequest;

    public RpcInvokerTask(RpcRequest rpcRequest) {
        this.rpcRequest = rpcRequest;
    }

    @Override
    public void run() {
        try {
            ChannelHandlerContext ctx = rpcRequest.getCtx();
            String interfaceIdentity = rpcRequest.getInterfaceIdentity();
            String requestId = rpcRequest.getRequestId();
            Map<String, Object> parameterMap = rpcRequest.getParameterMap();

            //interfaceIdentity組成:接口類+方法+參數(shù)類型
            Map<String, String> interfaceIdentityMap = string2Map(interfaceIdentity);

            //拿出是哪個類
            String interfaceName = interfaceIdentityMap.get("interface");
            Class interfaceClass = Class.forName(interfaceName);
            Object o = interfaceInstanceMap.get(interfaceClass);

            //拿出是哪個方法
            Method method = interfaceMethodMap.get(interfaceIdentity);

            //反射執(zhí)行
            Object result = null;
            String parameterStr = interfaceIdentityMap.get("parameter");
            if (parameterStr != null && parameterStr.length() > 0) {
                String[] parameterTypeClasses = parameterStr.split(",");//接口方法參數(shù)參數(shù)可能有多個,用,號隔開
                Object[] parameterInstance = new Object[parameterTypeClasses.length];
                for (int i = 0; i < parameterTypeClasses.length; i++) {
                    parameterInstance[i] = parameterMap.get(parameterTypeClasses[i]);
                }
                result = method.invoke(o, parameterInstance);
            } else {
                result = method.invoke(o);
            }

            //將結(jié)果封裝成rcpResponse
            RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);

            //ctx返回執(zhí)行結(jié)果
            String resultStr = JSONObject.toJSONString(rpcResponse) + DELIMITER_STR;

            ByteBuf byteBuf = Unpooled.copiedBuffer(resultStr.getBytes());
            ctx.writeAndFlush(byteBuf);

            System.out.println("響應(yīng)給客戶端:" + resultStr);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

(6) Client

EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Constants.DELIMITER));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new NettyClientHandler());

                    System.out.println("initChannel - " + Thread.currentThread().getName());
                }
            });
    ChannelFuture cf = bootstrap.connect(ip, port).sync();
//            cf.channel().closeFuture().sync();
    System.out.println("客戶端啟動成功");
} catch (Exception e) {
    e.printStackTrace();
    group.shutdownGracefully();
}

(7) Server

public NettyServer(List<ServiceConfig> serviceConfigList, Map<String, Method> interfaceMethodMap) {
    this.serviceConfigList = serviceConfigList;
    this.interfaceMethodMap = interfaceMethodMap;
}

public int init(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(Channel channel) throws Exception {
                    channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, DELIMITER));
                    channel.pipeline().addLast(new StringDecoder());
                    channel.pipeline().addLast(new RpcInvokeHandler(serviceConfigList, interfaceMethodMap));
                }
            });
    ChannelFuture cf = bootstrap.bind(port).sync();
    System.out.println("啟動NettyServer,端口為:" + port);
    return port;
}

2.項目地址

https://github.com/yclxiao/rpc-demo.git

五、總結(jié)

本文主要以Dubbo為例介紹了RPC調(diào)用核心流程,同時,寫了個簡易的RPC調(diào)用代碼。

記住以上的流程圖即可搞明白整個調(diào)用流程。然后再記住最核心的2句話:

所有的努力都是為了能在Consumer端和Provider端生成功能豐富的Proxy。核心事情由Protocol完成。

核心的5個部件:Registry、Proxy、Protocol、Invoker、Client、Server。

責(zé)任編輯:趙寧寧 來源: 不焦躁程序員
相關(guān)推薦

2021-07-08 12:32:58

2021-07-12 12:03:32

EPaxos分布式協(xié)議流程

2011-01-04 11:02:08

程序員

2009-07-16 17:42:47

WebWork配置

2024-07-08 23:03:13

2021-02-03 16:22:43

新基建SAP

2020-12-31 12:16:49

SAP云計算SAP產(chǎn)品

2021-04-28 10:13:58

zookeeperZNode核心原理

2024-04-25 12:59:31

2011-11-25 14:01:05

VPNVPN設(shè)置IPsec VPN

2011-11-11 10:00:41

NVIDIATegra 3

2012-05-14 17:22:38

ibmdw

2010-06-18 14:06:03

AMF協(xié)議

2021-04-06 09:22:47

云計算云計算產(chǎn)業(yè)云應(yīng)用

2022-02-09 23:02:53

Vuex開發(fā)管理模式

2010-03-31 09:20:28

Windows 7網(wǎng)絡(luò)互訪

2020-11-19 15:12:56

程序員技能開發(fā)者

2020-04-15 17:10:58

VirtualBoxKali LinuxLinux

2013-02-22 13:55:37

CSSWeb

2022-09-21 13:53:15

C++移動語義
點贊
收藏

51CTO技術(shù)棧公眾號