譯者 | 布加迪
審校 | 重樓
高質(zhì)量的數(shù)據(jù)在數(shù)據(jù)科學(xué)中非常重要,但這類數(shù)據(jù)常常來自許多地方,格式混亂。一些數(shù)據(jù)來自數(shù)據(jù)庫,另一些數(shù)據(jù)來自文件或網(wǎng)站。這些原始數(shù)據(jù)很難立即使用,因此我們需要先對其進(jìn)行清理和組織。
ETL是幫助完成這項(xiàng)任務(wù)的過程。ETL代表提取、轉(zhuǎn)換和加載。提取意味著從不同的來源收集數(shù)據(jù)。轉(zhuǎn)換意味著清理和格式化數(shù)據(jù)。加載意味著將數(shù)據(jù)存儲在數(shù)據(jù)庫中以便訪問。構(gòu)建ETL管道使這個(gè)過程實(shí)現(xiàn)了自動化。穩(wěn)健的ETL管道可以節(jié)省時(shí)間,并確保數(shù)據(jù)可靠。
我們在本文中將了解如何為數(shù)據(jù)科學(xué)項(xiàng)目構(gòu)建ETL管道。
什么是ETL管道?
ETL管道將數(shù)據(jù)從源端移動到目標(biāo)端。它分為三個(gè)階段:
1. 提?。簭亩鄠€(gè)來源(比如數(shù)據(jù)庫或文件)收集數(shù)據(jù)。
2. 轉(zhuǎn)換:清理和轉(zhuǎn)換數(shù)據(jù)以供分析。
3. 加載:將清理后的數(shù)據(jù)存儲在數(shù)據(jù)庫或其他系統(tǒng)中。
為什么ETL管道很重要?
ETL管道很重要,有這幾個(gè)原因:
- 數(shù)據(jù)質(zhì)量:轉(zhuǎn)換通過處理缺失值和修復(fù)錯(cuò)誤來幫助清理數(shù)據(jù)。
- 數(shù)據(jù)可訪問性:ETL管道將來自許多來源的數(shù)據(jù)放到一個(gè)地方,以便訪問。
- 自動化:管道自動執(zhí)行重復(fù)性任務(wù),讓數(shù)據(jù)科學(xué)家可以專注于分析。
現(xiàn)在,讓我們用Python構(gòu)建一個(gè)簡單的ETL管道。
數(shù)據(jù)攝取
首先,我們需要得到數(shù)據(jù)。我們將從一個(gè)CSV文件中提取數(shù)據(jù)。
import pandas as pd
# Function to extract data from a CSV file
def extract_data(file_path):
try:
data = pd.read_csv(file_path)
print(f"Data extracted from {file_path}")
return data
except Exception as e:
print(f"Error in extraction: {e}")
return None
# Extract employee data
employee_data = extract_data('/content/employees_data.csv')
# Print the first few rows of the data
if employee_data is not None:
print(employee_data.head())
數(shù)據(jù)轉(zhuǎn)換
收集數(shù)據(jù)后,我們需要對其進(jìn)行轉(zhuǎn)換。這意味著要清理數(shù)據(jù),并確保其正確。我們還將數(shù)據(jù)更改為可用于分析的格式。下面是一些常見的轉(zhuǎn)換:
- 處理缺失的數(shù)據(jù):刪除或填寫缺失的值。
- 創(chuàng)建衍生的特征:創(chuàng)建新的列,比如工資區(qū)間或年齡組。
- 編碼類別:將部門名稱等數(shù)據(jù)更改為計(jì)算機(jī)可以使用的格式。
# Function to transform employee data
def transform_data(data):
try:
# Ensure salary and age are numeric and handle any errors
data['Salary'] = pd.to_numeric(data['Salary'], errors='coerce')
data['Age'] = pd.to_numeric(data['Age'], errors='coerce')
# Remove rows with missing values
data = data.dropna(subset=['Salary', 'Age', 'Department'])
# Create salary bands
data['Salary_band'] = pd.cut(data['Salary'], bins=[0, 60000, 90000, 120000, 1500000], labels=['Low', 'Medium', 'High', 'Very High'])
# Create age groups
data['Age_group'] = pd.cut(data['Age'], bins=[0, 30, 40, 50, 60], labels=['Young', 'Middle-aged', 'Senior', 'Older'])
# Convert department to categorical
data['Department'] = data['Department'].astype('category')
print("Data transformation complete")
return data
except Exception as e:
print(f"Error in transformation: {e}")
return None
employee_data = extract_employee_data('/content/employees_data.csv')
# Transform the employee data
if employee_data is not None:
transformed_employee_data = transform_data(employee_data)
# Print the first few rows of the transformed data
print(transformed_employee_data.head())
數(shù)據(jù)存儲
最后一步是將數(shù)據(jù)加載到數(shù)據(jù)庫中,使得用戶易于搜索和分析。
在本文中,我們使用SQLite。它是一種存儲數(shù)據(jù)的輕量級數(shù)據(jù)庫。我們將在SQLite數(shù)據(jù)庫中創(chuàng)建一個(gè)名為employees(員工)的表。然后,我們將轉(zhuǎn)換后的數(shù)據(jù)插入到該表中。
import sqlite3
# Function to load transformed data into SQLite database
def load_data_to_db(data, db_name='employee_data.db'):
try:
# Connect to SQLite database (or create it if it doesn't exist)
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# Create table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS employees (
employee_id INTEGER PRIMARY KEY,
first_name TEXT,
last_name TEXT,
salary REAL,
age INTEGER,
department TEXT,
salary_band TEXT,
age_group TEXT
)
''')
# Insert data into the employees table
data.to_sql('employees', conn, if_exists='replace', index=False)
# Commit and close the connection
conn.commit()
print(f"Data loaded into {db_name} successfully")
# Query the data to verify it was loaded
query = "SELECT * FROM employees"
result = pd.read_sql(query, conn)
print("\nData loaded into the database:")
print(result.head()) # Print the first few rows of the data from the database
conn.close()
except Exception as e:
print(f"Error in loading data: {e}")
load_data_to_db(transformed_employee_data)
運(yùn)行完整的ETL管道
現(xiàn)在我們已完成了提取、轉(zhuǎn)換和加載等步驟,就可以將它們組合起來了。這將創(chuàng)建一個(gè)完整的ETL管道。該管道將獲取員工數(shù)據(jù),并清理和更改數(shù)據(jù)。最后,它將把數(shù)據(jù)保存在數(shù)據(jù)庫中。
def run_etl_pipeline(file_path, db_name='employee_data.db'):
# Extract
data = extract_employee_data(file_path)
if data is not None:
# Transform
transformed_data = transform_employee_data(data)
if transformed_data is not None:
# Load
load_data_to_db(transformed_data, db_name)
# Run the ETL pipeline
run_etl_pipeline('/content/employees_data.csv', 'employee_data.db')
現(xiàn)在大功告成了:我們的ETL管道已實(shí)現(xiàn)完畢,現(xiàn)在就可以執(zhí)行了。
ETL管道的幾個(gè)最佳實(shí)踐
下面是需要遵循的幾個(gè)最佳實(shí)踐,以便構(gòu)建高效可靠的ETL管道:
1. 利用模塊化:將管道分解為更小的、可重用的函數(shù)。
2. 錯(cuò)誤處理:在提取、轉(zhuǎn)換或加載期間為日志問題添加錯(cuò)誤處理機(jī)制。
3. 優(yōu)化性能:為大型數(shù)據(jù)集優(yōu)化查詢和管理內(nèi)存。
4. 自動化測試:自動測試轉(zhuǎn)換和數(shù)據(jù)格式,以確保準(zhǔn)確性。
結(jié)語
ETL管道是任何數(shù)據(jù)科學(xué)項(xiàng)目的關(guān)鍵。它們有助于處理和存儲數(shù)據(jù),以供進(jìn)行準(zhǔn)確的分析。我們演示了如何從CSV文件中獲取數(shù)據(jù),然后我們清理和更改了數(shù)據(jù),最后我們將其保存在SQLite數(shù)據(jù)庫中。
一個(gè)好的ETL管道可以確保數(shù)據(jù)井然有序。這種管道可加以改進(jìn),以處理更復(fù)雜的數(shù)據(jù)和存儲需求。它有助于創(chuàng)建可擴(kuò)展且可靠的數(shù)據(jù)解決方案。
原文標(biāo)題:Developing Robust ETL Pipelines for Data Science Projects,作者:Jayita Gulati