Netty是如何解析Redis的RESP協(xié)議—響應(yīng)篇
這篇是響應(yīng)篇,一起來(lái)看看 RedisDecoderTest 中,是怎么模擬 client-cli 接受處理 server 響應(yīng)的??
RedisDecoderTest
public class RedisDecoderTest {
public static void main(String[] args) {
EmbeddedChannel channel = newChannel(false);
System.out.println(channel.writeInbound(byteBufOf("$6\r\nfoobar\r")));
System.out.println(channel.writeInbound(byteBufOf("\n")));
RedisMessage msg = channel.readInbound();
System.out.println(msg instanceof FullBulkStringRedisMessage);
String bytes = stringOf(((FullBulkStringRedisMessage) msg).content());
System.out.println(bytes);
ReferenceCountUtil.release(msg);
channel.finish();
}
private static EmbeddedChannel newChannel(boolean decodeInlineCommands) {
return new EmbeddedChannel(
new RedisDecoder(decodeInlineCommands),
new RedisBulkStringAggregator(),
new RedisArrayAggregator());
}
}
圖解
這里的重點(diǎn)就是這 3 個(gè) ChannelInboundHandler 了。
圖片
具備 decode 能力 ??
圖片
下面進(jìn)入源碼解讀:
何時(shí)調(diào)用到 decode 方法
當(dāng)進(jìn)行 channelRead 時(shí)進(jìn)行 decode,比如 MessageToMessageDecoder ??
圖片
RedisDecoder
里面定義了 5 種 State
圖片
比如上面例子中,傳輸?shù)? $6\r\nfoobar\r\n ,就屬于 RESP 協(xié)議中的 Bulk strings 大字符串,需要解析出 length 和 content,格式如下 :
$<length>\r\n<data>\r\n
比如
$5\r\nhello\r\n
$0\r\n\r\n
關(guān)鍵步驟
圖片
decode 時(shí),由于默認(rèn)的 state 都是 DECODE_TYPE ,所以會(huì)先調(diào)用 decodeType 方法。
圖片
decodeType
看看是不是 inline 的,默認(rèn)是 false,我們也是設(shè)置了 false。
圖片
decodeLength
圖片
這里可以看到官網(wǎng) Fast to parse 的影子。
圖片
圖片
decodeBulkString
創(chuàng)建 BulkStringHeaderRedisMessage,再把 state 切換到 DECODE_BULK_STRING_CONTENT ,最后調(diào)用 decodeBulkStringContent 。
圖片
decodeBulkStringContent
創(chuàng)建 DefaultBulkStringRedisContent,并添加到 out 這個(gè) list 中(2個(gè))
圖片
接著,就來(lái)到第二個(gè) handler 了 ,RedisBulkStringAggregator
RedisBulkStringAggregator
起到一個(gè)聚合的作用,將消息包裝成 FullBulkStringRedisMessage。
圖片
這個(gè) decode 方法超過(guò) 100 行了,就粗略講一下。
在上面的方法中,我們往 out 中添加了 BulkStringHeaderRedisMessage 和 DefaultBulkStringRedisContent 這兩個(gè)。
圖片
消息頭處理
先處理 BulkStringHeaderRedisMessage ,
圖片
包裝成 FullBulkStringRedisMessage 。
圖片
消息體處理
圖片
appendPartialContent,把這個(gè) ByteBuf 整合到 CompositeByteBuf 中。
圖片
aggregate,擴(kuò)展方法,目前是空實(shí)現(xiàn)。
最后,判斷是不是消息尾
圖片
到了這里,handler 就處理完了,因?yàn)檫@個(gè)消息不是數(shù)組類(lèi)型的,用不到 RedisArrayAggregator 。
第二次 writeInbound
上面代碼中共調(diào)用了兩次 writeInbound
System.out.println(channel.writeInbound(byteBufOf("$6\r\nfoobar\r")));
System.out.println(channel.writeInbound(byteBufOf("\n")));
第二次時(shí),會(huì)把之前的 bytebuf 拿出來(lái)計(jì)算下。
圖片
可以看到,oldBytes 是 \r ,newBytes 則是 \n ,重新組合成新的 ByteBuf。
圖片
這樣才能去創(chuàng)建這個(gè) DefaultLastBulkStringRedisContent
圖片
進(jìn)而完成 RedisBulkStringAggregator 中的 last 條件分支。
圖片
最后消息被包裝成 FullBulkStringRedisMessage。
尾節(jié)點(diǎn) TailContext
經(jīng)過(guò)上面的層層處理,foobar 這個(gè) FullBulkStringRedisMessage 消息是怎么存到 EmbeddedChannel 中呢?
可以看到這里繼承了 DefaultChannelPipeline,并重寫(xiě)了 onUnhandledInboundMessage 方法。
圖片
DefaultChannelPipeline 中有尾節(jié)點(diǎn) TailContext,它會(huì)去調(diào)用這個(gè) onUnhandledInboundMessage 。
圖片
進(jìn)而將消息存到隊(duì)列中。
圖片
最后,readInbound 就是從里面 poll 出來(lái)這個(gè)消息,再進(jìn)行打印等操作即可。
圖片
官方例子
我從 Netty 的 example 里 CV 了一份,大家可以快速上手。
使用時(shí),主要還是注意這個(gè) inbound ,outbound 的順序問(wèn)題(如圖)。
圖片
/**
* Simple Redis client that demonstrates Redis commands against a Redis server.
*/
public class RedisClient {
private static final String HOST = System.getProperty("host", "192.168.200.128");
private static final int PORT = Integer.parseInt(System.getProperty("port", "6379"));
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new RedisDecoder());
p.addLast(new RedisBulkStringAggregator());
p.addLast(new RedisArrayAggregator());
p.addLast(new RedisEncoder());
p.addLast(new RedisClientHandler());
}
});
// Start the connection attempt.
Channel ch = b.connect(HOST, PORT).sync().channel();
// Read commands from the stdin.
System.out.println("Enter Redis commands (quit to end)");
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
final String input = in.readLine();
final String line = input != null ? input.trim() : null;
if (line == null || "quit".equalsIgnoreCase(line)) { // EOF or "quit"
ch.close().sync();
break;
} else if (line.isEmpty()) { // skip `enter` or `enter` with spaces.
continue;
}
// Sends the received line to the server.
lastWriteFuture = ch.writeAndFlush(line);
lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
System.err.print("write failed: ");
future.cause().printStackTrace(System.err);
}
}
});
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} finally {
group.shutdownGracefully();
}
}
}
/**
* An example Redis client handler. This handler read input from STDIN and write output to STDOUT.
*/
public class RedisClientHandler extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
String[] commands = ((String) msg).split("\\s+");
List<RedisMessage> children = new ArrayList<RedisMessage>(commands.length);
for (String cmdString : commands) {
children.add(new FullBulkStringRedisMessage(ByteBufUtil.writeUtf8(ctx.alloc(), cmdString)));
}
RedisMessage request = new ArrayRedisMessage(children);
ctx.write(request, promise);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
RedisMessage redisMessage = (RedisMessage) msg;
printAggregatedRedisResponse(redisMessage);
ReferenceCountUtil.release(redisMessage);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.print("exceptionCaught: ");
cause.printStackTrace(System.err);
ctx.close();
}
private static void printAggregatedRedisResponse(RedisMessage msg) {
if (msg instanceof SimpleStringRedisMessage) {
System.out.println(((SimpleStringRedisMessage) msg).content());
} else if (msg instanceof ErrorRedisMessage) {
System.out.println(((ErrorRedisMessage) msg).content());
} else if (msg instanceof IntegerRedisMessage) {
System.out.println(((IntegerRedisMessage) msg).value());
} else if (msg instanceof FullBulkStringRedisMessage) {
System.out.println(getString((FullBulkStringRedisMessage) msg));
} else if (msg instanceof ArrayRedisMessage) {
for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {
printAggregatedRedisResponse(child);
}
} else {
throw new CodecException("unknown message type: " + msg);
}
}
private static String getString(FullBulkStringRedisMessage msg) {
if (msg.isNull()) {
return "(null)";
}
return msg.content().toString(CharsetUtil.UTF_8);
}
}
結(jié)尾
這篇比請(qǐng)求篇稍微復(fù)雜些,還有 TailContext 這個(gè)隱藏的細(xì)節(jié)。