最簡最快了解RPC核心流程
本文主要以最簡易最快速的方式介紹RPC調(diào)用核心流程,文中以Dubbo為例。同時,會寫一個簡易的RPC調(diào)用代碼,方便理解和記憶核心組件和核心流程。
一、核心思想
RPC調(diào)用過程中,最粗礦的核心組件3個:Registry、Provider、Consumer。最粗礦的流程4個:注冊、訂閱、通知、調(diào)用。最簡單的流程圖就1個:
本文會繼續(xù)細(xì)粒度地拆解以上流程,拆解之前,請牢記這段話:
RPC調(diào)用,不管中間流程多么復(fù)雜,不管代碼多么復(fù)雜,所有的努力也只為做2件事情:
- 在Consumer端,將ReferenceConfig配置的類轉(zhuǎn)換成Proxy代理。
- 在Provider端,將ServiceConfig配置的類轉(zhuǎn)換成Proxy代理。
二、核心組件
為了能在Consumer端和Provider端生成各自的Proxy代理,并且發(fā)起調(diào)用和響應(yīng),需要如下核心組件:
(1) Registry:注冊中心,主要是為了實現(xiàn) Provider接口注冊、Consumer訂閱接口、接口變更通知、接口查找等功能。
(2) Proxy:服務(wù)代理,核心中的核心,一切的努力都是為了生成合適的Proxy服務(wù)代理。
- Consumer的Proxy:Consumer端根據(jù)ReferenceConfig生成Proxy,此Proxy主要用于找到合適的Provider接口,然后發(fā)起網(wǎng)絡(luò)調(diào)用。
- Provider的Proxy:Provider端根據(jù)ServiceConfig生成Proxy,此Proxy主要作用是通過類似反射的方法調(diào)用本地代碼,再將結(jié)果返回給Consumer。
(3) Protocol:服務(wù)協(xié)議,它相當(dāng)于一個中間層,用于與注冊中心打交道 和 封裝 RPC 調(diào)用。它在初始化時會創(chuàng)建Client模塊 與 服務(wù)端建立連接,也會生成用于遠(yuǎn)程調(diào)用的Invoker。
(4) Cluster:服務(wù)集群,主要用于路由、負(fù)載均衡、服務(wù)容錯等。
(5) Invoker:服務(wù)調(diào)用者。
- Consumer的服務(wù)調(diào)用者主要是利用Client模塊發(fā)起遠(yuǎn)程調(diào)用,然后等待Provider返回結(jié)果。
- Provider的服務(wù)調(diào)用者主要是根據(jù)接收到的消息利用反射生成本地代理,然后執(zhí)行方法,再將結(jié)果返回到Consumer。
(6) Client:客戶端模塊,默認(rèn)是Netty實現(xiàn),主要用于客戶端和服務(wù)端通訊(主要是服務(wù)調(diào)用),比如將請求的接口、參數(shù)、請求ID等封裝起來發(fā)給Server端。
(7) Server:服務(wù)端模擬,默認(rèn)是Netty實現(xiàn)。主要是用于客戶端和服務(wù)端通訊。
三、核心流程
1.Consumer流程
(1) 流程
Consumer的流程實際上就是一個從ReferenceConfig 生成Proxy代理的過程。核心事情由Protocol完成。
- 根據(jù)ReferenceConfig生成代理
- 注冊到注冊中心、訂閱注冊中心事件
- 建立NettyClient,并且與NettyServer建立連接
- 生成客戶端的ClientInvoker
- 選擇負(fù)載均衡和集群容錯
- ClientInvoker發(fā)起網(wǎng)絡(luò)調(diào)用和等待結(jié)果
(2) 流程圖:
2.Provider流程
(1) 流程
Provider的流程實際上就是個從ServiceConfig生成Proxy代理的過程。核心事情由PorxyProtocol完成。
- 根據(jù)ServiceConfig生成本地代理
- 注冊到注冊中心
- 啟動NettyServer等待客戶端連接
- 生成服務(wù)端Invoker
- Invoker監(jiān)聽調(diào)用請求
- 接收到請求后新建任務(wù)丟入到線程池去執(zhí)行
- 執(zhí)行時會生成本地代理執(zhí)行(比如通過反射去調(diào)用具體的方法),再將返回結(jié)果寫出去
(2) 流程圖:
3.整體流程圖
四、簡易代碼實現(xiàn)
1.核心代碼介紹
(1) 客戶端Proxy:
/**
* 獲取代理Service
*/
@SuppressWarnings("unchecked")
public <T> T getService(Class clazz) throws Exception {
return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if ("equals".equals(methodName) || "hashCode".equals(methodName)) {
throw new IllegalAccessException("不能訪問" + methodName + "方法");
}
if ("toString".equals(methodName)) {
return clazz.getName() + "#" + methodName;
}
List<RegistryInfo> registryInfoList = interfaceMethodsRegistryInfoMap.get(clazz);
if (registryInfoList == null) {
throw new RuntimeException("無法找到對應(yīng)的服務(wù)提供者");
}
LoadBalancer loadBalancer = new RandomLoadBalancer();
RegistryInfo registryInfo = loadBalancer.choose(registryInfoList);
ChannelHandlerContext ctx = registryChannelMap.get(registryInfo);
String identity = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
String requestId;
synchronized (ProxyProtocol.this) {
requestIdWorker.increment();
requestId = String.valueOf(requestIdWorker.longValue());
}
ClientInvoker clientInvoker = new DefaultClientInvoker(method.getReturnType(), ctx, requestId, identity);
inProcessInvokerMap.put(identity + "#" + requestId, clientInvoker);
return clientInvoker.invoke(args);
}
});
}
(2) 服務(wù)端Proxy
private class RpcInvokerTask implements Runnable {
private RpcRequest rpcRequest;
public RpcInvokerTask(RpcRequest rpcRequest) {
this.rpcRequest = rpcRequest;
}
@Override
public void run() {
try {
ChannelHandlerContext ctx = rpcRequest.getCtx();
String interfaceIdentity = rpcRequest.getInterfaceIdentity();
String requestId = rpcRequest.getRequestId();
Map<String, Object> parameterMap = rpcRequest.getParameterMap();
//interfaceIdentity組成:接口類+方法+參數(shù)類型
Map<String, String> interfaceIdentityMap = string2Map(interfaceIdentity);
//拿出是哪個類
String interfaceName = interfaceIdentityMap.get("interface");
Class interfaceClass = Class.forName(interfaceName);
Object o = interfaceInstanceMap.get(interfaceClass);
//拿出是哪個方法
Method method = interfaceMethodMap.get(interfaceIdentity);
//反射執(zhí)行
Object result = null;
String parameterStr = interfaceIdentityMap.get("parameter");
if (parameterStr != null && parameterStr.length() > 0) {
String[] parameterTypeClasses = parameterStr.split(",");//接口方法參數(shù)參數(shù)可能有多個,用,號隔開
Object[] parameterInstance = new Object[parameterTypeClasses.length];
for (int i = 0; i < parameterTypeClasses.length; i++) {
parameterInstance[i] = parameterMap.get(parameterTypeClasses[i]);
}
result = method.invoke(o, parameterInstance);
} else {
result = method.invoke(o);
}
//將結(jié)果封裝成rcpResponse
RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);
//ctx返回執(zhí)行結(jié)果
String resultStr = JSONObject.toJSONString(rpcResponse) + DELIMITER_STR;
ByteBuf byteBuf = Unpooled.copiedBuffer(resultStr.getBytes());
ctx.writeAndFlush(byteBuf);
System.out.println("響應(yīng)給客戶端:" + resultStr);
} catch (Exception e) {
e.printStackTrace();
}
}
}
(3) Protocol
public ProxyProtocol(String registryUrl, List<ServiceConfig> serviceConfigList, List<ReferenceConfig> referenceConfigList, int port) throws Exception {
this.serviceConfigList = serviceConfigList == null ? new ArrayList<>() : serviceConfigList;
this.registryUrl = registryUrl;
this.port = port;
this.referenceConfigList = referenceConfigList == null ? new ArrayList<>() : referenceConfigList;
//1、初始化注冊中心
initRegistry(this.registryUrl);
//2、將服務(wù)注冊到注冊中心
InetAddress addr = InetAddress.getLocalHost();
String hostName = addr.getHostName();
String hostAddr = addr.getHostAddress();
registryInfo = new RegistryInfo(hostName, hostAddr, this.port);
doRegistry(registryInfo);
//3、初始化nettyServer,啟動nettyServer
if (!this.serviceConfigList.isEmpty()) {
nettyServer = new NettyServer(this.serviceConfigList, this.interfaceMethodMap);
nettyServer.init(this.port);
}
//如果是客戶端引用啟動,則初始化處理線程
if (!this.referenceConfigList.isEmpty()) {
initProcessor();
}
}
(4) 客戶端Invoker
@Override
public T invoke(Object[] args) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("interfaces", identity);
JSONObject param = new JSONObject();
if (args != null) {
for (Object obj : args) {
param.put(obj.getClass().getName(), obj);
}
}
jsonObject.put("parameter", param);
jsonObject.put("requestId", requestId);
String msg = jsonObject.toJSONString() + Constants.DELIMITER_STR;
System.out.println("發(fā)送給服務(wù)端JSON為:" + msg);
ByteBuf byteBuf = Unpooled.copiedBuffer(msg.getBytes());
ctx.writeAndFlush(byteBuf);
wait4Result();
return result;
}
private void wait4Result() {
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void setResult(String result) {
synchronized (this) {
this.result = (T) JSONObject.parseObject(result, returnType);
notifyAll();
}
}
(5) 服務(wù)端Invoker
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println("提供者收到消息:" + message);
//解析消費者發(fā)來的消息
RpcRequest rpcRequest = RpcRequest.parse(message, ctx);
//接受到消息,啟動線程池處理消費者發(fā)過來的請求
threadPoolExecutor.execute(new RpcInvokerTask(rpcRequest));
}
/**
* 處理消費者發(fā)過來的請求
*/
private class RpcInvokerTask implements Runnable {
private RpcRequest rpcRequest;
public RpcInvokerTask(RpcRequest rpcRequest) {
this.rpcRequest = rpcRequest;
}
@Override
public void run() {
try {
ChannelHandlerContext ctx = rpcRequest.getCtx();
String interfaceIdentity = rpcRequest.getInterfaceIdentity();
String requestId = rpcRequest.getRequestId();
Map<String, Object> parameterMap = rpcRequest.getParameterMap();
//interfaceIdentity組成:接口類+方法+參數(shù)類型
Map<String, String> interfaceIdentityMap = string2Map(interfaceIdentity);
//拿出是哪個類
String interfaceName = interfaceIdentityMap.get("interface");
Class interfaceClass = Class.forName(interfaceName);
Object o = interfaceInstanceMap.get(interfaceClass);
//拿出是哪個方法
Method method = interfaceMethodMap.get(interfaceIdentity);
//反射執(zhí)行
Object result = null;
String parameterStr = interfaceIdentityMap.get("parameter");
if (parameterStr != null && parameterStr.length() > 0) {
String[] parameterTypeClasses = parameterStr.split(",");//接口方法參數(shù)參數(shù)可能有多個,用,號隔開
Object[] parameterInstance = new Object[parameterTypeClasses.length];
for (int i = 0; i < parameterTypeClasses.length; i++) {
parameterInstance[i] = parameterMap.get(parameterTypeClasses[i]);
}
result = method.invoke(o, parameterInstance);
} else {
result = method.invoke(o);
}
//將結(jié)果封裝成rcpResponse
RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);
//ctx返回執(zhí)行結(jié)果
String resultStr = JSONObject.toJSONString(rpcResponse) + DELIMITER_STR;
ByteBuf byteBuf = Unpooled.copiedBuffer(resultStr.getBytes());
ctx.writeAndFlush(byteBuf);
System.out.println("響應(yīng)給客戶端:" + resultStr);
} catch (Exception e) {
e.printStackTrace();
}
}
}
(6) Client
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Constants.DELIMITER));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new NettyClientHandler());
System.out.println("initChannel - " + Thread.currentThread().getName());
}
});
ChannelFuture cf = bootstrap.connect(ip, port).sync();
// cf.channel().closeFuture().sync();
System.out.println("客戶端啟動成功");
} catch (Exception e) {
e.printStackTrace();
group.shutdownGracefully();
}
(7) Server
public NettyServer(List<ServiceConfig> serviceConfigList, Map<String, Method> interfaceMethodMap) {
this.serviceConfigList = serviceConfigList;
this.interfaceMethodMap = interfaceMethodMap;
}
public int init(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, DELIMITER));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new RpcInvokeHandler(serviceConfigList, interfaceMethodMap));
}
});
ChannelFuture cf = bootstrap.bind(port).sync();
System.out.println("啟動NettyServer,端口為:" + port);
return port;
}
2.項目地址
https://github.com/yclxiao/rpc-demo.git
五、總結(jié)
本文主要以Dubbo為例介紹了RPC調(diào)用核心流程,同時,寫了個簡易的RPC調(diào)用代碼。
記住以上的流程圖即可搞明白整個調(diào)用流程。然后再記住最核心的2句話:
所有的努力都是為了能在Consumer端和Provider端生成功能豐富的Proxy。核心事情由Protocol完成。
核心的5個部件:Registry、Proxy、Protocol、Invoker、Client、Server。