Java 從零開始手寫 RPC-Netty4 實現(xiàn)客戶端和服務(wù)端
說明
上一篇代碼基于 socket 的實現(xiàn)非常簡單,但是對于實際生產(chǎn),一般使用 netty。
至于 netty 的優(yōu)點可以參考:
為什么選擇 netty?[1]
http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty

代碼實現(xiàn)
maven 引入
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>${netty.version}</version>
- </dependency>
引入 netty 對應(yīng)的 maven 包,此處為 4.1.17.Final。
服務(wù)端代碼實現(xiàn)
netty 的服務(wù)端啟動代碼是比較固定的。
- package com.github.houbb.rpc.server.core;
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.server.constant.RpcServerConst;
- import com.github.houbb.rpc.server.handler.RpcServerHandler;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- /**
- * rpc 服務(wù)端
- * @author binbin.hou
- * @since 0.0.1
- */
- public class RpcServer extends Thread {
- private static final Log log = LogFactory.getLog(RpcServer.class);
- /**
- * 端口號
- */
- private final int port;
- public RpcServer() {
- this.port = RpcServerConst.DEFAULT_PORT;
- }
- public RpcServer(int port) {
- this.port = port;
- }
- @Override
- public void run() {
- // 啟動服務(wù)端
- log.info("RPC 服務(wù)開始啟動服務(wù)端");
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(workerGroup, bossGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline().addLast(new RpcServerHandler());
- }
- })
- // 這個參數(shù)影響的是還沒有被accept 取出的連接
- .option(ChannelOption.SO_BACKLOG, 128)
- // 這個參數(shù)只是過一段時間內(nèi)客戶端沒有響應(yīng),服務(wù)端會發(fā)送一個 ack 包,以判斷客戶端是否還活著。
- .childOption(ChannelOption.SO_KEEPALIVE, true);
- // 綁定端口,開始接收進來的鏈接
- ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
- log.info("RPC 服務(wù)端啟動完成,監(jiān)聽【" + port + "】端口");
- channelFuture.channel().closeFuture().syncUninterruptibly();
- log.info("RPC 服務(wù)端關(guān)閉完成");
- } catch (Exception e) {
- log.error("RPC 服務(wù)異常", e);
- } finally {
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- }
為了簡單,服務(wù)端啟動端口號固定,RpcServerConst 常量類內(nèi)容如下:
- public final class RpcServerConst {
- private RpcServerConst(){}
- /**
- * 默認端口
- * @since 0.0.1
- */
- public static final int DEFAULT_PORT = 9627;
- }
RpcServerHandler
當然,還有一個比較核心的類就是 RpcServerHandler
- public class RpcServerHandler extends SimpleChannelInboundHandler {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- // do nothing now
- }
- }
目前是空實現(xiàn),后續(xù)可以添加對應(yīng)的日志輸出及邏輯處理。
測試
啟動測試的代碼非常簡單:
- /**
- * 服務(wù)啟動代碼測試
- * @param args 參數(shù)
- */
- public static void main(String[] args) {
- new RpcServer().start();
- }
說明
上面我們實現(xiàn)了服務(wù)端的實現(xiàn),這一節(jié)來一起看一下 client 客戶端代碼實現(xiàn)。
代碼實現(xiàn)
RpcClient
- /*
- * Copyright (c) 2019. houbinbin Inc.
- * rpc All rights reserved.
- */
- package com.github.houbb.rpc.client.core;
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.client.handler.RpcClientHandler;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- /**
- * <p> rpc 客戶端 </p>
- *
- * <pre> Created: 2019/10/16 11:21 下午 </pre>
- * <pre> Project: rpc </pre>
- *
- * @author houbinbin
- * @since 0.0.2
- */
- public class RpcClient extends Thread {
- private static final Log log = LogFactory.getLog(RpcClient.class);
- /**
- * 監(jiān)聽端口號
- */
- private final int port;
- public RpcClient(int port) {
- this.port = port;
- }
- public RpcClient() {
- this(9527);
- }
- @Override
- public void run() {
- // 啟動服務(wù)端
- log.info("RPC 服務(wù)開始啟動客戶端");
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- ChannelFuture channelFuture = bootstrap.group(workerGroup)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<Channel>(){
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline()
- .addLast(new LoggingHandler(LogLevel.INFO))
- .addLast(new RpcClientHandler());
- }
- })
- .connect("localhost", port)
- .syncUninterruptibly();
- log.info("RPC 服務(wù)啟動客戶端完成,監(jiān)聽端口:" + port);
- channelFuture.channel().closeFuture().syncUninterruptibly();
- log.info("RPC 服務(wù)開始客戶端已關(guān)閉");
- } catch (Exception e) {
- log.error("RPC 客戶端遇到異常", e);
- } finally {
- workerGroup.shutdownGracefully();
- }
- }
- }
.connect("localhost", port) 聲明了客戶端需要連接的服務(wù)端,此處和服務(wù)端的端口保持一致。
RpcClientHandler
客戶端處理類也比較簡單,暫時留空。
- /*
- * Copyright (c) 2019. houbinbin Inc.
- * rpc All rights reserved.
- */
- package com.github.houbb.rpc.client.handler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- /**
- * <p> 客戶端處理類 </p>
- *
- * <pre> Created: 2019/10/16 11:30 下午 </pre>
- * <pre> Project: rpc </pre>
- *
- * @author houbinbin
- * @since 0.0.2
- */
- public class RpcClientHandler extends SimpleChannelInboundHandler {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- // do nothing.
- }
- }
啟動測試
服務(wù)端
首先啟動服務(wù)端。
客戶端
然后啟動客戶端連接服務(wù)端,實現(xiàn)如下:
- /**
- * 服務(wù)啟動代碼測試
- * @param args 參數(shù)
- */
- public static void main(String[] args) {
- new RpcClient().start();
- }
小結(jié)
為了便于大家學(xué)習,以上源碼已經(jīng)開源:
https://github.com/houbb/rpc
我是老馬,期待與你的下次重逢。
References
[1] 為什么選擇 netty?: http://houbb.github.io/2019/05/10/netty-definitive-gudie-04-why-netty