Dubbo 我手寫幾行代碼,就把通信模式給你解釋清楚!
本文的宗旨在于通過簡(jiǎn)單干凈實(shí)踐的方式教會(huì)讀者,為什么要使用Dubbo、怎么使用Dubbo、Dubbo通信的原理是什么。在學(xué)習(xí)本文后,你可以避開很多關(guān)于 Dubbo 使用時(shí)的坑,也能更清楚自己的編碼是在做什么。
本文涉及的工程:
- xfg-dev-tech-dubbo:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-dubbo
- xfg-dev-tech-dubbo-test:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-dubbo-test
一、為什么使用
隨著互聯(lián)網(wǎng)場(chǎng)景中所要面對(duì)的用戶規(guī)模和體量的增加,系統(tǒng)的也需要做相應(yīng)的拆分設(shè)計(jì)和實(shí)現(xiàn)。隨之而來的,以前的一套系統(tǒng),現(xiàn)在成了多個(gè)微服務(wù)。如;電商系統(tǒng),以前就在一個(gè)工程中寫就可以了,現(xiàn)在需要拆分出,用戶、支付、商品、配送、活動(dòng)、風(fēng)控等各個(gè)模塊。那么這些模塊拆分后,如何高效的通信呢?
圖片
- 關(guān)于通信,就引入了 RPC 框架,而 Dubbo 就是其中的一個(gè)實(shí)現(xiàn)方式。
- 那為啥用 Dubbo 呢?其實(shí)核心問題就一個(gè),為了提高通信效率。因?yàn)?Dubbo 的底層通信是 Socket 而不是 HTTP 所以通信的性能會(huì)更好。同時(shí) Dubbo 又有分布式的高可用設(shè)計(jì),在一組部署了交易服務(wù)的實(shí)例宕機(jī)后,會(huì)被從注冊(cè)中心摘除,之后流量會(huì)打到其他服務(wù)上。
二、要怎么使用
圖片
Dubbo 的使用分為2方,一個(gè)是接口的提供方,另外一個(gè)是接口的調(diào)用方。接口的提供方需要提供出被調(diào)用方使用接口的描述性信息。這個(gè)信息包括;接口名稱、接口入?yún)?、接口出參,只有讓調(diào)用方拿到這些信息以后,它才能依托于這樣的接口信息做一個(gè)代理操作,并在代理類中使用 Socket 完成雙方的信息交互。
所以你看上去調(diào)用 RPC 接口好像和使用 HTTP 也沒啥區(qū)別,無非就是引入了 POM 配置,之后再配置了注解就可以使用了。但其實(shí),它是把你的 Jar 當(dāng)做代理的必要參數(shù)使用了。本文也會(huì)介紹,具體是怎么代理的
三、使用的案例
對(duì)于編程的學(xué)習(xí)來說,其實(shí)最開始的那一下,不是搞明白所有原理,而是先讓自己可以看到運(yùn)行出來的效果。哎,之后就去分析原理,這樣會(huì)舒服的多。
所以小傅哥這里提供了一套簡(jiǎn)單的 Dubbo 使用案例,只要你滿足最基本的配置條件,就可以運(yùn)行出效果;
- JDK 1.8
- Maven 3.x - jdk1.8支持的就可以
- Dubbo 3.1.4 - POM 中已經(jīng)配置,與2.x最大的使用上的區(qū)別就是一些注解的使用
- Zookeeper 3.4.x - 如果你只是按照本文中的直連模式測(cè)試,那么不安裝 Zookeeper 也可以
1. 接口提供方
工程案例創(chuàng)建結(jié)構(gòu),采用的是 DDD 結(jié)構(gòu)。但和 DDD 一點(diǎn)關(guān)系沒有。如果你對(duì)工程創(chuàng)建有疑惑,可以參考 《Java 簡(jiǎn)明教程》之 DDD 架構(gòu)
圖片
1.1 接口定義
源碼:cn.bugstack.dev.tech.dubbo.api.IUserService
public interface IUserService {
Response<UserResDTO> queryUserInfo(UserReqDTO reqDTO);
}
- 接口定義平平無奇,但第1個(gè)坑暗藏玄機(jī)!
- 也就是,所有的 Dubbo 接口,出入?yún)?,默認(rèn)都需要繼承 Serializable 接口。也就是 UserReqDTO、UserResDTO、Response 這3個(gè)類,都得繼承 Serializable 序列化接口。
1.2 接口實(shí)現(xiàn)
源碼:cn.bugstack.dev.tech.dubbo.trigger.rpc.UserService
@Slf4j
@DubboService(version = "1.0.0")
public class UserService implements IUserService {
@Override
public Response<UserResDTO> queryUserInfo(UserReqDTO reqDTO) {
log.info("查詢用戶信息 userId: {} reqStr: {}", reqDTO.getUserId(), JSON.toJSONString(reqDTO));
try {
// 1. 模擬查詢【你可以從數(shù)據(jù)庫或者Redis緩存獲取數(shù)據(jù)】
UserResDTO resDTO = UserResDTO.builder()
.userId(reqDTO.getUserId())
.userName("小傅哥")
.userAge(20)
.build();
// 2. 返回結(jié)果
return Response.<UserResDTO>builder()
.code(Constants.ResponseCode.SUCCESS.getCode())
.info(Constants.ResponseCode.SUCCESS.getInfo())
.data(resDTO).build();
} catch (Exception e) {
log.error("查詢用戶信息失敗 userId: {} reqStr: {}", reqDTO.getUserId(), JSON.toJSONString(reqDTO), e);
return Response.<UserResDTO>builder()
.code(Constants.ResponseCode.UN_ERROR.getCode())
.info(Constants.ResponseCode.UN_ERROR.getInfo())
.build();
}
}
}
- 接口實(shí)現(xiàn)平平無奇,但第2個(gè)坑暗藏玄機(jī)!
- Dubbo 的實(shí)現(xiàn)接口,需要被 Dubbo 自己管理。所以 Dubbo 提供了 @DubboService 注解。有些小卡拉米,使用的是不是 Spring 的 @Service 呀?尤其是以前的 Dubbo 版本 2.7.* 它的注解也是 @Service 也不留神就用成了 Spring 的 @Service。一個(gè)小bug,又調(diào)了一上午。
1.3 工程配置
application.yml
dubbo:
application:
name: xfg-dev-tech-dubbo
version: 1.0.0
registry:
address: zookeeper://127.0.0.1:2181 # N/A - 無zookeeper可配置 N/A 走直連模式測(cè)試
protocol:
name: dubbo
port: 20881
scan:
base-packages: cn.bugstack.dev.tech.dubbo.api
- 配置信息平平無奇,但第3個(gè)坑暗藏玄機(jī)!
- base-packages 掃描的是哪里配置了 Dubbo 的 API 入口,給它入口就行,它會(huì)自己找到實(shí)現(xiàn)類。但!你要知道 Java 的 Spring 應(yīng)用能掃描到,能被 Spring 管理,那么 pom 要直接或者間接的引導(dǎo)到定義了 Dubbo 的模塊。
- 再有一個(gè)問題,Spring 應(yīng)用開發(fā),講究約定大于配置。你 Application 應(yīng)用,的包名應(yīng)該是可以覆蓋到其他包名的。比如 Application 都配置到 cn.bugstack.dev.tech.dubbo.a.b.c.d.* 去了,它默認(rèn)就掃不到 cn.bugstack.dev.tech.dubbo.api 了。一個(gè)小bug,一下午又過去了。
- 注意:address:如果配置的是 N/A 就是不走任何注冊(cè)中心,就是個(gè)直連,主要用于本地驗(yàn)證的。如果你配置了 zookeeper://127.0.0.1:2181 就需要先安裝一個(gè) zookeeper 另外,即使你配置了注冊(cè)中心的方式,也可以直連測(cè)試。
1.4 應(yīng)用構(gòu)建
以上信息都準(zhǔn)備了,一群小卡拉米開始掉到第4個(gè)坑里了!
你有2個(gè)應(yīng)用,一個(gè)Dubbo接口提供方、一個(gè)Dubbo接口使用方。那么你在給你另外一個(gè)應(yīng)用使用接口的時(shí)候,你在 InelliJ IDEA 的 Maven 中執(zhí)行 Install 了嗎?
Install 是干啥的?它是為了讓你使用了同一個(gè)本地 Maven 配置的應(yīng)用,可以引入到對(duì)方提供的 Jar 包。你 Install 以后,這個(gè) Jar 包就會(huì)進(jìn)入到本地 Maven 倉庫了。如果是公司里開發(fā),會(huì)有專門的自己家部署的,私有Maven中心倉庫,就可以通過 deploy 把本地 Jar 發(fā)布上去,那么公司里的伙伴,也就都可以引用了。
圖片
- 你要先點(diǎn)擊 root 下的 install 操作,這樣就會(huì)自動(dòng)構(gòu)建了。
- 如果你電腦配置有點(diǎn)低,也會(huì)出現(xiàn)一些氣人怪相,比如就刷不進(jìn)去,install 了也引用不了。記得要 clean 清空下,也可以直接到 maven 文件件去清空。
2. 接口使用方
有些小卡拉米覺得前面的抗都掃干凈了,就完事了。沒有接下來還有坑,讓你一搞搞一天,半夜也睡不好。
2.1 POM 引入
<dependency>
<groupId>cn.bugstack</groupId>
<artifactId>xfg-dev-tech-dubbo-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
- POM 的配置,就是把 Jar 包給引用進(jìn)來。因?yàn)?Dubbo 需要根據(jù)這個(gè)接口,做一個(gè)代理操作。不引入,你代碼就爆紅啦!爆紅啦!??
2.2 消費(fèi)配置
源碼:application.yml
dubbo:
application:
name: xfg-dev-tech-dubbo
version: 1.0.0
registry:
address: zookeeper://127.0.0.1:2181
# address: N/A
protocol:
name: dubbo
port: 20881
- 配置了 zookeeper 你就用第一個(gè),代碼中對(duì)應(yīng) @DubboReference(interfaceClass = IUserService.class, version = "1.0.0")
- 配置了 N/A 你就用第二個(gè),代碼中必須指定直連。@DubboReference(interfaceClass = IUserService.class, url = "dubbo://127.0.0.1:20881", version = "1.0.0")
2.3 代碼配置
源碼:cn.bugstack.dev.tech.dubbo.consumer.test.ApiTest
// 直連模式;@DubboReference(interfaceClass = IUserService.class, url = "dubbo://127.0.0.1:20881", version = "1.0.0")
@DubboReference(interfaceClass = IUserService.class, version = "1.0.0")
private IUserService userService;
@Test
public void test_userService() {
UserReqDTO reqDTO = UserReqDTO.builder().userId("10001").build();
Response<UserResDTO> resDTO = userService.queryUserInfo(reqDTO);
log.info("測(cè)試結(jié)果 req: {} res: {}", JSON.toJSONString(reqDTO), JSON.toJSONString(resDTO));
}
測(cè)試結(jié)果
2023-07-08 15:37:22.291 INFO 62481 --- [ main] c.b.d.tech.dubbo.consumer.test.ApiTest : 測(cè)試結(jié)果 req: {"userId":"10001"} res: {"code":"0000","data":{"userAge":20,"userId":"10001","userName":"小傅哥"},"info":"成功"}
2023-07-08 15:37:22.324 INFO 62481 --- [tor-Framework-0] o.a.c.f.imps.CuratorFrameworkImpl : backgroundOperationsLoop exiting
- 如果不出啥意外,到這你就可以直接啟動(dòng)運(yùn)行了。并看到測(cè)試結(jié)果。
- 但別忘記了,你啟動(dòng)的時(shí)候,需要先啟動(dòng) xfg-dev-tech-dubbo 讓接口提供方跑起來。
四、原理的分析
都說 Jar 是提供可描述性信息的,對(duì)方才能代理調(diào)用。那么這個(gè)過程是怎么干的呢,總不能一問這個(gè),就讓小卡拉米們?nèi)ナ謱?Dubbo 呀!所以小傅哥會(huì)通過最簡(jiǎn)單模型結(jié)構(gòu),讓你了解這個(gè) Dubbo 通信的原理,方便小卡拉米們上手。
圖片
- 如果所示,接口使用方,對(duì)接口進(jìn)行代理。什么是代理呢,代理就是用一個(gè)包裝的結(jié)構(gòu),代替原有的操作。在這個(gè)包裝的結(jié)構(gòu)里,你可以自己擴(kuò)展出任意的方法。
- 那么,這里的代理。就是根據(jù)接口的信息,創(chuàng)建出一個(gè)代理對(duì)象,在代理對(duì)象中,提供 Socket 請(qǐng)求。當(dāng)調(diào)用這個(gè)接口的時(shí)候,就可以對(duì)接口提供方的,發(fā)起 Socket 請(qǐng)求了。
- 而 Socket 接收方,也就是接口提供方。他收到信息以后,根據(jù)接口的描述性內(nèi)容,進(jìn)行一個(gè)反射調(diào)用。這下就把信息給請(qǐng)求出來,之后再通過 Socket 返回回去就可以了。
好,核心的原理就這么點(diǎn)。接下來,我們從代碼中看看。
1. 接口代理 - 提供方
源碼:cn.bugstack.dev.tech.dubbo.trigger.socket.RpcServerSocket
@Slf4j
@Service
public class RpcServerSocket implements Runnable {
private ApplicationContext applicationContext;
public RpcServerSocket(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
new Thread(this).start();
}
@Override
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) {
channel.pipeline().addLast(new ObjectEncoder());
channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
channel.pipeline().addLast(new SimpleChannelInboundHandler<Map<String, Object>>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Map<String, Object> request) throws Exception {
// 解析參數(shù)
Class<?> clazz = (Class<?>) request.get("clazz");
String methodName = (String) request.get("methodName");
Class<?>[] paramTypes = (Class<?>[]) request.get("paramTypes");
Object[] args = (Object[]) request.get("args");
// 反射調(diào)用
Method method = clazz.getMethod(methodName, paramTypes);
Object invoke = method.invoke(applicationContext.getBean(clazz), args);
// 封裝結(jié)果
Map<String, Object> response = new HashMap<>();
response.put("data", invoke);
log.info("RPC 請(qǐng)求調(diào)用 clazz:{} methodName:{}, response:{}", clazz.getName(), methodName, JSON.toJSON(response));
// 回寫數(shù)據(jù)
channelHandlerContext.channel().writeAndFlush(response);
}
});
}
});
ChannelFuture f = b.bind(22881).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
這段代碼主要提供的功能包括;
- Netty Socket 啟動(dòng)一個(gè)服務(wù)端
- 注入 ApplicationContext applicationContext 用于在接收到請(qǐng)求接口信息后,獲取對(duì)應(yīng)的 Bean 對(duì)象。
- 根據(jù)請(qǐng)求來的 Bean 對(duì)象,以及參數(shù)的必要信息。進(jìn)行接口的反射調(diào)用。
- 最后一步,就是把接口反射請(qǐng)求的信息,再通過 Socket 返回回去。
2. 接口反射 - 調(diào)用方
打開工程:xfg-dev-tech-dubbo-test
源碼:cn.bugstack.dev.tech.dubbo.consumer.config.RPCProxyBeanFactory
@Slf4j
@Component("rpcProxyBeanFactory")
public class RPCProxyBeanFactory implements FactoryBean<IUserService>, Runnable {
private Channel channel;
// 緩存數(shù)據(jù),實(shí)際RPC會(huì)對(duì)每次的調(diào)用生成一個(gè)ID來標(biāo)記獲取
private Object responseCache;
public RPCProxyBeanFactory() throws InterruptedException {
new Thread(this).start();
while (null == channel) {
Thread.sleep(150);
log.info("Rpc Socket 鏈接等待...");
}
}
@Override
public IUserService getObject() throws Exception {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?>[] classes = {IUserService.class};
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
}
Map<String, Object> request = new HashMap<>();
request.put("clazz", IUserService.class);
request.put("methodName", method.getName());
request.put("paramTypes", method.getParameterTypes());
request.put("args", args);
channel.writeAndFlush(request);
// 模擬超時(shí)等待,一般RPC接口請(qǐng)求,都有一個(gè)超時(shí)等待時(shí)長(zhǎng)。
Thread.sleep(350);
return responseCache;
}
};
return (IUserService) Proxy.newProxyInstance(classLoader, classes, handler);
}
@Override
public Class<?> getObjectType() {
return IUserService.class;
}
@Override
public void run() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ObjectEncoder());
channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
channel.pipeline().addLast(new SimpleChannelInboundHandler<Map<String, Object>>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Map<String, Object> data) throws Exception {
responseCache = data.get("data");
}
});
}
});
ChannelFuture channelFuture = b.connect("127.0.0.1", 22881).syncUninterruptibly();
this.channel = channelFuture.channel();
channelFuture.channel().closeFuture().syncUninterruptibly();
} finally {
workerGroup.shutdownGracefully();
}
}
}
這段代碼主要提供的功能包括;
- 實(shí)現(xiàn) FactoryBean<IUserService> 為的是把這樣一個(gè)代理對(duì)象,交給 Spring 容器管理。
- 實(shí)現(xiàn) Runnable 接口,并在接口中,創(chuàng)建 Netty 的 Socket 客戶端。客戶端中接收來自服務(wù)端的消息,并臨時(shí)存放到緩存中。注意 Dubbo 中這塊的處理會(huì)復(fù)雜一些,以及請(qǐng)求同步響應(yīng)通信,這樣才能把各個(gè)接口的調(diào)動(dòng)記錄下來
- getObject() 對(duì)象中,提供代理操作。代理里,就可以自己想咋搞咋搞了。而 Dubbo 也是在代理里,提供了如此的操作,對(duì)接口提供方發(fā)送請(qǐng)求消息,并在超時(shí)時(shí)間內(nèi)返回接口信息。因?yàn)榉瓷湔{(diào)用,需要你提供類、方法、入?yún)㈩愋?、入?yún)?nèi)容,所以我們要把這些信息傳遞給接口提供方。
3. 服務(wù)測(cè)試 - 消費(fèi)驗(yàn)證
- 啟動(dòng) xfg-dev-tech-dubbo
- 測(cè)試 xfg-dev-tech-dubbo-test
@Resource(name = "rpcProxyBeanFactory")
private IUserService proxyUserService;
@Test
public void test_proxyUserService(){
UserReqDTO reqDTO = UserReqDTO.builder().userId("10001").build();
Response<UserResDTO> resDTO = proxyUserService.queryUserInfo(reqDTO);
log.info("測(cè)試結(jié)果 req: {} res: {}", JSON.toJSONString(reqDTO), JSON.toJSONString(resDTO));
}
測(cè)試結(jié)果
2023-07-08 16:14:51.322 INFO 74498 --- [ main] c.b.d.tech.dubbo.consumer.test.ApiTest : 測(cè)試結(jié)果 req: {"userId":"10001"} res: {"code":"0000","data":{"userAge":20,"userId":"10001","userName":"小傅哥"},"info":"成功"}
- 這里我們給 IUserService 注入一個(gè)自己代理好的對(duì)象,之后就可以調(diào)用驗(yàn)證了。
- 好啦,到這我們就把關(guān)于 Dubbo 的事交代明白了,以上內(nèi)容較多。小卡拉米需要細(xì)細(xì)的品味吸收!