自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

FlinkSQL 中 Catalog 的使用場景及案例詳解

大數(shù)據(jù)
Catalog 在 Flink SQL 中是一個元數(shù)據(jù)管理組件,用于存儲和管理數(shù)據(jù)庫、表、視 圖、函數(shù)等元數(shù)據(jù)對象的抽象接口。

Catalog 在 Flink SQL 中是一個元數(shù)據(jù)管理組件,用于存儲和管理數(shù)據(jù)庫、表、視 圖、函數(shù)等元數(shù)據(jù)對象的抽象接口。它類似于傳統(tǒng)數(shù)據(jù)庫系統(tǒng)中的元數(shù)據(jù)倉庫,為 Flink SQL 提供了元數(shù)據(jù)管理能力。

Catalog 使 Flink 能夠:

  • 以統(tǒng)一的方式訪問不同的外部系統(tǒng)
  • 減少代碼中的硬編 碼配置
  • 實現(xiàn)表元數(shù)據(jù)的持久化
  • 支持跨會話的元數(shù)據(jù)共享。

1. Catalog的作用

(1) 管理元數(shù)據(jù)對象

Catalog 可以管理以下元數(shù)據(jù)對象: - 數(shù)據(jù)庫(Database) - 表(Table) - 視圖(View) - 函數(shù)(Function) - 分區(qū)(Partition)等

(2) 支持多樣化的元數(shù)據(jù)存儲

Flink 支持多種Catalog 實現(xiàn),可以連接各種外部元數(shù)據(jù)系統(tǒng): - 內(nèi)存Catalog(默認) - Hive Catalog - JDBC Catalog - 自定義 Catalog

(3) 提供統(tǒng)一的數(shù)據(jù)訪問接口

無論底層元數(shù)據(jù)存儲在哪里,都可以通過統(tǒng)一的接口訪問和操作

(4) 簡化元數(shù)據(jù)管理

開發(fā)者可以通過Catalog 注冊永久表,而不是在代碼中重復定義表結構

2. Flink內(nèi)置的catalog類型

(1) GenericInMemoryCatalog

默認的內(nèi)存 Catalog,元數(shù)據(jù)只在單個 Flink 會話中有效,會話結束數(shù)據(jù)就會丟失。

// 創(chuàng)建內(nèi)存Catalog 
Catalog inmemory = new GenericInMemoryCatalog("in_memory_catalog"); 
tableEnv.registerCatalog("in_memory_catalog", inmemory); -- SQL 中創(chuàng)建和使用內(nèi)存Catalog 
CREATE CATALOG my_memory_catalog WITH ( 
'type' = 'generic_in_memory' 
); 
USE CATALOG my_memory_catalog;

(2) HiveCatalog

使用Hive Metastore 存儲元數(shù)據(jù),支持持久化,適合生產(chǎn)環(huán)境。

// 創(chuàng)建Hive Catalog 
String name = "my_hive_catalog"; 
String defaultDatabase = "default"; 
String hiveConfDir = "/path/to/hive/conf"; 
String version = "2.3.6"; 
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version); 
tableEnv.registerCatalog("my_hive_catalog", hive); -- SQL 中創(chuàng)建和使用Hive Catalog 
CREATE CATALOG my_hive_catalog WITH ( 
'type' = 'hive', 
'default-database' = 'default', 
'hive-conf-dir' = '/path/to/hive/conf', 
'hive-version' = '2.3.6' 
); 
USE CATALOG my_hive_catalog; 
3. JdbcCatalog 
使用關系型數(shù)據(jù)庫存儲元數(shù)據(jù)。 
// 創(chuàng)建JDBC Catalog 
String name = "my_jdbc_catalog"; 
String defaultDatabase = "default"; 
String username = "username"; 
String password = "password"; 


String baseUrl = "jdbc:mysql://localhost:3306"; 


JdbcCatalog jdbc = new JdbcCatalog(name, defaultDatabase, username, pas
 sword, baseUrl); 
tableEnv.registerCatalog("my_jdbc_catalog", jdbc); -- SQL中創(chuàng)建和使用JDBC Catalog(Flink 1.15+) 
CREATE CATALOG my_jdbc_catalog WITH ( 
    'type' = 'jdbc', 
    'default-database' = 'default', 
    'username' = 'username', 
    'password' = 'password', 
    'base-url' = 'jdbc:mysql://localhost:3306' 
); 


USE CATALOG my_jdbc_catalog;

(3) 使用Catalog的SQL操作

1. 創(chuàng)建和切換Catalog -- 創(chuàng)建Catalog 
CREATE CATALOG my_catalog WITH ( 
    'type' = 'hive', 
    'hive-conf-dir' = '/path/to/hive/conf' 
); 
 -- 查看所有Catalog 
SHOW CATALOGS; 
 -- 切換當前Catalog 
USE CATALOG my_catalog; 
2. 創(chuàng)建和使用數(shù)據(jù)庫 -- 創(chuàng)建數(shù)據(jù)庫 
CREATE DATABASE my_database; 
 -- 查看所有數(shù)據(jù)庫 
SHOW DATABASES; 
 -- 切換當前數(shù)據(jù)庫 
USE my_database; 
3. 管理表和視圖 -- 創(chuàng)建表 
CREATE TABLE my_table ( 
    id INT, 
    name STRING, 
    create_time TIMESTAMP(3) 


) WITH ( 
'connector' = 'kafka', 
'topic' = 'my_topic', 
'properties.bootstrap.servers' = 'localhost:9092', 
'format' = 'json' 
); -- 查看所有表 
SHOW TABLES; -- 查看表結構 
DESCRIBE my_table; 
4. 管理函數(shù) -- 創(chuàng)建自定義函數(shù) 
CREATE FUNCTION my_function AS 'com.example.MyFunction'; -- 查看所有函數(shù) 
SHOW FUNCTIONS;

(4) Catalog 的實際應用示

// 跨會話持久化元數(shù)據(jù) 
// 會話1:注冊Hive Catalog和表 
StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecuti
 onEnvironment(); 
StreamTableEnvironment tEnv1 = StreamTableEnvironment.create(env1); 
tEnv1.executeSql("CREATE CATALOG hive_catalog WITH ('type' = 'hive')"); 
tEnv1.executeSql("USE CATALOG hive_catalog"); 
tEnv1.executeSql("CREATE DATABASE IF NOT EXISTS db1"); 
tEnv1.executeSql("USE db1"); 
tEnv1.executeSql( 
"CREATE TABLE IF NOT EXISTS orders (" + 
"  order_id STRING, " + 
"  price DECIMAL(10, 2)" + 
") WITH (" + 
"  'connector' = 'kafka', " + 
"  'topic' = 'orders'" + 
")"); 
// 會話2:直接使用之前注冊的表 
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecuti
 onEnvironment(); 
StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env2); 
tEnv2.executeSql("USE CATALOG hive_catalog"); 
tEnv2.executeSql("USE db1"); 

// 不需要重新定義表結構,可以直接查詢 
tEnv2.executeSql("SELECT * FROM orders"); 
使用不同類型的Catalog實現(xiàn)多源數(shù)據(jù)集成 
// 注冊多個Catalog訪問不同系統(tǒng) 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutio
 nEnvironment(); 
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); 
// 注冊Hive Catalog 
tEnv.executeSql("CREATE CATALOG hive_catalog WITH ('type' = 'hive')"); 
// 注冊JDBC Catalog 
tEnv.executeSql( 
"CREATE CATALOG jdbc_catalog WITH (" + 
"  'type' = 'jdbc', " + 
"  'default-database' = 'default', " + 
"  'username' = 'user', " + 
"  'password' = 'password', " + 
"  'base-url' = 'jdbc:mysql://localhost:3306'" + 
")"); 
// 從不同Catalog中的表關聯(lián)查詢 
tEnv.executeSql( 
"SELECT o.order_id, o.price, c.name " + 
"FROM hive_catalog.db1.orders o " + 
"JOIN jdbc_catalog.default.customers c " + 
"ON o.customer_id = c.id" 
); 
責任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
點贊
收藏

51CTO技術棧公眾號