揭開 Raft 的神秘面紗,和ApacheRatis 了解Raft 組件的使用
相比 Paxos, Raft 一直以來就是以易于理解著稱。今天我們以一年 Raft 使用者的角度,來看一下,別人根據(jù) Raft 論文實(shí)現(xiàn)了之后,我們一般要怎么樣使用。
俗話說,要想知道梨子的味道,就要親口嘗一嘗,沒吃過豬肉,也要見一見豬跑。否則別人再怎么樣形容,你可能還以為是像貓狗一類毛茸茸。
在 Raft 官網(wǎng)里長(zhǎng)長(zhǎng)的列表就能發(fā)現(xiàn),實(shí)現(xiàn) Raft 的框架目前不少。Java 里我大概看了螞蟻的 SOFARaft 和 Apache 的 Ratis。這次我們以 Ratis 為例,揭開面紗,來看看到底要怎樣使用。
當(dāng)然,下面具體提到的例子,也是這些組件中自帶的 example。
一、編譯
github下載 Ratis 直接 mvn clean package 即可,如果編譯過程中出錯(cuò),可以先clean install ratis-proto
二、示例
Ratis 自帶的示例有三個(gè):
- arithmetic
- counter
- filestore
在 ratis-examples 模塊中,對(duì)于 arithmetic 和 filestore比較方便,可以通過main/bin目錄下的 shell 腳本快速啟動(dòng) Server 和 Client 來進(jìn)行測(cè)試。
對(duì)于Raft,咱們都知道是需要多實(shí)例組成集群才能測(cè)試,你啟動(dòng)一個(gè)實(shí)例沒啥用,連選主都成問題。Bin 目錄下的 start-all 支持 example 的名稱以及對(duì)應(yīng)的命令。比如 filestore server 代表是啟動(dòng) filestore 這個(gè)應(yīng)用的server。對(duì)應(yīng)的命令參數(shù)會(huì)在相應(yīng)example里的 cli 中解析。同時(shí)會(huì)一次性啟動(dòng)三個(gè)server,組成一個(gè)集群并在周期內(nèi)完成選舉。
而對(duì)于 counter 這個(gè)示例,并沒有相應(yīng)的腳本來快速啟動(dòng)三個(gè)server,這個(gè)我們可以通過命令行或者在IDE里以參數(shù)的形式啟動(dòng)。
三、分析
下面我們來示例里看下 Raft Server 是怎樣工作的。
對(duì)于 counter 示例來說,我們啟動(dòng)的時(shí)候,需要傳入一個(gè)參數(shù),代表當(dāng)前的server是第幾個(gè),目的在于,要從 peers 列表中得知該用哪個(gè)IP + 端口去啟動(dòng)它。這里我們能發(fā)現(xiàn),這個(gè) peers 列表,是在代碼內(nèi)提前設(shè)置好的。當(dāng)然你說動(dòng)態(tài)配置啥的,也沒啥問題,另外兩個(gè)示例是通過shell 腳本里common 中的配置傳入的。
所以,第一步我們看到, Raft Server 在啟動(dòng)的時(shí)候,會(huì)通過「配置」的形式,來知道 peer 之間的存在,這樣才能彼此通信,讓別人給自己投票或者給別人投票,完成 Term 內(nèi)的選舉。另外,才能接收到 Leader 傳過來的 Log ,并且應(yīng)用到本地。
第二步,我們來看下 Client 和 集群之間是如何通信的。整個(gè) Raft 集群可能有多個(gè)實(shí)例,我們知道必須通過 Leader 來完成寫操作。那怎樣知道誰是Leader?有什么辦法?
一般常見的思路有:
- 在寫之前,先去集群內(nèi)查一下,誰是 Leader,然后再寫
- 隨機(jī)拿一個(gè)寫,不行再換一個(gè),不停的試,總會(huì)有一個(gè)成功。
當(dāng)然方式二這樣試下去效率不太高。所以會(huì)在這個(gè)隨機(jī)試一次之后,集群會(huì)將當(dāng)前的 Leader 信息返回給 Client,然后 Client 直接通過這個(gè)建立連接進(jìn)行通信即可。
在 Ratis 里, Client 調(diào)用非 Leader 節(jié)點(diǎn)會(huì)收到 Server 拋出的一個(gè)異常,異常中會(huì)包含一個(gè)稱為 suggestLeader 的信息,表示當(dāng)前正確的 Leader,按這個(gè)連上去就行。當(dāng)然,如果如果在此過程中發(fā)生的 Leader 的變更,那就會(huì)有一個(gè)新的suggestLeader 返回來,再次重試。
我們來看 Counter 這個(gè)示例中的實(shí)現(xiàn)。
Server 和 Client 的共用的Common 代碼中,包含 peers 的聲明
- public final class CounterCommon {
- public static final List<RaftPeer> PEERS = new ArrayList<>(3);
- static {
- PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n1"), "127.0.0.1:6000"));
- PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n2"), "127.0.0.1:6001"));
- PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n3"), "127.0.0.1:6002"));
- }
這里聲明了三個(gè)節(jié)點(diǎn)。
通過命令行啟動(dòng)時(shí),會(huì)直接把index 傳進(jìn)來, index 取值1-3。
- java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}
然后在Server 啟動(dòng)的時(shí)候,拿到對(duì)應(yīng)的配置信息。
- //find current peer object based on application parameter
- RaftPeer currentPeer =
- CounterCommon.PEERS.get(Integer.parseInt(args[0]) - 1);
再設(shè)置存儲(chǔ)目錄
- //set the storage directory (different for each peer) in RaftProperty object
- File raftStorageDir = new File("./" + currentPeer.getId().toString());
- RaftServerConfigKeys.setStorageDir(properties,
- Collections.singletonList(raftStorageDir))
重點(diǎn)看這里,每個(gè) Server 都會(huì)有一個(gè)狀態(tài)機(jī)「CounterStateMachine」,平時(shí)我們的「業(yè)務(wù)邏輯」都放到這里
- //create the counter state machine which hold the counter value
- CounterStateMachine counterStateMachine = new CounterStateMachine();
客戶端發(fā)送的命令,會(huì)在這個(gè)狀態(tài)機(jī)中被執(zhí)行,同時(shí)這些命令又以Log 的形式復(fù)制給其它節(jié)點(diǎn),各個(gè)節(jié)點(diǎn)的Log 又會(huì)在它自己的狀態(tài)機(jī)里執(zhí)行,從而保證各個(gè)節(jié)點(diǎn)狀態(tài)的一致。
最后根據(jù)這些配置,生成 Raft Server 實(shí)例并啟動(dòng)。
- //create and start the Raft server
- RaftServer server = RaftServer.newBuilder()
- .setGroup(CounterCommon.RAFT_GROUP)
- .setProperties(properties)
- .setServerId(currentPeer.getId())
- .setStateMachine(counterStateMachine)
- .build();
- server.start();
CounterStateMachine 里,應(yīng)用計(jì)數(shù)的這一小段代碼,我們看先檢查了命令是否合法,然后執(zhí)行命令
- //check if the command is valid
- String logData = entry.getStateMachineLogEntry().getLogData()
- .toString(Charset.defaultCharset());
- if (!logData.equals("INCREMENT")) {
- return CompletableFuture.completedFuture(
- Message.valueOf("Invalid Command"));
- }
- //update the last applied term and index
- final long index = entry.getIndex();
- updateLastAppliedTermIndex(entry.getTerm(), index);
- //actual execution of the command: increment the counter
- counter.incrementAndGet();
- //return the new value of the counter to the client
- final CompletableFuture<Message> f =
- CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
- //if leader, log the incremented value and it's log index
- if (trx.getServerRole() == RaftProtos.RaftPeerRole.LEADER) {
- LOG.info("{}: Increment to {}", index, counter.toString());
- }
我們?cè)賮砜?Client 的實(shí)現(xiàn)。
和 Server 類似,通過配置屬性,創(chuàng)建一個(gè)實(shí)例
- private static RaftClient buildClient() {
- RaftProperties raftProperties = new RaftProperties();
- RaftClient.Builder builder = RaftClient.newBuilder()
- .setProperties(raftProperties)
- .setRaftGroup(CounterCommon.RAFT_GROUP)
- .setClientRpc(
- new GrpcFactory(new Parameters())
- .newRaftClientRpc(ClientId.randomId(), raftProperties));
- return builder.build();
- }
然后就可以向Server發(fā)送命令開工了。
- raftClient.send(Message.valueOf("INCREMENT"));
Counter 的狀態(tài)機(jī)支持INCREMENT 和 GET 兩個(gè)命令。所以example 最后執(zhí)行了一個(gè) GET 的命令來獲取最終的計(jì)數(shù)結(jié)果
- RaftClientReply count = raftClient.sendReadOnly(Message.valueOf("GET"));
四、內(nèi)部部分實(shí)現(xiàn)
RaftClientImpl 里,初期會(huì)從peers列表中選一個(gè),當(dāng)成leader 去請(qǐng)求。
- RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
- RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) {
- this.clientId = clientId;
- this.clientRpc = clientRpc;
- this.peers = new ConcurrentLinkedQueue<>(group.getPeers());
- this.groupId = group.getGroupId();
- this.leaderId = leaderId != null? leaderId
- : !peers.isEmpty()? peers.iterator().next().getId(): null;
- ...
- }
之后,會(huì)根據(jù)server 返回的不同異常分別處理。
- private RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
- RaftClientReply reply;
- try {
- reply = clientRpc.sendRequest(request);
- } catch (GroupMismatchException gme) {
- throw gme;
- } catch (IOException ioe) {
- handleIOException(request, ioe);
- }
- reply = handleLeaderException(request, reply, null);
- reply = handleRaftException(reply, Function.identity());
- return reply;
- }
比如在 handleLeaderException 中,又分幾種情況,因?yàn)橥ㄟ^Client 來和 Server 進(jìn)行通訊的時(shí)候,會(huì)隨機(jī)從peers里選擇一個(gè),做為leader去請(qǐng)求,如果 Server 返回異常,說它不是leader,就用下面的代碼,隨機(jī)從另外的peer里選擇一個(gè)再去請(qǐng)求。
- final RaftPeerId oldLeader = request.getServerId();
- final RaftPeerId curLeader = leaderId;
- final boolean stillLeader = oldLeader.equals(curLeader);
- if (newLeader == null && stillLeader) {
- newLeader = CollectionUtils.random(oldLeader,
- CollectionUtils.as(peers, RaftPeer::getId));
- }
- static <T> T random(final T given, Iterable<T> iteration) {
- Objects.requireNonNull(given, "given == null");
- Objects.requireNonNull(iteration, "iteration == null");
- final List<T> list = StreamSupport.stream(iteration.spliterator(), false)
- .filter(e -> !given.equals(e))
- .collect(Collectors.toList());
- final int size = list.size();
- return size == 0? null: list.get(ThreadLocalRandom.current().nextInt(size));
- }
是不是感覺很低效。如果這個(gè)時(shí)候,server 返回的信息里,告訴client 誰是 leader,那client 直接連上去就可以了是吧。
- /**
- * @return null if the reply is null or it has
- * {@link NotLeaderException} or {@link LeaderNotReadyException}
- * otherwise return the same reply.
- */
- RaftClientReply handleLeaderException(RaftClientRequest request, RaftClientReply reply,
- Consumer<RaftClientRequest> handler) {
- if (reply == null || reply.getException() instanceof LeaderNotReadyException) {
- return null;
- }
- final NotLeaderException nle = reply.getNotLeaderException();
- if (nle == null) {
- return reply;
- }
- return handleNotLeaderException(request, nle, handler);
- }
- RaftClientReply handleNotLeaderException(RaftClientRequest request, NotLeaderException nle,
- Consumer<RaftClientRequest> handler) {
- refreshPeers(nle.getPeers());
- final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null
- : nle.getSuggestedLeader().getId();
- handleIOException(request, nle, newLeader, handler);
- return null;
- }
我們會(huì)看到,在異常的信息中,如果能夠提取出一個(gè) suggestedLeader,這時(shí)候就會(huì)做為新的leaderId來使用,下次直接連接了。
本文轉(zhuǎn)載自微信公眾號(hào)「Tomcat那些事兒」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系Tomcat那些事兒公眾號(hào)。