反問(wèn)面試官:如何實(shí)現(xiàn)集群內(nèi)選主
面試官經(jīng)常喜歡問(wèn)什么zookeeper選主原理、什么CAP理論、什么數(shù)據(jù)一致性。經(jīng)常都被問(wèn)煩了,我就想問(wèn)問(wèn)面試官,你自己還會(huì)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的集群內(nèi)選主呢?估計(jì)大部分面試官自己也寫(xiě)不出來(lái)。
本篇使用 Java 和 Netty 實(shí)現(xiàn)簡(jiǎn)單的集群選主過(guò)程的示例。
這個(gè)示例展示了多個(gè)節(jié)點(diǎn)通過(guò)投票選舉一個(gè)新的主節(jié)點(diǎn)的過(guò)程。Netty 用于節(jié)點(diǎn)間的通信,而每個(gè)節(jié)點(diǎn)則負(fù)責(zé)發(fā)起和響應(yīng)選舉消息。
集群選主流程
1.選主流程
咱們且不說(shuō)zookeeper如何選主,單說(shuō)人類選主,也是采用少數(shù)服從多數(shù)的原則。人類選主時(shí),中間會(huì)經(jīng)歷如下過(guò)程:
- 如果我沒(méi)有熟悉的或者沒(méi)找到能力比我強(qiáng)的,首先投給自己一票。
- 隨著時(shí)間推移,可能后面的人介紹了各自的特點(diǎn)和實(shí)力,那我可能會(huì)改投給別人。
- 所有人將投票信息放入到統(tǒng)計(jì)箱中。
- 最終票數(shù)最多的人是領(lǐng)導(dǎo)者。
同樣的,zookeeper在選主時(shí),也是這樣的流程。假設(shè)有五大服務(wù)器:
- 服務(wù)器1先給自身投票
- 后續(xù)起來(lái)的服務(wù)器2也會(huì)投自身一票,然后服務(wù)器1觀察到服務(wù)器2的id比較大,則會(huì)改投服務(wù)器2
- 后續(xù)起來(lái)的服務(wù)器3也會(huì)投自身一票,然后服務(wù)1和服務(wù)器2發(fā)現(xiàn)服務(wù)器3的id比較大,則都會(huì)改投服務(wù)器3。服務(wù)器3被確定為領(lǐng)導(dǎo)者。
- 服務(wù)器4起來(lái)后也會(huì)投自身一票,然后發(fā)現(xiàn)服務(wù)器3已經(jīng)有3票了,立馬改投服務(wù)器3。
- 服務(wù)器5與服務(wù)器4的操作一樣。
2.選主協(xié)議
在選主過(guò)程中采用的是超過(guò)半數(shù)的協(xié)議。在選主過(guò)程中,會(huì)需要如下幾類消息:
- 投票請(qǐng)求:節(jié)點(diǎn)發(fā)出自己的投票請(qǐng)求。
- 接受投票:其余節(jié)點(diǎn)作出判斷,如果覺(jué)得id較大,則接受投票。
- 選舉勝出:當(dāng)選主節(jié)點(diǎn)后,廣播勝出消息。
代碼實(shí)現(xiàn)
下面模擬3個(gè)節(jié)點(diǎn)的選主過(guò)程,核心步驟如下:
(1) 定義消息類型、消息對(duì)象、節(jié)點(diǎn)信息
public enum MessageType {
VOTE_REQUEST, // 投票請(qǐng)求
VOTE, // 投票
ELECTED // 選舉完成后的勝出消息
}
public class ElectionMessage implements Serializable {
private MessageType type;
private int nodeId; // 節(jié)點(diǎn)ID
private long zxId; // ZXID:類似于ZooKeeper中的邏輯時(shí)鐘,用于比較
private int voteFor; // 投票給的節(jié)點(diǎn)ID
}
public class ElectionNode {
private int nodeId; // 當(dāng)前節(jié)點(diǎn)ID
private long zxId; // 當(dāng)前節(jié)點(diǎn)的ZXID
private volatile int leaderId; // 當(dāng)前選舉的Leader ID
private String host;
private int port;
private ConcurrentHashMap<Integer, Integer> voteMap = new ConcurrentHashMap<>(); // 此節(jié)點(diǎn)對(duì)每個(gè)節(jié)點(diǎn)的投票情況
private int totalNodes; // 集群總節(jié)點(diǎn)數(shù)
}
(2) 每個(gè)節(jié)點(diǎn)利用Netty啟動(dòng)Server
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEncoder(),
new ElectionHandler(ElectionNode.this));
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("Node " + nodeId + " started on port " + port);
// 啟動(dòng)后開(kāi)始選舉過(guò)程
startElection();
// future.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
// bossGroup.shutdownGracefully();
// workerGroup.shutdownGracefully();
}
}
(3) 啟動(dòng)后利用Netty發(fā)送投票請(qǐng)求
public void sendVoteRequest(String targetHost, int targetPort) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEncoder(),
new ElectionHandler(ElectionNode.this));
}
});
ChannelFuture future = bootstrap.connect(targetHost, targetPort).sync();
ElectionMessage voteRequest = new ElectionMessage(ElectionMessage.MessageType.VOTE_REQUEST, nodeId, zxId, nodeId);
future.channel().writeAndFlush(voteRequest);
// future.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
// group.shutdownGracefully();
}
}
(4) 節(jié)點(diǎn)接受到投票請(qǐng)求后,做相關(guān)處理
節(jié)點(diǎn)在收到消息后,做相關(guān)邏輯處理:處理投票請(qǐng)求、處理確認(rèn)投票、處理選主結(jié)果。
處理投票請(qǐng)求:判斷是否是否接受投票信息。只有在主節(jié)點(diǎn)沒(méi)確定并且zxId較大時(shí),才發(fā)送投票消息。如果接受了投票請(qǐng)求的話,則更新本地的投票邏輯,然后給投票節(jié)點(diǎn)發(fā)送接受投票的消息
處理確認(rèn)投票:如果投票消息被接受了,則更新本地的投票邏輯。
處理選主結(jié)果:如果收到了選主結(jié)果的消息,則更新本地的主節(jié)點(diǎn)。
public class ElectionHandler extends ChannelInboundHandlerAdapter {
private final ElectionNode node;
public ElectionHandler(ElectionNode node) {
this.node = node;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ElectionMessage electionMessage = (ElectionMessage) msg;
System.out.println("Node " + node.getNodeId() + " received: " + electionMessage);
if (electionMessage.getType() == ElectionMessage.MessageType.VOTE_REQUEST) {
// 判斷是否是否接受投票信息。只有在主節(jié)點(diǎn)沒(méi)確定并且zxId較大時(shí),才發(fā)送投票消息
// 如果接受了投票請(qǐng)求的話,則更新本地的投票邏輯,然后給投票節(jié)點(diǎn)發(fā)送接受投票的消息
if (electionMessage.getZxId() >= node.getZxId() && node.getLeaderId() == 0) {
node.receiveVote(electionMessage.getNodeId());
ElectionMessage voteMessage = new ElectionMessage(ElectionMessage.MessageType.VOTE, electionMessage.getNodeId(), electionMessage.getZxId(), electionMessage.getNodeId());
ctx.writeAndFlush(voteMessage);
} else {
// 如果已經(jīng)確定主節(jié)點(diǎn)了,直接發(fā)送ELECTED消息
sendLeaderInfo(ctx);
}
} else if (electionMessage.getType() == ElectionMessage.MessageType.VOTE) {
// 如果投票消息被接受了,則更新本地的投票邏輯。
if (electionMessage.getZxId() >= node.getZxId() && node.getLeaderId() == 0) {
node.receiveVote(electionMessage.getNodeId());
} else {
// 如果已經(jīng)確定主節(jié)點(diǎn)了,直接發(fā)送ELECTED消息
sendLeaderInfo(ctx);
}
} else if (electionMessage.getType() == ElectionMessage.MessageType.ELECTED) {
if (node.getLeaderId() == 0) {
node.setLeaderId(electionMessage.getVoteFor());
}
}
}
(5) 接受別的節(jié)點(diǎn)的投票
這里是比較關(guān)鍵的一步,當(dāng)確定接受某個(gè)節(jié)點(diǎn)時(shí),則更新本地的投票數(shù),然后判斷投票數(shù)是否超過(guò)半數(shù),超過(guò)半數(shù)則確定主節(jié)點(diǎn)。同時(shí),再將主節(jié)點(diǎn)廣播出去。
此時(shí),其余節(jié)點(diǎn)接收到選主確認(rèn)的消息后,都會(huì)更新自己的本地的主節(jié)點(diǎn)信息。
public void receiveVote(int nodeId) {
voteMap.merge(nodeId, 1, Integer::sum);
// 比較出votes里值,取出最大的那個(gè)對(duì)應(yīng)的key
int currentVotes = voteMap.values().stream().max(Integer::compareTo).get();
if (currentVotes > totalNodes / 2 && leaderId == 0) {
setLeaderId(nodeId);
broadcastElected();
}
}
(6) 廣播選主結(jié)果
/**
* 廣播選舉結(jié)果
*/
private void broadcastElected() {
for (int i = 1; i <= totalNodes; i++) {
if (i != nodeId) {
sendElectedMessage(host, 9000 + i);
}
}
}
/**
* 發(fā)送選舉結(jié)果
*
* @param targetHost
* @param targetPort
*/
public void sendElectedMessage(String targetHost, int targetPort) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEncoder(),
new ElectionHandler(ElectionNode.this));
}
});
ChannelFuture future = bootstrap.connect(targetHost, targetPort).sync();
ElectionMessage electedMessage = new ElectionMessage(ElectionMessage.MessageType.ELECTED, leaderId, zxId, leaderId);
future.channel().writeAndFlush(electedMessage);
// future.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
// group.shutdownGracefully();
}
}
(7) 完整代碼
完整代碼:https://gitee.com/yclxiao/specialty/blob/master/javacore/src/main/java/com/ycl/election/ElectionHandler.java
總結(jié)
本文主要演示了一個(gè)簡(jiǎn)易的多Server的選主過(guò)程,以上代碼是一個(gè)簡(jiǎn)單的基于Netty實(shí)現(xiàn)的集群選舉過(guò)程的示例。在實(shí)際場(chǎng)景中,選舉邏輯遠(yuǎn)比這個(gè)復(fù)雜,需要處理更多的網(wǎng)絡(luò)異常、重復(fù)消息、并發(fā)問(wèn)題等。