Apache Flink 掃雷系列 - PyFlink如何解決多JAR包依賴問題
《Apache Flink 掃雷系列》簡介
本篇是《Apache Flink 掃雷系列》的第一篇,所以簡單介紹一下這一系列的特點,本系列所定義的”雷”是指由于目前Apache Flink目前的設計問題導致的用戶非便利性問題的臨時解決辦法。那么為什么明知道有設計問題還不進行設計重構,避免這些”雷”的存在呢?其實社區(qū)的發(fā)展和我們各個公司內部產品發(fā)展一樣,都有一些客觀因素導致實際存在的問題無法及時得到解決,比如,社區(qū)的Release或者內部產品發(fā)布的的周期問題,在沒有新的Release之前的一些對用戶非友好的問題就需要有一些“非正規(guī)”的解決方式,或者說是臨時解決方案,這種方案的特點就是,能解決問題,但不是通用性解決手段,只能民間流傳,不能官方宣揚。所以《Apache Flink 掃雷系列》就是為大家提供能夠解決大家現(xiàn)實問題,但是可能不是最佳實踐,大家在這系列中可以有更大的反哺社區(qū)的機會:)
開篇說”雷”
本篇的”雷”是目前針對Apache Flink 1.10集以前版本中,在利用CLI提交作業(yè)時候只能提交一個JAR的功能問題解決,也就是針對命令參數(shù)-j,--jarfile
掃雷難度
面對合并多個JAR包,也許Java用戶還好(雖然不便利,但應該都會操作),但對于Python用戶,在沒有涉及過Java開發(fā)的情況下,可能要花費一些時間來完成JARs的合并,甚至有可能有種無從下手的感覺。所以本篇主要針對的是不了解Java的Flink Python用戶。
案例選取
為了大家能夠實際的體驗實際效果,我們選取一個具體的案例來說明如果進行多JARs的合并。我們就選取我在2020年3月17日直播中所說的《PyFlink 場景案例 - PyFlink實現(xiàn)CDN日志實時分析》來進行說明。
案例回顧
《PyFlink 場景案例 - PyFlink實現(xiàn)CDN日志實時分析》核心是針對灌入Kafka的CDN日志數(shù)據(jù)經過PyFlink進行按地區(qū)的下載量,下載速度的統(tǒng)計,最終將統(tǒng)計數(shù)據(jù)寫入到MySql中。同時放入到Kafka的數(shù)據(jù)格式是CSV('format.type' = 'csv')。所以我們依賴的JARs如下:
- flink-sql-connector-kafka_2.11-1.10.0.jar
- flink-jdbc_2.11-1.10.0.jar
- flink-csv-1.10.0-sql-jar.jar
- mysql-connector-java-8.0.19.jar
我們可以用如下命令下載:
- $ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar
- $ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar
- $ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar
- $ curl -O https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar
我們將如上4個JARs下載到某個目錄,我這里下載到本機的temp目錄:
“雷”存在的場景說明
為啥在博客《PyFlink 場景案例 - PyFlink實現(xiàn)CDN日志實時分析》并沒有提到要合并JARs的問題? 是的,這個“雷”的存在是有一定的條件的:
作業(yè)提交的集群環(huán)境沒有預先安裝你所有需要的JARs(大部分情況都是不會安裝的)
上面條件是必須成立,才會存在掃雷的問題。那么在博客中我在集群環(huán)境預安裝了說需要的JARs,也就是博客中提到的下載JARs到集群lib目錄
- PYFLINK_LIB=python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/lib')")
的操作。
合并JARs的注意點
合并JARs的一個很重要的點是涉及到了JAR包的Service Provider機制,詳細規(guī)范詳見。這是讓Python人員是很難注意到的合并重點。JAR包的Service Provider機制會允許在JAR包的META-INF/services目錄下保存Service Provider的配置文件。簡單說就是他為開發(fā)者提供了一種擴展機制,在開發(fā)階段只是定義接口,然后在包含實現(xiàn)的JAR包進行實現(xiàn)配置,就可以調用到實際接口的實現(xiàn)類。關于JAR包META-INF目錄結構簡單說明如下:
- META-INF - 目錄中的下列文件和目錄獲得Java 2平臺的認可與解釋,用來配置應用程序、擴展程序、類加載器和服務:
- MANIFEST.MF - 清單文件,用來定義與擴展和數(shù)據(jù)包相關的數(shù)據(jù)。
- INDEX.LIST - 這個文件由JAR工具的新“-i”選項生成,其中包含在一個應用程序或擴展中定義的數(shù)據(jù)包的地址信息。它是JarIndex的一部分,被類加載器用來加速類加載過程。
- x.SF - JAR文件的簽名文件。x代表基礎文件名。
- x.DSA - 這個簽名塊文件與同名基礎簽名文件有關。此文件存儲對應簽名文件的數(shù)字簽名。
- services - 這個目錄存儲所有服務提供程序配置文件。
注意:provider配置文件必須是以UTF-8編碼。
合并操作
1. 解壓JARs
- $ mkdir jobjar csv jdbc kafka mysql
其中jobjar存放最終我們打包的JAR內容, csv jdbc kafka mysql存放對應的JAR所解壓的內容。具體命令如下:
- $ unzip flink-csv-1.10.0-sql-jar.jar -d csv/
- $ unzip flink-sql-connector-kafka_2.11-1.10.0.jar -d kafka/
- $ unzip flink-jdbc_2.11-1.10.0.jar -d jdbc/
- $ unzip mysql-connector-java-8.0.19.jar -d mysql
解壓之后我們會在剛才的目錄得到如下文件內容:
我們核心要處理的是class文件夾和 META-INF/services文件夾,如圖csv和kafka的JAR解壓之后的內容。其中,Class文件夾可以直接拷貝。但是services要進行同名的合并,比如上用于Flink的Connector的服務發(fā)現(xiàn)配置org.apache.flink.table.factories.TableFactory是需要將文件內容進行合并的。
2. 合并JARs
首先我們創(chuàng)建META-INF和META-INF/services目錄,目錄結構如下:
- jincheng:jobjar jincheng.sunjc$ tree -L 2
- .
- └── META-INF
- └── services
- 2 directories, 0 files
(1) class文件合并
將csv jdbc kafka mysql的class直接copy到jobjar目錄,如下:
- $ cp -rf ../csv/org .
- $ cp -rf ../jdbc/org .
- $ cp -rf ../kafka/org .
- $ cp -rf ../mysql/com .
- $ tree -L 2
- .
- ├── META-INF
- │ └── services
- ├── com
- │ └── mysql
- └── org
- └── apache
詳細的目錄結構如下:
(2) services合并
Service Provider是JAR的一個標準,不僅僅Flink的Connector使用了Service Provider機制,同時Kafka使用了配置的服務發(fā)現(xiàn)。所以我們要將所有的services里面的內容按文件名進行合并。以csv和kafka為例:
在CSV里面的META-INF/services里面只有一個和Flink的connector相關的配置,內容如下:
在Kafka里面的META-INF/services里面有Flink的connector相關的配置和Kafka內部使用的配置,內容如下:
所以我們需要將Kafka相關的直接copy到jobjar/META-INF/services/目錄,然后將csv和Kafka關于org.apache.flink.table.factories.TableFactory的配置進行內容合并。合并的內容如下:
- # Licensed to the Apache Software Foundation (ASF) under
- ...
- ...
- # limitations under the License.
- org.apache.flink.formats.csv.CsvRowFormatFactory
- org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
我們最終將4個JARs的services配置進行合并之后的最終代碼如下:
- # Licensed to the Apache Software Foundation (ASF) under
- ...
- ...
- # limitations under the License.
- org.apache.flink.formats.csv.CsvRowFormatFactory
- org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
- org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
大家可以嘗試使用的命令如下:
- $ cat ../csv/META-INF/services/org.apache.flink.table.factories.TableFactory | grep ^[^#] >> META-INF/services/org.apache.flink.table.factories.TableFactory
- $ cat ../kafka/META-INF/services/org.apache.flink.table.factories.TableFactory | grep ^[^#] >> META-INF/services/org.apache.flink.table.factories.TableFactory
- $ cat ../kafka/META-INF/services/org.apache.flink.kafka.shaded.org.apache.kafka.common.config.provider.ConfigProvider | grep ^[^#] >> META-INF/services/org.apache.flink.kafka.shaded.org.apache.kafka.common.config.provider.ConfigProvider
- $ cat ../jdbc/META-INF/services/org.apache.flink.table.factories.TableFactory | grep ^[^#] >> META-INF/services/org.apache.flink.table.factories.TableFactory
3. 創(chuàng)建JAR
這一步驟沒有特別強調的內容,直接用用zip或者jar命令進行打包就好了。
- $ jincheng:jobjar jincheng.sunjc$ jar -cf myjob.jar META-INF com org
我最終產生的JAR可以在這里下載,用于對比你自己打包的是否和我的一樣:)
OK,到這里我們就完成了多JARs的合并工作。我們可以嘗試應用CLI進行提交命令了。
CLI提交作業(yè)
- 啟動集群(我修改了flink-conf,將端口更改到4000了)
- /usr/local/lib/python3.7/site-packages/pyflink/bin/start-cluster.sh local
- Starting cluster.
- Starting standalonesession daemon on host jincheng.local.
- Starting taskexecutor daemon on host jincheng.local.
當沒有添加-j選項時候,提交作業(yè)如下:
- $PYFLINK_LIB/../bin/flink run -m localhost:4000 -py cdn_demo.py
報錯如下:
提供正確的-j參數(shù),將我們打包的JAR提交到集群的情況,如下:
- $PYFLINK_LIB/../bin/flink run -j ~/temp/jobjar/myjob.jar -m localhost:4000 -py cdn_demo.py
同時Web控制臺可以查看提交的作業(yè):
小結
本篇核心介紹了PyFlink的用戶如何解決多JARs依賴作業(yè)提交問題,也許這不是最Nice的解決方法,但至少是你解決多JARs依賴作業(yè)提交的方法之一,祝你 “掃雷” 順利,也期望如果你有更好的解決辦法,留言或者郵件與我分享哦:)!
【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉載請聯(lián)系原作者】