使用 SQL 的方式查詢消息隊(duì)列數(shù)據(jù)以及踩坑指南
背景
為了讓業(yè)務(wù)團(tuán)隊(duì)可以更好的跟蹤自己消息的生產(chǎn)和消費(fèi)狀態(tài),需要一個(gè)類似于表格視圖的消息列表,用戶可以直觀的看到發(fā)送的消息;同時(shí)點(diǎn)擊詳情后也能查到消息的整個(gè)軌跡。
消息列表
點(diǎn)擊詳情后查看軌跡
原理介紹
由于 Pulsar 并沒有關(guān)系型數(shù)據(jù)庫(kù)中表的概念,所有的數(shù)據(jù)都是存儲(chǔ)在 Bookkeeper 中,為了模擬使用 SQL 查詢的效果 Pulsar 提供了 Presto (現(xiàn)在已經(jīng)更名為 Trino)的插件。
Trino 是一個(gè)分布式的 SQL 查詢引擎,它也提供了插件能力,如果我們想通過 SQL 從自定義數(shù)據(jù)源查詢數(shù)據(jù)時(shí),基于它的 SPI 編寫一個(gè)插件是很方便的。
這樣便可以類似于查詢數(shù)據(jù)庫(kù)一樣查詢 Pulsar 數(shù)據(jù):
Pulsar 插件的運(yùn)行流程如上圖所示:
- 啟動(dòng)的時(shí)候通過 Pulsar-Admin 接口獲取一些元數(shù)據(jù),比如 Scheme,topic 分區(qū)信息等。
- 然后會(huì)創(chuàng)建一個(gè)只讀的 Bookkeeper 客戶端,用于獲取數(shù)據(jù)。
- 之后根據(jù) SQL 條件過濾數(shù)據(jù)即可。
相關(guān)代碼:
使用 Pulsar-SQL
使用起來也很簡(jiǎn)單,官方提供了兩個(gè)命令:
- sql-worker: 會(huì)啟動(dòng)一個(gè) trino 服務(wù)端同時(shí)運(yùn)行了 Pulsar 插件.
- sql: 就是一個(gè) SQL 命令行終端。
遇到的問題
自己在本地運(yùn)行的時(shí)候自然是沒問題,可是一旦想在生產(chǎn)運(yùn)行,同時(shí)如果你的 Pulsar 集群是運(yùn)行再 k8s 環(huán)境中時(shí)就會(huì)碰到一些問題。
無法使用現(xiàn)有 Trino 集群
首先第一個(gè)問題是如果生產(chǎn)環(huán)境已經(jīng)有了一個(gè) Trino 集群想要復(fù)用的時(shí)候就會(huì)碰到問題,常規(guī)流程是將 Pulsar 的插件復(fù)制到 Trino 的 Plugin 目錄,然后重啟 Trino 后就能使用該插件。
當(dāng)然社區(qū)也是支持這么做的:
但是當(dāng)我將 Pulsar-plugin 復(fù)制到 Trino 中運(yùn)行的時(shí)候卻失敗了,整體的流程可以參考這個(gè) issue:https://github.com/apache/pulsar/discussions/20941
簡(jiǎn)單來說 Trino 的官方鏡像和 pulsar-plugin 并不能兼容,這個(gè)問題直接影響到我們是否可以在生產(chǎn)環(huán)境使用它。
但是手動(dòng)編譯出來的 Trino 服務(wù)和插件是兼容的,可以直接運(yùn)行。
因此我只能在本地編譯出 Trino 服務(wù)端和 pulsar-plugin 然后打包成一個(gè)鏡像來運(yùn)行了,當(dāng)然這樣的壞處就是無法利用到我們現(xiàn)有的 Trino 集群,又得重新部署一個(gè)了。
流程也比較麻煩:
- 首先是本地編譯 Pulsar-SQL 模塊
- 將生成物復(fù)制到當(dāng)前目錄
- 執(zhí)行 make docker 打出 docker 鏡像并上傳到私服
- 再執(zhí)行 kubectl 將 trino 部署到 k8s 環(huán)境中
整個(gè)流程做下來加上和社區(qū)的溝通,更加確定這個(gè)功能應(yīng)該是很少有人在生產(chǎn)環(huán)境使用的,畢竟第一個(gè)坑就很麻煩,更別提后續(xù)的問題了??。
Presto 插件不支持 AuthToken
第二個(gè)問題也是個(gè)深坑,當(dāng)我把 Trino 部署好查詢數(shù)據(jù)的時(shí)候直接拋了一個(gè)調(diào)用 pulsar-admin 接口連接超時(shí)的異常。
結(jié)果排查了半天發(fā)現(xiàn)原來是 pulsar-plugin 里沒有提供 JWT 的驗(yàn)證方式,而我們的 Pulsar 集群恰好是打開了 JWT 驗(yàn)證的。
為此我只能先在本地修復(fù)了這個(gè)問題,同時(shí)也提交了 PR,預(yù)計(jì)會(huì)在下一個(gè)大版本合并吧:https://github.com/apache/pulsar/pull/20860。
新創(chuàng)建的 topic 查詢失敗
第二個(gè)問題是當(dāng)查詢一個(gè)新創(chuàng)建的 topic 時(shí),客戶端會(huì)直接 block,相關(guān)的復(fù)現(xiàn)流程在這里:https://github.com/apache/pulsar/issues/20910
這個(gè)問題還好,不是很致命,是我在本地測(cè)試的時(shí)候無意間發(fā)現(xiàn)的。
本地我已經(jīng)修復(fù)了,后面也提交了一個(gè) PR,目前還在討論中:https://github.com/apache/pulsar/pull/20911
查詢消息會(huì)丟失最后一條
這個(gè)問題也不是很嚴(yán)重,數(shù)據(jù)量少的時(shí)候會(huì)發(fā)現(xiàn),就是在指定了消息發(fā)送時(shí)間的查詢條件時(shí),最后一條消息會(huì)被過濾掉,相關(guān) issue 在這里:https://github.com/apache/pulsar/issues/20919。
這個(gè)我只是定位到了原因,但不太清楚 為什么要這么做(-1),影響也不是很大,就放在這里擱置了。
Schema 不兼容
最后發(fā)現(xiàn)的一個(gè)問題是我們線上某些 topic 查詢數(shù)據(jù)的時(shí)候會(huì)拋出 Not a record: "string"的異常,但只是部分 topic,也排查了很久,整個(gè)源碼中沒有任何一個(gè)地方有這個(gè)異常。
https://github.com/apache/pulsar/issues/20945
根本原因是生產(chǎn)者生成的 schema 有問題,類型已經(jīng)是 JSON 了,但是 schema 卻是 string,這樣導(dǎo)致 pulsar-plugin 在反序列化 schema 的時(shí)候拋出了異常,由于是 pb 反序列化拋出的異常,所以源碼中都搜索不到。
沒有問題的 topic 使用了正確的 schema
后續(xù)我也在本地修復(fù)了這個(gè)問題,當(dāng)拋出異常后就將 schema 降級(jí)為基本類型進(jìn)行解析。
不過本質(zhì)問題還是客戶端使用有誤,如果對(duì) schema 理解不準(zhǔn)確的話還是建議使用 byte[] 吧,這樣至少兼容性不會(huì)有問題。相關(guān) PR:https://github.com/apache/pulsar/pull/20955。
總結(jié)
Pulsar-SQL 是一個(gè)非常有用的功能,只是我們使用過程中確實(shí)發(fā)現(xiàn)了一些問題,大部分都已經(jīng)修復(fù)了;希望對(duì)后續(xù)使用該功能的朋友有所幫助。