Pulsar升級(jí)自動(dòng)化:一鍵搞定集群升級(jí)與測(cè)試
背景
由于我在公司內(nèi)部負(fù)責(zé)維護(hù) Pulsar,需要時(shí)不時(shí)的升級(jí) Pulsar 版本從而和社區(qū)保持一致。
而每次升級(jí)過程都需要做相同的步驟:
- 安裝一個(gè)新版本的集群
- 觸發(fā)功能性測(cè)試
- 觸發(fā)性能測(cè)試
- 查看監(jiān)控是否正常
- 應(yīng)用有無異常日志
- 流量是否正常
- 各個(gè)組件的內(nèi)存占用是否正常
- 寫入延遲是否正常
命令行工具
以上的流程步驟最好是全部一鍵完成,我們只需要人工檢測(cè)下監(jiān)控是否正常即可。
于是我便寫了一個(gè)命令行工具,執(zhí)行流程如下:
pulsar-upgrade-cli -h ok | at 10:33:18
A cli app for upgrading Pulsar
Usage:
pulsar-upgrade-cli [command]
Available Commands:
completion Generate the autocompletion script for the specified shell
help Help about any command
install install a target version
scale scale statefulSet of the cluster
Flags:
--burst-limit int client-side default throttling limit (default 100)
--debug enable verbose output
-h, --help help for pulsar-upgrade-cli
--kube-apiserver string the address and the port for the Kubernetes API server
--kube-as-group stringArray group to impersonate for the operation, this flag can be repeated to specify multiple groups.
--kube-as-user string username to impersonate for the operation
真實(shí)使用的 example 如下:
pulsar-upgrade-cli install \
--values ./charts/pulsar/values.yaml \
--set namespace=pulsar-test \
--set initialize=true \
--debug \
--test-case-schema=http \
--test-case-host=127.0.0.1 \
--test-case-port=9999 \
pulsar-test ./charts/pulsar -n pulsar-test
它的安裝命令非常類似于 helm,也是直接使用 helm 的 value.yaml 進(jìn)行安裝;只是在安裝成功后(等待所有的 Pod 都處于 Running 狀態(tài))會(huì)再觸發(fā) test-case 測(cè)試,也就是請(qǐng)求一個(gè) endpoint。
這個(gè) endpoint 會(huì)在內(nèi)部處理所有的功能測(cè)試和性能測(cè)試,具體細(xì)節(jié)就在后文分析。
同時(shí)還提供了一個(gè) scale(擴(kuò)、縮容) 命令,可以用修改集群規(guī)模:
# 縮容集群規(guī)模為0
./pulsar-upgrade-cli scale --replicase 0 -n pulsar-test
# 縮容為最小集群
./pulsar-upgrade-cli scale --replicase 1 -n pulsar-test
# 恢復(fù)為最滿集群
./pulsar-upgrade-cli scale --replicase 2 -n pulsar-test
這個(gè)需求是因?yàn)槲覀兊?nbsp;Pulsar 測(cè)試集群部署在了一個(gè) servless 的 kubernetes 集群里,它是按照使用量收費(fèi)的,所以在我不需要的使用的時(shí)候可以通過這個(gè)命令將所有的副本數(shù)量修改為 0,從而減少使用成本。
當(dāng)只需要做簡(jiǎn)單的功能測(cè)試時(shí)便回將集群修改為最小集群,將副本數(shù)修改為只可以提供服務(wù)即可。
而當(dāng)需要做性能測(cè)試時(shí)就需要將集群修改為最高配置。
這樣可以避免每次都安裝新集群,同時(shí)也可以有效的減少測(cè)試成本。
實(shí)現(xiàn)原理
require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
helm.sh/helm/v3 v3.10.2
)
這個(gè)命令行工具本質(zhì)上是參考了 helm 的命令行實(shí)現(xiàn)的,所有主要也是依賴了 helm 和 cobra。
下面以最主要的安裝命令為例,核心的是以下的步驟:
- 執(zhí)行 helm 安裝(這里是直接使用的 helm 的源碼邏輯進(jìn)行安裝)
- 等待所有的 Pod 成功運(yùn)行
- 觸發(fā) test-case 執(zhí)行
- 等待測(cè)試用例執(zhí)行完畢
- 檢測(cè)是否需要卸載安裝的集群
func (e *installEvent) FinishInstall(cfg *action.Configuration, name string) error {
bar.Increment()
bar.Finish()
clientSet, err := cfg.KubernetesClientSet()
if err != nil {
return err
}
ctx := context.Background()
ip, err := GetServiceExternalIp(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-proxy", name))
if err != nil {
return err
}
token, err := GetPulsarProxyToken(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-token-proxy-admin", name))
if err != nil {
return err
}
// trigger testcase
err = e.client.Trigger(context.Background(), ip, token)
return err
}
這里的 FinishInstall 需要獲取到新安裝的 Pulsar 集群的 proxy IP 地址和鑒權(quán)所使用的 token(GetServiceExternalIp()/GetPulsarProxyToken())。
將這兩個(gè)參數(shù)傳遞給 test-case 才可以構(gòu)建出 pulsar-client.
這個(gè)命令的核心功能就是安裝集群和觸發(fā)測(cè)試,以及一些集群的基本運(yùn)維能力。
測(cè)試框架
而關(guān)于這里的測(cè)試用例也有一些小伙伴咨詢過,如何對(duì) Pulsar 進(jìn)行功能測(cè)試。
其實(shí) Pulsar 源碼中已經(jīng)包含了幾乎所有我們會(huì)使用到的測(cè)試代碼,理論上只要新版本的官方鏡像已經(jīng)推送了那就是跑了所有的單測(cè),質(zhì)量是可以保證的。
那為什么還需要做功能測(cè)試呢?
其實(shí)很很簡(jiǎn)單,Pulsar 這類基礎(chǔ)組件官方都有提供基準(zhǔn)測(cè)試,但我們想要用于生產(chǎn)環(huán)境依然需要自己做壓測(cè)得出一份屬于自己環(huán)境下的性能測(cè)試報(bào)告。
根本目的是要看在自己的業(yè)務(wù)場(chǎng)景下是否可以滿足(包括公司的軟硬件,不同的業(yè)務(wù)代碼)。
所以這里的功能測(cè)試代碼有一個(gè)很重要的前提就是:需要使用真實(shí)的業(yè)務(wù)代碼進(jìn)行測(cè)試。
也就是業(yè)務(wù)在線上使用與 Pulsar 相關(guān)的代碼需要參考功能測(cè)試?yán)锏拇a實(shí)現(xiàn),不然有些問題就無法在測(cè)試環(huán)節(jié)覆蓋到。
這里我就踩過坑,因?yàn)樵诠δ軠y(cè)試?yán)镉玫氖枪俜降?example 代碼進(jìn)行測(cè)試的,自然是沒有問題;但業(yè)務(wù)在實(shí)際使用時(shí),使用到了一個(gè) Schema 的場(chǎng)景,并沒有在功能測(cè)試?yán)锔采w到(官方的測(cè)試用例里也沒有??),就導(dǎo)致升級(jí)到某個(gè)版本后業(yè)務(wù)功能無法正常使用(雖然用法確實(shí)是有問題),但應(yīng)該在我測(cè)試階段就暴露出來。
實(shí)現(xiàn)原理
以上是一個(gè)集群的功能測(cè)試報(bào)告,這里我只有 8 個(gè)測(cè)試場(chǎng)景(結(jié)合實(shí)際業(yè)務(wù)使用),考慮到未來可能會(huì)有新的測(cè)試用例,所以在設(shè)計(jì)這個(gè)測(cè)試框架時(shí)就得考慮到擴(kuò)展性。
AbstractJobDefine job5 =
new FailoverConsumerTest(event, "故障轉(zhuǎn)移消費(fèi)測(cè)試", pulsarClient, 20, admin);
CompletableFuture<Void> c5 = CompletableFuture.runAsync(job5::start, EXECUTOR);
AbstractJobDefine job6 = new SchemaTest(event,"schema測(cè)試",pulsarClient,20,prestoService);
CompletableFuture<Void> c6 = CompletableFuture.runAsync(job6::start, EXECUTOR);
AbstractJobDefine job7 = new VlogsTest(event,"vlogs test",pulsarClient,20, vlogsUrl);
CompletableFuture<Void> c7 = CompletableFuture.runAsync(job7::start, EXECUTOR);
CompletableFuture<Void> all = CompletableFuture.allOf(c1, c2, c3, c4, c5, c6, c7);
all.whenComplete((___, __) -> {
event.finishAll();
pulsarClient.closeAsync();
admin.close();
}).get();
對(duì)外提供的 trigger 接口就不貼代碼了,重點(diǎn)就是在這里構(gòu)建測(cè)試任務(wù),然后等待他們?nèi)繄?zhí)行完畢。
@Data
public abstract class AbstractJobDefine {
private Event event;
private String jobName;
private PulsarClient pulsarClient;
private int timeout;
private PulsarAdmin admin;
public AbstractJobDefine(Event event, String jobName, PulsarClient pulsarClient, int timeout, PulsarAdmin admin) {
this.event = event;
this.jobName = jobName;
this.pulsarClient = pulsarClient;
this.timeout = timeout;
this.admin = admin;
}
public void start() {
event.addJob();
try {
CompletableFuture.runAsync(() -> {
StopWatch watch = new StopWatch();
try {
watch.start(jobName);
run(pulsarClient, admin);
} catch (Exception e) {
event.oneException(this, e);
} finally {
watch.stop();
event.finishOne(jobName, StrUtil.format("cost: {}s", watch.getTotalTimeSeconds()));
}
}, TestCase.EXECUTOR).get(timeout, TimeUnit.SECONDS);
} catch (Exception e) {
event.oneException(this, e);
}
}
/** run busy code
* @param pulsarClient pulsar client
* @param admin pulsar admin client
* @throws Exception e
*/
public abstract void run(PulsarClient pulsarClient, PulsarAdmin admin) throws Exception;
}
核心代碼就是這個(gè)抽象的任務(wù)定義類,其中的 start 函數(shù)用于定義任務(wù)執(zhí)行的模版:
- 添加任務(wù):具體實(shí)現(xiàn)是任務(wù)計(jì)數(shù)器+1
- 開始計(jì)時(shí)
- 執(zhí)行抽血的 run 函數(shù),具體實(shí)現(xiàn)交給子類
- 異常時(shí)記錄事件
- 正常執(zhí)行完畢后也記錄事件
下面來看一個(gè)普通用例的實(shí)現(xiàn)情況:
就是重寫了 run() 函數(shù),然后在其中實(shí)現(xiàn)具體的測(cè)試用例,斷言測(cè)試結(jié)果。
這樣當(dāng)我們需要再添加用例的時(shí)候只需要再新增一個(gè)子類實(shí)現(xiàn)即可。
同時(shí)還需要定義一個(gè)事件接口,用于處理一些關(guān)鍵的節(jié)點(diǎn):
public interface Event {
/**
* 新增一個(gè)任務(wù)
*/
void addJob();
/** 獲取運(yùn)行中的任務(wù)數(shù)量
* @return 獲取運(yùn)行中的任務(wù)數(shù)量
*/
TestCaseRuntimeResponse getRuntime();
/**
* 單個(gè)任務(wù)執(zhí)行完畢
*
* @param jobName 任務(wù)名稱
* @param finishCost 任務(wù)完成耗時(shí)
*/
void finishOne(String jobName, String finishCost);
/**單個(gè)任務(wù)執(zhí)行異常
* @param jobDefine 任務(wù)
* @param e 異常
*/
void oneException(AbstractJobDefine jobDefine, Exception e);
/**
* 所有任務(wù)執(zhí)行完畢
*/
void finishAll();
}
其中 getRuntime 接口是用于在 cli 那邊查詢?nèi)蝿?wù)是否執(zhí)行完畢的接口,只有任務(wù)執(zhí)行完畢之后才能退出 cli。
監(jiān)控指標(biāo)
當(dāng)這些任務(wù)運(yùn)行完畢后我們需要重點(diǎn)查看應(yīng)用客戶端和 Pulsar broker 端是否有異常日志。
同時(shí)還需要觀察一些關(guān)鍵的監(jiān)控面板:
包含但不限于:
- 消息吞吐量
- broker 寫入延遲
- Bookkeeper 的寫入、讀取成功率,以及延遲。
當(dāng)然還有 zookeeper 的運(yùn)行情況也需要監(jiān)控,限于篇幅就不一一粘貼了。
以上就是測(cè)試整個(gè) Pulsar 集群的流程,當(dāng)然還有一些需要優(yōu)化的地方。
比如使用命令行還是有些不便,后續(xù)可能會(huì)切換到網(wǎng)頁上就可以操作。