Hadoop RPC遠(yuǎn)程過(guò)程調(diào)用源碼解析及實(shí)例
什么是RPC?
1、RPC(Remote Procedure Call)遠(yuǎn)程過(guò)程調(diào)用,它允許一臺(tái)計(jì)算機(jī)程序遠(yuǎn)程調(diào)用另外一臺(tái)計(jì)算機(jī)的子程序,而不用去關(guān)心底層的網(wǎng)絡(luò)通信細(xì)節(jié),對(duì)我們來(lái)說(shuō)是透明的。經(jīng)常用于分布式網(wǎng)絡(luò)通信中。
2、Hadoop的進(jìn)程間交互都是通過(guò)RPC來(lái)進(jìn)行的,比如Namenode與Datanode之間,Jobtracker與Tasktracker之間等。
RPC協(xié)議假定某些傳輸協(xié)議的存在,如TCP或UDP,為通信程序之間攜帶信息數(shù)據(jù)。在OSI網(wǎng)絡(luò)通信模型中, RPC跨越了傳輸層和應(yīng)用層。 RPC使得開(kāi)發(fā)包括網(wǎng)絡(luò)分布式多程序在內(nèi)的應(yīng)用程序更加容易。
RPC采用客戶機(jī)/服務(wù)器模式。請(qǐng)求程序就是一個(gè)客戶機(jī),而服務(wù)提供程序就是一個(gè)服務(wù)器。
首先,客戶機(jī)調(diào)用進(jìn)程發(fā)送一個(gè)有進(jìn)程參數(shù)的調(diào)用信息到服務(wù)進(jìn)程,然后等待應(yīng)答信息,在服務(wù)器端,進(jìn)程保持睡眠狀態(tài)直到調(diào)用信息的到達(dá)為止。當(dāng)一個(gè)調(diào)用信息到達(dá),服務(wù)器獲得進(jìn)程參數(shù),計(jì)算結(jié)果,發(fā)送答復(fù)信息給client,然后等待下一個(gè)調(diào)用信息,最后,客戶端調(diào)用進(jìn)程接收答復(fù)信息,獲得進(jìn)程結(jié)果,然后調(diào)用執(zhí)行繼續(xù)進(jìn)行。
RPC特點(diǎn)
1、透明性:遠(yuǎn)程調(diào)用其他機(jī)器上的程序,對(duì)用戶來(lái)說(shuō)就像是調(diào)用本地方法一樣。
2、高性能:RPC server能夠并發(fā)處理多個(gè)來(lái)自Client的請(qǐng)求(請(qǐng)求隊(duì)列)。3、可控性:jdk中已經(jīng)提供了一個(gè)RPC框架–RMI,但是該RPC框架過(guò)于重量級(jí)并且可控之處比較少,所以Hadoop RPC實(shí)現(xiàn)了自定義的RPC框架。
Hadoop RPC通信
1、序列化層:Client與Server端 通信傳遞的信息采用了Hadoop里提供的序列化類或自定義Writable類型。
2、函數(shù)調(diào)用層:Hadoop RPC通過(guò)動(dòng)態(tài)代理以及Java反射機(jī)制實(shí)現(xiàn)函數(shù)調(diào)用。
3、網(wǎng)絡(luò)傳輸層:Hadoop RPC采用了基于TCP/IP的socket機(jī)制。
4、服務(wù)器端框架層:RPC Server利用Java NIO以及采用了事件驅(qū)動(dòng)的I/O模型,提高RPC Server的并發(fā)處理能力。
Hadoop的整個(gè)體系結(jié)構(gòu)就是構(gòu)建在RPC之上(org.apache.hadoop.ipc)。
Hadoop RPC設(shè)計(jì)技術(shù)
1、動(dòng)態(tài)代理
2、反射3、序列化4、非阻塞的異步IO(NIO)
動(dòng)態(tài)代理
1、動(dòng)態(tài)代理可以提供對(duì)另一個(gè)對(duì)象的訪問(wèn),同時(shí)隱藏實(shí)際對(duì)象的具體事實(shí),代理對(duì)象對(duì)客戶隱藏了實(shí)際對(duì)象。
2、動(dòng)態(tài)代理可以對(duì)請(qǐng)求進(jìn)行其他的一些處理,在不允許直接訪問(wèn)某些類,或需要對(duì)訪問(wèn)做一些特殊處理等,這時(shí)候可以考慮使用代理。3)目前Java開(kāi)發(fā)包中提供了對(duì)動(dòng)態(tài)代理的支持,但現(xiàn)在只支持對(duì)接口的實(shí)現(xiàn)。相關(guān)的類與接口:java.lang.reflect.Proxy--類 java.lang.reflect.InvocationHandler--接口
動(dòng)態(tài)代理創(chuàng)建對(duì)象過(guò)程:
InvocationHandler handler = new InvocationHandlerImpl(...) Proxy.newInstance(...)
具體實(shí)現(xiàn)可參考如下:
根據(jù)上圖查看hadoop2.6.0源碼
Client
Server
RPC
幾個(gè)重要的協(xié)議
ClientProtocol是客戶端(FileSystem)與NameNode通信的接口。
DatanodeProtocol是DataNode與NameNode通信的接口NamenodeProtocol是SecondaryNameNode與NameNode通信的接口。DFSClient是直接調(diào)用NameNode接口的對(duì)象。用戶代碼是通過(guò)DistributedFileSystem調(diào)用DFSClient對(duì)象,才能與NameNode打交道。
模擬Hadoop RPC通信
- package MyRPC;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.ipc.VersionedProtocol;
- public interface MyRPCProtocal extends VersionedProtocol{
- public static long versionID = 23234l;//很重要很重要,搞了一下午才解決掉。
- public Text test(Text t);
- }
- package MyRPC;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.ipc.ProtocolSignature;
- import org.apache.hadoop.ipc.RPC;
- import org.apache.hadoop.ipc.RPC.Server;
- public class RPCServer implements MyRPCProtocal{
- Server server = null;
- public RPCServer() throws IOException, InterruptedException{
- //server = RPC.getServer(this,"localhost",8888,new Configuration());
- //相對(duì)于以前的版本有略微的改動(dòng)
- RPC.Builder ins = new RPC.Builder(new Configuration());
- ins.setInstance(this);
- ins.setBindAddress("localhost");
- ins.setPort(9999);
- ins.setProtocol(MyRPCProtocal.class);
- //RPC.setProtocolEngine(new Configuration(), MyRPCProtocal.class, RpcEngine.class);
- server = ins.build();//獲得一個(gè)server實(shí)例
- server.start();
- server.join();
- }
- public static void main(String[] args) throws IOException, InterruptedException {
- new RPCServer();
- }
- @Override
- public long getProtocolVersion(String protocol, long clientVersion)
- throws IOException {
- return MyRPCProtocal.versionID;
- }
- @Override
- public ProtocolSignature getProtocolSignature(String protocol,
- long clientVersion, int clientMethodsHash) throws IOException {
- return new ProtocolSignature();
- }
- @Override
- public Text test(Text t) {
- if(t.toString().equals("RPC")){
- return new Text("ok");
- }
- return new Text("false");
- }
- }
- package MyRPC;
- import java.net.InetSocketAddress;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.ipc.RPC;
- public class RPCClient {
- private MyRPCProtocal protocal;
- public RPCClient() throws Exception{
- InetSocketAddress address = new InetSocketAddress("localhost",9999);
- protocal = (MyRPCProtocal)RPC.waitForProxy
- (MyRPCProtocal.class,MyRPCProtocal.versionID, address, new Configuration());
- //RPC.setProtocolEngine(new Configuration(), MyRPCProtocal.class, RpcEngine.class);
- }
- public void call(String s){
- final Text string = protocal.test(new Text(s));
- System.out.println(string.toString());
- }
- public static void main(String[] args) throws Exception {
- RPCClient client = new RPCClient();
- client.call("RPC");
- }
- }