FlinkSQL 中 Catalog 的使用場景及案例詳解
Catalog 在 Flink SQL 中是一個元數(shù)據(jù)管理組件,用于存儲和管理數(shù)據(jù)庫、表、視 圖、函數(shù)等元數(shù)據(jù)對象的抽象接口。它類似于傳統(tǒng)數(shù)據(jù)庫系統(tǒng)中的元數(shù)據(jù)倉庫,為 Flink SQL 提供了元數(shù)據(jù)管理能力。
Catalog 使 Flink 能夠:
- 以統(tǒng)一的方式訪問不同的外部系統(tǒng)
- 減少代碼中的硬編 碼配置
- 實現(xiàn)表元數(shù)據(jù)的持久化
- 支持跨會話的元數(shù)據(jù)共享。
1. Catalog的作用
(1) 管理元數(shù)據(jù)對象
Catalog 可以管理以下元數(shù)據(jù)對象: - 數(shù)據(jù)庫(Database) - 表(Table) - 視圖(View) - 函數(shù)(Function) - 分區(qū)(Partition)等
(2) 支持多樣化的元數(shù)據(jù)存儲
Flink 支持多種Catalog 實現(xiàn),可以連接各種外部元數(shù)據(jù)系統(tǒng): - 內(nèi)存Catalog(默認) - Hive Catalog - JDBC Catalog - 自定義 Catalog
(3) 提供統(tǒng)一的數(shù)據(jù)訪問接口
無論底層元數(shù)據(jù)存儲在哪里,都可以通過統(tǒng)一的接口訪問和操作
(4) 簡化元數(shù)據(jù)管理
開發(fā)者可以通過Catalog 注冊永久表,而不是在代碼中重復定義表結構
2. Flink內(nèi)置的catalog類型
(1) GenericInMemoryCatalog
默認的內(nèi)存 Catalog,元數(shù)據(jù)只在單個 Flink 會話中有效,會話結束數(shù)據(jù)就會丟失。
// 創(chuàng)建內(nèi)存Catalog
Catalog inmemory = new GenericInMemoryCatalog("in_memory_catalog");
tableEnv.registerCatalog("in_memory_catalog", inmemory); -- SQL 中創(chuàng)建和使用內(nèi)存Catalog
CREATE CATALOG my_memory_catalog WITH (
'type' = 'generic_in_memory'
);
USE CATALOG my_memory_catalog;
(2) HiveCatalog
使用Hive Metastore 存儲元數(shù)據(jù),支持持久化,適合生產(chǎn)環(huán)境。
// 創(chuàng)建Hive Catalog
String name = "my_hive_catalog";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf";
String version = "2.3.6";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
version);
tableEnv.registerCatalog("my_hive_catalog", hive); -- SQL 中創(chuàng)建和使用Hive Catalog
CREATE CATALOG my_hive_catalog WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/path/to/hive/conf',
'hive-version' = '2.3.6'
);
USE CATALOG my_hive_catalog;
3. JdbcCatalog
使用關系型數(shù)據(jù)庫存儲元數(shù)據(jù)。
// 創(chuàng)建JDBC Catalog
String name = "my_jdbc_catalog";
String defaultDatabase = "default";
String username = "username";
String password = "password";
String baseUrl = "jdbc:mysql://localhost:3306";
JdbcCatalog jdbc = new JdbcCatalog(name, defaultDatabase, username, pas
sword, baseUrl);
tableEnv.registerCatalog("my_jdbc_catalog", jdbc); -- SQL中創(chuàng)建和使用JDBC Catalog(Flink 1.15+)
CREATE CATALOG my_jdbc_catalog WITH (
'type' = 'jdbc',
'default-database' = 'default',
'username' = 'username',
'password' = 'password',
'base-url' = 'jdbc:mysql://localhost:3306'
);
USE CATALOG my_jdbc_catalog;
(3) 使用Catalog的SQL操作
1. 創(chuàng)建和切換Catalog -- 創(chuàng)建Catalog
CREATE CATALOG my_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/path/to/hive/conf'
);
-- 查看所有Catalog
SHOW CATALOGS;
-- 切換當前Catalog
USE CATALOG my_catalog;
2. 創(chuàng)建和使用數(shù)據(jù)庫 -- 創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE my_database;
-- 查看所有數(shù)據(jù)庫
SHOW DATABASES;
-- 切換當前數(shù)據(jù)庫
USE my_database;
3. 管理表和視圖 -- 創(chuàng)建表
CREATE TABLE my_table (
id INT,
name STRING,
create_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'my_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
); -- 查看所有表
SHOW TABLES; -- 查看表結構
DESCRIBE my_table;
4. 管理函數(shù) -- 創(chuàng)建自定義函數(shù)
CREATE FUNCTION my_function AS 'com.example.MyFunction'; -- 查看所有函數(shù)
SHOW FUNCTIONS;
(4) Catalog 的實際應用示
// 跨會話持久化元數(shù)據(jù)
// 會話1:注冊Hive Catalog和表
StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecuti
onEnvironment();
StreamTableEnvironment tEnv1 = StreamTableEnvironment.create(env1);
tEnv1.executeSql("CREATE CATALOG hive_catalog WITH ('type' = 'hive')");
tEnv1.executeSql("USE CATALOG hive_catalog");
tEnv1.executeSql("CREATE DATABASE IF NOT EXISTS db1");
tEnv1.executeSql("USE db1");
tEnv1.executeSql(
"CREATE TABLE IF NOT EXISTS orders (" +
" order_id STRING, " +
" price DECIMAL(10, 2)" +
") WITH (" +
" 'connector' = 'kafka', " +
" 'topic' = 'orders'" +
")");
// 會話2:直接使用之前注冊的表
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecuti
onEnvironment();
StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env2);
tEnv2.executeSql("USE CATALOG hive_catalog");
tEnv2.executeSql("USE db1");
// 不需要重新定義表結構,可以直接查詢
tEnv2.executeSql("SELECT * FROM orders");
使用不同類型的Catalog實現(xiàn)多源數(shù)據(jù)集成
// 注冊多個Catalog訪問不同系統(tǒng)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutio
nEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 注冊Hive Catalog
tEnv.executeSql("CREATE CATALOG hive_catalog WITH ('type' = 'hive')");
// 注冊JDBC Catalog
tEnv.executeSql(
"CREATE CATALOG jdbc_catalog WITH (" +
" 'type' = 'jdbc', " +
" 'default-database' = 'default', " +
" 'username' = 'user', " +
" 'password' = 'password', " +
" 'base-url' = 'jdbc:mysql://localhost:3306'" +
")");
// 從不同Catalog中的表關聯(lián)查詢
tEnv.executeSql(
"SELECT o.order_id, o.price, c.name " +
"FROM hive_catalog.db1.orders o " +
"JOIN jdbc_catalog.default.customers c " +
"ON o.customer_id = c.id"
);