譯者 | 布加迪
審校 | 重樓
提取、轉(zhuǎn)換和加載(ETL)的三個(gè)階段通常涉及多個(gè)任務(wù),每個(gè)任務(wù)都可以獨(dú)立執(zhí)行。你可以將每個(gè)任務(wù)作為微服務(wù)來開發(fā)。
公司企業(yè)每天從各種業(yè)務(wù)運(yùn)營中生成大量數(shù)據(jù)。比如說,每當(dāng)客戶在零售店結(jié)賬時(shí),可以在銷售點(diǎn)(PoS)系統(tǒng)獲取諸如客戶標(biāo)識(shí)符、零售店標(biāo)識(shí)符、結(jié)賬時(shí)間、購買物品列表和總銷售額之類的數(shù)據(jù)。同樣,現(xiàn)場銷售人員可能會(huì)將潛在的銷售機(jī)會(huì)錄入到電子表格中。此外,大多數(shù)商業(yè)通信是通過電子郵件進(jìn)行的,這使得電子郵件成為一個(gè)大有價(jià)值的數(shù)據(jù)源。為了在整個(gè)組織保持信息的一致性,并從這些數(shù)據(jù)中獲得業(yè)務(wù)洞察力,從這些分散的數(shù)據(jù)源中提取必要的細(xì)節(jié)并保持所有的相關(guān)信息集中化就顯得至關(guān)重要。
提取、轉(zhuǎn)換和加載(ETL)技術(shù)側(cè)重于這個(gè)問題:從多個(gè)數(shù)據(jù)源提取數(shù)據(jù),將提取到的數(shù)據(jù)轉(zhuǎn)換成所需的格式,最后將其加載到相關(guān)的數(shù)據(jù)存儲(chǔ)或系統(tǒng)中。然而,由于業(yè)務(wù)和技術(shù)的進(jìn)步,ETL應(yīng)用生態(tài)也在迅速發(fā)生變化。其中面臨一些挑戰(zhàn):
- 使用人工智能從自然語言或非結(jié)構(gòu)化數(shù)據(jù)源中提取信息。
- 使用人工智能來轉(zhuǎn)換數(shù)據(jù)。
- 與基于云的系統(tǒng)連接以提取或加載數(shù)據(jù)。
- 在混合云環(huán)境中靈活部署ETL流。
- ETL流的可擴(kuò)展性。
- 像微服務(wù)那樣敏捷和快速地部署ETL流。
- 支持流式ETL操作。
- 針對(duì)小規(guī)模用例的低成本ETL部署。
我們?cè)谙挛膶⒂懻摌?gòu)建這種敏捷ETL流的體系結(jié)構(gòu)以及快速部署這些ETL流的方法。
用于構(gòu)建敏捷ETL流的體系結(jié)構(gòu)
ETL的每個(gè)提取、轉(zhuǎn)換和加載階段通常涉及多個(gè)任務(wù)。比如說,提取階段可能涉及從CSV文檔和電子郵件中提取數(shù)據(jù)的任務(wù)。與之相仿,轉(zhuǎn)換階段可能涉及刪除缺失字段的數(shù)據(jù)項(xiàng)、連接字段、分類以及將數(shù)據(jù)從一種格式映射到另一種格式等任務(wù)。最后,加載階段可能涉及加載到數(shù)據(jù)倉庫、更新數(shù)據(jù)庫中的數(shù)據(jù)項(xiàng)或?qū)?shù)據(jù)插入不同系統(tǒng)中等任務(wù)。這樣的ETL流如下圖所示:
圖1
一旦提供了原始數(shù)據(jù)或另一個(gè)任務(wù)的輸出,這些任務(wù)中的每一個(gè)都可以獨(dú)立執(zhí)行。因此,可以使用合適的技術(shù)實(shí)現(xiàn)這每一個(gè)任務(wù),并將它們作為可獨(dú)立部署和擴(kuò)展的集群加以執(zhí)行。這使得我們可以將每個(gè)任務(wù)作為微服務(wù)來開發(fā)。
此外,任務(wù)之間存在依賴關(guān)系。比如說,“連接字段”任務(wù)依賴“從文件系統(tǒng)中提取”任務(wù)??梢允褂枚喾N方法在這類相關(guān)任務(wù)之間傳遞數(shù)據(jù)。一種簡單的方法就是使用REST API調(diào)用在這些任務(wù)之間進(jìn)行聯(lián)系。然而,如果在任務(wù)之間使用消息傳遞系統(tǒng),就可以促進(jìn)解耦并提高可靠性。然后,每個(gè)任務(wù)使用來自消息傳遞系統(tǒng)中某個(gè)主題的數(shù)據(jù),并在處理完成后將輸出數(shù)據(jù)發(fā)布到另一個(gè)主題。這種方法有諸多優(yōu)點(diǎn):
- 每個(gè)任務(wù)可以以自己的速度工作,而不會(huì)被前一個(gè)任務(wù)的請(qǐng)求過載。
- 如果任務(wù)失敗,數(shù)據(jù)不會(huì)丟失。
- 可以將另外的任務(wù)添加到ETL流中,而不會(huì)影響當(dāng)前任務(wù)。
將ETL任務(wù)作為微服務(wù)來實(shí)現(xiàn)并通過消息傳遞層方便其通信的體系結(jié)構(gòu)如下所示:
圖2
將每個(gè)ETL任務(wù)分離為微服務(wù)可以視作邏輯體系結(jié)構(gòu)。在實(shí)際的實(shí)現(xiàn)中,可以根據(jù)可擴(kuò)展性、開發(fā)團(tuán)隊(duì)和預(yù)期的可擴(kuò)展性需求等因素,確定是將ETL任務(wù)作為單獨(dú)的微服務(wù)來實(shí)現(xiàn),還是將多個(gè)任務(wù)組合成單個(gè)微服務(wù)。
實(shí)現(xiàn)ETL任務(wù)
下一步是實(shí)現(xiàn)單獨(dú)的ETL任務(wù)。因?yàn)檫@每一個(gè)任務(wù)都是微服務(wù),任何技術(shù)都可以用于實(shí)現(xiàn)。ETL任務(wù)通常包括三個(gè)步驟:
- 與本地?cái)?shù)據(jù)中心和云端可用的數(shù)據(jù)存儲(chǔ)和外部端點(diǎn)集成。
- 處理龐大且復(fù)雜的數(shù)據(jù)結(jié)構(gòu)。
- 通過多種格式和協(xié)議傳輸數(shù)據(jù)。
許多支持微服務(wù)風(fēng)格部署的集成技術(shù)都可以用于實(shí)現(xiàn)ETL任務(wù)。適合此用途的一種技術(shù)是Ballerina編程語言,它是專門為集成而設(shè)計(jì)的。Ballerina直接支持服務(wù)開發(fā)、數(shù)據(jù)庫連接、通用協(xié)議、數(shù)據(jù)轉(zhuǎn)換以及多種數(shù)據(jù)類型(如JSON、XML、CSV和EDI)。此外,它還附帶大量連接件,以便與本地系統(tǒng)和SaaS系統(tǒng)集成。我們?cè)谙旅鎸⑻接懸恍┦褂肂allerina開發(fā)ETL任務(wù)的示例。
數(shù)據(jù)提取
業(yè)務(wù)數(shù)據(jù)有可能放在數(shù)據(jù)庫、CSV文件、EDI文檔、電子表格或ERP應(yīng)用軟件等各種企業(yè)系統(tǒng)中。因此,數(shù)據(jù)提取任務(wù)需要連接所有這些數(shù)據(jù)源,并使用它們支持的格式讀取數(shù)據(jù)。下面是使用Ballerina從數(shù)據(jù)庫、CSV文件和EDI文檔中提取數(shù)據(jù)的幾個(gè)示例。
- 讀取數(shù)據(jù)庫
stream orders = dbClient->/orderdata;
check from var orderData in orders
do {
io:println(orderData);
};
- 讀取CSV文件
stream productDataStream = check io:fileReadCsvAsStream("product_data.csv");
check productDataStream.forEach(
function(string[] productData) {
io:println(productData);
});
- 讀取EDI文檔
string ediText = check io:fileReadString("resources/purchase_order.edi");
PurchaseOrder simpleOrder = check fromEdiString(ediText);
io:println(string `Order Id: ${simpleOrder.header.orderId}`);
數(shù)據(jù)提取階段可能還需要從非結(jié)構(gòu)化數(shù)據(jù)源中提取數(shù)據(jù)。這方面的一個(gè)典例是從電子郵件、留言和評(píng)論中提取結(jié)構(gòu)化信息。下面的例子演示了使用Ballerina和OpenAI從評(píng)論中提取好評(píng)、差評(píng)和改進(jìn)建議等信息。
chat:CreateChatCompletionRequest request = {
model: "gpt-3.5-turbo",
messages: [
{
role: "user",
content: string `
Extract the following details in JSON from the reviews given.
{
good_points: string,
bad_points: string,
improvement_points: string
}
The fields should contain points extracted from all reviews
Here are the reviews:
${string:'join(",", ...summaryRequest.reviews)}
`
}
]
};
chat:CreateChatCompletionResponse summary = check openAiChat->/chat/completions.post(request);
if summary.choices.length() > 0 {
string content = check summary.choices[0].message?.content.ensureType();
io:println(content);
chat:CreateChatCompletionRequest request = {
model: "gpt-3.5-turbo",
messages: [
{
role: "user",
content: string `
Extract the following details in JSON from the reviews given.
{
good_points: string,
bad_points: string,
improvement_points: string
}
The fields should contain points extracted from all reviews
Here are the reviews:
${string:'join(",", ...summaryRequest.reviews)}
`
}
]
};
chat:CreateChatCompletionResponse summary = check openAiChat->/chat/completions.post(request);
if summary.choices.length() > 0 {
string content = check summary.choices[0].message?.content.ensureType();
io:println(content);
}
數(shù)據(jù)轉(zhuǎn)換
提取的數(shù)據(jù)可能來自員工填寫的電子表格、從手寫文檔掃描而來的文本或操作員輸入到系統(tǒng)的數(shù)據(jù)。因此,這類數(shù)據(jù)可能含有拼寫錯(cuò)誤、缺失字段、重復(fù)信息或無效數(shù)據(jù)。因此,轉(zhuǎn)換階段必須在將這些數(shù)據(jù)記錄加載到目標(biāo)系統(tǒng)之前加以清潔。此外,可能需要在轉(zhuǎn)換階段將來自多個(gè)數(shù)據(jù)源的相關(guān)細(xì)節(jié)組合起來,以便豐富數(shù)據(jù)。下面的例子展示了使用Ballerina來完成這些任務(wù)。
- 刪除重復(fù)信息
function removeDuplicates(SalesOrder[] orders) returns SalesOrder[] {
return from var {itemId, customerId, itemName, quantity, date} in orders
group by itemId, customerId, itemName
select {
itemId,
customerId,
itemName,
quantity: [quantity][0],
date: [date][0]
};
}
- 識(shí)別無效數(shù)據(jù)項(xiàng)
function isValidEmail(string inputString) returns boolean {
string:RegExp emailPattern = re `[A-Za-z0-9\._%+-]+@[A-Za-z0-9\.-]+\.[A-Za-z]{2,}`;
return emailPattern.isFullMatch(inputString);
}
- 數(shù)據(jù)豐富
CRMResponse response = check crmClient->/crm/api/customers/'json(customerId = customer.id);
if response.status == "OK" {
customer.billingAddress = response.billingAddress;
customer.primaryContact = response.telephone;
}
提取的數(shù)據(jù)常常需要在存儲(chǔ)到目標(biāo)系統(tǒng)之前轉(zhuǎn)換成不同的格式。然而,ETL任務(wù)通常不得不處理由數(shù)百個(gè)字段組成的非常龐大的數(shù)據(jù)結(jié)構(gòu),這可能使數(shù)據(jù)映射成為一項(xiàng)乏味枯燥的任務(wù)??梢允褂肂allerina的可視化數(shù)據(jù)映射功能簡化這項(xiàng)操作,如下所示:
圖3
數(shù)據(jù)加載
最后,數(shù)據(jù)加載階段的任務(wù)需要連接不同的目標(biāo)系統(tǒng),并通過所需的協(xié)議發(fā)送數(shù)據(jù)。使用TLS和OAuth2等技術(shù)實(shí)現(xiàn)安全連接到這些目標(biāo)系統(tǒng)也很重要。Ballerina有大量的連接件,并內(nèi)置支持所有常見的安全標(biāo)準(zhǔn),因而實(shí)現(xiàn)此類數(shù)據(jù)加載任務(wù)變得容易。下面的示例展示了如何將數(shù)據(jù)插入到Google BigQuery中。
SalesData[] salesDataset = check io:fileReadCsv("./resources/sales_data.csv");
bigquery:TabledatainsertallrequestRows[] rows = from var salesData in salesDataset
select {insertId: uuid:createType1AsString(), 'json: salesData};
bigquery:TableDataInsertAllRequest payload = {rows};
_ = check bigQueryClient->insertAllTableData(projectId, datasetId, tableId, payload);
有時(shí)候,業(yè)務(wù)用戶可能希望檢查某些數(shù)據(jù)記錄,比如缺失值或無效值的數(shù)據(jù)項(xiàng)。就微服務(wù)體系結(jié)構(gòu)而言,引入這樣一個(gè)額外的任務(wù)只需要添加一個(gè)微服務(wù)來讀取相關(guān)主題,并將數(shù)據(jù)加載到電子表格之類的最終用戶系統(tǒng)中。下面是一個(gè)從主題中讀取數(shù)據(jù)并將其插入到Google Sheets的示例。
sheets:Spreadsheet sheet = check spreadsheetClient->createSpreadsheet(sheetName);
_ = check spreadsheetClient->
appendValue(sheet.spreadsheetId, ["Product", "Sales", "Date"], {sheetName: workSheetName});
foreach var {product, sales, date} in salesSummary {
_ = check spreadsheetClient->
appendValue(sheet.spreadsheetId, [product, sales, date], {sheetName: workSheetName});
}
部署和測(cè)試ETL流
將單個(gè)ETL任務(wù)作為微服務(wù)來開發(fā)便于將整個(gè)ETL流部署到Kubernetes集群中。每個(gè)ETL任務(wù)都可以是Kubernetes部署環(huán)境中的一個(gè)pod,從而可以根據(jù)負(fù)載大小來增加或減少單個(gè)ETL任務(wù)的pod數(shù)量。然而,組織通常有多個(gè)ETL流,每個(gè)流又涉及許多任務(wù)。此外,這些ETL流可能歸不同的團(tuán)隊(duì)擁有。因此,擁有適當(dāng)?shù)腃I/CD管道、權(quán)限模型、監(jiān)測(cè)功能以及用于開發(fā)、測(cè)試、性能驗(yàn)證和生產(chǎn)的多個(gè)環(huán)境至關(guān)重要。
Ballerina可以與所有常見的CI/CD、監(jiān)測(cè)和部署技術(shù)一起工作,從而無縫地將基于Ballerina的ETL流與組織的現(xiàn)有基礎(chǔ)設(shè)施集成在一起。比如說,Ballerina ETL源代碼可以在GitHub中加以維護(hù),CI/CD操作可以使用Jenkins來實(shí)現(xiàn),ETL流可以部署在Amazon EKS上,執(zhí)行則可以使用Prometheus和Grafana加以監(jiān)測(cè)。
另一個(gè)部署選項(xiàng)是Choreo平臺(tái),該平臺(tái)默認(rèn)情況下提供了所有這些功能。由于Choreo讓用戶無需構(gòu)建平臺(tái),因此可以通過部署一組選定的ETL流、進(jìn)行測(cè)試并將它們轉(zhuǎn)移到生產(chǎn)環(huán)境中,立即開啟ETL之旅。然后可以對(duì)這些ETL流進(jìn)行改動(dòng),或者可以在相應(yīng)的數(shù)據(jù)源存儲(chǔ)庫中引入新的ETL流,新的ETL流由Choreo攝取后部署到開發(fā)環(huán)境中。
結(jié)語
本文討論了靈活的、類似微服務(wù)的ETL流的體系結(jié)構(gòu)和Ballerina語言實(shí)現(xiàn)。考慮到大多數(shù)業(yè)務(wù)部門生成數(shù)據(jù),并有著獨(dú)特的數(shù)據(jù)需求,Ballerina語言提供的數(shù)據(jù)處理能力、連接性和靈活的部署選項(xiàng)可能具有變革性。Ballerina團(tuán)隊(duì)目前正在竭力改進(jìn)工具支持,力求使構(gòu)建集成和ETL流變得更簡單。
原文標(biāo)題:Developing agile ETL flows with Ballerina,作者:Chathura Ekanayake