自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java 從零開(kāi)始手寫(xiě) RPC -序列化

開(kāi)發(fā) 后端
序列化 (Serialization)是將對(duì)象的狀態(tài)信息轉(zhuǎn)換為可以存儲(chǔ)或傳輸?shù)男问降倪^(guò)程。在序列化期間,對(duì)象將其當(dāng)前狀態(tài)寫(xiě)入到臨時(shí)或持久性存儲(chǔ)區(qū)。

[[429947]]

前面幾節(jié)我們實(shí)現(xiàn)了最基礎(chǔ)的客戶(hù)端調(diào)用服務(wù)端,這一節(jié)來(lái)學(xué)習(xí)一下通訊中的對(duì)象序列化。

為什么需要序列化

netty 底層都是基于 ByteBuf 進(jìn)行通訊的。

前面我們通過(guò)編碼器/解碼器專(zhuān)門(mén)為計(jì)算的入?yún)?出參進(jìn)行處理,這樣方便我們直接使用 pojo。

但是有一個(gè)問(wèn)題,如果想把我們的項(xiàng)目抽象為框架,那就需要為所有的對(duì)象編寫(xiě)編碼器/解碼器。

顯然,直接通過(guò)每一個(gè)對(duì)象寫(xiě)一對(duì)的方式是不現(xiàn)實(shí)的,而且用戶(hù)如何使用,也是未知的。

序列化的方式

基于字節(jié)的實(shí)現(xiàn),性能好,可讀性不高。

基于字符串的實(shí)現(xiàn),比如 json 序列化,可讀性好,性能相對(duì)較差。

ps: 可以根據(jù)個(gè)人還好選擇,相關(guān)序列化可參考下文,此處不做展開(kāi)。

json 序列化框架簡(jiǎn)介[1]

實(shí)現(xiàn)思路

可以將我們的 Pojo 全部轉(zhuǎn)化為 byte,然后 Byte 轉(zhuǎn)換為 ByteBuf 即可。

反之亦然。

代碼實(shí)現(xiàn)

maven

引入序列化包:

  1. <dependency> 
  2.     <groupId>com.github.houbb</groupId> 
  3.     <artifactId>json</artifactId> 
  4.     <version>0.1.1</version> 
  5. </dependency> 

服務(wù)端

核心

服務(wù)端的代碼可以大大簡(jiǎn)化:

  1. serverBootstrap.group(workerGroup, bossGroup) 
  2.     .channel(NioServerSocketChannel.class) 
  3.     // 打印日志 
  4.     .handler(new LoggingHandler(LogLevel.INFO)) 
  5.     .childHandler(new ChannelInitializer<Channel>() { 
  6.         @Override 
  7.         protected void initChannel(Channel ch) throws Exception { 
  8.             ch.pipeline() 
  9.                     .addLast(new RpcServerHandler()); 
  10.         } 
  11.     }) 
  12.     // 這個(gè)參數(shù)影響的是還沒(méi)有被accept 取出的連接 
  13.     .option(ChannelOption.SO_BACKLOG, 128) 
  14.     // 這個(gè)參數(shù)只是過(guò)一段時(shí)間內(nèi)客戶(hù)端沒(méi)有響應(yīng),服務(wù)端會(huì)發(fā)送一個(gè) ack 包,以判斷客戶(hù)端是否還活著。 
  15.     .childOption(ChannelOption.SO_KEEPALIVE, true); 

這里只需要一個(gè)實(shí)現(xiàn)類(lèi)即可。

RpcServerHandler

服務(wù)端的序列化/反序列化調(diào)整為直接使用 JsonBs 實(shí)現(xiàn)。

  1. package com.github.houbb.rpc.server.handler; 
  2.  
  3.  
  4. import com.github.houbb.json.bs.JsonBs; 
  5. import com.github.houbb.log.integration.core.Log; 
  6. import com.github.houbb.log.integration.core.LogFactory; 
  7. import com.github.houbb.rpc.common.model.CalculateRequest; 
  8. import com.github.houbb.rpc.common.model.CalculateResponse; 
  9. import com.github.houbb.rpc.common.service.Calculator; 
  10. import com.github.houbb.rpc.server.service.CalculatorService; 
  11.  
  12.  
  13. import io.netty.buffer.ByteBuf; 
  14. import io.netty.buffer.Unpooled; 
  15. import io.netty.channel.ChannelHandlerContext; 
  16. import io.netty.channel.SimpleChannelInboundHandler; 
  17.  
  18.  
  19. /** 
  20.  * @author binbin.hou 
  21.  * @since 0.0.1 
  22.  */ 
  23. public class RpcServerHandler extends SimpleChannelInboundHandler { 
  24.  
  25.  
  26.     private static final Log log = LogFactory.getLog(RpcServerHandler.class); 
  27.  
  28.  
  29.     @Override 
  30.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  31.         final String id = ctx.channel().id().asLongText(); 
  32.         log.info("[Server] channel {} connected " + id); 
  33.     } 
  34.  
  35.  
  36.     @Override 
  37.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
  38.         final String id = ctx.channel().id().asLongText(); 
  39.  
  40.  
  41.         ByteBuf byteBuf = (ByteBuf)msg; 
  42.         byte[] bytes = new byte[byteBuf.readableBytes()]; 
  43.         byteBuf.readBytes(bytes); 
  44.         CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class); 
  45.         log.info("[Server] receive channel {} request: {} from ", id, request); 
  46.  
  47.  
  48.         Calculator calculator = new CalculatorService(); 
  49.         CalculateResponse response = calculator.sum(request); 
  50.  
  51.  
  52.         // 回寫(xiě)到 client 端 
  53.         byte[] responseBytes = JsonBs.serializeBytes(response); 
  54.         ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes); 
  55.         ctx.writeAndFlush(responseBuffer); 
  56.         log.info("[Server] channel {} response {}", id, response); 
  57.     } 
  58.  
  59.  

客戶(hù)端

核心

客戶(hù)端可以簡(jiǎn)化如下:

  1. channelFuture = bootstrap.group(workerGroup) 
  2.     .channel(NioSocketChannel.class) 
  3.     .option(ChannelOption.SO_KEEPALIVE, true
  4.     .handler(new ChannelInitializer<Channel>(){ 
  5.         @Override 
  6.         protected void initChannel(Channel ch) throws Exception { 
  7.             channelHandler = new RpcClientHandler(); 
  8.             ch.pipeline() 
  9.                     .addLast(new LoggingHandler(LogLevel.INFO)) 
  10.                     .addLast(channelHandler); 
  11.         } 
  12.     }) 
  13.     .connect(RpcConstant.ADDRESS, port) 
  14.     .syncUninterruptibly(); 

RpcClientHandler

客戶(hù)端的序列化/反序列化調(diào)整為直接使用 JsonBs 實(shí)現(xiàn)。

  1. package com.github.houbb.rpc.client.handler; 
  2.  
  3.  
  4. import com.github.houbb.json.bs.JsonBs; 
  5. import com.github.houbb.log.integration.core.Log; 
  6. import com.github.houbb.log.integration.core.LogFactory; 
  7. import com.github.houbb.rpc.client.core.RpcClient; 
  8. import com.github.houbb.rpc.common.model.CalculateResponse; 
  9.  
  10.  
  11. import io.netty.buffer.ByteBuf; 
  12. import io.netty.channel.ChannelHandlerContext; 
  13. import io.netty.channel.SimpleChannelInboundHandler; 
  14.  
  15.  
  16. /** 
  17.  * <p> 客戶(hù)端處理類(lèi) </p> 
  18.  * 
  19.  * <pre> Created: 2019/10/16 11:30 下午  </pre> 
  20.  * <pre> Project: rpc  </pre> 
  21.  * 
  22.  * @author houbinbin 
  23.  * @since 0.0.2 
  24.  */ 
  25. public class RpcClientHandler extends SimpleChannelInboundHandler { 
  26.  
  27.  
  28.     private static final Log log = LogFactory.getLog(RpcClient.class); 
  29.  
  30.  
  31.     /** 
  32.      * 響應(yīng)信息 
  33.      * @since 0.0.4 
  34.      */ 
  35.     private CalculateResponse response; 
  36.  
  37.  
  38.     @Override 
  39.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
  40.         ByteBuf byteBuf = (ByteBuf)msg; 
  41.         byte[] bytes = new byte[byteBuf.readableBytes()]; 
  42.         byteBuf.readBytes(bytes); 
  43.  
  44.  
  45.         this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class); 
  46.         log.info("[Client] response is :{}", response); 
  47.     } 
  48.  
  49.  
  50.     @Override 
  51.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  52.         // 每次用完要關(guān)閉,不然拿不到response,我也不知道為啥(目測(cè)得了解netty才行) 
  53.         // 個(gè)人理解:如果不關(guān)閉,則永遠(yuǎn)會(huì)被阻塞。 
  54.         ctx.flush(); 
  55.         ctx.close(); 
  56.     } 
  57.  
  58.  
  59.     public CalculateResponse getResponse() { 
  60.         return response; 
  61.     } 
  62.  
  63.  

 

責(zé)任編輯:姜華 來(lái)源: 今日頭條
相關(guān)推薦

2021-10-13 08:21:52

Java websocket Java 基礎(chǔ)

2021-10-29 08:07:30

Java timeout Java 基礎(chǔ)

2021-10-19 08:58:48

Java 語(yǔ)言 Java 基礎(chǔ)

2021-10-21 08:21:10

Java Reflect Java 基礎(chǔ)

2019-09-23 19:30:27

reduxreact.js前端

2021-10-14 08:39:17

Java Netty Java 基礎(chǔ)

2017-02-10 09:30:33

數(shù)據(jù)化運(yùn)營(yíng)流量

2020-07-02 15:32:23

Kubernetes容器架構(gòu)

2024-12-06 17:02:26

2015-11-17 16:11:07

Code Review

2018-04-18 07:01:59

Docker容器虛擬機(jī)

2019-01-18 12:39:45

云計(jì)算PaaS公有云

2022-08-06 08:41:18

序列化反序列化Hessian

2018-03-19 10:20:23

Java序列化反序列化

2023-03-06 07:28:57

RPC框架序列化

2009-06-14 22:01:27

Java對(duì)象序列化反序列化

2021-10-27 08:10:15

Java 客戶(hù)端 Java 基礎(chǔ)

2013-03-11 13:55:03

JavaJSON

2024-05-15 14:29:45

2010-05-26 17:35:08

配置Xcode SVN
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)