譯者 | 李睿
審校 | 重樓
梅西百貨公司首席數(shù)據(jù)工程師Naresh Erukulla是一位勇于迎接挑戰(zhàn)的數(shù)據(jù)工程師,他擅長用簡潔明了的概念驗證(POC)解決各種問題。最近,Naresh關(guān)注到了數(shù)據(jù)工程師日常工作中普遍遭遇的一個難題,并為此采取行動,為所有批處理和流數(shù)據(jù)管道設(shè)置了警報系統(tǒng)。當(dāng)錯誤超過閾值或數(shù)據(jù)管道出現(xiàn)故障時,可以迅速通過電子郵件向數(shù)據(jù)工程師發(fā)送故障通知,確保問題能夠得到及時處理。
一切似乎都在順利進(jìn)行中,直到他注意到一個關(guān)鍵數(shù)據(jù)集無法加載到BigQuery中。在調(diào)查了錯誤日志之后,發(fā)現(xiàn)一些“缺少所需數(shù)據(jù)”提示的消息。當(dāng)看到用戶輸入文件中頻繁出現(xiàn)的原始數(shù)據(jù)問題時,他為此感到困惑。
處理數(shù)據(jù)不一致問題,特別是數(shù)據(jù)缺失或格式錯誤,會在分析和運營工作流程的后續(xù)環(huán)節(jié)引發(fā)嚴(yán)重的后果。有一個關(guān)鍵的下游報告正是建立在這些輸入數(shù)據(jù)的基礎(chǔ)之上。該報告在日常業(yè)務(wù)中發(fā)揮著至關(guān)重要的作用,它能夠反映出公司在多個領(lǐng)域內(nèi)的關(guān)鍵指標(biāo)表現(xiàn),并且為決策制定提供了不可或缺的數(shù)據(jù)支持。在這份至關(guān)重要的報告中,所有高管級別的利益相關(guān)者都依賴這些數(shù)據(jù)來展示業(yè)績指標(biāo)、討論面臨的挑戰(zhàn)以及規(guī)劃未來的發(fā)展路徑。
Erukulla耗費了數(shù)小時檢查源CSV文件,該文件承載了來自另一個上游應(yīng)用程序的大量事務(wù)數(shù)據(jù)。準(zhǔn)確識別并修正問題行顯得至關(guān)重要。然而,當(dāng)他著手處理這些問題時,發(fā)現(xiàn)已經(jīng)錯過截止日期,這無疑令利益相關(guān)者深感失望。Erukulla也意識到傳統(tǒng)數(shù)據(jù)管道的脆弱性。它們很容易出錯,而且往往需要多次人工干預(yù)來進(jìn)行修復(fù),這個過程既耗時又容易出錯。
人們是否也遇到過類似的情況?是否花費了大量時間調(diào)試數(shù)據(jù)管道,結(jié)果卻發(fā)現(xiàn)根本原因只是一個簡單的格式錯誤或缺少必填字段?事實上,世界各地的數(shù)據(jù)工程師每天都在努力應(yīng)對這些挑戰(zhàn)。那么是否有可以構(gòu)建能夠“自我修復(fù)”數(shù)據(jù)管道的方法?這正是Erukulla追求的目標(biāo)。
自我修復(fù)數(shù)據(jù)管道的工作原理
自我修復(fù)數(shù)據(jù)管道的想法很簡單:當(dāng)數(shù)據(jù)處理過程中出現(xiàn)錯誤時,數(shù)據(jù)管道應(yīng)該自動檢測、分析和糾正錯誤,而無需人工干預(yù)。傳統(tǒng)上,解決這些問題需要人工干預(yù),這既耗時又容易出錯。
雖然有多種方法可以實現(xiàn)這一點,,但使用人工智能代理是最好的方法,也是數(shù)據(jù)工程師在未來自我修復(fù)故障數(shù)據(jù)管道并動態(tài)自動糾正它們的方法。本文將展示如何使用像GPT-4/DeepSeek R1模型這樣的LLM來自修復(fù)數(shù)據(jù)管道的基本實現(xiàn),其方法是使用LLM對失敗記錄進(jìn)行分析并提出建議,并在數(shù)據(jù)管道運行的過程中應(yīng)用這些修復(fù)措施。所提供的解決方案可以擴(kuò)展到大型數(shù)據(jù)管道,并將擴(kuò)展更多的功能。
以下介紹如何利用OpenAI API在云計算環(huán)境中使用GPT-4模型構(gòu)建一個實用的管道。遵循的基本步驟如下:
- 將源文件上傳到谷歌云存儲桶(Google Cloud Storage Bucket)。如果沒有谷歌云平臺的訪問權(quán)限,則可以使用任何本地或其他云存儲。
- 創(chuàng)建數(shù)據(jù)模型,用于將原始數(shù)據(jù)提取到BigQuery表中,將錯誤記錄提取到錯誤表中。
- 從CSV中讀取源文件,并從輸入數(shù)據(jù)中識別干凈(Clean)數(shù)據(jù)集和無效記錄錯誤行(Error Rows)數(shù)據(jù)集。
- 將Clean數(shù)據(jù)集導(dǎo)入BigQuery,并使用提示將Error Rows數(shù)據(jù)集傳遞給LLM。
- 對于每個錯誤行(Error Rows),OpenAI的GPT API進(jìn)行分析并提供智能產(chǎn)品ID分配。
- 使用Google BigQuery動態(tài)存儲和檢索產(chǎn)品信息。
- 基于Python的自動化無縫集成。
可以參閱Erukulla在GitHub上的完整代碼庫。
1.從云存儲讀取輸入數(shù)據(jù)
數(shù)據(jù)管道首先讀取存儲在Cloud Storage中的客戶端上傳的CSV文件,可以利用云函數(shù)(無服務(wù)器執(zhí)行管道步驟)在新文件上傳到存儲桶時觸發(fā)。該函數(shù)使用谷歌云存儲庫(google-cloud-storage)讀取文件,并將其解析為Pandas DataFrame以供進(jìn)一步處理。
在將數(shù)據(jù)傳遞到下一步之前,可以實施一些數(shù)據(jù)質(zhì)量檢查。然而,現(xiàn)實世界中的數(shù)據(jù)問題是動態(tài)的,無法預(yù)測和編寫所有測試用例,這會使代碼變得復(fù)雜且難以閱讀。
在這個用例中,CSV文件包含字段ProductID、Price、name、saleAmount。以下是包含數(shù)據(jù)的示例文件(ProductID和Price字段中也缺少數(shù)據(jù))。
1 # Read CSV from GCS
2 client = storage.Client()
3 bucket = client.bucket(bucket_name)
4 blob = bucket.blob(file_name)
5 data = blob.download_as_text()
6 df = pd.read_csv(io.StringIO(data))
7
2.將數(shù)據(jù)導(dǎo)入BigQuery
接下來,數(shù)據(jù)管道嘗試將數(shù)據(jù)導(dǎo)入到BigQuery中。如果由于模式不匹配、數(shù)據(jù)類型錯誤或缺少字段而導(dǎo)致任何行失敗,則捕獲并記錄它們以供進(jìn)一步分析。這一步驟對于檢測底層錯誤信息至關(guān)重要,這些錯誤信息將用于識別OpenAI API的可能解決方案。
1 # Function to clean and preprocess data
2 def clean_data(df):
3 avg_price = get_average_price()
4
5 df["Price"] = df["Price"].fillna(avg_price)
6
7 # Log and remove rows with issues
8 error_rows = df[df["ProductID"].isna()]
9 clean_df = df.dropna(subset=["ProductID"])
10
11 return clean_df, error_rows
12
13 # Function to query BigQuery for an average price
14 def get_average_price():
15 client = bigquery.Client()
16 query = f"SELECT AVG(Price) AS avg_price FROM `{BQ_PROJECT_ID}.{BQ_DATASET_ID}.Product_Info`"
17
18 try:
19 df = client.query(query).to_dataframe()
20 avg_price = df["avg_price"][0]
21 print(f"Fetched Average Price: {avg_price}")
22 return avg_price
23 except Exception as e:
24 print(f"Error fetching average price: {e}")
25 return None
26
注意,avg_price = get_average_price()是從BigQuery查詢中獲取的。
在插入干凈的數(shù)據(jù)集之后如下圖所示:
3.使用LLM分析錯誤
分析錯誤是整個流程中的關(guān)鍵步驟,這就是采用LLM的神奇之處。失敗的記錄被發(fā)送到GPT-4或DeepSeek R1等LLM進(jìn)行分析。LLM檢查錯誤并提出更正建議和修正后的記錄。
例如,假設(shè)日期字段的格式不正確。在這種情況下,LLM可能會建議從字符串到整數(shù)轉(zhuǎn)換或從字符串到日期/時間戳轉(zhuǎn)換的正確格式記錄,反之亦然。在數(shù)據(jù)是預(yù)期的但發(fā)現(xiàn)為空的情況下,根據(jù)代碼強(qiáng)制執(zhí)行的規(guī)則,帶有“平均”(Average)或“默認(rèn)”(Default)值的缺失值將被修復(fù),以確保數(shù)據(jù)攝取成功。
通過重試機(jī)制實現(xiàn)ChatCompletion請求。
為了確保彈性,利用tenacity實現(xiàn)了重試機(jī)制。該函數(shù)將錯誤細(xì)節(jié)發(fā)送給GPT并檢索建議的修復(fù)程序。在本文的示例中,創(chuàng)建了‘functions’(函數(shù))列表,并使用ChatCompletion Request將其傳遞給JSON有效負(fù)載。
需要注意,‘functions’列表是使用在管道代碼中創(chuàng)建的Python函數(shù)來修復(fù)已知或可能問題的所有函數(shù)的列表。GPT分析輸入提示符和錯誤消息,以確定是否調(diào)用‘functions’列表中列出的特定函數(shù)來修復(fù)問題。
因此,GPT的響應(yīng)提供了指示應(yīng)該調(diào)用哪個函數(shù)的結(jié)構(gòu)化數(shù)據(jù)。GPT不會直接執(zhí)行函數(shù),而是由數(shù)據(jù)管道來執(zhí)行。
1 @retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3))
2 def chat_completion_request(messages, functinotallow=None, model=GPT_MODEL):
3 headers = {
4 "Content-Type": "application/json",
5 "Authorization": "Bearer " + openai.api_key,
6 }
7 json_data = {"model": model, "messages": messages}
8 if functions is not None:
9 json_data.update({"functions": functions})
10 try:
11 response = requests.post(
12 "https://api.openai.com/v1/chat/completions",
13 headers=headers,
14 jsnotallow=json_data,
15 )
16 return response.json()
17 except Exception as e:
18 print("Unable to generate ChatCompletion response")
19 print(f"Exception: {e}")
20 return e
21 # Function to send ChatCompletion request to OpenAI API
22 functions = [
23 {
24 "name": "assign_product_id",
25 "description": "assigning a unique ProductID",
26 "parameters": {
27 "type": "object",
28 "properties": {
29 "ProductID": {
30 "type": "integer",
31 "description": "The product ID to assign."
32 },
33 }
34 },
35 }
36 ]
37
assign_product_id是‘functions’列表中列出的函數(shù),GPT可以在需要時調(diào)用它。在這個示例中,CSV文件的最后兩行缺少ProductID。因此,GPT調(diào)用特定的assign_product_id函數(shù)來確定ProductID值。
assign_product_id函數(shù)從BigQuery中獲取最高分配的ProductID,并將其遞增以供后續(xù)使用。如果它是首次運行,或者BigQuery表中沒有可用的數(shù)據(jù),它將分配默認(rèn)的99999作為ProductID。
1 def assign_product_id():
2 client = bigquery.Client()
3 # table_ref = client.dataset(BQ_DATASET_ID).table(BQ_TABLE_ID)
4
5 query = f"""
6 Select max(ProductID) as max_id from `{BQ_PROJECT_ID}.{BQ_DATASET_ID}.{BQ_TABLE_ID}` WHERE ProductID < 99999
7 """
8 df = client
9 try:
10 df = client.query(query).to_dataframe()
11 except Exception as e:
12 print(f"Error fetching max ProductID: {e}")
13 return None
14 return df["max_id"][0] + 1 if not df.empty else 99999
15
4.應(yīng)用自動更正
數(shù)據(jù)管道將GPT的建議應(yīng)用于失敗的記錄,并重新嘗試將它們導(dǎo)入到BigQuery中。如果更正成功,數(shù)據(jù)將存儲在主表中。如果沒有,不可修復(fù)的記錄將被記錄到一個單獨的錯誤表中,以供人工檢查。
在字段是必需且唯一的情況下,GPT可以從BigQuery獲得唯一的ProductID值,并在此值的基礎(chǔ)上加1,以確保其唯一性。考慮管道中有多個錯誤行的情況;每個記錄都按照GPT響應(yīng)提供的修復(fù)程序順序處理,并用建議值更新錯誤記錄。
在以下的代碼中,ProductID被從assign_product_id()BigQuery表中獲取的值替換。當(dāng)有多個錯誤行時,每個錯誤行都會通過遞增ProductID獲得一個唯一的數(shù)字。
1 # Function to send error data to GPT-4 for analysis
2 def analyze_errors_with_gpt(error_rows):
3 if error_rows.empty:
4 return error_rows
5
6 new_product_id = assign_product_id()
7
8 for index, row in error_rows.iterrows():
9 prompt = f"""
10 Fix the following ProductID by assigning a unique ProductID from the bigquery table Product_Info:
11 {row.to_json()}
12 """
13 chat_response = chat_completion_request(
14 model=GPT_MODEL,
15 messages=[{"role": "user", "content": prompt}],
16 functions=functions
17 )
18
19 if chat_response is not None:
20 try:
21 if chat_response["choices"][0]["message"]:
22 response_content = chat_response["choices"][0]["message"]
23 else:
24 print("Chat response content is None")
25 continue
26 except json.JSONDecodeError as e:
27 print(f"Error decoding JSON response: {e}")
28 continue
29
30 if 'function_call' in response_content:
31 if response_content['function_call']['name'] == 'assign_product_id':
32 res = json.loads(response_content['function_call']['arguments'])
33 res['product_id'] = new_product_id
34 error_rows.at[index, "ProductID"] = res['product_id']
35 new_product_id += 1 # Increment the ProductID for the next row
36
37 print(f"Assigned ProductID: {res['product_id']}")
38 else:
39 print("Function not supported")
40 else:
41 chat.add_prompt('assistant', response_content['content'])
42 else:
43 print("ChatCompletion request failed. Retrying...")
44
45 return error_rows
46
5.將已修改的行導(dǎo)入到BigQuery表中
main函數(shù)從谷歌云存儲(Google Cloud Storage)讀取數(shù)據(jù)并進(jìn)行清理,并將有效數(shù)據(jù)導(dǎo)入到BigQuery中,同時動態(tài)修復(fù)錯誤。
1 # Main function to execute the pipeline
2 def main():
3 bucket_name = "self-healing-91"
4 file_name = "query_results.csv"
5
6 # Read CSV from GCS
7 client = storage.Client()
8 bucket = client.bucket(bucket_name)
9 blob = bucket.blob(file_name)
10 data = blob.download_as_text()
11 df = pd.read_csv(io.StringIO(data))
12
13 # Clean data and identify errors
14 clean_df, error_rows = clean_data(df)
15
16 # Load valid data into BigQuery
17 load_to_bigquery(clean_df, BQ_TABLE_ID)
18
19 # Process errors if any
20 if not error_rows.empty:
21
22 # Analyze errors with GPT-4
23 error_rows = analyze_errors_with_gpt(error_rows)
24
25 load_to_bigquery(error_rows, BQ_TABLE_ID)
26
27 print("Fixed Errors loaded successfully into BigQuery original table.")
28
在修復(fù)數(shù)據(jù)錯誤之后,需要特別檢查第66至68行。從BigQuery表中獲取最大值10000 ProductID后,對這些ID逐一進(jìn)行遞增處理。此外,錯誤行中缺少信息的Price字段被BigQuery表中的Avg(Price)替換。
6.日志記錄和監(jiān)控
在整個過程中,使用云日志(Cloud Logging)記錄錯誤和數(shù)據(jù)管道的活動。這確保工程師可以監(jiān)控數(shù)據(jù)管道的運行狀況并排查問題。
采用的工具和技術(shù)
以下是用來構(gòu)建和測試數(shù)據(jù)管道的關(guān)鍵工具和技術(shù):
- 云存儲:用于存儲輸入的CSV文件。
- 云函數(shù):用于無服務(wù)器執(zhí)行管道步驟。
- BigQuery:用于存儲清理過的數(shù)據(jù)和錯誤日志。
- GPT-4/DeepSeek R1:用于分析失敗記錄并提出更正建議。
- 云日志:用于監(jiān)視和故障排除。
- 云編排器:它用于使用Apache氣流編排管道。
面臨的挑戰(zhàn)
1. LLM集成
將LLM集成到數(shù)據(jù)管道中頗具挑戰(zhàn)性。必須確保API調(diào)用是有效的,LLM的響應(yīng)是準(zhǔn)確的。此外,還有成本方面的考慮,由于為LLM配置API對于大型數(shù)據(jù)集來說可能成本高昂。因此,只需知道必須為該服務(wù)設(shè)置一個API密鑰。
例如,對于OpenAI,必須訪問https://platform.openai.com/來注冊和生成新的API密鑰,并在發(fā)送帶有提示的API調(diào)用時在數(shù)據(jù)管道中使用它。
2.錯誤處理
設(shè)計一個穩(wěn)健的錯誤處理機(jī)制具有挑戰(zhàn)性。必須考慮各種錯誤,從模式不匹配到網(wǎng)絡(luò)問題,并確保數(shù)據(jù)管道能夠優(yōu)雅地處理它們。數(shù)據(jù)管道可能會面臨許多問題,而且所有問題都不能動態(tài)解決,例如文件為空或BigQuery表不存在等問題。
3.可擴(kuò)展性
隨著數(shù)據(jù)量的增長,必須優(yōu)化數(shù)據(jù)管道以實現(xiàn)可擴(kuò)展性。這涉及到在BigQuery中對數(shù)據(jù)進(jìn)行分區(qū),并使用Dataflow進(jìn)行大規(guī)模處理。
4.成本管理
雖然谷歌云平臺提供了強(qiáng)大的工具,但使用這些工具需要支付費用。因此必須仔細(xì)監(jiān)控使用情況并優(yōu)化數(shù)據(jù)管道,以避免額外的成本。OpenAI API成本是需要仔細(xì)監(jiān)控的另一個因素。
結(jié)論和要點
對于數(shù)據(jù)工程師來說,構(gòu)建自我修復(fù)的數(shù)據(jù)管道是一個改變游戲規(guī)則的方法。它可以減少人工干預(yù),提高效率,保證數(shù)據(jù)質(zhì)量。然而,這并不是靈丹妙藥。雖然自我修復(fù)數(shù)據(jù)管道可以節(jié)省時間,但它們會帶來額外的成本,例如LLM API費用和增加的云函數(shù)的使用量。因此,權(quán)衡這些成本與收益至關(guān)重要。
對于自我修復(fù)數(shù)據(jù)管道領(lǐng)域的新手來說,建議從小型項目著手,首先嘗試集成大型語言模型(LLM)和處理基本錯誤,然后再逐步擴(kuò)展。在這一過程中,定期監(jiān)控數(shù)據(jù)管道的性能和成本。使用云監(jiān)控和云日志之類的工具來識別瓶頸并進(jìn)行相應(yīng)的優(yōu)化。最后,要與數(shù)據(jù)科學(xué)家、分析師和業(yè)務(wù)利益相關(guān)者緊密合作,了解他們的實際需求,并確保當(dāng)業(yè)務(wù)需求發(fā)生變化時,其數(shù)據(jù)管道能夠持續(xù)創(chuàng)造價值。
總之,自我修復(fù)的數(shù)據(jù)管道代表著數(shù)據(jù)工程的未來。通過利用歌云平臺和LLM等工具,可以構(gòu)建健壯、高效、智能的管道,從而最大限度地減少停機(jī)時間并提升生產(chǎn)效率。如果曾經(jīng)受到脆弱的數(shù)據(jù)管道的困擾,可以探索和采用這一方法,而前期的努力將帶來長期的收益。
原文標(biāo)題:Self-Healing Data Pipelines: The Next Big Thing in Data Engineering?,作者:Naresh Erukulla