如何使用數(shù)據(jù)版本控制管理數(shù)據(jù)湖中的模式驗(yàn)證
譯文譯者 | 李睿
審校 | 重樓
數(shù)據(jù)團(tuán)隊(duì)依賴許多其他“第三方”發(fā)送數(shù)據(jù)的情況并不少見,他們經(jīng)常在沒有進(jìn)行任何溝通或讓數(shù)據(jù)團(tuán)隊(duì)知道太晚的情況下更改數(shù)據(jù)的模式。
每當(dāng)發(fā)生這種情況時(shí),數(shù)據(jù)管道就會(huì)遭到破壞,數(shù)據(jù)團(tuán)隊(duì)需要修復(fù)數(shù)據(jù)湖。這是一個(gè)充滿繁重任務(wù)的人工過(guò)程。在通常情況下,數(shù)據(jù)團(tuán)隊(duì)可能會(huì)推卸責(zé)任,試圖證明模式已經(jīng)改變。
但是隨著發(fā)展和進(jìn)步,數(shù)據(jù)團(tuán)隊(duì)意識(shí)到,以自動(dòng)持續(xù)集成(CI)/持續(xù)交付(CD)的方式簡(jiǎn)單地阻止模式一起更改是更明智的。
模式更改和模式驗(yàn)證給數(shù)據(jù)團(tuán)隊(duì)帶來(lái)了很多痛苦,但是市場(chǎng)上有一些解決方案可以幫助解決這個(gè)問(wèn)題——幸運(yùn)的是,其中一些是開源的。
以下是一個(gè)循序漸進(jìn)的教程,介紹如何使用開源數(shù)據(jù)版本控制工具lakeFS解決模式驗(yàn)證問(wèn)題。
什么是模式驗(yàn)證?
模式驗(yàn)證允許用戶為數(shù)據(jù)湖創(chuàng)建驗(yàn)證規(guī)則,例如允許的數(shù)據(jù)類型和值范圍。它保證保存在數(shù)據(jù)湖中的數(shù)據(jù)遵循已建立的模式,該模式描述了數(shù)據(jù)的結(jié)構(gòu)、格式和限制。
由于用戶的數(shù)據(jù)湖可以填充來(lái)自具有不同模式定義的各種來(lái)源的數(shù)據(jù),因此在數(shù)據(jù)湖中的所有數(shù)據(jù)上強(qiáng)制使用統(tǒng)一的模式是一個(gè)挑戰(zhàn)。
這是一個(gè)需要解決的問(wèn)題——如果不快速采取行動(dòng),就會(huì)在數(shù)據(jù)處理過(guò)程中看到不一致和錯(cuò)誤。
為什么需要處理模式驗(yàn)證?
花費(fèi)一些時(shí)間正確地管理模式是值得的,有以下四個(gè)原因:
- 一致性——數(shù)據(jù)湖通常包含來(lái)自多個(gè)來(lái)源的大量數(shù)據(jù)。如果沒有模式驗(yàn)證,最終可能會(huì)以不一致或不正確的形式存儲(chǔ)在數(shù)據(jù)湖中,從而導(dǎo)致處理過(guò)程中的問(wèn)題。
- 質(zhì)量——模式驗(yàn)證通過(guò)施加數(shù)據(jù)限制和標(biāo)準(zhǔn),有助于保持?jǐn)?shù)據(jù)湖中數(shù)據(jù)的良好質(zhì)量。它可以幫助用戶識(shí)別和標(biāo)記數(shù)據(jù)質(zhì)量問(wèn)題,例如丟失或不準(zhǔn)確的信息,在它們導(dǎo)致下游出現(xiàn)問(wèn)題之前。
- 效率——模式驗(yàn)證通過(guò)確保數(shù)據(jù)湖中所有數(shù)據(jù)的統(tǒng)一模式來(lái)加快數(shù)據(jù)處理和分析。這反過(guò)來(lái)又減少了清理、轉(zhuǎn)換和分析數(shù)據(jù)所需的時(shí)間和精力,并提高了數(shù)據(jù)管道的總體效率。
- 合規(guī)性——許多企業(yè)必須滿足嚴(yán)格的監(jiān)管和合規(guī)性要求。模式驗(yàn)證有助于確保存儲(chǔ)在數(shù)據(jù)湖中的數(shù)據(jù)符合這些標(biāo)準(zhǔn),從而提供對(duì)數(shù)據(jù)沿襲和質(zhì)量的清晰審計(jì)跟蹤。
處理數(shù)據(jù)湖中的模式并非一帆風(fēng)順
在數(shù)據(jù)倉(cāng)庫(kù)中,用戶處理的是嚴(yán)格的數(shù)據(jù)模型和嚴(yán)格的模式。數(shù)據(jù)湖與之相反。大多數(shù)情況下,它們最終包含廣泛的數(shù)據(jù)源。
為什么這很重要?因?yàn)樵跀?shù)據(jù)湖中,模式的定義可以在數(shù)據(jù)源之間發(fā)生變化,并且當(dāng)添加新數(shù)據(jù)時(shí),模式可能會(huì)隨著時(shí)間的推移而變化。這使得在數(shù)據(jù)湖中的所有數(shù)據(jù)上實(shí)施統(tǒng)一的模式成為一個(gè)巨大的挑戰(zhàn)。如果不能解決這個(gè)問(wèn)題,將不得不解決數(shù)據(jù)處理問(wèn)題。
但這還不是全部。由于構(gòu)建在數(shù)據(jù)湖之上的數(shù)據(jù)管道的復(fù)雜性不斷增加,無(wú)法擁有一個(gè)一致的模式。數(shù)據(jù)管道可以包括多個(gè)流程和轉(zhuǎn)換,每個(gè)流程和轉(zhuǎn)換都需要一個(gè)唯一的模式定義。
模式可能隨著數(shù)據(jù)的處理和修改而變化,因此很難確??缯麄€(gè)管道進(jìn)行模式驗(yàn)證。
這就是版本控制系統(tǒng)可以派上用場(chǎng)的地方。
在數(shù)據(jù)湖中實(shí)現(xiàn)模式驗(yàn)證的數(shù)據(jù)版本控制
lakeFS是一個(gè)開源工具,它可以將數(shù)據(jù)湖轉(zhuǎn)換為類似Git的存儲(chǔ)庫(kù),讓用戶像軟件工程師管理代碼一樣管理它。這就是數(shù)據(jù)版本控制的意義所在。
與其他源代碼控制系統(tǒng)一樣,lakeFS有一個(gè)稱為hook的特性,它是定制的腳本或程序,lakeFS平臺(tái)可以運(yùn)行這些腳本或程序來(lái)響應(yīng)指定的事件或操作。
這些事件可以包括提交更改、合并分支、創(chuàng)建新分支、添加或刪除標(biāo)記等等。例如,當(dāng)合并發(fā)生時(shí),在合并完成之前,在源分支上運(yùn)行一個(gè)預(yù)合并掛鉤。
它如何應(yīng)用于模式驗(yàn)證呢? 用戶可以創(chuàng)建一個(gè)預(yù)合并掛鉤來(lái)驗(yàn)證Parquet文件的模式與當(dāng)前模式是否相同。
需要準(zhǔn)備什么
- lakeFS服務(wù)器(可以免費(fèi)安裝或在云中啟動(dòng))。
- 可選:可以使用sample-repo來(lái)啟動(dòng)一個(gè)筆記本(notebook),筆記本可以配置為連接到lakeFS服務(wù)器。
在這個(gè)場(chǎng)景中,將在一個(gè)攝取分支中創(chuàng)建一個(gè)delta表,并將其合并到生產(chǎn)中。接下來(lái)將更改表的模式,并嘗試再次合并它,模擬將數(shù)據(jù)提升到生產(chǎn)的過(guò)程。
1.設(shè)置
首先,將設(shè)置一些全局變量并安裝將在本例中使用的包,這些包將在Python筆記本中運(yùn)行。
在設(shè)置好lakeFS憑證后,可以開始創(chuàng)建一些包含存儲(chǔ)庫(kù)和分支名稱的全局變量:
Python
repo = "schema-validation-example-repo"
mainBranch = "main"
ingestionBranch = "ingestion_branch"
每個(gè)lakeFS存儲(chǔ)庫(kù)都需要有自己的存儲(chǔ)命名空間,所以也需要?jiǎng)?chuàng)建一個(gè):
Python
storageNamespace = 's3://' # e.g. "s3://username-lakefs-cloud/"
在本例中,使用AWS S3存儲(chǔ)。為了使一切順利進(jìn)行,用戶的存儲(chǔ)需要配置為與lakeFS一起運(yùn)行,lakeFS與AWS、Azure、Google Cloud或內(nèi)部部署對(duì)象存儲(chǔ)(如MinIO)一起工作。
如果在云中運(yùn)行l(wèi)akeFS,則可以通過(guò)復(fù)制示例存儲(chǔ)庫(kù)的存儲(chǔ)名稱空間并將字符串附加到其上,將其鏈接到存儲(chǔ)。所以,如果lakeFS Cloud提供了這個(gè)sample-repo:
可以通過(guò)以下方式進(jìn)行配置:
Python
storageNamespace = 's3://lakefs-sample-us-east-1-production/AROA5OU4KHZHHFCX4PTOM:2ae87b7718e5bb16573c021e542dd0ec429b7ccc1a4f9d0e3f17d6ee99253655/my_random_string'
在筆記本中,將使用Python代碼,因此也必須導(dǎo)入lakeFS Python客戶端包:
Python
import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient
import os
from pyspark.sql.types import ByteType, IntegerType, LongType, StringType, StructType, StructField
接下來(lái),配置客戶端:
Python
%xmode Minimal
if not 'client' in locals():
# lakeFS credentials and endpoint
configuration = lakefs_client.Configuration()
configuration.username = lakefsAccessKey
configuration.password = lakefsSecretKey
configuration.host = lakefsEndPoint
client = LakeFSClient(configuration)
print("Created lakeFS client.")
以下將在本例中創(chuàng)建delta表,因此需要包括以下包:
Python
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensinotallow=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell'
lakeFS公開了一個(gè)S3網(wǎng)關(guān),它允許應(yīng)用程序以與S3通信的方式與lakeFS進(jìn)行接口。要配置網(wǎng)關(guān),并執(zhí)行以下步驟:
Python
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", lakefsAccessKey)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", lakefsSecretKey)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", lakefsEndPoint)
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
現(xiàn)在已經(jīng)準(zhǔn)備好在筆記本中大規(guī)模使用lakeFS版本控制。
2.創(chuàng)建存儲(chǔ)庫(kù)和掛鉤
以下將使用Python客戶端創(chuàng)建存儲(chǔ)庫(kù):
Python
client.repositories.create_repository(
repository_creatinotallow=models.RepositoryCreation(
name=repo,
storage_namespace=storageNamespace,
default_branch=mainBranch))
在這種情況下,將使用預(yù)合并掛鉤來(lái)確保架構(gòu)沒有更改。操作文件應(yīng)提交到lakeFS存儲(chǔ)庫(kù),前綴為_lakeFS_actions/。未能分析操作文件將導(dǎo)致運(yùn)行失敗。
將提交以下鉤子配置操作文件,pre-merge-schema-validation.yaml:
Python
#Parquet schema Validator
#Args:
# - locations (list of strings): locations to look for parquet files under
# - sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them
#Example hook declaration: (_lakefs_actions/pre-merge-schema-validation.yaml):
name: pre merge checks on main branch
on:
、
pre-merge:
branches:
- main
hooks:
- id: check_schema_changes
type: lua
properties:
script_path: scripts/parquet_schema_change.lua # location of this script in the repository
args:
sample: false
locations:
- tables/customers/
這個(gè)文件(pre-merge-schema-validation.yaml)存儲(chǔ)在example repo中的子文件夾LuaHooks中。必須將文件提交到文件夾_lakeFS_actions下的lakeFS存儲(chǔ)庫(kù):
Python
hooks_config_yaml = "pre-merge-schema-validation.yaml"
hooks_prefix = "_lakefs_actions"
with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f:
client.objects.upload_object(repository=repo,
branch=mainBranch,
path=f'{hooks_prefix}/{hooks_config_yaml}',
cnotallow=f
)
只是設(shè)置了一個(gè)動(dòng)作腳本,在合并到main之前運(yùn)行scripts/parquet_schema_che.lua。
然后將創(chuàng)建腳本本身(parquet_schema_che.lua)并將其上載到腳本目錄中。正如人們所看到的,使用嵌入式LuaVM來(lái)運(yùn)行鉤子,而不依賴于其他組件。
此文件也位于ample-repo中的LuaHooks子文件夾中:
Python
--[[
Parquet schema validator
Args:
- locations (list of strings): locations to look for parquet files under
- sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them
]]
lakefs = require("lakefs")
strings = require("strings")
parquet = require("encoding/parquet")
regexp = require("regexp")
path = require("path")
visited_directories = {}
for _, location in ipairs(args.locations) do
after = ""
has_more = true
need_more = true
print("checking location: " .. location)
while has_more do
print("running diff, location = " .. location .. " after = " .. after)
local code, resp = lakefs.diff_refs(action.repository_id, action.branch_id, action.source_ref, after, location)
if code ~= 200 then
error("could not diff: " .. resp.message)
end
for _, result in pairs(resp.results) do
p = path.parse(result.path)
print("checking: '" .. result.path .. "'")
if not args.sample or (p.parent and not visited_directories[p.parent]) then
if result.path_type == "object" and result.type ~= "removed" then
if strings.has_suffix(p.base_name, ".parquet") then
-- check it!
code, content = lakefs.get_object(action.repository_id, action.source_ref, result.path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code) .. "body:\n" .. content)
end
schema = parquet.get_schema(content)
for _, column in ipairs(schema) do
for _, pattern in ipairs(args.column_block_list) do
if regexp.match(pattern, column.name) then
error("Column is not allowed: '" .. column.name .. "': type: " .. column.type .. " in path: " .. result.path)
end
end
end
print("\t all columns are valid")
visited_directories[p.parent] = true
end
end
else
print("\t skipping path, directory already sampled")
end
end
-- pagination
has_more = resp.pagination.has_more
after = resp.pagination.next_offset
end
end
把文件(這次是parquet_schema_che.lua)從LuaHooks目錄上傳到lakeFS存儲(chǔ)庫(kù)中操作配置文件中指定的位置(即腳本文件夾內(nèi)):
Python
hooks_config_yaml = "pre-merge-schema-validation.yaml"
hooks_prefix = "_lakefs_actions"
with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f:
client.objects.upload_object(repository=repo,
branch=mainBranch,
path=f'{hooks_prefix}/{hooks_config_yaml}',
content=f
)
必須在提交操作文件后提交更改才能生效:
Python
client.commits.commit(
repository=repo,
branch=mainBranch,
commit_creatinotallow=models.CommitCreation(
message='Added hook config file and schema validation scripts'))
如果切換到lakeFS UI,應(yīng)該會(huì)在主目錄下看到以下目錄結(jié)構(gòu)和文件:
LakeFS UI的目錄結(jié)構(gòu)
lakeFS UI中顯示的合并前架構(gòu)驗(yàn)證
lakeFS UI中的架構(gòu)驗(yàn)證腳本
3.使用原始模式運(yùn)行第一個(gè)ETL
在lakeFS中,可以在與生產(chǎn)(主要)分支不同的分支上進(jìn)行攝取和轉(zhuǎn)化。
以下將建立一個(gè)攝取分支:
Python
client.branches.create_branch(
repository=repo,
branch_creatinotallow=models.BranchCreation(
name=ingestionBranch, source=mainBranch))
接下來(lái),將使用Kaggle數(shù)據(jù)集Orion Star——運(yùn)動(dòng)和戶外RDBMS數(shù)據(jù)集。使用Customer.csv,可以從data/samples/OrionStar/將其上傳到示例存儲(chǔ)庫(kù)。
首先,需要定義表模式:
Python
customersSchema = StructType([
StructField("User_ID", IntegerType(), False),
StructField("Country", StringType(), False),
StructField("Gender", StringType(), False),
StructField("Personal_ID", IntegerType(), True),
StructField("Customer_Name", StringType(), False),
StructField("Customer_FirstName", StringType(), False),
StructField("Customer_LastName", StringType(), False),
StructField("Birth_Date", StringType(), False),
StructField("Customer_Address", StringType(), False),
StructField("Street_ID", LongType(), False),
StructField("Street_Number", IntegerType(), False),
StructField("Customer_Type_ID", IntegerType(), False)
])
然后,從CSV文件中,將創(chuàng)建一個(gè)delta表,并將其提交到存儲(chǔ)庫(kù):
Python
customersTablePath = f"s3a://{repo}/{ingestionBranch}/tables/customers"
df = spark.read.csv('./data/samples/OrionStar/CUSTOMER.csv',header=True,schema=customersSchema)
df.write.format("delta").mode("overwrite").save(customersTablePath)
在這里需要做出改變:
Python
client.commits.commit(
repository=repo,
branch=ingestionBranch,
commit_creatinotallow=models.CommitCreation(
message='Added customers Delta table',
metadata={'using': 'python_api'}))
然后,使用合并將數(shù)據(jù)發(fā)送到生產(chǎn):
Python
client.refs.merge_into_branch(
repository=repo,
source_ref=ingestionBranch,
destination_branch=mainBranch)
已經(jīng)完成的架構(gòu)驗(yàn)證序列:
4. 修改模式并嘗試將表移動(dòng)到生產(chǎn)環(huán)境
為了簡(jiǎn)化操作,將重命名其中一列。以下將Country_name替換為Country_name:
Python
customersSchema = StructType([
StructField("User_ID", IntegerType(), False),
StructField("Country_Name", StringType(), False), # Column name changes from Country to Country_name
StructField("Gender", StringType(), False),
StructField("Personal_ID", IntegerType(), True),
StructField("Customer_Name", StringType(), False),
StructField("Customer_FirstName", StringType(), False),
StructField("Customer_LastName", StringType(), False),
StructField("Birth_Date", StringType(), False),
StructField("Customer_Address", StringType(), False),
StructField("Street_ID", LongType(), False),
StructField("Street_Number", IntegerType(), False),
StructField("Customer_Type_ID", IntegerType(), False)
])
在攝取分支中,重新創(chuàng)建delta表:
Python
customersTablePath = f"s3a://{repo}/{ingestionBranch}/tables/customers"
df = spark.read.csv('./data/samples/OrionStar/CUSTOMER.csv',header=True,schema=customersSchema)
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(customersTablePath)
])
需要進(jìn)行修改:
Python
client.commits.commit(
repository=repo,
branch=ingestionBranch,
commit_creatinotallow=models.CommitCreation(
message='Added customers table with schema changes',
metadata={'using': 'python_api'}))
然后,可以嘗試將數(shù)據(jù)投入生產(chǎn):
Python
client.commits.commit(
repository=repo,
branch=ingestionBranch,
commit_creatinotallow=models.CommitCreation(
message='Added customer tables with schema changes!',
metadata={'using': 'python_api'}))
由于模式修改,得到了一個(gè)先決條件Failed錯(cuò)誤。合并前的掛鉤阻礙了晉升。因此,這些數(shù)據(jù)不會(huì)在生產(chǎn)中使用:
從lakeFS UI中,可以導(dǎo)航到存儲(chǔ)庫(kù)并選擇“Actions”選項(xiàng)。接下來(lái),單擊失敗操作的Run ID,選擇“主分支上的合并前檢查”,展開check_schema_changes,并查看錯(cuò)誤消息。
結(jié)語(yǔ)
由于存儲(chǔ)數(shù)據(jù)的異構(gòu)性和原始性,數(shù)據(jù)湖上的模式驗(yàn)證至關(guān)重要,但也很困難。管理模式演變、數(shù)據(jù)轉(zhuǎn)換和跨多種格式的兼容性檢查意味著每個(gè)數(shù)據(jù)從業(yè)者都需要一些非常強(qiáng)大的方法和工具。
數(shù)據(jù)湖的去中心化性質(zhì),許多用戶和系統(tǒng)可以在其中編輯數(shù)據(jù),使模式驗(yàn)證更加復(fù)雜。模式的驗(yàn)證對(duì)于數(shù)據(jù)治理、集成和可靠的分析至關(guān)重要。
像上面展示的預(yù)合并掛鉤這樣的解決方案有助于在將模式文件合并到生產(chǎn)分支之前驗(yàn)證它們。它在保證數(shù)據(jù)完整性和防止不兼容的模式更改合并到主分支時(shí)非常方便。它還增加了一層額外的質(zhì)量控制,使數(shù)據(jù)更加一致。
原文標(biāo)題:Managing Schema Validation in a Data Lake Using Daa Version Control,作者:Iddo Avneri