Pulsar3.0 升級指北,你學會些什么?
Pulsar3.0 介紹
Pulsar3.0 是 Pulsar 社區(qū)推出的第一個 LTS 長期支持版本。
圖片
如圖所示,LTS 版本會最長支持到 36 個月,而 Feature 版本最多只有六個月;類似于我們使用的 JDK11,17,21 都是可以長期使用的;所以也推薦大家都升級到 LTS 版本。
作為首個 LTS 版本,3.0 自然也是自帶了許多新特性,這個會在后續(xù)介紹。
升級指南
先來看看升級指南:
圖片
在官方的兼容表中會發(fā)現(xiàn):不推薦跨版本升級。
也就是說如果你現(xiàn)在還在使用的是 2.10.x,那么推薦是先升級到 2.11.x 然后再升級到 3.0.x.
而且根據(jù)我們的使用經(jīng)驗來看,首個版本是不保險的,即便是 LTS 版本;所以不推薦直接升級到 3.0.0,而是更推薦 3.0.1+,這個小版本會修復(fù) 3.0 所帶來的一些 bug。
先講一下我們的升級流程,大家可以用做參考。
升級前準備
根據(jù)我們的使用場景,為了以防萬一,首先需要將我們的插件依賴升級到對應(yīng)的版本。
圖片
其實簡單來說就是更新下依賴,然后再重新打包,在后續(xù)的流程進行測試。
預(yù)熱鏡像
之后是預(yù)熱鏡像,我們使用 harbor 搭建了自己的 docker 鏡像倉庫,這樣在升級重啟鏡像的時候可以更快的從內(nèi)網(wǎng)拉取鏡像。
畢竟一個 pulsar-all 的鏡像也不小,盡量的縮短啟動時間。
預(yù)熱的過程也很簡單:
docker pull apachepulsar/pulsar-all:3.0.1
docker tag apachepulsar/pulsar-all:3.0.1 harbor-private.xx.com/pulsar/pulsar-all:3.0.1
docker image push harbor-private.xx.com/pulsar/pulsar-all:3.0.1
之后升級的時候就可以使用私服的鏡像了。
功能測試
我這邊有寫了一個 cli 可以幫我快速創(chuàng)建或升級一個集群,然后觸發(fā)我所編寫的功能測試。
./pulsar-upgrade-cli upgrade pulsar-test ./charts/pulsar --version x.x.x -f charts/pulsar/values.yaml -n pulsar-test
這個 cli 很簡單,一共就做三件事:
- 使用 helm 接口升級集群
- 等待所有的 Pod 都升級成功
- 觸發(fā)功能測試
之后的效果如下:
圖片
主要就是覆蓋了我們的使用場景,都跑通過之后才會走后續(xù)的流程。
運行監(jiān)控
圖片
之后會啟動一個 200 左右的并發(fā)生產(chǎn)和消費數(shù)據(jù),模擬線上的使用情況,會一直讓這個任務(wù)跑著,大概一晚上就可以了,第二天通過監(jiān)控查看:
- 應(yīng)用有無異常日志
- 流量是否正常
- 各個組件的內(nèi)存占用
- 寫入延遲等信息
升級步驟
組件的升級步驟這里參考了官方指南:https://pulsar.apache.org/docs/3.1.x/administration-upgrade/#upgrade-zookeeper-optional
圖片
- 升級ZK
- 關(guān)閉auto recovery
- 升級Bookkeeper
- 升級Broker
- 升級Proxy
- 開啟auto recovery
只要一步步按照這個流程走,問題不大,哪一步出現(xiàn)問題后需要及時回滾,回滾流程參考下面的回滾部分。
同時在升級過程中需要一直查看 broker 的 error 日志,如果有明顯的不符合預(yù)期的日志一定要注意。
在升級 bookkeeper 的時候,broker 可能會出現(xiàn) bk 連接失敗的異常,這個可以不用在意。
線上驗證
都升級完后就是線上業(yè)務(wù)驗證環(huán)節(jié)了:
- [x] 查看監(jiān)控面板,是否有明顯的流量、內(nèi)存、延遲的異常指標。? 2023-12-24
- [x] topic 元數(shù)據(jù)完整性驗證:這個是因為我們這次升級出了一個 topic 被刪除的 bug,所以需要重點驗證下;這部分會在下次詳細分析。? 2023-12-24
- [x] 查看業(yè)務(wù)消息收發(fā)有無異常 ? 2023-12-24
- [x] 鏈路查詢是否正常,我們有一個消息鏈路查詢的頁面,主要是使用 Pulsar-SQL 和 broker-interceptor 實現(xiàn)的。? 2023-12-24
異常回滾
當出現(xiàn)異常的時候需要立即回滾,這里的異常一般就是消息收發(fā)異常,客戶端掉線等。
經(jīng)過我的測試 3.0.x 的存儲和之前的版本是兼容的,所以 bookkeeper 都能降級其他的組件就沒啥可擔心的了。
需要降級時直接將所有組件降級為上一個版本即可。
災(zāi)難恢復(fù)
因為是從 2.x 升級到 3.x 也是涉及到了跨大版本,所以也準備了災(zāi)難恢復(fù)的方案。
比如極端情況下升級失敗,所有數(shù)據(jù)丟失的情況。
整個災(zāi)難恢復(fù)的主要目的就是恢復(fù)后的集群對外提供的域名不發(fā)生變化,同時所有的客戶端可以自動重連上來,也就是最壞的情況下所有的數(shù)據(jù)丟了可以接受,但不能影響業(yè)務(wù)正常使用。
所以我們的流程如下:
備份 topic
@SneakyThrows
@Test
void backup(){
List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList("tenant/namespace");
log.info("topic size={}",topicList.size());
// create a custom thread pool
CopyOnWriteArrayList<TopicMeta> dataList = new CopyOnWriteArrayList<>();
ExecutorService customThreadPool = Executors.newFixedThreadPool(10);
for (String topicName : topicList) {
customThreadPool.execute(()-> {
PartitionedTopicMetadata metadata;
try {
metadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topicName);
TopicMeta topicMeta = new TopicMeta();
// backup topic
topicMeta.setName(topicName);
topicMeta.setPartition(metadata.partitions);
// backup permission
Map<String, Set<AuthAction>> permissions = pulsarAdmin.topics().getPermissions(topicName);
topicMeta.setPermissions(permissions);
// back sub
List<String> subscriptions = new ArrayList<>();
PartitionedTopicStats topicStats = pulsarAdmin.topics().getPartitionedStats(topicName, true);
topicStats.getSubscriptions().forEach((k,v)-> subscriptions.add(k));
topicMeta.setSubscriptions(subscriptions);
dataList.add(topicMeta);
} catch (PulsarAdminException e) {
throw new RuntimeException(e);
} }); }
customThreadPool.shutdown();
while (!customThreadPool.isTerminated()) {
}
log.info("{}",dataList.size());
log.info("{}",JSONUtil.toJsonStr(dataList));
}
// TopicMetaData
@Data
public class TopicMeta {
private String name;
private int partition;
Map<String, Set<AuthAction>> permissions;
List<String> subscriptions = new ArrayList<>();
}
第一步是備份 topic:
- topic 主要是名稱和分區(qū)數(shù)量
- 備份權(quán)限
- 備份 topic 的訂閱者
公私鑰備份
因為我們客戶端使用了 JWT 驗證,所有為了使得恢復(fù)的 Pulsar 集群可以讓客戶端無縫切換到新集群,因此必須得使用相同的公私鑰。
這個其實比較簡單,我們使用的是 helm 安裝的集群,所以只需要備份好 Secret 即可。
apiVersion: v1
data:
PRIVATEKEY: XXX
PUBLICKEY: XXX
kind: Secret
metadata:
name: pulsar-token-asymmetric-key
namespace: pulsar
type: Opaque
# 還有幾個 superUser 的 Secret
數(shù)據(jù)恢復(fù)
創(chuàng)建新集群
首先使用 helm 重新創(chuàng)建一個新集群:
./scripts/pulsar/prepare_helm_release.sh -n pulsar -k pulsar
helm install \ --values charts/pulsar/values.yaml \ --set namespace=pulsar\
--set initialize=true \
pulsar ./charts/pulsar -n pulsar
恢復(fù)公私鑰
直接使用剛才備份的公私鑰覆蓋到新集群即可。
恢復(fù)namespace
進入 toolset pod 創(chuàng)建需要使用的 tenant/namespace
k exec -it pulsar-toolset-0 -n pulsar bash
bin/pulsar-admin tenants create tenant
bin/pulsar-admin namespaces create tenant/namespace
元數(shù)據(jù)恢復(fù)
之后便是最重要的元數(shù)據(jù)恢復(fù)了:
@SneakyThrows
@Test
void restore() {
PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://url:8080")
.authentication(AuthenticationFactory.token(token))
.build();
Path filePath = Path.of("restore-ns.json");
String fileContent = Files.readString(filePath);
List<TopicMeta> topicMetaList = JSON.parseArray(fileContent, TopicMeta.class);
ExecutorService customThreadPool = Executors.newFixedThreadPool(50);
for (TopicMeta topicMeta : topicMetaList) {
customThreadPool.execute(() -> {
// Create topic
try {
pulsarAdmin.topics().createPartitionedTopic(topicMeta.getName(), topicMeta.getPartition());
} catch (PulsarAdminException e) {
log.error("Create topic error");
}
// Create sub
for (String subscription : topicMeta.getSubscriptions()) {
try {
pulsarAdmin.topics().createSubscription(topicMeta.getName(), subscription, MessageId.latest);
} catch (PulsarAdminException e) {
log.error("createSubscription error");
} }
// Grant permission
topicMeta.getPermissions().forEach((role, authActions) -> {
permission(pulsarAdmin, topicMeta.getName(), role, authActions);
});
log.info("topic:{} restore success", topicMeta.getName());
}); }
customThreadPool.shutdown();
while (!customThreadPool.isTerminated()) {
} log.info("restore success");
}
private synchronized void permission(PulsarAdmin pulsarAdmin, String topic, String role, Set<AuthAction> authActions) {
try {
pulsarAdmin.topics().grantPermission(topic, role, authActions);
} catch (PulsarAdminException e) {
log.error("grantPermission error", e);
}
}
流程和備份類似:
- 創(chuàng)建分區(qū) topic
- 創(chuàng)建訂閱者
- 授權(quán)角色信息
因為授權(quán)接口限制了并發(fā)調(diào)用,所有需要加鎖,導(dǎo)致整個恢復(fù)的流程就會比較慢。
8000 topic 的 namespace 大概恢復(fù)時間為 40min 左右。
之后依次恢復(fù)其他 namespace 即可。
恢復(fù) police
admin.namespaces().setNamespaceMessageTTL("tenant/namespace", 3600 * 6);
admin.namespaces().setBacklogQuota("tenant/namespace", BacklogQuota)
如果之前的集群有設(shè)置 TTL 或者是 backlogQuota 時都需要手動恢復(fù)。
總結(jié)
以上就是整個升級和災(zāi)難恢復(fù)的流程,當然災(zāi)難恢復(fù)希望大家不要碰到。
我會在下一篇詳細介紹 Pulsar 3.0 的新功能以及所碰到的一些坑。