來聊聊 Netty 幾個(gè)開箱即用的處理器框架
本篇文章我們將基于Netty幾個(gè)開箱即用的封裝快速落地一個(gè)易于拓展與維護(hù)的客戶端服務(wù)端通信示例,希望對你有所幫助。
基于Netty快速落地自定義協(xié)議消息通信
1.提出需求
我們需要使用Netty快速落地一套客戶端和服務(wù)端系統(tǒng)通信程序,客戶端會在與服務(wù)端建立連接后發(fā)送自定義協(xié)議的登錄包,然后服務(wù)端完成校驗(yàn)之后返回自定義協(xié)議的登錄處理結(jié)果:
2.服務(wù)端設(shè)計(jì)與實(shí)現(xiàn)
按照我們以往的處理器邏輯,對于服務(wù)端我們可能會編寫一個(gè)處理器handler,其內(nèi)部負(fù)責(zé):
- 對收到的數(shù)據(jù)包解碼。
- 根據(jù)數(shù)據(jù)包類型走不同的if-else邏輯。
- 回復(fù)相應(yīng)的加密后的數(shù)據(jù)包。
這種做法將編碼、解碼、數(shù)據(jù)邏輯全部耦合在一個(gè)處理器上,違背了單一職責(zé)的設(shè)計(jì),導(dǎo)致代碼臃腫,后續(xù)的功能的拓展和維護(hù)都十分不便。
對此本文做法是針對不同數(shù)據(jù)包指定相應(yīng)處理器,通過pipeline自帶的責(zé)任鏈模式將這些處理器串聯(lián)起來,并將編碼和解碼的handler單獨(dú)抽離出來維護(hù):
因?yàn)榭蛻舳藭蚍?wù)端發(fā)送登錄包,對應(yīng)文件編碼規(guī)則為:
- 第一個(gè)整形位,設(shè)置為登錄包類型為1。
- 第二個(gè)整型為設(shè)置為登錄包數(shù)據(jù)長度。
- 第三個(gè)字節(jié)數(shù)組設(shè)置為序列化后的數(shù)據(jù)包。
所以我們解碼的邏輯為:
- 獲取4個(gè)字節(jié)知曉類型。
- 獲取4個(gè)字節(jié)解析長度。
- 讀取對應(yīng)長度的字節(jié)數(shù)組將其反序列化為對應(yīng)類型的數(shù)據(jù)包。
而Netty也為我們解碼的邏輯提供了一個(gè)類MessageToMessageDecoder,我們只需繼承并重寫其decode方法,將bytebuf解碼后的結(jié)果傳入out列表中即可傳播到對應(yīng)的處理器上:
對此我們給出解碼器的處理器Handler的邏輯,可以看到該解碼器會按照編碼的要求進(jìn)行解析:
public class ServerDecodeHandler extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//獲取消息類型
int type = msg.readInt();
if (type == 1) {
//獲取實(shí)際消息長度
int length = msg.readInt();
//讀取數(shù)據(jù)并反序列化
byte[] data = new byte[length];
msg.readBytes(data);
LoginPacket loginPacket = JSON.parseObject(data, LoginPacket.class);
out.add(loginPacket);
}
}
}
消息被解碼器解碼之后,就可以傳播到對應(yīng)業(yè)務(wù)處理器上,為了保證讀取到不同的消息被不同業(yè)務(wù)處理器處理,Netty提供了一個(gè)開箱即用的讀消息處理器,它會根據(jù)我們的指定的泛型為數(shù)據(jù)包進(jìn)行匹配,只有與泛型類一致才會進(jìn)行處理:
所以我們的認(rèn)證處理器AuthHandler 繼承SimpleChannelInboundHandler并指明泛型LoginPacket專門處理讀取到的LoginPacket:
public class AuthHandler extends SimpleChannelInboundHandler<LoginPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginPacket msg) throws Exception {
//如果用戶名和密碼一致則通過loginResp發(fā)送一個(gè)hello包,反之回復(fù)發(fā)送失敗
if (msg.getUserName().equals("user") && msg.getPassword().equals("123456")) {
LoginRespPacket loginRespPacket = new LoginRespPacket();
loginRespPacket.setType(2);
loginRespPacket.setMessage("hello netty client");
ctx.writeAndFlush(loginRespPacket);
} else {
LoginRespPacket loginRespPacket = new LoginRespPacket();
loginRespPacket.setType(2);
loginRespPacket.setMessage("login failed");
ctx.writeAndFlush(loginRespPacket);
}
}
}
該處理器匹配消息包的邏輯我們可以通過源碼進(jìn)行簡單介紹,當(dāng)解碼后的數(shù)據(jù)包通過pipeline傳播來到AuthHandler 時(shí),它會調(diào)用繼承自SimpleChannelInboundHandler的channelRead方法并通過acceptInboundMessage查看這個(gè)消息類型和泛型是否一致,如果一致則會調(diào)用channelRead0最終回調(diào)到我們的channelRead0方法,而且相較于channelHandler,我們的SimpleChannelInboundHandler還會在finally語句自動按需檢查并釋放bytebuf內(nèi)存:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
//判斷當(dāng)前消息類類型和指明的泛型是否匹配
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
//如果匹配則直接調(diào)用我們重寫的channelRead0
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
//調(diào)用結(jié)束后還會檢查按需釋放bytebuf內(nèi)存
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
認(rèn)證處理器確定登錄包正確,則發(fā)送loginResp響應(yīng),對應(yīng)的數(shù)據(jù)包也需要按照類型、長度、序列化包字符串的格式進(jìn)行編碼,所以我們還需要編寫一個(gè)編碼器,同理我們還是使用Netty開箱即用的MessageToByteEncoder將編碼后數(shù)據(jù)寫到out這個(gè)bytebuf中:
public class ServerEncodeHandler extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception { //如果是Resp類型,則依次寫入類型、長度、序列化包到ByteBuf中
if (msg.getType() == 2) {
LoginRespPacket loginRespPacket = (LoginRespPacket) msg;
out.writeInt(loginRespPacket.getType());
byte[] bytes = JSON.toJSONBytes(loginRespPacket);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
}
3.客戶端設(shè)計(jì)與實(shí)現(xiàn)
而客戶端也和上文類型,我們先編寫一個(gè)連接激活后發(fā)送登錄包的處理器:
public class LoginHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LoginPacket loginPacket = new LoginPacket();
loginPacket.setUserName("user");
loginPacket.setPassword("123456");
ctx.writeAndFlush(loginPacket).
}
}
然后就是編碼器,同樣是繼承MessageToByteEncoder實(shí)現(xiàn):
public class ClientEncodeHandler extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
//按照類型、長度、序列化包進(jìn)行編碼
if (msg.getType() == 1) {
LoginPacket loginPacket = (LoginPacket) msg;
byte[] jsonBytes = JSON.toJSONBytes(loginPacket);
out.writeInt(msg.getType());
out.writeInt(jsonBytes.length);
out.writeBytes(jsonBytes);
}
}
}
收到包后,根據(jù)第一個(gè)整型字節(jié)匹配到LoginRespPacket,將其解碼為LoginRespPacket:
public class ClientDecodeHandler extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
int type = msg.readInt();
//如果type為2則說明是loginResp,按照類型、長度、反序列化包處理器
if (type == 2) {
int length = msg.readInt();
byte[] data = new byte[length];
msg.readBytes(data);
LoginRespPacket loginRespPacket = JSON.parseObject(data, LoginRespPacket.class);
out.add(loginRespPacket);
}
}
}
最終傳播到LoginRespHandler打印輸出:
public class LoginRespHandler extends SimpleChannelInboundHandler<LoginRespPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRespPacket msg) throws Exception {
System.out.println(JSONUtil.toJsonStr(msg));
}
}
4.最終成果驗(yàn)收
按照上述解耦的處理器完成開發(fā)之后,我們分別啟動服務(wù)端和客戶端,最終客戶端就會得到如下輸出:
{"message":"hello netty client","type":2}
由此基于Netty開箱即用的客戶端服務(wù)端通信模型完成。