Dubbo框架的一個核心設(shè)計點
Java領(lǐng)域要說讓我最服氣的RPC框架當屬Dubbo,原因有許多,但是最吸引我的還是它把遠程調(diào)用這個事情設(shè)計得很有藝術(shù)。
一、Dubbo優(yōu)點較多,我只鐘情其一
1.優(yōu)點
業(yè)內(nèi)對于微服務(wù)之間調(diào)用的框架選擇較多,主流是Spring Cloud的Rest方式 和 Dubbo方式,我使用Dubbo方式居多。Dubbo工業(yè)級可用,穩(wěn)定又高效,深受各大公司研發(fā)同學的喜愛。
Dubbo的優(yōu)點較多,比如:
- 高性能:Dubbo 使用的是基于 Netty 的自定義通信協(xié)議,提供了高效的二進制數(shù)據(jù)傳輸,使得遠程服務(wù)調(diào)用性能大幅提升。
- 模塊化設(shè)計:Dubbo 的架構(gòu)非常模塊化,主要由五大核心模塊組成:遠程調(diào)用模塊(RPC)、集群模塊、負載均衡模塊、容錯模塊和注冊中心模塊。
- 每個部件都支持多協(xié)議:每個部件都支持多種協(xié)議,比如注冊中心,支持ZK、Redis、Nacos等等。
- 負載均衡和容錯:Dubbo 提供了多種容錯機制,比如失敗重試、失敗轉(zhuǎn)移等。還支持多種負載均衡,比如隨機、輪詢、一致性哈希等。
- 服務(wù)注冊和發(fā)現(xiàn):Dubbo引入了注冊中心的概念,實現(xiàn)了服務(wù)的自動注冊和發(fā)現(xiàn)。
- SPI 擴展機制:在背八股文場景下,Dubbo被提及最多的就是使用了類似Java的SPI機制,提高了擴展性,這一點仁者見仁智者見智吧。
2.鐘情其一
但是,Dubbo最吸引人的,半支煙覺得反而倒是它的RPC調(diào)用。Dubbo的定位是一個RPC框架,這是它的核心和立足之地,所以Dubbo將RPC的調(diào)用過程透明化,使得開發(fā)者可以專注于業(yè)務(wù)邏輯,而不用關(guān)注底層通信問題。
一個RPC框架只有聚焦于先做好它的RPC調(diào)用過程這個模塊,才會有人關(guān)注,其余的優(yōu)點都是在這之后,慢慢迭代而來。
作者將RPC調(diào)用的這個過程,抽象成一種協(xié)議消息的傳輸機制,再通過控制好線程的等待和喚醒,來實現(xiàn)遠程方法調(diào)用。這一設(shè)計思路真是美妙,充分體驗了作者的智慧。
二、RPC簡易示例
學Dubbo,首先就是要學習作者這種設(shè)計理念和思路。基于此,來實現(xiàn)一個簡易的遠程方法調(diào)用,將Dubbo的RPC過程簡易化。
1.示例步驟
簡易的RPC過程步驟如下,大致分5步,依舊使用Netty作用Socket通訊工具。
- 使用2個Java進程來模擬2個系統(tǒng)之間的調(diào)用,A進程 和 B進程。
- A進程的某個方法,使用網(wǎng)絡(luò)請求調(diào)用B進程的某個方法。
- 然后A進程的方法就處于等待狀態(tài)。
- 等B進程的方法執(zhí)行完之后,在利用網(wǎng)絡(luò)通知到A進程。
- 然后A進程的方法被喚醒,繼續(xù)往下執(zhí)行。
2.示例代碼
B進程作為服務(wù)端,啟動網(wǎng)絡(luò)服務(wù):
public class BProcessServer {
private final int port;
public BProcessServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new BProcessServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("B啟動了服務(wù),端口號: " + port);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new BProcessServer(8088).start();
}
}
B進程接受網(wǎng)絡(luò)請求參數(shù),反序列化之后,執(zhí)行對應(yīng)的方法,再將執(zhí)行結(jié)果返回:
public class BProcessServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
String reqData = msg.toString(CharsetUtil.UTF_8);
System.out.println("B進程接受到了請求數(shù)據(jù): " + reqData);
executeMethod(ctx);
}
/**
* 執(zhí)行方法
*
* @param ctx
* @throws InterruptedException
*/
private void executeMethod(ChannelHandlerContext ctx) throws InterruptedException {
// TODO 將請求消息按照某種規(guī)則解析成方法名、方法參數(shù)等,其實就是反序列化的過程。
System.out.println("對接受的數(shù)據(jù)做反序列化,然后開始執(zhí)行 消息體里指定的方法...");
// 模擬方法執(zhí)行
Thread.sleep(2000);
System.out.println("執(zhí)行完畢,返回結(jié)果...");
// 將結(jié)果 通知給 A 進程
ByteBuf dataByteBuf = ctx.alloc().buffer().writeBytes("Task completed".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(dataByteBuf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
A進程啟動Netty客戶端,建立與B進程的通信,然后發(fā)起遠程調(diào)用,處于等待狀態(tài)。
public class AProcessClient {
private final String host;
private final int port;
private final Object lock = new Object(); // 監(jiān)視器對象
public AProcessClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new AProcessClientHandler(lock));
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
System.out.println("A進程與B進程建立了通信連接");
Channel channel = future.channel();
// 發(fā)起遠程調(diào)用
callRemoteMethod(channel);
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
/**
* 執(zhí)行方法
*
* @param channel
* @throws InterruptedException
*/
private void callRemoteMethod(Channel channel) throws InterruptedException {
//TODO 此處需要將調(diào)用的方法和參數(shù),按照協(xié)議進行序列化。這次暫且省去此過程。
System.out.println("A進程將 請求的方法和參數(shù) 進行序列化,然后向B進程發(fā)起網(wǎng)絡(luò)調(diào)用...");
ByteBuf dataByteBuf = channel.alloc().buffer().writeBytes("Start call method".getBytes(CharsetUtil.UTF_8));
channel.writeAndFlush(dataByteBuf);
// 使用wait等待B進程通知
synchronized (lock) {
System.out.println("A進程等待B進程的響應(yīng)...");
lock.wait(); // 等待通知
}
System.out.println("A進程收到了B進程的響應(yīng)通知,繼續(xù)往下...");
}
public static void main(String[] args) throws InterruptedException {
new AProcessClient("localhost", 8088).start();
}
}
A進程接受B進程的響應(yīng),同時被喚醒,然后以上lock.wait()以后的代碼得以繼續(xù)執(zhí)行。
public class AProcessClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final Object lock;
public AProcessClientHandler(Object lock) {
this.lock = lock;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
String resData = msg.toString(CharsetUtil.UTF_8);
System.out.println("A進程接受到了響應(yīng)數(shù)據(jù): " + resData);
// B 進程任務(wù)完成,使用 notify 喚醒等待的線程
synchronized (lock) {
lock.notify(); // 喚醒 A 進程
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
三、總結(jié)
Dubbo的優(yōu)秀設(shè)計思路有許多,我只鐘情其一,那就是RPC的調(diào)用過程。以上是一個簡易的RPC遠程調(diào)用的示例,用于理解Dubbo的原理和源碼,希望對你有幫助!