Kafka如何修改分區(qū)Leader
前幾天有個(gè)群友問我: kafka如何修改優(yōu)先副本? 他們有個(gè)需求是, 想指定某個(gè)分區(qū)中的其中一個(gè)副本為L(zhǎng)eader
在這里插入圖片描述
需求分析
對(duì)于這么一個(gè)問題,在我們生產(chǎn)環(huán)境還是挺常見的,經(jīng)常有需要修改某個(gè)Topic中某分區(qū)的Leader
比如 topic1-0這個(gè)分區(qū)有3個(gè)副本[0,1,2], 按照「優(yōu)先副本」的規(guī)則
那么 0 號(hào)副本肯定就是Leader了 我們都知道分區(qū)中的只有Leader副本才會(huì)提供讀寫副本
其他副本作為備份 假如在某些情況下,「0」 號(hào)副本性能資源不夠,或者網(wǎng)絡(luò)不太好,或者IO壓力比較大
那么肯定對(duì)Topic的整體讀寫性能有很大影響, 這個(gè)時(shí)候切換一臺(tái)壓力較小副本作為L(zhǎng)eader就顯得很重要;
優(yōu)先副本: 分區(qū)中的AR(所有副本)信息, 優(yōu)先選擇排在第一位的副本作為L(zhǎng)eader
Leader機(jī)制: 分區(qū)中只有一個(gè)Leader來承擔(dān)讀寫,其他副本只是作為備份
那么如何實(shí)現(xiàn)這樣一個(gè)需求呢?
解決方案
知道了原理之后,我們就能想到對(duì)應(yīng)的解決方案了 只要將 分區(qū)的 AR 中的第一個(gè)位置,替換成你指定副本就行了;AR = { 0,1,2 } ==> AR = {2,1,0}
一般能夠達(dá)到這個(gè)目的有兩種方案,下面我們來分析一下
方案一: 分區(qū)副本重分配
之前關(guān)于分區(qū)副本重分配 我已經(jīng)寫過很多文章了,如果想詳細(xì)了解 分區(qū)副本重分配、數(shù)據(jù)遷移、副本擴(kuò)縮容 可以看看鏈接的文章, 這里我就簡(jiǎn)單說一下;
一般分區(qū)副本重分配主要有三個(gè)流程
- 生成推薦的遷移Json文件
- 執(zhí)行遷移Json文件
- 驗(yàn)證遷移流程是否完成
這里我們主要看第2步驟, 來看看遷移文件一般是什么樣子的
- {
- "version": 1,
- "partitions": [{
- "topic": "topic1",
- "partition": 0,
- "replicas": [0,1,2]
- }]
- }
這個(gè)遷移Json意思是, 把topic1的「0」號(hào)分區(qū)的副本分配成[0,1,2] ,也就是說 topic1-0號(hào)分區(qū)最終有3個(gè)副本分別在 {brokerId-0,brokerId-1,brokerId-2} ;
如果你有看過我之前寫的 分區(qū)副本重分配原理源碼分析 ,那么肯定就知道
不管你之前的分配方式是什么樣子的, 最終副本分配都是 [0,1,2] , 之前副本多的,會(huì)被刪掉,少的會(huì)被新增;
那么我們想要實(shí)現(xiàn) 我們的需求 是不是把這個(gè)Json文件 中的 "replicas": [0,1,2] 改一下就行了
比如改成 "replicas": [2,1,0] 改完Json后執(zhí)行,執(zhí)行execute, 正式開始重分配流程! 遷移完成之后, 就會(huì)發(fā)現(xiàn),Leader已經(jīng)變成上面的第一個(gè)位置的副本「2」 了
優(yōu)缺點(diǎn)
優(yōu)點(diǎn): 實(shí)現(xiàn)了需求, 并且主動(dòng)切換了Leader
缺點(diǎn): 操作比較復(fù)雜容易出錯(cuò),需要先獲取原先的分區(qū)分配數(shù)據(jù),然后手動(dòng)修改Json文件,這里比較容易出錯(cuò),影響會(huì)比較大,當(dāng)然這些都可以通過校驗(yàn)接口來做好限制, 最重要的一點(diǎn)是 副本重分配當(dāng)前只能有一個(gè)任務(wù) ! 假如你當(dāng)前有一個(gè)「副本重分配」的任務(wù)在,那么這里就不能夠執(zhí)行了, 「副本重分配」是一個(gè)比較「重」 了的操作,出錯(cuò)對(duì)集群的影響比較大
方案二: 手動(dòng)修改AR順序
首先,我們知道分區(qū)副本的分配數(shù)據(jù)是保存在zookeeper中的節(jié)點(diǎn)brokers/topics/{topicName} 中; 我們看個(gè)Topic1的節(jié)點(diǎn)數(shù)據(jù)例子;
- {
- "version": 2,
- "partitions": {
- "2": [3, 2, 1],
- "1": [2, 1, 3],
- "4": [2, 3, 1],
- "0": [1, 3, 2],
- "3": [1, 2, 3]
- },
- "adding_replicas": {},
- "removing_replicas": {}
- }
數(shù)據(jù)解釋:version:版本信息, 現(xiàn)在有 「1」、「2」 兩個(gè)版本
removing_replicas:需要?jiǎng)h除的副本數(shù)據(jù), 在進(jìn)行分區(qū)副本重分配過程中, 多余的副本會(huì)在數(shù)據(jù)遷移快完成的時(shí)候被刪除掉,刪除成功這里的數(shù)據(jù)會(huì)被清除
adding_replicas:需要新增的副本數(shù)據(jù),在進(jìn)行分區(qū)副本重分配過程中, 新增加的副本將會(huì)被新增,新增完成這里的數(shù)據(jù)會(huì)清除;
partitions:Topic的所有分區(qū)副本分配方式; 上面表示總共有5個(gè)分區(qū),以及對(duì)應(yīng)的副本位置;
知道了這些之后,想要修改優(yōu)先副本,是不是可以通過直接修改zookeeper中的節(jié)點(diǎn)數(shù)據(jù)就行了; 比如 我們把 「1」號(hào)分區(qū)的副本位置改成 [2,1,3]
改成這樣之后, 還需要 執(zhí)行 重新進(jìn)行優(yōu)先副本選舉操作 ,例如通過kafka的命令執(zhí)行
- sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic Topic1--election-type PREFERRED --partition 1
--election-type : PREFERRED 這個(gè)表示的以優(yōu)先副本的方式進(jìn)行重新選舉
那么做完這兩步之后, 我們的修改優(yōu)先副本的目的就達(dá)成了.........嗎 ?
實(shí)則并沒有, 因?yàn)檫@里僅僅只是修改了 zookeeper節(jié)點(diǎn)的數(shù)據(jù)
而bin/kafka-leader-election.sh 重選舉的操作是Controller來進(jìn)行的; 如果你對(duì)Controller的作用和源碼足夠了解
肯定知道Controller里面保存了每個(gè)Topic的分區(qū)副本信息, 是保存在JVM內(nèi)存中的, 然后我們手動(dòng)修改Zookeeper中的節(jié)點(diǎn),并沒有觸發(fā) Controller更新自身的內(nèi)存
也就是說 就算我們執(zhí)行了kafka-leader-election.sh, 它也不會(huì)有任何變化,因?yàn)閮?yōu)先副本沒有被感知到修改了;
解決這個(gè)問題也很簡(jiǎn)單,讓Controller感知到數(shù)據(jù)的變更就行了 最簡(jiǎn)單的方法, 讓Controller發(fā)生重新選舉, 數(shù)據(jù)重新加載!
總結(jié)
手動(dòng)修改zookeeper中的「AR」順序
Controller 重新選舉
執(zhí)行 分區(qū)副本重選舉操作(優(yōu)先副本策略)
簡(jiǎn)單代碼
當(dāng)然上面功能,肯定是要集成到LogiKM中的咯; 簡(jiǎn)單代碼如下
- // 這里轉(zhuǎn)換成HashMap類型,切勿自定義類型,以防kafka節(jié)點(diǎn)數(shù)據(jù)后續(xù)新增數(shù)據(jù)節(jié)點(diǎn),導(dǎo)致數(shù)據(jù)丟失
- HashMap partitionMap = zkConfig.get(ZkPathUtil.getBrokerTopicRoot(topicName), HashMap.class);
- JSONObject partitionJson = (JSONObject)partitionMap.get("partitions");
- JSONArray partitions = (JSONArray)partitionJson.get(partition);
- //部分代碼省略
- //調(diào)換序列 優(yōu)先副本
- Integer first = partitions.getInteger(0);
- partitions.set(0,targetBroker);
- partitions.set(index,first);
- zkUtils = ZookeeperUtils.getKafkaZkUtils(clusterDO.getZookeeper());
- String json = JSON.toJSONString(partitionMap);
- zkUtils.updatePersistentPath(ZkPathUtil.getBrokerTopicRoot(topicName), json,null);
- //寫入成功之后觸發(fā)一下 異步去優(yōu)先副本選舉
- new Thread(()->{
- try {
- // 1. 先讓Controller重新選舉 (不然上面修改的還沒有生效) (TODO.. 待優(yōu)化 -> 頻繁的Controller重選舉對(duì)集群性能會(huì)有影響)
- zkConfig.deletePath(ZkPathUtil.CONTROLLER_ROOT_NODE);
- // 等待 Controller 選舉一下
- Thread.sleep(1000);
- //2. 然后再發(fā)起副本重新選舉
- preferredReplicalElectCommand.preferredReplicaElection(clusterId,topicName,partition,"");
- } catch (ConfigException | InterruptedException e) {
- LOGGER.error("重新選舉異常.e:{}",e);
- e.printStackTrace();
- }
- }).start();
優(yōu)缺點(diǎn)
優(yōu)點(diǎn): 實(shí)現(xiàn)了目標(biāo)需求, 簡(jiǎn)單, 操作方便
缺點(diǎn): 頻繁的Controller重選舉對(duì)生產(chǎn)環(huán)境來說會(huì)有一些影響;
優(yōu)化與改進(jìn)
第二種方案中,需要Controller 重選舉, 頻繁的選舉肯定是對(duì)生產(chǎn)環(huán)境有影響的;Controller承擔(dān)了非常多的責(zé)任,比如分區(qū)副本重分配、刪除topic、Leader選舉 等等還有很多都是它在干!
那么如何不進(jìn)行Controller的重選舉,也能達(dá)到我們的需求呢?
我們的需求是,當(dāng)我們 修改了zookeeper中的節(jié)點(diǎn)數(shù)據(jù)的時(shí)候,能夠迅速的讓Controller感知到,并更新自己的內(nèi)存數(shù)據(jù)就行了;
對(duì)于這個(gè)問題,我會(huì)在下一期文章中介紹
問題
看完這篇文章,提幾個(gè)相關(guān)的問題給大家思考一下;
如果我在修改zk中的「AR」信息時(shí)候不僅僅是調(diào)換順序,而是有新增或者刪除副本會(huì)發(fā)生什么情況呢?
如果手動(dòng)修改brokers/topics/{topicName}/partitions/{分區(qū)號(hào)}/state 節(jié)點(diǎn)里面的leader信息,能不能直接更新Leader?
副本選舉的整個(gè)流程是什么樣子的?
大家可以思考一下, 問題答案我會(huì)在后面的文章中一一講解!