大數(shù)據(jù)實時分析:Flink 連接 Kafka 和 Flink SQL
Flink 連接 Kafka 前的準(zhǔn)備
在使用 Apache Flink 連接 Apache Kafka 之前,需要完成以下準(zhǔn)備工作。具體步驟如下:
從 Maven 官方庫獲取相關(guān)的 jar
選擇合適的 Kafka 連接器版本
- 根據(jù)我們使用的 Flink 版本選擇合適的 Kafka 連接器版本。官方建議的版本可以在 Flink 的官方文檔中找到。
添加 Maven 依賴
- 打開我們的項目的 pom.xml 文件,并添加以下依賴(假設(shè)我們使用的是 Flink 1.13 和 Kafka 2.8.0):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
下載 jar 文件
- 在命令行中運行 mvn clean package 下載依賴的 jar 文件。
將 jar 放到 lib 目錄下
找到下載的 jar 文件
- 運行 Maven 命令后,相關(guān)的 jar 文件會被下載到本地的 Maven 倉庫中,通常位。
- 于 ~/.m2/repository/org/apache/flink/ 下。
復(fù)制 jar 文件到 Flink 的 lib 目錄
- 找到相關(guān)的 jar 文件并將其復(fù)制到 Flink 的 lib 目錄中。假設(shè) Flink 安裝在 /opt/flink 路徑下,執(zhí)行以下命令:
cp ~/.m2/repository/org/apache/flink/flink-connector-kafka_2.12/1.13.0/flink-connector-kafka_2.12-1.13.0.jar /opt/flink/lib/
重啟 Flink
停止 Flink 集群
- 執(zhí)行以下命令停止 Flink 集群:
/opt/flink/bin/stop-cluster.sh
啟動 Flink 集群
- 執(zhí)行以下命令啟動 Flink 集群:
/opt/flink/bin/start-cluster.sh
完成上述步驟后,F(xiàn)link 將能夠連接并消費 Kafka 的消息。
Flink連接Kafka的例子
在 Apache Flink 中,通過 Flink SQL 從 Kafka 中讀取數(shù)據(jù),通常需要以下幾個步驟:
定義 Kafka 數(shù)據(jù)源表
使用 SQL 語句定義一個 Kafka 表,該表描述了如何從 Kafka 主題中讀取數(shù)據(jù)以及數(shù)據(jù)的格式。
執(zhí)行 SQL 查詢
編寫 SQL 查詢來處理從 Kafka 讀取的數(shù)據(jù)。下面是一個詳細(xì)的示例,演示如何通過 Flink SQL 從 Kafka 中讀取數(shù)據(jù):
定義 Kafka 數(shù)據(jù)源表
首先,我們需要定義一個 Kafka 表。假設(shè)我們有一個 Kafka 主題 input_topic,它包含 JSON 格式的數(shù)據(jù)。我們可以使用 CREATE TABLE 語句來定義這張表。
CREATE TABLE input_table (
user_id STRING,
action STRING,
timestamp TIMESTAMP(3),
WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'input_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink_consumer_group',
'format' = 'json'
);
編寫 SQL 查詢
定義好 Kafka 表后,我們可以編寫 SQL 查詢來處理從 Kafka 中讀取的數(shù)據(jù)。例如,我們可以計算每個用戶的操作次數(shù),并將結(jié)果插入到另一個 Kafka 主題。
CREATE TABLE output_table (
user_id STRING,
action_count BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'output_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
INSERT INTO output_table
SELECT user_id, COUNT(action) AS action_count
FROM input_table
GROUP BY user_id, TUMBLE(timestamp, INTERVAL '10' MINUTE);
詳細(xì)解釋
- input_table
user_id 和 action 是讀取自 Kafka 消息的字段。
timestamp 是事件時間戳,用于時間語義。
WATERMARK 用于處理遲到的數(shù)據(jù),定義了一個 watermark 策略,表示事件時間戳延遲 5 秒。
WITH 子句定義了 Kafka 連接器的配置,包括 Kafka 主題名、服務(wù)器地址、消費者組 ID 和消息格式。
- output_table
定義了一個輸出表,將結(jié)果寫回 Kafka 的 output_topic 主題。
配置與 input_table 類似,定義了 Kafka 連接器的屬性。
SQL 查詢
使用 INSERT INTO ... SELECT ... 語句從 input_table 讀取數(shù)據(jù),并將處理結(jié)果寫入 output_table。
使用 TUMBLE 函數(shù)定義了一個 10 分鐘的滾動窗口,按 user_id 進(jìn)行分組并計算每個用戶的操作次數(shù)。
運行 SQL 查詢
上述 SQL 查詢可以通過 Flink SQL CLI、Flink SQL 程序或 Flink SQL 任務(wù)提交工具來運行。以下是通過 Flink SQL CLI 運行這些查詢的步驟:
- 啟動 Flink 集群。
- 進(jìn)入 Flink SQL CLI:
./bin/sql-client.sh
- 在 SQL CLI 中執(zhí)行上述 CREATE TABLE 和 INSERT INTO 語句。
這樣,F(xiàn)link 就會開始從 Kafka 的 input_topic 主題中讀取數(shù)據(jù),按定義的 SQL 查詢進(jìn)行處理,并將結(jié)果寫入 output_topic 主題。
Flink連接Kafka-帶有時間屬性
在 Apache Flink SQL 中,可以使用窗口函數(shù)來從 Kafka 中每隔五分鐘取一次數(shù)據(jù)并進(jìn)行分析。下面是一個詳細(xì)的示例,展示了如何定義一個 Kafka 數(shù)據(jù)源表,并使用滾動窗口(Tumbling Window)來每五分鐘進(jìn)行一次數(shù)據(jù)聚合分析。
定義 Kafka 數(shù)據(jù)源表
首先,需要定義一個 Kafka 表,該表描述了如何從 Kafka 主題中讀取數(shù)據(jù)以及數(shù)據(jù)的格式。
CREATE TABLE input_table (
user_id STRING,
action STRING,
timestamp TIMESTAMP(3),
WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'input_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink_consumer_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
定義結(jié)果表
接下來,需要定義一個輸出表,用于存儲分析結(jié)果。這里假設(shè)我們將結(jié)果寫回到另一個 Kafka 主題。
CREATE TABLE output_table (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
user_id STRING,
action_count BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'output_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
編寫 SQL 查詢
然后,編寫 SQL 查詢來從 Kafka 表中每隔五分鐘取一次數(shù)據(jù)并進(jìn)行聚合分析。使用 TUMBLE 窗口函數(shù)來定義一個滾動窗口。
INSERT INTO output_table
SELECT
TUMBLE_START(timestamp, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(timestamp, INTERVAL '5' MINUTE) AS window_end,
user_id,
COUNT(action) AS action_count
FROM input_table
GROUP BY
TUMBLE(timestamp, INTERVAL '5' MINUTE),
user_id;
詳細(xì)解釋
- input_table
user_id 和 action 是從 Kafka 消息中讀取的字段。
timestamp 是事件時間戳,用于定義時間窗口。
WATERMARK 定義了一個 watermark 策略,允許事件時間戳延遲 5 秒。
WITH 子句定義了 Kafka 連接器的配置,包括 Kafka 主題名、服務(wù)器地址、消費者組 ID、啟動模式和消息格式。
- output_table
定義了一個輸出表,將結(jié)果寫回 Kafka 的 output_topic 主題。
配置與 input_table 類似,定義了 Kafka 連接器的屬性。
SQL 查詢
使用 INSERT INTO ... SELECT ... 語句從 input_table 讀取數(shù)據(jù),并將處理結(jié)果寫入 output_table。
TUMBLE 函數(shù)定義了一個 5 分鐘的滾動窗口。
TUMBLE_START 和 TUMBLE_END 函數(shù)分別返回窗口的開始時間和結(jié)束時間。
按 user_id 進(jìn)行分組,并計算每個用戶在每個 5 分鐘窗口內(nèi)的操作次數(shù)。
運行 SQL 查詢
這些 SQL 查詢可以通過 Flink SQL CLI、Flink SQL 程序或 Flink SQL 任務(wù)提交工具來運行。以下是通過 Flink SQL CLI 運行這些查詢的步驟:
- 啟動 Flink 集群。
- 進(jìn)入 Flink SQL CLI:
./bin/sql-client.sh
- 在 SQL CLI 中執(zhí)行上述 CREATE TABLE 和 INSERT INTO 語句。
這樣,F(xiàn)link 就會從 Kafka 的 input_topic 主題中讀取數(shù)據(jù),每隔五分鐘按定義的 SQL 查詢進(jìn)行處理,并將結(jié)果寫入 output_topic 主題。