Hadoop RPC通信Client客戶端的流程分析
Hadoop的RPC的通信與其他系統(tǒng)的RPC通信不太一樣,作者針對(duì)Hadoop的使用特點(diǎn),專門的設(shè)計(jì)了一套R(shí)PC框架,這套框架個(gè)人感覺還是 有點(diǎn)小復(fù)雜的。所以我打算分成Client客戶端和Server服務(wù)端2個(gè)模塊做分析。如果你對(duì)RPC的整套流程已經(jīng)非常了解的前提下,對(duì)于Hadoop 的RPC,你也一定可以非常迅速的了解的。OK,下面切入正題。
Hadoop的RPC的相關(guān)代碼都在org.apache.hadoop.ipc的包下,首先RPC的通信必須遵守許多的協(xié)議,其中最最基本的協(xié)議即使如下:
- /**
- * Superclass of all protocols that use Hadoop RPC.
- * Subclasses of this interface are also supposed to have
- * a static final long versionID field.
- * Hadoop RPC所有協(xié)議的基類,返回協(xié)議版本號(hào)
- */
- public interface VersionedProtocol {
- /**
- * Return protocol version corresponding to protocol interface.
- * @param protocol The classname of the protocol interface
- * @param clientVersion The version of the protocol that the client speaks
- * @return the version that the server will speak
- */
- public long getProtocolVersion(String protocol,
- long clientVersion) throws IOException;
- }
他是所有協(xié)議的基類,他的下面還有一堆的子類,分別對(duì)應(yīng)于不同情況之間的通信,下面是一張父子類圖:
顧名思義,只有客戶端和服務(wù)端遵循相同的版本號(hào),才能進(jìn)行通信。
RPC客戶端的所有相關(guān)操作都被封裝在了一個(gè)叫Client.java的文件中:
- /** A client for an IPC service. IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value. A service runs on
- * a port and is defined by a parameter class and a value class.
- * RPC客戶端類
- * @see Server
- */
- public class Client {
- public static final Log LOG =
- LogFactory.getLog(Client.class);
- //客戶端到服務(wù)端的連接
- private Hashtable<ConnectionId, Connection> connections =
- new Hashtable<ConnectionId, Connection>();
- //回調(diào)值類
- private Class<? extends Writable> valueClass; // class of call values
- //call回調(diào)id的計(jì)數(shù)器
- private int counter; // counter for call ids
- //原子變量判斷客戶端是否還在運(yùn)行
- private AtomicBoolean running = new AtomicBoolean(true); // if client runs
- final private Configuration conf;
- //socket工廠,用來創(chuàng)建socket
- private SocketFactory socketFactory; // how to create sockets
- private int refCount = 1;
- ......
從代碼中明顯的看到,這里存在著一個(gè)類似于connections連接池的東西,其實(shí)這暗示著連接是可以被復(fù)用的,在hashtable中,與每個(gè)Connecttion連接的對(duì)應(yīng)的是一個(gè)ConnectionId,顯然這里不是一個(gè)Long類似的數(shù)值:
- /**
- * This class holds the address and the user ticket. The client connections
- * to servers are uniquely identified by <remoteAddress, protocol, ticket>
- * 連接的唯一標(biāo)識(shí),主要通過<遠(yuǎn)程地址,協(xié)議類型,用戶組信息>
- */
- static class ConnectionId {
- //遠(yuǎn)程的socket地址
- InetSocketAddress address;
- //用戶組信息
- UserGroupInformation ticket;
- //協(xié)議類型
- Class<?> protocol;
- private static final int PRIME = 16777619;
- private int rpcTimeout;
- private String serverPrincipal;
- private int maxIdleTime; //connections will be culled if it was idle for
- //maxIdleTime msecs
- private int maxRetries; //the max. no. of retries for socket connections
- private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
- private int pingInterval; // how often sends ping to the server in msecs
- ....
這里用了3個(gè)屬性組成唯一的標(biāo)識(shí)屬性,為了保證可以進(jìn)行ID的復(fù)用,所以作者對(duì)ConnectionId的equal比較方法和hashCode 進(jìn)行了重寫:
- /**
- * 作者重寫了equal比較方法,只要成員變量都想等也就想到了
- */
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj instanceof ConnectionId) {
- ConnectionId that = (ConnectionId) obj;
- return isEqual(this.address, that.address)
- && this.maxIdleTime == that.maxIdleTime
- && this.maxRetries == that.maxRetries
- && this.pingInterval == that.pingInterval
- && isEqual(this.protocol, that.protocol)
- && this.rpcTimeout == that.rpcTimeout
- && isEqual(this.serverPrincipal, that.serverPrincipal)
- && this.tcpNoDelay == that.tcpNoDelay
- && isEqual(this.ticket, that.ticket);
- }
- return false;
- }
- /**
- * 重寫了hashCode的生成規(guī)則,保證不同的對(duì)象產(chǎn)生不同的hashCode值
- */
- @Override
- public int hashCode() {
- int result = 1;
- result = PRIME * result + ((address == null) ? 0 : address.hashCode());
- result = PRIME * result + maxIdleTime;
- result = PRIME * result + maxRetries;
- result = PRIME * result + pingInterval;
- result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
- result = PRIME * rpcTimeout;
- result = PRIME * result
- + ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
- result = PRIME * result + (tcpNoDelay ? 1231 : 1237);
- result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode());
- return result;
- }
這樣就能保證對(duì)應(yīng)同類型的連接就能夠完全復(fù)用了,而不是僅僅憑借引用的關(guān)系判斷對(duì)象是否相等,這里就是一個(gè)不錯(cuò)的設(shè)計(jì)了。
與連接Id對(duì)應(yīng)的就是Connection了,它里面維護(hù)是一下的一些變量;
- /** Thread that reads responses and notifies callers. Each connection owns a
- * socket connected to a remote address. Calls are multiplexed through this
- * socket: responses may be delivered out of order. */
- private class Connection extends Thread {
- //所連接的服務(wù)器地址
- private InetSocketAddress server; // server ip:port
- //服務(wù)端的krb5的名字,與安全方面相關(guān)
- private String serverPrincipal; // server's krb5 principal name
- //連接頭部,內(nèi)部包含了,所用的協(xié)議,客戶端用戶組信息以及驗(yàn)證的而方法
- private ConnectionHeader header; // connection header
- //遠(yuǎn)程連接ID
- private final ConnectionId remoteId; // connection id
- //連接驗(yàn)證方法
- private AuthMethod authMethod; // authentication method
- //下面3個(gè)變量都是安全方面的
- private boolean useSasl;
- private Token<? extends TokenIdentifier> token;
- private SaslRpcClient saslRpcClient;
- //下面是一組socket通信方面的變量
- private Socket socket = null; // connected socket
- private DataInputStream in;
- private DataOutputStream out;
- private int rpcTimeout;
- private int maxIdleTime; //connections will be culled if it was idle for
- //maxIdleTime msecs
- private int maxRetries; //the max. no. of retries for socket connections
- //tcpNoDelay可設(shè)置是否阻塞模式
- private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
- private int pingInterval; // how often sends ping to the server in msecs
- // currently active calls 當(dāng)前活躍的回調(diào),一個(gè)連接 可能會(huì)有很多個(gè)call回調(diào)
- private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
- //最后一次IO活動(dòng)通信的時(shí)間
- private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
- //連接關(guān)閉標(biāo)記
- private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
- private IOException closeException; // close reason
- .....
里面維護(hù)了大量的和連接通信相關(guān)的變量,在這里有一個(gè)很有意思的東西connectionHeader,連接頭部,里面的數(shù)據(jù)時(shí)為了在通信最開始的時(shí)候被使用:
- class ConnectionHeader implements Writable {
- public static final Log LOG = LogFactory.getLog(ConnectionHeader.class);
- //客戶端和服務(wù)端通信的協(xié)議名稱
- private String protocol;
- //客戶端的用戶組信息
- private UserGroupInformation ugi = null;
- //驗(yàn)證的方式,關(guān)系到寫入數(shù)據(jù)的時(shí)的格式
- private AuthMethod authMethod;
- .....
起到標(biāo)識(shí)驗(yàn)證的作用。一個(gè)Client類的基本結(jié)構(gòu)我們基本可以描繪出來了,下面是完整的類關(guān)系圖:
在上面這幅圖中,你肯定會(huì)發(fā)現(xiàn)我少了一個(gè)很關(guān)鍵的類了,就是Call回調(diào)類。Call回調(diào)在很多異步通信中是經(jīng)常出現(xiàn)的。因?yàn)樵谕ㄐ胚^程中,當(dāng)一個(gè)對(duì)象通 過網(wǎng)絡(luò)發(fā)送請(qǐng)求給另外一個(gè)對(duì)象的時(shí)候,如果采用同步的方式,會(huì)一直阻塞在那里,會(huì)帶來非常不好的效率和體驗(yàn)的,所以很多時(shí)候,我們采用的是一種叫回調(diào)接口 的方式。在這期間,用戶可以繼續(xù)做自己的事情。所以同樣的Call這個(gè)概念當(dāng)然也是適用在Hadoop RPC中。在Hadoop的RPC的核心調(diào) 用原理, 簡(jiǎn)單的說,就是我把parame參數(shù)序列化到一個(gè)對(duì)象中,通過參數(shù)的形式把對(duì)象傳入,進(jìn)行RPC通信,最后服務(wù)端把處理好的結(jié)果值放入call對(duì)象,在返 回給客戶端,也就是說客戶端和服務(wù)端都是通過Call對(duì)象進(jìn)行操作,Call里面存著,請(qǐng)求的參數(shù),和處理后的結(jié)構(gòu)值2個(gè)變量。通過Call對(duì)象的封裝, 客戶單實(shí)現(xiàn)了完美的無須知道細(xì)節(jié)的調(diào)用。下面是Call類的類按時(shí):
- /** A call waiting for a value. */
- //客戶端的一個(gè)回調(diào)
- private class Call {
- /回調(diào)ID
- int id; // call id
- //被序列化的參數(shù)
- Writable param; // parameter
- //返回值
- Writable value; // value, null if error
- //出錯(cuò)時(shí)返回的異常
- IOException error; // exception, null if value
- //回調(diào)是否已經(jīng)被完成
- boolean done; // true when call is done
- ....
看到這個(gè)Call回調(diào)類,也許你慢慢的會(huì)明白Hadoop RPC的一個(gè)基本原型了,這些Call當(dāng)然是存在于某個(gè)連接中的,一個(gè)連接可能會(huì)發(fā)生多個(gè)回調(diào),所以在Connection中維護(hù)了calls列表:
- private class Connection extends Thread {
- ....
- // currently active calls 當(dāng)前活躍的回調(diào),一個(gè)連接 可能會(huì)有很多個(gè)call回調(diào)
- private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
作者在設(shè)計(jì)Call類的時(shí)候,比較聰明的考慮一種并發(fā)情況下的Call調(diào)用,所以為此設(shè)計(jì)了下面這個(gè)Call的子類,就是專門用于短時(shí)間內(nèi)的瞬間Call調(diào)用:
- /** Call implementation used for parallel calls. */
- /** 繼承自Call回調(diào)類,可以并行的使用,通過加了index下標(biāo)做Call的區(qū)分 */
- private class ParallelCall extends Call {
- /每個(gè)ParallelCall并行的回調(diào)就會(huì)有對(duì)應(yīng)的結(jié)果類
- private ParallelResults results;
- //index作為Call的區(qū)分
- private int index;
- ....
如果要查找值,就通過里面的ParallelCall查找,原理是根據(jù)index索引:
- /** Result collector for parallel calls. */
- private static class ParallelResults {
- //并行結(jié)果類中擁有一組返回值,需要ParallelCall的index索引匹配
- private Writable[] values;
- //結(jié)果值的數(shù)量
- private int size;
- //values中已知的值的個(gè)數(shù)
- private int count;
- .....
- /** Collect a result. */
- public synchronized void callComplete(ParallelCall call) {
- //將call中的值賦給result中
- values[call.index] = call.value; // store the value
- count++; // count it
- //如果計(jì)數(shù)的值等到最終大小,通知caller
- if (count == size) // if all values are in
- notify(); // then notify waiting caller
- }
- }
因?yàn)镃all結(jié)構(gòu)集是這些并發(fā)Call共有的,所以用的是static變量,都存在在了values數(shù)組中了,只有所有的并發(fā)Call都把值取出來了,才 算回調(diào)成功,這個(gè)是個(gè)非常細(xì)小的輔助設(shè)計(jì),這個(gè)在有些書籍上并沒有多少提及。下面我們看看一般Call回調(diào)的流程,正如剛剛說的,最終客戶端看到的形式就 是,傳入?yún)?shù),獲得結(jié)果,忽略內(nèi)部一切邏輯,這是怎么做到的呢,答案在下面:
在執(zhí)行之前,你會(huì)先得到ConnectionId:
- public Writable call(Writable param, InetSocketAddress addr,
- Class<?> protocol, UserGroupInformation ticket,
- int rpcTimeout)
- throws InterruptedException, IOException {
- ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
- ticket, rpcTimeout, conf);
- return call(param, remoteId);
- }
接著才是主流程:
- public Writable call(Writable param, ConnectionId remoteId)
- throws InterruptedException, IOException {
- //根據(jù)參數(shù)構(gòu)造一個(gè)Call回調(diào)
- Call call = new Call(param);
- //根據(jù)遠(yuǎn)程ID獲取連接
- Connection connection = getConnection(remoteId, call);
- //發(fā)送參數(shù)
- connection.sendParam(call); // send the parameter
- boolean interrupted = false;
- synchronized (call) {
- //如果call.done為false,就是Call還沒完成
- while (!call.done) {
- try {
- //等待遠(yuǎn)端程序的執(zhí)行完畢
- call.wait(); // wait for the result
- } catch (InterruptedException ie) {
- // save the fact that we were interrupted
- interrupted = true;
- }
- }
- //如果是異常中斷,則終止當(dāng)前線程
- if (interrupted) {
- // set the interrupt flag now that we are done waiting
- Thread.currentThread().interrupt();
- }
- //如果call回到出錯(cuò),則返回call出錯(cuò)信息
- if (call.error != null) {
- if (call.error instanceof RemoteException) {
- call.error.fillInStackTrace();
- throw call.error;
- } else { // local exception
- // use the connection because it will reflect an ip change, unlike
- // the remoteId
- throw wrapException(connection.getRemoteAddress(), call.error);
- }
- } else {
- //如果是正常情況下,返回回調(diào)處理后的值
- return call.value;
- }
- }
- }
在這上面的操作步驟中,重點(diǎn)關(guān)注2個(gè)函數(shù),獲取連接操作,看看人家是如何保證連接的復(fù)用性的:
- private Connection getConnection(ConnectionId remoteId,
- Call call)
- throws IOException, InterruptedException {
- .....
- /* we could avoid this allocation for each RPC by having a
- * connectionsId object and with set() method. We need to manage the
- * refs for keys in HashMap properly. For now its ok.
- */
- do {
- synchronized (connections) {
- //從connection連接池中獲取連接,可以保證相同的連接ID可以復(fù)用
- connection = connections.get(remoteId);
- if (connection == null) {
- connection = new Connection(remoteId);
- connections.put(remoteId, connection);
- }
- }
- } while (!connection.addCall(call));
有點(diǎn)單例模式的味道哦,還有一個(gè)方法叫sendParam發(fā)送參數(shù)方法:
- public void sendParam(Call call) {
- if (shouldCloseConnection.get()) {
- return;
- }
- DataOutputBuffer d=null;
- try {
- synchronized (this.out) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " sending #" + call.id);
- //for serializing the
- //data to be written
- //將call回調(diào)中的參數(shù)寫入到輸出流中,傳向服務(wù)端
- d = new DataOutputBuffer();
- d.writeInt(call.id);
- call.param.write(d);
- byte[] data = d.getData();
- int dataLength = d.getLength();
- out.writeInt(dataLength); //first put the data length
- out.write(data, 0, dataLength);//write the data
- out.flush();
- }
- ....
代碼只發(fā)送了Call的id,和請(qǐng)求參數(shù),并沒有把所有的Call的內(nèi)容都扔出去了,一定是為了減少數(shù)據(jù)量的傳輸,這里還把數(shù)據(jù)的長(zhǎng)度寫入了,這是為了方 便服務(wù)端準(zhǔn)確的讀取到不定長(zhǎng)的數(shù)據(jù)。這服務(wù)端中間的處理操作不是今天討論的重點(diǎn)。Call的執(zhí)行過程就是這樣。那么Call是如何被調(diào)用的呢,這又要重新 回到了Client客戶端上去了,Client有一個(gè)run()函數(shù),所有的操作都是始于此的;
- public void run() {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": starting, having connections "
- + connections.size());
- //等待工作,等待請(qǐng)求調(diào)用
- while (waitForWork()) {//wait here for work - read or close connection
- //調(diào)用完請(qǐng)求,則立即獲取回復(fù)
- receiveResponse();
- }
- close();
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": stopped, remaining connections "
- + connections.size());
- }
操作很簡(jiǎn)單,程序一直跑著,有請(qǐng)求,處理請(qǐng)求,獲取請(qǐng)求,沒有請(qǐng)求,就死等。
- private synchronized boolean waitForWork() {
- if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
- long timeout = maxIdleTime-
- (System.currentTimeMillis()-lastActivity.get());
- if (timeout>0) {
- try {
- wait(timeout);
- } catch (InterruptedException e) {}
- }
- }
- ....
獲取回復(fù)的操作如下:
- /* Receive a response.
- * Because only one receiver, so no synchronization on in.
- * 獲取回復(fù)值
- */
- private void receiveResponse() {
- if (shouldCloseConnection.get()) {
- return;
- }
- //更新最近一次的call活動(dòng)時(shí)間
- touch();
- try {
- int id = in.readInt(); // try to read an id
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " got value #" + id);
- //從獲取call中取得相應(yīng)的call
- Call call = calls.get(id);
- //判斷該結(jié)果狀態(tài)
- int state = in.readInt(); // read call status
- if (state == Status.SUCCESS.state) {
- Writable value = ReflectionUtils.newInstance(valueClass, conf);
- value.readFields(in); // read value
- call.setValue(value);
- calls.remove(id);
- } else if (state == Status.ERROR.state) {
- call.setException(new RemoteException(WritableUtils.readString(in),
- WritableUtils.readString(in)));
- calls.remove(id);
- } else if (state == Status.FATAL.state) {
- // Close the connection
- markClosed(new RemoteException(WritableUtils.readString(in),
- WritableUtils.readString(in)));
- }
- .....
- } catch (IOException e) {
- markClosed(e);
- }
- }
從之前維護(hù)的Call列表中取出,做判斷。Client本身的執(zhí)行流程比較的簡(jiǎn)單:
Hadoop RPC客戶端的通信模塊的部分大致就是我上面的這個(gè)流程,中間其實(shí)還忽略了很多的細(xì)節(jié),大家學(xué)習(xí)的時(shí)候,針對(duì)源碼會(huì)有助于更好的理解,Hadoop RPC的服務(wù)端的實(shí)現(xiàn)更加復(fù)雜,所以建議采用分模塊的學(xué)習(xí)或許會(huì)更好一點(diǎn)。
本文出自:http://blog.csdn.net/Androidlushangderen/article/details/41751133