SpringBoot與Calcite整合,實(shí)現(xiàn)多數(shù)據(jù)源統(tǒng)一查詢系統(tǒng)
作者:Java知識(shí)日歷
最近,接到一個(gè)電商系統(tǒng)的兼職小單,其中訂單信息存儲(chǔ)在MySQL數(shù)據(jù)庫(kù),而用戶信息存儲(chǔ)在PostgreSQL數(shù)據(jù)庫(kù)。客戶那邊想有一個(gè)統(tǒng)一查詢接口,可以通過(guò)SQL查詢同時(shí)獲取這兩個(gè)數(shù)據(jù)源的信息。
最近,接到一個(gè)電商系統(tǒng)的兼職小單,其中訂單信息存儲(chǔ)在MySQL數(shù)據(jù)庫(kù),而用戶信息存儲(chǔ)在PostgreSQL數(shù)據(jù)庫(kù)。客戶那邊想有一個(gè)統(tǒng)一查詢接口,可以通過(guò)SQL查詢同時(shí)獲取這兩個(gè)數(shù)據(jù)源的信息。
為什么選擇Apache Calcite?
簡(jiǎn)化開(kāi)發(fā)流程
- 抽象層次高: Apache Calcite 提供了高層次的抽象,使得開(kāi)發(fā)者可以專注于業(yè)務(wù)邏輯,而不必處理底層的數(shù)據(jù)庫(kù)連接和查詢執(zhí)行細(xì)節(jié)。
- 減少重復(fù)工作: 通過(guò)使用Calcite,可以避免重復(fù)造輪子,節(jié)省開(kāi)發(fā)時(shí)間和成本。
強(qiáng)大的SQL解析和優(yōu)化能力
- SQL標(biāo)準(zhǔn)支持: Apache Calcite 支持多種SQL方言(如MySQL、PostgreSQL等),可以無(wú)縫地處理不同數(shù)據(jù)庫(kù)的SQL語(yǔ)句。
- 查詢優(yōu)化: 內(nèi)置的查詢優(yōu)化器可以根據(jù)不同的數(shù)據(jù)源特性進(jìn)行智能優(yōu)化,提高查詢性能。
靈活性和可擴(kuò)展性
- 自定義模式和表: 可以通過(guò)編程方式動(dòng)態(tài)地添加和管理多個(gè)數(shù)據(jù)源,每個(gè)數(shù)據(jù)源可以有不同的模式和表結(jié)構(gòu)。
- 插件機(jī)制: 支持各種插件,可以根據(jù)需求靈活擴(kuò)展功能,例如自定義函數(shù)、聚合操作等。
高性能
- 內(nèi)存計(jì)算: Apache Calcite 支持內(nèi)存中的數(shù)據(jù)處理,減少了I/O開(kāi)銷,提高了查詢速度。
- 分布式計(jì)算: 雖然本項(xiàng)目主要關(guān)注單機(jī)版實(shí)現(xiàn),但Apache Calcite也可以擴(kuò)展到分布式環(huán)境中,支持大規(guī)模數(shù)據(jù)集的處理。
集成性強(qiáng)
- 與其他工具集成: 支持與其他大數(shù)據(jù)工具和技術(shù)棧(如Apache Flink、Presto等)集成,形成完整的數(shù)據(jù)分析解決方案。
哪些公司使用了Apache Calcite?
- Google 在其內(nèi)部的一些數(shù)據(jù)處理系統(tǒng)中使用 Apache Calcite,特別是在需要高性能和靈活性的場(chǎng)景下。
- IBM 在其數(shù)據(jù)倉(cāng)庫(kù)和分析解決方案中使用 Apache Calcite,以提高查詢性能和靈活性。
- Intel 使用 Apache Calcite 來(lái)支持其大數(shù)據(jù)分析工具和解決方案,特別是在內(nèi)存計(jì)算方面。
- Alibaba Cloud: 阿里巴巴云在其大數(shù)據(jù)平臺(tái)中使用 Apache Calcite 提供強(qiáng)大的查詢優(yōu)化和執(zhí)行能力。
- MaxCompute (ODPS): 阿里巴巴的大規(guī)模數(shù)據(jù)計(jì)算服務(wù) MaxCompute 使用 Calcite 進(jìn)行 SQL 查詢處理。
- Elasticsearch 的某些高級(jí)功能,如 Kibana 中的復(fù)雜查詢,依賴于 Apache Calcite 進(jìn)行 SQL 解析和優(yōu)化。
- Netflix 使用 Apache Calcite 來(lái)構(gòu)建其內(nèi)部的數(shù)據(jù)虛擬化層,支持復(fù)雜的查詢和數(shù)據(jù)分析需求。
- Microsoft 在其一些大數(shù)據(jù)產(chǎn)品和服務(wù)中使用 Apache Calcite,例如 Azure Synapse Analytics。
- Teradata 使用 Apache Calcite 來(lái)增強(qiáng)其數(shù)據(jù)庫(kù)系統(tǒng)的查詢優(yōu)化和執(zhí)行性能。
- Uber 使用 Apache Calcite 來(lái)處理其龐大的數(shù)據(jù)集,并支持復(fù)雜的查詢和數(shù)據(jù)分析需求。
應(yīng)用場(chǎng)景
數(shù)據(jù)虛擬化
- 虛擬數(shù)據(jù)層: 創(chuàng)建一個(gè)虛擬的數(shù)據(jù)層,將分散在不同系統(tǒng)中的數(shù)據(jù)集中起來(lái),提供統(tǒng)一的視圖。
- 動(dòng)態(tài)數(shù)據(jù)源管理: 動(dòng)態(tài)地添加和管理數(shù)據(jù)源,支持靈活的數(shù)據(jù)架構(gòu)設(shè)計(jì)。
商業(yè)智能 (BI) 工具
- 報(bào)表生成: 作為 BI 工具的核心組件,支持復(fù)雜的報(bào)表生成和數(shù)據(jù)分析。
- 自助服務(wù)分析: 提供自助服務(wù)分析功能,允許非技術(shù)人員進(jìn)行數(shù)據(jù)探索和分析。
機(jī)器學(xué)習(xí)與人工智能
- 特征工程: 在機(jī)器學(xué)習(xí)管道中使用 Calcite 進(jìn)行特征提取和數(shù)據(jù)準(zhǔn)備。
- 模型訓(xùn)練: 結(jié)合其他 AI 框架,利用 Calcite 進(jìn)行大規(guī)模數(shù)據(jù)集的查詢和處理。
多數(shù)據(jù)源查詢
- 統(tǒng)一接口訪問(wèn)多個(gè)數(shù)據(jù)庫(kù): 允許用戶通過(guò)單一接口查詢存儲(chǔ)在不同數(shù)據(jù)庫(kù)(如 MySQL、PostgreSQL、Oracle 等)中的數(shù)據(jù)。
- 聯(lián)合查詢: 支持跨數(shù)據(jù)源的復(fù)雜 SQL 查詢,例如從不同的數(shù)據(jù)庫(kù)中獲取相關(guān)聯(lián)的數(shù)據(jù)。
大數(shù)據(jù)平臺(tái)集成
- 與 Hadoop 生態(tài)系統(tǒng)集成: 與 Hive、HBase、Druid 等大數(shù)據(jù)工具結(jié)合,提供統(tǒng)一的查詢接口。
- 流處理與批處理: 支持 Apache Flink 和 Apache Beam 等流處理框架,實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)分析。
嵌入式數(shù)據(jù)庫(kù)
- 輕量級(jí)數(shù)據(jù)庫(kù)引擎: 提供一個(gè)輕量級(jí)的 SQL 引擎,適用于嵌入式應(yīng)用程序和內(nèi)存數(shù)據(jù)庫(kù)。
- 內(nèi)存計(jì)算: 利用內(nèi)存計(jì)算加速查詢性能,適合需要快速響應(yīng)的應(yīng)用場(chǎng)景。
數(shù)據(jù)湖解決方案
- 統(tǒng)一元數(shù)據(jù)管理: 提供統(tǒng)一的元數(shù)據(jù)管理和查詢接口,方便數(shù)據(jù)湖的建設(shè)和維護(hù)。
- 多樣化數(shù)據(jù)格式支持: 支持多種數(shù)據(jù)格式(如 JSON、Parquet、ORC 等),滿足不同類型的數(shù)據(jù)存儲(chǔ)需求。
代碼實(shí)操
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Apache Calcite Core -->
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.32.0</version>
</dependency>
<!-- HikariCP Connection Pool -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<!-- MySQL Connector Java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- PostgreSQL JDBC Driver -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.yml
spring:
datasource:
order-db:
url:jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC
username:root
password:root
driver-class-name:com.mysql.cj.jdbc.Driver
user-db:
url:jdbc:postgresql://localhost:5432/user_db
username:postgres
password:postgres
driver-class-name:org.postgresql.Driver
jpa:
show-sql:true
hibernate:
ddl-auto:update
properties:
hibernate:
dialect:org.hibernate.dialect.MySQL8Dialect
數(shù)據(jù)源配置
package com.example.multids.config;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
publicclass DataSourceConfig {
@Bean(name = "mysqlDataSource")
public DataSource mysqlDataSource() {
// 配置MySQL數(shù)據(jù)源
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC");
config.setUsername("root");
config.setPassword("root");
returnnew HikariDataSource(config);
}
@Bean(name = "postgresDataSource")
public DataSource postgresDataSource() {
// 配置PostgreSQL數(shù)據(jù)源
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/user_db");
config.setUsername("postgres");
config.setPassword("postgres");
returnnew HikariDataSource(config);
}
}
自定義數(shù)據(jù)源工廠
package com.example.multids.factory;
import com.example.multids.schema.MySchemas;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
publicclass DataSourceFactory {
public static CalciteConnection createConnection(DataSource mysqlDataSource, DataSource postgresDataSource) throws SQLException {
// 定義Calcite模型JSON字符串
String modelJson = "{\n" +
" \"version\": \"1.0\",\n" +
" \"defaultSchema\": \"my_schemas\",\n" +
" \"schemas\": [\n" +
" {\n" +
" \"name\": \"my_schemas\",\n" +
" \"type\": \"custom\",\n" +
" \"factory\": \"" + ReflectiveSchema.Factory.class.getName() + "\",\n" +
" \"operand\": {\n" +
" \"class\": \"" + MySchemas.class.getName() + "\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";
// 創(chuàng)建Calcite連接
Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + modelJson);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 獲取根模式并添加子模式
SchemaPlus schema = calciteConnection.getRootSchema().getSubSchema("my_schemas");
schema.add("orders", JdbcSchema.create(calciteConnection.getRootSchema(), "orders", mysqlDataSource, null, Lex.MYSQL));
schema.add("users", JdbcSchema.create(calciteConnection.getRootSchema(), "users", postgresDataSource, null, Lex.POSTGRESQL));
return calciteConnection;
}
}
自定義模式
package com.example.multids.schema;
import org.apache.calcite.schema.impl.AbstractSchema;
import java.util.Map;
public class MySchemas extends AbstractSchema {
@Override
protected Map<String, org.apache.calcite.schema.Table> getTableMap() {
// 返回表映射,這里不需要額外處理
return super.getTableMap();
}
}
查詢控制器
package com.example.multids.controller;
import com.example.multids.factory.DataSourceFactory;
import org.apache.calcite.jdbc.CalciteConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
@RestController
publicclass QueryController {
privatefinal DataSource mysqlDataSource;
privatefinal DataSource postgresDataSource;
@Autowired
public QueryController(@Qualifier("mysqlDataSource") DataSource mysqlDataSource,
@Qualifier("postgresDataSource") DataSource postgresDataSource) {
this.mysqlDataSource = mysqlDataSource;
this.postgresDataSource = postgresDataSource;
}
@GetMapping("/query")
public List<List<String>> query(@RequestParam String sql) throws SQLException {
// 創(chuàng)建Calcite連接
CalciteConnection connection = DataSourceFactory.createConnection(mysqlDataSource, postgresDataSource);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
// 處理查詢結(jié)果
List<List<String>> result = new ArrayList<>();
while (resultSet.next()) {
int columnCount = resultSet.getMetaData().getColumnCount();
List<String> row = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
row.add(resultSet.getString(i));
}
result.add(row);
}
// 關(guān)閉資源
resultSet.close();
statement.close();
connection.close();
return result;
}
}
Application
package com.example.multids;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MultidsApplication {
public static void main(String[] args) {
SpringApplication.run(MultidsApplication.class, args);
}
}
測(cè)試
MySQL orders 表
CREATE TABLE orders (
id INT PRIMARY KEY,
user_id INT,
amount DECIMAL(10, 2),
order_date DATETIME
);
PostgreSQL users 表
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
);
測(cè)試執(zhí)行一個(gè)聯(lián)合查詢,從兩個(gè)不同的數(shù)據(jù)源中獲取數(shù)據(jù),SQL語(yǔ)句是:
SELECT o.id AS order_id, u.name AS user_name, o.amount, o.order_date
FROM orders o
JOIN users u ON o.user_id = u.id;
測(cè)試結(jié)果
$ curl -X GET "http://localhost:8080/query?sql=SELECT%20o.id%20AS%20order_id,%20u.name%20AS%20user_name,%20o.amount,%20o.order_date%20FROM%20orders%20o%20JOIN%20users%20u%20ON%20o.user_id%20=%20u.id"
[
["1", "Alice", "199.99", "2025-04-10 21:30:00"],
["2", "Bob", "250.75", "2025-04-10 20:45:00"]
]
責(zé)任編輯:武曉燕
來(lái)源:
Java知識(shí)日歷