自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

介紹下Netty中常用的編碼器和解碼器

網(wǎng)絡(luò) 通信技術(shù)
這篇文章,介紹什么是編碼器、解碼器,也講述了如何實(shí)戰(zhàn)中運(yùn)用編碼器和解碼器。希望能對(duì)有所幫助。

[[359182]]

前面文章介紹Netty相關(guān)知識(shí)點(diǎn)。接下來將介紹下在通信過程中用的編碼器和解碼器。這里會(huì)不會(huì)聯(lián)想到諜戰(zhàn)戲里面。發(fā)送情報(bào)者怕情報(bào)泄露,所以對(duì)情報(bào)行加密然后傳給接收者。接收者對(duì)情報(bào)進(jìn)行解密,得到情報(bào)。這里講的編碼器和解碼器是和情報(bào)傳遞很相似?一起查看這篇文章,來揭秘!!!
1一 編解碼器

1 1.1 什么叫編解碼器

在網(wǎng)絡(luò)傳輸?shù)倪^程中,數(shù)據(jù)都是以字節(jié)流的方式進(jìn)行傳遞??蛻舳嗽谙蚍?wù)端發(fā)送數(shù)據(jù)的時(shí)候,將業(yè)務(wù)中其他類型數(shù)據(jù)轉(zhuǎn)化為字節(jié),叫編碼。服務(wù)端接收到數(shù)據(jù)為字節(jié)流,將字節(jié)流轉(zhuǎn)化為原來的格式,叫解碼。統(tǒng)稱codec。

編解碼器分為兩部分-編碼器和解碼器,編碼器負(fù)責(zé)出站,解碼器負(fù)責(zé)入站。

2 1.2 解碼器

1.2.1 概述

解碼器負(fù)責(zé)入站操作,那么也一定要實(shí)現(xiàn)ChannelInboundHandler接口,所以解碼器本質(zhì) 上也是ChannelHandler。我們自定義編解碼器只需要繼承ByteToMessageDecoder(Netty提供抽象類,繼承 ChannelInboundHandlerAdapter),實(shí)現(xiàn)decode()。Netty提供一些常用的解碼器實(shí)現(xiàn), 開箱即用。如下:

  1. 1 RedisDecoder 基于Redis協(xié)議的解碼器 
  2. 2 XmlDecoder 基于XML格式的解碼器 
  3. 3 JsonObjectDecoder 基于json數(shù)據(jù)格式的解碼器 
  4. 4 HttpObjectDecoder 基于http協(xié)議的解碼器 

Netty也提供了MessageToMessageDecoder,將⼀種格式轉(zhuǎn)化為另⼀種格式的解碼器,也提供了⼀些 實(shí)現(xiàn),如下:

  1. 1 StringDecoder 將接收到ByteBuf轉(zhuǎn)化為字符串 
  2. 2 ByteArrayDecoder 將接收到ByteBuf轉(zhuǎn)化字節(jié)數(shù)組 
  3. 3 Base64Decoder 將由ByteBuf或US-ASCII字符串編碼的Base64解碼為ByteBuf。 

1.2.2 將字節(jié)流轉(zhuǎn)化為Intger類型(案例)

1. 字節(jié)解碼器

  1. package com.haopt.netty.codec; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.channel.ChannelHandlerContext; 
  4. import io.netty.handler.codec.ByteToMessageDecoder; 
  5.  
  6. import java.util.List; 
  7. public class ByteToIntegerDecoder extends ByteToMessageDecoder { 
  8.     /** 
  9.     * 
  10.     * @param ctx 上下⽂ 
  11.     * @param in 輸⼊的ByteBuf消息數(shù)據(jù) 
  12.     * @param out 轉(zhuǎn)化后輸出的容器 
  13.     * @throws Exception 
  14.     */ 
  15.     @Override 
  16.     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
  17.         if(in.readableBytes() >= 4){ //int類型占⽤4個(gè)字節(jié),所以需要判斷是否存在有4個(gè)字節(jié),再進(jìn)⾏讀取 
  18.             out.add(in.readInt()); //讀取到int類型數(shù)據(jù),放⼊到輸出,完成數(shù)據(jù)類型的轉(zhuǎn)化 
  19.         } 
  20.     } 

2. Handler

  1. package com.haopt.netty.codec; 
  2. import io.netty.channel.ChannelHandlerContext; 
  3. import io.netty.channel.ChannelInboundHandlerAdapter; 
  4. public class ServerHandler extends ChannelInboundHandlerAdapter { 
  5.     @Override 
  6.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
  7.         Integer i = (Integer) msg; //這⾥可以直接拿到Integer類型的數(shù)據(jù) 
  8.         System.out.println("服務(wù)端接收到的消息為:" + i); 
  9.     } 

3 在pipeline中添加解碼器

  1. @Override 
  2. protected void initChannel(SocketChannel ch) throws Exception { 
  3.     ch.pipeline() 
  4.     .addLast(new ByteToIntegerDecoder()) 
  5.     .addLast(new ServerHandler()); 

可以將代碼復(fù)制到IDEA運(yùn)行下,查看下運(yùn)行效果。

3 1.3 編碼器

1.3.1 概述

將原來的格式轉(zhuǎn)化為字節(jié)。我們要實(shí)現(xiàn)自定義解碼器只要繼承MessageToByteEncoder(實(shí)現(xiàn)了ChannelOutboundHandler接⼝),本質(zhì)上也是ChannelHandler。Netty中一些實(shí)現(xiàn)的編碼器,如下:

  1. 1 ObjectEncoder 將對(duì)象(需要實(shí)現(xiàn)Serializable接⼝)編碼為字節(jié)流 
  2. 2 SocksMessageEncoder 將SocksMessage編碼為字節(jié)流 
  3. 3 HAProxyMessageEncoder 將HAProxyMessage編碼成字節(jié)流 

Netty也提供了MessageToMessageEncoder,將⼀種格式轉(zhuǎn)化為另⼀種格式的編碼器,也提供了⼀些 實(shí)現(xiàn):

  1. 1 RedisEncoder 將Redis協(xié)議的對(duì)象進(jìn)⾏編碼 
  2. 2 StringEncoder 將字符串進(jìn)⾏編碼操作 
  3. 3 Base64Encoder 將Base64字符串進(jìn)⾏編碼操作 

1.3.2 將Integer類型編碼為字節(jié)進(jìn)⾏傳遞(案例)

1. 自定義編碼器

  1. package com.haopt.netty.codec.client; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.channel.ChannelHandlerContext; 
  4. import io.netty.handler.codec.MessageToByteEncoder; 
  5. public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> { 
  6.     @Override 
  7.     protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { 
  8.        out.writeInt(msg); 
  9.     } 

2. Handler

  1. package com.haopt.netty.codec.client; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.channel.SimpleChannelInboundHandler; 
  6. import io.netty.util.CharsetUtil; 
  7. public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  8.     @Override 
  9.     protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
  10.         System.out.println("接收到服務(wù)端的消息:" + 
  11.         msg.toString(CharsetUtil.UTF_8)); 
  12.     } 
  13.     @Override 
  14.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  15.      ctx.writeAndFlush(123); 
  16.     } 
  17.     @Override 
  18.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  19.      cause.printStackTrace(); 
  20.      ctx.close(); 
  21.     } 

3. pipeline

  1. @Override 
  2. protected void initChannel(SocketChannel ch) throws Exception { 
  3.     ch.pipeline().addLast(new IntegerToByteEncoder()); 
  4.     ch.pipeline().addLast(new ClientHandler()); 

 2 二 開發(fā)Http服務(wù)器

通過Netty中提供的http的解碼器,進(jìn)行http服務(wù)器開發(fā)。建議代碼復(fù)制下來,執(zhí)行下看看效果。

4 2.1 Netty配置

1. server

  1. package com.haopt.netty.codec.http; 
  2. import io.netty.bootstrap.ServerBootstrap; 
  3. import io.netty.channel.ChannelFuture; 
  4. import io.netty.channel.ChannelInitializer; 
  5. import io.netty.channel.EventLoopGroup; 
  6. import io.netty.channel.nio.NioEventLoopGroup; 
  7. import io.netty.channel.socket.SocketChannel; 
  8. import io.netty.channel.socket.nio.NioServerSocketChannel; 
  9. import io.netty.handler.codec.http.HttpObjectAggregator; 
  10. import io.netty.handler.codec.http.HttpRequestDecoder; 
  11. import io.netty.handler.codec.http.HttpResponseEncoder; 
  12. import io.netty.handler.stream.ChunkedWriteHandler; 
  13. public class NettyHttpServer { 
  14.     public static void main(String[] args) throws Exception { 
  15.         // 主線程,不處理任何業(yè)務(wù)邏輯,只是接收客戶的連接請(qǐng)求 
  16.         EventLoopGroup boss = new NioEventLoopGroup(1); 
  17.         // ⼯作線程,線程數(shù)默認(rèn)是:cpu*2 
  18.         EventLoopGroup worker = new NioEventLoopGroup(); 
  19.         try { 
  20.         // 服務(wù)器啟動(dòng)類 
  21.         ServerBootstrap serverBootstrap = new ServerBootstrap(); 
  22.         serverBootstrap.group(boss, worker); 
  23.         //配置server通道 
  24.         serverBootstrap.channel(NioServerSocketChannel.class); 
  25.         serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { 
  26.             @Override 
  27.             protected void initChannel(SocketChannel ch) throws Exception { 
  28.                 ch.pipeline() 
  29.                 //http請(qǐng)求的解碼器 
  30.                 //將http請(qǐng)求中的uri以及請(qǐng)求體聚合成⼀個(gè)完整的FullHttpRequest對(duì)象 
  31.                 .addLast(new HttpRequestDecoder())  
  32.                 .addLast(new HttpObjectAggregator(1024 * 128)) 
  33.                 .addLast(new HttpResponseEncoder()) //http響應(yīng)的編碼器 
  34.                 .addLast(new ChunkedWriteHandler()) //⽀持異步的⼤⽂件傳輸,防⽌內(nèi)存溢出 
  35.                 .addLast(new ServerHandler()); 
  36.             } 
  37.           }); //worker線程的處理器 
  38.           ChannelFuture future = serverBootstrap.bind(8080).sync(); 
  39.           System.out.println("服務(wù)器啟動(dòng)完成。。。。。"); 
  40.           //等待服務(wù)端監(jiān)聽端⼝關(guān)閉 
  41.           future.channel().closeFuture().sync(); 
  42.         } finally { 
  43.           //優(yōu)雅關(guān)閉 
  44.           boss.shutdownGracefully(); 
  45.           worker.shutdownGracefully(); 
  46.       } 
  47.      } 

2. ServerHandler

  1. package com.haopt.netty.codec.http; 
  2. import io.netty.buffer.Unpooled; 
  3. import io.netty.channel.ChannelFutureListener; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.channel.ChannelInboundHandlerAdapter; 
  6. import io.netty.channel.SimpleChannelInboundHandler; 
  7. import io.netty.handler.codec.http.*; 
  8. import io.netty.util.CharsetUtil; 
  9. import java.util.Map; 
  10. public class ServerHandler extends SimpleChannelInboundHandler<FullHttpRequest>{ 
  11.     @Override 
  12.     public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { 
  13.         //解析FullHttpRequest,得到請(qǐng)求參數(shù) 
  14.         Map<String, String> paramMap = new RequestParser(request).parse(); 
  15.         String name = paramMap.get("name"); 
  16.         //構(gòu)造響應(yīng)對(duì)象 
  17.         FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); 
  18.         httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8"); 
  19.         StringBuilder sb = new StringBuilder(); 
  20.         sb.append("<h1>"); 
  21.         sb.append("你好," + name); 
  22.         sb.append("</h1>"); 
  23.         httpResponse.content().writeBytes(Unpooled.copiedBuffer(sb,CharsetUtil.UTF_8)); 
  24.         //操作完成后,將channel關(guān)閉 
  25.         ctx.writeAndFlush(httpResponse).addListener(ChannelFutureListener.CLOSE);  
  26.     } 

 3. RequestParser

  1. package com.haopt.netty.codec.http; 
  2. import io.netty.handler.codec.http.FullHttpRequest; 
  3. import io.netty.handler.codec.http.HttpMethod; 
  4. import io.netty.handler.codec.http.QueryStringDecoder; 
  5. import io.netty.handler.codec.http.multipart.Attribute; 
  6. import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; 
  7. import io.netty.handler.codec.http.multipart.InterfaceHttpData; 
  8. import java.io.IOException; 
  9. import java.util.HashMap; 
  10. import java.util.List; 
  11. import java.util.Map; 
  12. /** 
  13. * HTTP請(qǐng)求參數(shù)解析器, ⽀持GET, POST 
  14. */ 
  15. public class RequestParser { 
  16.     private FullHttpRequest fullReq; 
  17.     /** 
  18.     * 構(gòu)造⼀個(gè)解析器 
  19.     * @param req 
  20.     */ 
  21.     public RequestParser(FullHttpRequest req) { 
  22.      this.fullReq = req; 
  23.     } 
  24.     /** 
  25.     * 解析請(qǐng)求參數(shù) 
  26.     * @return 包含所有請(qǐng)求參數(shù)的鍵值對(duì), 如果沒有參數(shù), 則返回空Map 
  27.     * 
  28.     * @throws IOException 
  29.     */ 
  30.     public Map<String, String> parse() throws Exception { 
  31.         HttpMethod method = fullReq.method(); 
  32.         Map<String, String> parmMap = new HashMap<>(); 
  33.         if (HttpMethod.GET == method) { 
  34.           // 是GET請(qǐng)求 
  35.           QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri()); 
  36.           decoder.parameters().entrySet().forEach( entry -> { 
  37.           // entry.getValue()是⼀個(gè)List, 只取第⼀個(gè)元素 
  38.           parmMap.put(entry.getKey(), entry.getValue().get(0)); 
  39.           }); 
  40.         } else if (HttpMethod.POST == method) { 
  41.           // 是POST請(qǐng)求 
  42.           HttpPostRequestDecoder decoder = new 
  43.           HttpPostRequestDecoder(fullReq); 
  44.           decoder.offer(fullReq); 
  45.           List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas(); 
  46.           for (InterfaceHttpData parm : parmList) { 
  47.           Attribute data = (Attribute) parm; 
  48.           parmMap.put(data.getName(), data.getValue()); 
  49.           } 
  50.         } else { 
  51.           // 不⽀持其它⽅法 
  52.           throw new RuntimeException("不⽀持其它⽅法"); // 可以用自定義異常來替代 
  53.         } 
  54.         return parmMap; 
  55.     } 

4. 對(duì)象

  1. package com.haopt.netty.codec.obj; 
  2. public class User implements java.io.Serializable { 
  3.     private static final long serialVersionUID = -89217070354741790L; 
  4.     private Long id; 
  5.     private String name
  6.     private Integer age; 
  7.     public Long getId() { 
  8.      return id; 
  9.     } 
  10.     public void setId(Long id) { 
  11.      this.id = id; 
  12.     } 
  13.     public String getName() { 
  14.      return name
  15.     } 
  16.     public void setName(String name) { 
  17.      this.name = name
  18.     } 
  19.     public Integer getAge() { 
  20.      return age; 
  21.     } 
  22.     public void setAge(Integer age) { 
  23.      this.age = age; 
  24.     } 
  25.     @Override 
  26.     public String toString() { 
  27.       return "User{" + 
  28.         "id=" + id + 
  29.         ", name='" + name + '\'' + 
  30.         ", age=" + age + 
  31.         '}'
  32.       } 

5 2.2 服務(wù)端

1. NettyObjectServer

  1. package com.haopt.netty.codec.obj; 
  2. import io.netty.bootstrap.ServerBootstrap; 
  3. import io.netty.channel.ChannelFuture; 
  4. import io.netty.channel.ChannelInitializer; 
  5. import io.netty.channel.EventLoopGroup; 
  6. import io.netty.channel.nio.NioEventLoopGroup; 
  7. import io.netty.channel.socket.SocketChannel; 
  8. import io.netty.channel.socket.nio.NioServerSocketChannel; 
  9. import io.netty.handler.codec.serialization.ClassResolvers; 
  10. import io.netty.handler.codec.serialization.ObjectDecoder; 
  11. public class NettyObjectServer { 
  12.     public static void main(String[] args) throws Exception { 
  13.         // 主線程,不處理任何業(yè)務(wù)邏輯,只是接收客戶的連接請(qǐng)求 
  14.         EventLoopGroup boss = new NioEventLoopGroup(1); 
  15.         // ⼯作線程,線程數(shù)默認(rèn)是:cpu*2 
  16.         EventLoopGroup worker = new NioEventLoopGroup(); 
  17.         try { 
  18.         // 服務(wù)器啟動(dòng)類 
  19.         ServerBootstrap serverBootstrap = new ServerBootstrap(); 
  20.         serverBootstrap.group(boss, worker); 
  21.         //配置server通道 
  22.         serverBootstrap.channel(NioServerSocketChannel.class); 
  23.         serverBootstrap.childHandler(new ChannelInitializer<SocketChannel> () { 
  24.             @Override 
  25.             protected void initChannel(SocketChannel ch) throws Exception { 
  26.                 ch.pipeline() 
  27.                 .addLast(new ObjectDecoder(ClassResolvers.weakCachingResolver( 
  28.                 this.getClass().getClassLoader() 
  29.                 ))) 
  30.                 .addLast(new ServerHandler()); 
  31.             } 
  32.         }); //worker線程的處理器 
  33.         ChannelFuture future = serverBootstrap.bind(6677).sync(); 
  34.         System.out.println("服務(wù)器啟動(dòng)完成。。。。。"); 
  35.         //等待服務(wù)端監(jiān)聽端⼝關(guān)閉 
  36.         future.channel().closeFuture().sync(); 
  37.         } finally { 
  38.         //優(yōu)雅關(guān)閉 
  39.         boss.shutdownGracefully(); 
  40.         worker.shutdownGracefully(); 
  41.         } 
  42.     } 

2. ServerHandler

  1. package com.haopt.netty.codec.obj; 
  2. import io.netty.buffer.Unpooled; 
  3. import io.netty.channel.ChannelHandlerContext; 
  4. import io.netty.channel.SimpleChannelInboundHandler; 
  5. import io.netty.util.CharsetUtil; 
  6. public class ServerHandler extends SimpleChannelInboundHandler<User> { 
  7.     @Override 
  8.     public void channelRead0(ChannelHandlerContext ctx, User user) throws Exception { 
  9.         //獲取到user對(duì)象 
  10.         System.out.println(user); 
  11.         ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); 
  12.     } 

6 2.3 客戶端

1. NettyObjectClient

  1. package com.haopt.netty.codec.obj; 
  2. import io.netty.bootstrap.Bootstrap; 
  3. import io.netty.channel.ChannelFuture; 
  4. import io.netty.channel.ChannelInitializer; 
  5. import io.netty.channel.EventLoopGroup; 
  6. import io.netty.channel.nio.NioEventLoopGroup; 
  7. import io.netty.channel.socket.SocketChannel; 
  8. import io.netty.channel.socket.nio.NioSocketChannel; 
  9. import io.netty.handler.codec.serialization.ObjectEncoder; 
  10. public class NettyObjectClient { 
  11. public static void main(String[] args) throws Exception{ 
  12.     EventLoopGroup worker = new NioEventLoopGroup(); 
  13.         try { 
  14.             // 服務(wù)器啟動(dòng)類 
  15.             Bootstrap bootstrap = new Bootstrap(); 
  16.             bootstrap.group(worker); 
  17.             bootstrap.channel(NioSocketChannel.class); 
  18.             bootstrap.handler(new ChannelInitializer<SocketChannel>() { 
  19.                 @Override 
  20.                 protected void initChannel(SocketChannel ch) throws Exception { 
  21.                     ch.pipeline().addLast(new ObjectEncoder()); 
  22.                     ch.pipeline().addLast(new ClientHandler()); 
  23.                 } 
  24.             }); 
  25.             ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync(); 
  26.             future.channel().closeFuture().sync(); 
  27.         } finally { 
  28.             worker.shutdownGracefully(); 
  29.         } 
  30.     } 

2. ClientHandler

  1. package com.haopt.netty.codec.obj; 
  2. import io.netty.buffer.ByteBuf; 
  3. import io.netty.channel.ChannelHandlerContext; 
  4. import io.netty.channel.SimpleChannelInboundHandler; 
  5. import io.netty.util.CharsetUtil; 
  6. public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  7.     @Override 
  8.     protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
  9.         System.out.println("接收到服務(wù)端的消息:" + 
  10.         msg.toString(CharsetUtil.UTF_8)); 
  11.     } 
  12.     @Override 
  13.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  14.         User user = new User(); 
  15.         user.setId(1L); 
  16.         user.setName("張三"); 
  17.         user.setAge(20); 
  18.         ctx.writeAndFlush(user); 
  19.     } 
  20.     @Override 
  21.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
  22.         cause.printStackTrace(); 
  23.         ctx.close(); 
  24.     } 

7 2.4 JDK序列化的優(yōu)化

JDK序列化使⽤是⽐較⽅便,但是性能較差,序列化后的字節(jié)⽐較⼤,所以⼀般在項(xiàng)⽬中不 會(huì)使⽤⾃帶的序列化,⽽是會(huì)采⽤第三⽅的序列化框架Hessian編解碼。

1. 導(dǎo)入依賴

  1. <dependency> 
  2.   <groupId>com.caucho</groupId> 
  3.   <artifactId>hessian</artifactId> 
  4.   <version>4.0.63</version> 
  5. </dependency> 

2. User對(duì)象

  1. package com.haopt.netty.codec.hessian; 
  2. public class User implements java.io.Serializable
  3.     private static final long serialVersionUID = -8200798627910162221L; 
  4.     private Long id; 
  5.     private String name
  6.     private Integer age; 
  7.     public Long getId() { 
  8.      return id; 
  9.     } 
  10.     public void setId(Long id) { 
  11.      this.id = id; 
  12.     } 
  13.     public String getName() { 
  14.      return name
  15.     } 
  16.     public void setName(String name) { 
  17.      this.name = name
  18.     } 
  19.     public Integer getAge() { 
  20.      return age; 
  21.     } 
  22.     public void setAge(Integer age) { 
  23.      this.age = age; 
  24.     } 
  25.     @Override 
  26.     public String toString() { 
  27.       return "User{" + 
  28.       "id=" + id + 
  29.       ", name='" + name + '\'' + 
  30.       ", age=" + age + 
  31.       '}'
  32.     } 

3. Hessian序列化⼯具類

  1. package com.haopt.netty.codec.hessian.codec; 
  2. import com.caucho.hessian.io.HessianInput; 
  3. import com.caucho.hessian.io.HessianOutput; 
  4. import java.io.ByteArrayInputStream; 
  5. import java.io.ByteArrayOutputStream; 
  6. import java.io.IOException; 
  7. /** 
  8. * Hessian序列化⼯具類 
  9. */ 
  10. public class HessianSerializer { 
  11.     public <T> byte[] serialize(T obj) { 
  12.         ByteArrayOutputStream os = new ByteArrayOutputStream(); 
  13.         HessianOutput ho = new HessianOutput(os); 
  14.         try { 
  15.           ho.writeObject(obj); 
  16.           ho.flush(); 
  17.           return os.toByteArray(); 
  18.         } catch (IOException e) { 
  19.          throw new RuntimeException(e); 
  20.         } finally { 
  21.           try { 
  22.            ho.close(); 
  23.           } catch (IOException e) { 
  24.            throw new RuntimeException(e); 
  25.           } 
  26.           try { 
  27.            os.close(); 
  28.           } catch (IOException e) { 
  29.           throw new RuntimeException(e); 
  30.           } 
  31.         } 
  32.      } 
  33.       
  34.      public <T> Object deserialize(byte[] bytes, Class<T> clazz) { 
  35.         ByteArrayInputStream is = new ByteArrayInputStream(bytes); 
  36.         HessianInput hi = new HessianInput(is); 
  37.         try { 
  38.           return (T) hi.readObject(clazz); 
  39.         } catch (IOException e) { 
  40.           throw new RuntimeException(e); 
  41.         } finally { 
  42.             try { 
  43.                hi.close(); 
  44.             } catch (Exception e) { 
  45.             throw new RuntimeException(e); 
  46.         } 
  47.         try { 
  48.           is.close(); 
  49.         } catch (IOException e) { 
  50.           throw new RuntimeException(e); 
  51.         } 
  52.       } 
  53.     } 

4. 編碼器

  1. package com.haopt.netty.codec.hessian.codec; 
  2. import cn.itcast.netty.coder.hessian.User
  3. import io.netty.buffer.ByteBuf; 
  4. import io.netty.channel.ChannelHandlerContext; 
  5. import io.netty.handler.codec.MessageToByteEncoder; 
  6. public class HessianEncoder extends MessageToByteEncoder<User> { 
  7.     private HessianSerializer hessianSerializer = new HessianSerializer(); 
  8.     protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception { 
  9.           byte[] bytes = hessianSerializer.serialize(msg); 
  10.           out.writeBytes(bytes); 
  11.     } 

5. 解碼器

  1. public class HessianDecoder extends ByteToMessageDecoder { 
  2.     private HessianSerializer hessianSerializer = new HessianSerializer(); 
  3.  
  4.     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> 
  5.             out) throws Exception { 
  6.         //復(fù)制⼀份ByteBuf數(shù)據(jù),輕復(fù)制,⾮完全拷⻉ 
  7.         //避免出現(xiàn)異常:did not read anything but decoded a message 
  8.         //Netty檢測(cè)沒有讀取任何字節(jié)就會(huì)拋出該異常 
  9.         ByteBuf in2 = in.retainedDuplicate(); 
  10.         byte[] dst; 
  11.         if (in2.hasArray()) {//堆緩沖區(qū)模式 
  12.             dst = in2.array(); 
  13.         } else { 
  14.             dst = new byte[in2.readableBytes()]; 
  15.             in2.getBytes(in2.readerIndex(), dst); 
  16.         } 
  17.         //跳過所有的字節(jié),表示已經(jīng)讀取過了 
  18.         in.skipBytes(in.readableBytes()); 
  19.         //反序列化 
  20.         Object obj = hessianSerializer.deserialize(dst, User.class); 
  21.         out.add(obj); 
  22.     } 

6. 服務(wù)端

  1. public class NettyHessianServer { 
  2.     public static void main(String[] args) throws Exception { 
  3.         // System.setProperty("io.netty.noUnsafe""true"); 
  4.         // 主線程,不處理任何業(yè)務(wù)邏輯,只是接收客戶的連接請(qǐng)求 
  5.         EventLoopGroup boss = new NioEventLoopGroup(1); 
  6.         // ⼯作線程,線程數(shù)默認(rèn)是:cpu*2 
  7.         EventLoopGroup worker = new NioEventLoopGroup(); 
  8.         try { 
  9.             // 服務(wù)器啟動(dòng)類 
  10.             ServerBootstrap serverBootstrap = new ServerBootstrap(); 
  11.             serverBootstrap.group(boss, worker); 
  12.             //配置server通道 
  13.             serverBootstrap.channel(NioServerSocketChannel.class); 
  14.             serverBootstrap.childHandler(new ChannelInitializer<SocketChannel> 
  15.                     () { 
  16.                 @Override 
  17.                 protected void initChannel(SocketChannel ch) throws Exception { 
  18.                     ch.pipeline() 
  19.                             .addLast(new HessianDecoder()) 
  20.                             .addLast(new ServerHandler()); 
  21.                 } 
  22.             }); //worker線程的處理器 
  23.             // serverBootstrap.childOption(ChannelOption.ALLOCATOR, 
  24.             UnpooledByteBufAllocator.DEFAULT); 
  25.             ChannelFuture future = serverBootstrap.bind(6677).sync(); 
  26.             System.out.println("服務(wù)器啟動(dòng)完成。。。。。"); 
  27.             //等待服務(wù)端監(jiān)聽端⼝關(guān)閉 
  28.             future.channel().closeFuture().sync(); 
  29.         } finally { 
  30.             //優(yōu)雅關(guān)閉 
  31.             boss.shutdownGracefully(); 
  32.             worker.shutdownGracefully(); 
  33.         } 
  34.     } 

  1. public class ServerHandler extends SimpleChannelInboundHandler<User> { 
  2.     @Override 
  3.     public void channelRead0(ChannelHandlerContext ctx, User user) throws 
  4.             Exception { 
  5.         //獲取到user對(duì)象 
  6.         System.out.println(user); 
  7.         ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); 
  8.     } 

7. 客戶端(配置類)

  1. public class NettyHessianClient { 
  2.     public static void main(String[] args) throws Exception { 
  3.         EventLoopGroup worker = new NioEventLoopGroup(); 
  4.         try { 
  5.             // 服務(wù)器啟動(dòng)類 
  6.             Bootstrap bootstrap = new Bootstrap(); 
  7.             bootstrap.group(worker); 
  8.             bootstrap.channel(NioSocketChannel.class); 
  9.             bootstrap.handler(new ChannelInitializer<SocketChannel>() { 
  10.                 @Override 
  11.                 protected void initChannel(SocketChannel ch) throws Exception { 
  12.                     ch.pipeline().addLast(new HessianEncoder()); 
  13.                     ch.pipeline().addLast(new ClientHandler()); 
  14.                 } 
  15.             }); 
  16.             ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync(); 
  17.             future.channel().closeFuture().sync(); 
  18.         } finally { 
  19.             worker.shutdownGracefully(); 
  20.         } 
  21.     } 

  1. public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
  2.     @Override 
  3.     protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
  4.             Exception { 
  5.         System.out.println("接收到服務(wù)端的消息:" + 
  6.                 msg.toString(CharsetUtil.UTF_8)); 
  7.     } 
  8.  
  9.     @Override 
  10.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  11.         User user = new User(); 
  12.         user.setId(1L); 
  13.         user.setName("張三"); 
  14.         user.setAge(20); 
  15.         ctx.writeAndFlush(user); 
  16.     } 
  17.  
  18.     @Override 
  19.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
  20.             throws Exception { 
  21.         cause.printStackTrace(); 
  22.         ctx.close(); 
  23.     } 

這篇文章,介紹了什么是編碼器、解碼器,也講述了如何實(shí)戰(zhàn)中運(yùn)用編碼器和解碼器。希望能對(duì)有所幫助。在開頭提到的我們本文的編碼器解碼器和情報(bào)信息交互是否相似?在我看來,是相似的。發(fā)報(bào)人將自己看的懂得信息,按照某種規(guī)則進(jìn)行加密。收?qǐng)?bào)人接收到信息是加密后的數(shù)據(jù),需要進(jìn)行按照規(guī)則進(jìn)行解密才能看懂。我們客戶端在進(jìn)行發(fā)送數(shù)據(jù),需要將程序中的數(shù)據(jù)變?yōu)槎M(jìn)制流發(fā)送。服務(wù)端接收到數(shù)據(jù),需要將二進(jìn)制流轉(zhuǎn)化為程序可以操作數(shù)據(jù)類型。

 

責(zé)任編輯:姜華 來源: 花花和Java
相關(guān)推薦

2021-08-03 08:38:21

Netty解碼器使用

2025-04-25 09:00:00

Transforme模型代碼

2024-08-29 09:18:55

2024-02-07 12:33:00

AI訓(xùn)練

2010-05-07 16:15:46

Windows Med

2025-04-10 06:30:00

2025-04-10 11:52:55

2021-03-22 10:52:13

人工智能深度學(xué)習(xí)自編碼器

2021-03-29 11:37:50

人工智能深度學(xué)習(xí)

2021-11-02 20:44:47

數(shù)字化

2009-12-15 15:00:00

Fedora Linu

2023-06-25 10:01:29

2009-12-14 13:37:52

linuxFedora播放器

2017-03-21 07:54:43

解碼器軟件程序

2012-04-10 16:55:22

PowerSmart編碼器

2012-04-01 16:40:45

編碼器

2023-04-25 21:36:07

火山引擎

2025-03-10 10:20:00

TransformeDecoder自然語言處理

2022-09-06 11:13:16

接口PipelineHandler
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)