Calcite:開啟異構(gòu)數(shù)據(jù)處理新篇
前言
Apache Calcite是一個強大的動態(tài)數(shù)據(jù)管理框架,它專注于提供SQL解析、優(yōu)化和執(zhí)行等基礎(chǔ)能力,卻不涉及數(shù)據(jù)的存儲與處理。這種設(shè)計理念賦予了它極高的靈活性,使其能夠輕松集成各類數(shù)據(jù)源,無論是關(guān)系型數(shù)據(jù)庫、文件系統(tǒng)還是NoSQL數(shù)據(jù)庫。
其主要功能包括:
- SQL解析與驗證
支持標(biāo)準(zhǔn)SQL及多種SQL方言的解析,將SQL轉(zhuǎn)換為抽象語法樹(AST)
提供語義分析功能,驗證表名、列名是否存在,數(shù)據(jù)類型是否匹配等
- 查詢優(yōu)化
- 基于規(guī)則優(yōu)化(RBO):通過預(yù)設(shè)規(guī)則(如謂詞下推)重寫查詢
- 基于代價優(yōu)化(CBO):根據(jù)統(tǒng)計信息估算不同執(zhí)行計劃的代價,選擇最優(yōu)方案
- 數(shù)據(jù)源適配
- 可連接多種數(shù)據(jù)源,如關(guān)系型數(shù)據(jù)庫(MySQL、Oracle等)、文件系統(tǒng)(CSV、JSON)、NoSQL數(shù)據(jù)庫等
- 支持自定義數(shù)據(jù)源適配器
- 跨數(shù)據(jù)源查詢
- 能夠連接不同類型的數(shù)據(jù)源,通過適配器統(tǒng)一抽象不同數(shù)據(jù)源的操作
- 將查詢分解為各數(shù)據(jù)源可處理的子查詢,然后合并結(jié)果
- 執(zhí)行計劃生成
- 根據(jù)優(yōu)化結(jié)果生成可執(zhí)行的物理執(zhí)行計劃
- 支持JDBC接口,可通過標(biāo)準(zhǔn)JDBC訪問
案例
依賴引入
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.35.0</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-elasticsearch</artifactId>
<version>1.35.0</version>
</dependency>
model.json模型配置
{
"version": "1.0",
"defaultSchema": "public",
"schemas": [
{
"name": "mysql",
"type": "custom",
"factory": "org.example.calcite.CalciteSchemaFactory",
"operand": {
"jdbcUrl": "jdbc:mysql://localhost:3306/scp",
"jdbcUser": "root",
"jdbcPassword": "root"
}
},
{
"name": "es",
"type": "custom",
"factory": "org.example.calcite.CalciteSchemaFactory",
"operand": {
"jdbcUrl": "localhost:9200",
"jdbcUser": "root",
"jdbcPassword": "root"
}
}
]
}
Calcite Schema工廠類
public class CalciteSchemaFactory implements SchemaFactory {
@Override
public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
String jdbcUrl = (String) operand.get("jdbcUrl");
String jdbcUser = (String) operand.get("jdbcUser");
String jdbcPassword = (String) operand.get("jdbcPassword");
try {
if(jdbcUrl.startsWith("jdbc:mysql")){
DataSource dataSource = createDataSource(jdbcUrl, jdbcUser, jdbcPassword);
return JdbcSchema.create(parentSchema, name, dataSource, null, null);
} elseif (jdbcUrl.matches("\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+")){
HttpHost[] httpHosts = Arrays.stream(jdbcUrl.split(",")).map(HttpHost::create).toArray(HttpHost[]::new);
RestClient restClient = RestClient.builder(httpHosts).build();
ElasticsearchSchema esSchema = new ElasticsearchSchema(restClient,new ObjectMapper(),null);
parentSchema.add(name,esSchema);
return esSchema;
} else {
throw new IllegalArgumentException("Unsupported data source type");
}
} catch (Exception e) {
throw new RuntimeException("Failed to create Calcite schema", e);
}
}
private DataSource createDataSource(String url, String user, String password) {
DataSource dataSource = DataSourceBuilder.create()
.driverClassName("com.mysql.cj.jdbc.Driver")
.url(url)
.username(user)
.password(password).build();
return dataSource;
}
}
Calcite服務(wù)類
@Service
public class CalciteQueryService {
public List<String> executeQuery(String sql) throws SQLException {
Properties info = new Properties();
info.setProperty("lex", "JAVA");
info.setProperty("model", "D:\\gitee\\self-learn\\01_springboot\\test-demo\\src\\main\\resources\\model.json");
List<String> results = new ArrayList<>();
try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
StringBuilder row = new StringBuilder();
for (int i = 1; i <= columnCount; i++) {
if (i > 1) row.append(", ");
row.append(metaData.getColumnName(i)).append(": ").append(resultSet.getString(i));
}
results.add(row.toString());
}
}
return results;
}
}
控制器類
@RestController
@RequiredArgsConstructor(onConstructor_ = {@Lazy, @Autowired})
public class DemoController {
private final CalciteQueryService calciteQueryService;
@GetMapping("/query")
public List<String> query(@RequestParam String sql) throws SQLException {
return calciteQueryService.executeQuery(sql);
}
@GetMapping("/queryPage")
public ModelAndView queryPage(@RequestParam String sql) throws SQLException {
List<String> results = calciteQueryService.executeQuery(sql);
ModelAndView mav = new ModelAndView("queryResult");
mav.addObject("results", results);
return mav;
}
}
頁面
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<title>>Calcite 查詢結(jié)果</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
}
h1 {
background-color: #007BFF;
color: white;
padding: 20px;
text-align: center;
margin: 0;
}
ul {
list-style-type: none;
padding: 0;
margin: 20px;
background-color: white;
border-radius: 5px;
box-shadow: 0 0 5px rgba(0, 0, 0, 0.1);
}
li {
padding: 10px 20px;
border-bottom: 1px solid #e0e0e0;
}
li:last-child {
border-bottom: none;
}
</style>
</head>
<body>
<h1>Calcite 查詢結(jié)果</h1>
<ul>
<li th:each="result : ${results}" th:text="${result}"></li>
</ul>
</body>
</html>
測試
http://localhost:10001/queryPage?sql=SELECT%20*%20FROM%20mysql.user_table
圖片
http://localhost:10001/queryPage?sql=SELECT%20*%20FROM%20es.authlogfb%20limit%205
圖片
這里發(fā)現(xiàn)查ES,字段格式有點亂,下面教大家如何格式化,格式化存在特殊字符,需要轉(zhuǎn)義
- 打開瀏覽器開發(fā)者工具(通常按F12 或者Ctrl + Shift + I),切換到控制臺(Console)選項卡
- 執(zhí)行encodeURIComponent("SELECT CAST(_MAP['rtime'] AS varchar(20)) AS rtime, CAST(_MAP['user_name'] AS varchar(20)) AS user_name, CAST(_MAP['vpdndomain'] AS varchar(20)) AS vpdndomain, CAST(_MAP['apnname'] AS varchar(20)) AS apnname, CAST(_MAP['imsi'] AS varchar(20)) AS imsi, CAST(_MAP['mdn'] AS varchar(20)) AS mdn, CAST(_MAP['called_id'] AS varchar(20)) AS called_id, CAST(_MAP['resulCode'] AS varchar(20)) AS resulCode, CAST(_MAP['sourceType'] AS varchar(20)) AS sourceType FROM es.authlogfb LIMIT 3")
圖片
再次執(zhí)行:
圖片
Apache Calcite在整合異構(gòu)數(shù)據(jù)源、執(zhí)行復(fù)雜查詢方面展現(xiàn)出了卓越的能力。它為開發(fā)人員提供了一種高效、靈活的方式來處理多源數(shù)據(jù),大大簡化了跨數(shù)據(jù)源查詢的實現(xiàn)過程。