自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

十分鐘了解Flink SQL使用

大數(shù)據(jù)
本文將介紹Flink SQL的基本原理、使用方法、流批統(tǒng)一,并通過(guò)幾個(gè)例子進(jìn)行實(shí)踐。

Flink 是一個(gè)流處理和批處理統(tǒng)一的大數(shù)據(jù)框架,專(zhuān)門(mén)為高吞吐量和低延遲而設(shè)計(jì)。開(kāi)發(fā)者可以使用SQL進(jìn)行流批統(tǒng)一處理,大大簡(jiǎn)化了數(shù)據(jù)處理的復(fù)雜性。本文將介紹Flink SQL的基本原理、使用方法、流批統(tǒng)一,并通過(guò)幾個(gè)例子進(jìn)行實(shí)踐。

一、Flink SQL基本原理

Flink SQL建立在Apache Flink之上,利用Flink的強(qiáng)大處理能力,使得用戶(hù)可以使用SQL語(yǔ)句進(jìn)行流數(shù)據(jù)和批數(shù)據(jù)處理。Flink SQL既支持實(shí)時(shí)的流數(shù)據(jù)處理,也支持有界的批數(shù)據(jù)處理。

Flink SQL用SQL作為處理數(shù)據(jù)的接口語(yǔ)言,將SQL語(yǔ)句轉(zhuǎn)換成數(shù)據(jù)流圖(Dataflow Graph),再由Flink引擎執(zhí)行。

二、Flink SQL固定編碼套路

使用Flink SQL時(shí),我們通常會(huì)遵循如下編碼套路,這些套路和使用Flink API的套路是一樣的:

  • 環(huán)境準(zhǔn)備:初始化一個(gè)TableEnvironment對(duì)象,它是執(zhí)行Flink SQL語(yǔ)句的核心。這個(gè)環(huán)境可以是流數(shù)據(jù)環(huán)境,也可以是批數(shù)據(jù)環(huán)境。
  • 數(shù)據(jù)源定義:通過(guò)CREATE TABLE語(yǔ)句定義輸入數(shù)據(jù)源(source),可以是Kafka、CSV文件等。
  • 數(shù)據(jù)處理:編寫(xiě)SQL語(yǔ)句對(duì)數(shù)據(jù)進(jìn)行處理,如查詢(xún)、過(guò)濾、聚合等。
  • 數(shù)據(jù)輸出:通過(guò)CREATE TABLE定義輸出數(shù)據(jù)源(sink),并將處理結(jié)果輸出。

三、Flink SQL代碼示例

以下是一個(gè)從CSV文件讀取數(shù)據(jù),通過(guò)SQL查詢(xún),再將數(shù)據(jù)輸出到CSV的完整例子。

(1) 先準(zhǔn)備input.csv文件內(nèi)容,如下:

1,product_A,10.5
2,product_B,20.3
3,product_C,15.8
1,product_D,12.2
2,product_A,18.7

(2) 編寫(xiě)demo代碼

編寫(xiě)代碼之前先在pom.xml中添加依賴(lài):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>

示例代碼如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSqlDemo {
    public static void main(String[] args) throws Exception {
        // 設(shè)置環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //為了方便測(cè)試看效果,這里并行度設(shè)置為1
        // 使用EnvironmentSettings創(chuàng)建StreamTableEnvironment,明確設(shè)置為批處理模式
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inBatchMode() // 設(shè)置為批處理模式,這樣后續(xù)才能一次性的輸出到csv中
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // 定義輸入數(shù)據(jù)源
        String createSourceTableDdl = "CREATE TABLE csv_source (" +
                " user_id INT," +
                " product STRING," +
                " order_amount DOUBLE" +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'file:///path/input.csv'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createSourceTableDdl);

//        // 編寫(xiě) SQL 查詢(xún)
//        String query = "SELECT user_id, SUM(order_amount) AS total_amount FROM csv_source GROUP BY user_id";
//        // 執(zhí)行查詢(xún)并打印
//        tableEnv.executeSql(query).print();
//        env.execute("Flink SQL Demo");

        // 定義輸出數(shù)據(jù)源
        String createSinkTableDdl = "CREATE TABLE csv_sink (" +
                " user_id INT," +
                " total_amount DOUBLE" +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'file:///path/output.csv'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createSinkTableDdl);

        // 執(zhí)行查詢(xún)并將結(jié)果輸出到csv_sink
        String query = "INSERT INTO csv_sink " +
                "SELECT user_id, SUM(order_amount) as total_amount " +
                "FROM csv_source " +
                "GROUP BY user_id";
        tableEnv.executeSql(query);
//        env.execute("Flink SQL Job");
    }
}

(3) 執(zhí)行結(jié)果如下:

四、Flink SQL做流批統(tǒng)一

1.什么是流批統(tǒng)一?

流批統(tǒng)一是大數(shù)據(jù)處理領(lǐng)域的一個(gè)概念,它指的是使用一套代碼來(lái)同時(shí)處理流數(shù)據(jù)(Streaming)和批數(shù)據(jù)(Batching)。

流處理和批處理的區(qū)別如下:

(1) 批處理(Batch Processing):

  • 批處理是指在某一時(shí)間點(diǎn)處理大量數(shù)據(jù)的手段。
  • 它通常涉及到對(duì)大量靜止的(不再變化的)數(shù)據(jù)集進(jìn)行一次性的處理。
  • 批處理作業(yè)通常在數(shù)據(jù)集完整可用后開(kāi)始執(zhí)行,并且經(jīng)常是在數(shù)據(jù)倉(cāng)庫(kù)中進(jìn)行。
  • 例如,一個(gè)電商平臺(tái)可能在一天結(jié)束時(shí)運(yùn)行一個(gè)批處理作業(yè)來(lái)處理當(dāng)天所有的交易記錄。

(2) 流處理(Stream Processing):

  • 流處理是指對(duì)數(shù)據(jù)實(shí)時(shí)進(jìn)行處理,通常是數(shù)據(jù)生成或接收的同時(shí)立即進(jìn)行。
  • 流處理適用于連續(xù)的數(shù)據(jù)輸入,這些數(shù)據(jù)一直在變化,需要立即響應(yīng)。
  • 例如,社交媒體平臺(tái)在接收到新的帖子時(shí),可能會(huì)實(shí)時(shí)分析這些帖子的內(nèi)容和流行趨勢(shì)。

在早期,流處理和批處理通常需要不同的系統(tǒng)來(lái)執(zhí)行。對(duì)于批處理,可能使用如Hadoop這樣的框架;而對(duì)于流處理,可能使用如Apache Storm這樣的框架。這就導(dǎo)致開(kāi)發(fā)者要同時(shí)學(xué)習(xí)多種框架才能處理不同類(lèi)型的數(shù)據(jù)作業(yè)。

流批統(tǒng)一的概念,就是將這兩種數(shù)據(jù)處理方式合并到一個(gè)平臺(tái)中,這樣一個(gè)系統(tǒng)既可以處理靜止的大批量數(shù)據(jù)集,也可以處理實(shí)時(shí)的數(shù)據(jù)流。這樣做的優(yōu)點(diǎn)是顯而易見(jiàn)的:

  • 統(tǒng)一的API:開(kāi)發(fā)人員只需要學(xué)習(xí)和使用一套工具和API,可以共享更多的代碼和邏輯。
  • 維護(hù)簡(jiǎn)便:只需維護(hù)一個(gè)系統(tǒng),可以減少學(xué)習(xí)成本,減輕運(yùn)維壓力,減少故障點(diǎn)。
  • 靈活的數(shù)據(jù)處理:可以根據(jù)不同的業(yè)務(wù)需求靈活選擇數(shù)據(jù)處理方式。

2.Flink SQL流批一體的實(shí)現(xiàn)原理

Flink很好的實(shí)現(xiàn)了流批統(tǒng)一,可以讓開(kāi)發(fā)人員用相同的方式來(lái)編寫(xiě)批處理和流處理程序。不論是對(duì)有界(批處理)還是無(wú)界(流處理)的數(shù)據(jù)源,F(xiàn)link都可以使用相同的API和處理邏輯來(lái)處理數(shù)據(jù)。

Flink 通過(guò)內(nèi)置的表抽象來(lái)實(shí)現(xiàn)流批一體,這里的"表"可以是動(dòng)態(tài)變化的(例如,來(lái)自實(shí)時(shí)數(shù)據(jù)流的表)或是靜態(tài)的(例如,存儲(chǔ)在文件或數(shù)據(jù)庫(kù)中的批量數(shù)據(jù)表)。Flink SQL引擎會(huì)根據(jù)數(shù)據(jù)的實(shí)際來(lái)源自動(dòng)優(yōu)化執(zhí)行計(jì)劃。

Flink SQL的流批統(tǒng)一核心在于三點(diǎn):

  • 統(tǒng)一的API和SQL語(yǔ)義:Flink SQL提供一致的查詢(xún)構(gòu)建塊(如窗口、時(shí)間處理函數(shù)),這些在流處理和批處理中語(yǔ)義一致,確保不同模式下行為的統(tǒng)一性。
  • 透明的狀態(tài)處理:無(wú)論是流處理還是批處理,F(xiàn)link都能夠保持和恢復(fù)狀態(tài),為開(kāi)發(fā)者提供一致的高容錯(cuò)性體驗(yàn)。
  • 多模態(tài)存儲(chǔ)和處理能力:Flink SQL能夠訪(fǎng)問(wèn)不同存儲(chǔ)介質(zhì)的數(shù)據(jù),這意味著相同的SQL語(yǔ)句可以無(wú)縫在流數(shù)據(jù)和存儲(chǔ)的批量數(shù)據(jù)上執(zhí)行。

3.Flink SQL流批統(tǒng)一的代碼示例

以下是一個(gè)完整的代碼示例,用Flink來(lái)實(shí)現(xiàn)流批統(tǒng)一處理。Flink同時(shí)從Kafka 和 CSV讀取數(shù)據(jù),然后合并查詢(xún)?cè)佥敵鼋Y(jié)果:

(1) 代碼示例

代碼中,先配置了Flink的流處理環(huán)境和表環(huán)境,然后用DDL語(yǔ)句在Flink中注冊(cè)了Kafka和文件系統(tǒng)數(shù)據(jù)源。接著執(zhí)行了一個(gè)SQL查詢(xún)來(lái)合并來(lái)自這兩種數(shù)據(jù)源的數(shù)據(jù),并計(jì)算總金額。最后,打印出查詢(xún)結(jié)果并開(kāi)始執(zhí)行Flink作業(yè)。


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class StreamBatchUnifiedDemo {
    public static void main(String[] args) throws Exception {
        // 設(shè)置流處理的環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // Kafka 流處理表
        String createKafkaSourceDDL = "CREATE TABLE kafka_stream_orders (" +
                "order_id STRING," +
                "amount DOUBLE)" +
                "WITH (" +
                "'connector' = 'kafka'," +
                "'topic' = 'topic_test'," +
                "'properties.bootstrap.servers' = '10.20.1.26:9092'," +
                "'format' = 'json'," +
                "'scan.startup.mode' = 'latest-offset'" +
                ")";
        tableEnv.executeSql(createKafkaSourceDDL);

        // 文件系統(tǒng)批處理表
        String createFilesystemSourceDDL = "CREATE TABLE file_batch_orders (" +
                "order_id STRING," +
                "amount DOUBLE)" +
                "WITH (" +
                "'connector' = 'filesystem'," +
                "'path' = 'file:///Users/yclxiao/Project/bigdata/flink-blog/doc/input_order.csv'," +
                "'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createFilesystemSourceDDL);

        // 執(zhí)行統(tǒng)一查詢(xún),計(jì)算總金額
        Table resultTable = tableEnv.sqlQuery("SELECT SUM(amount) FROM (" +
                "SELECT amount FROM kafka_stream_orders " +
                "UNION ALL " +
                "SELECT amount FROM file_batch_orders)");

        // 打印結(jié)果
        tableEnv.toRetractStream(resultTable, Row.class).print();

        // 開(kāi)始執(zhí)行程序
        env.execute("Stream-Batch Unified Job");
    }
}

(2) 執(zhí)行效果

通過(guò)以上示例代碼,可以看出Flink SQL的流批一體設(shè)計(jì):相同的SQL語(yǔ)句可以用在流處理和批處理中,而不需要做任何修改。Flink背后的執(zhí)行引擎會(huì)自動(dòng)根據(jù)數(shù)據(jù)的特性(流或者批)來(lái)進(jìn)行相應(yīng)的優(yōu)化執(zhí)行。

這就是Flink SQL非常強(qiáng)大的地方,它減少了開(kāi)發(fā)者需要寫(xiě)不同代碼邏輯的需求,簡(jiǎn)化了復(fù)雜的數(shù)據(jù)處理流程。

五、總結(jié)

Flink SQL是一個(gè)非常強(qiáng)大的數(shù)據(jù)處理工具,可以應(yīng)對(duì)多種復(fù)雜的數(shù)據(jù)處理場(chǎng)景。

本文主要介紹了Flink SQL的基本原理、編碼套路、流批統(tǒng)一,再結(jié)合正確的代碼示例進(jìn)行實(shí)踐。希望對(duì)你有幫助。

完整代碼地址:https://github.com/yclxiao/flink-blog

責(zé)任編輯:趙寧寧 來(lái)源: 不焦躁程序員
相關(guān)推薦

2020-12-17 06:48:21

SQLkafkaMySQL

2024-06-19 09:58:29

2023-07-15 18:26:51

LinuxABI

2024-11-07 16:09:53

2015-11-06 11:03:36

2021-07-29 08:57:23

ViteReact模塊

2009-11-03 11:01:45

VB.NET遠(yuǎn)程事件

2025-03-18 12:20:00

編程

2024-12-13 15:29:57

SpringSpringBeanJava

2024-10-08 11:12:12

2019-04-01 14:59:56

負(fù)載均衡服務(wù)器網(wǎng)絡(luò)

2009-10-09 14:45:29

VB程序

2024-10-06 12:50:25

2020-12-09 16:41:22

LinuxIT開(kāi)發(fā)

2021-09-07 09:40:20

Spark大數(shù)據(jù)引擎

2022-06-16 07:31:41

Web組件封裝HTML 標(biāo)簽

2023-04-12 11:18:51

甘特圖前端

2015-09-06 09:22:24

框架搭建快速高效app

2012-07-10 01:22:32

PythonPython教程

2023-11-30 10:21:48

虛擬列表虛擬列表工具庫(kù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)