從數(shù)據(jù)庫獲取數(shù)據(jù),必須要了解Python生成器
介紹
作為數(shù)據(jù)工程師,我們經(jīng)常面臨這樣的情況:我們必須從運(yùn)營數(shù)據(jù)庫中獲取一個(gè)特別大的數(shù)據(jù)集,對其進(jìn)行一些轉(zhuǎn)換,然后將其寫回分析數(shù)據(jù)庫或云對象存儲(例如S3桶)。
如果數(shù)據(jù)集太大無法裝入內(nèi)存,但同時(shí)使用分布式計(jì)算不值得或不可行,該怎么辦呢?
在這種情況下,我們需要找到一種方法,在不影響數(shù)據(jù)團(tuán)隊(duì)其他同事(例如通過使用Airflow實(shí)例中可用內(nèi)存的大部分)的情況下完成工作。這就是Python生成器可能會派上用場的時(shí)候,通過避免內(nèi)存峰值來高效地從數(shù)據(jù)庫獲取數(shù)據(jù)。
事實(shí)上,在本教程中,我們將通過旋轉(zhuǎn)運(yùn)行三個(gè)服務(wù)(PostgresDB、Jupyter Notebook和MinIO)的Docker容器來模擬一個(gè)真實(shí)的端到端數(shù)據(jù)工作流程,探討在數(shù)據(jù)工程師中使用生成器的兩個(gè)實(shí)際用例。
Python中使用生成器的優(yōu)點(diǎn)
在Python中,標(biāo)準(zhǔn)函數(shù)計(jì)算并返回單個(gè)值然后終止,而生成器可以隨時(shí)間產(chǎn)生一系列值,根據(jù)需要暫停和恢復(fù)。生成器是一種特殊的函數(shù),它使用`yield`子句而不是`return`來產(chǎn)生一系列的值。值逐個(gè)創(chuàng)建,無需將整個(gè)序列存儲在內(nèi)存中。
當(dāng)調(diào)用生成器函數(shù)時(shí),它返回一個(gè)迭代器對象,可以用于迭代生成器產(chǎn)生的值的序列。例如,讓我們創(chuàng)建一個(gè)squares_generator(n)函數(shù),該函數(shù)生成介于零和輸入變量n之間的數(shù)字的平方:
def squares_generator(n):
num = 0
while num < n:
yield num * num
num += 1
當(dāng)調(diào)用該函數(shù)時(shí),它只返回一個(gè)迭代器:
squares_generator(n)
#Output:
# <generator object squares_generator at 0x10653bdd0>
為了觸發(fā)整個(gè)值序列,我們必須在循環(huán)中調(diào)用生成器函數(shù):
for num in squares_generator(5):
print(num)
#Output:
0
1
4
9
16
另一種更優(yōu)雅的選擇是創(chuàng)建一個(gè)生成器表達(dá)式,它執(zhí)行與上述函數(shù)相同的操作,但作為一行代碼:
n = 5
generator_exp = (num * num for num in range(n))
現(xiàn)在,可以直接使用`next()`方法訪問值:
print(next(generator_exp)) # 0
print(next(generator_exp)) # 1
print(next(generator_exp)) # 4
print(next(generator_exp)) # 9
print(next(generator_exp)) # 16
正如我們所看到的,生成器函數(shù)返回值的方式并不像常規(guī)Python函數(shù)那樣直觀,這可能是為什么許多數(shù)據(jù)工程師沒有像他們應(yīng)該的那樣經(jīng)常使用生成器的原因。
目標(biāo)與設(shè)置
本教程的目標(biāo)是:
- 從Postgres數(shù)據(jù)庫中獲取數(shù)據(jù)并將其存儲為pandas數(shù)據(jù)框。
- 將pandas數(shù)據(jù)框以parquet格式寫入S3桶。
每個(gè)目標(biāo)都將使用常規(guī)函數(shù)和生成器函數(shù)兩種方法實(shí)現(xiàn)。為了模擬這樣的工作流程,我們將使用三個(gè)服務(wù)旋轉(zhuǎn)一個(gè)Docker容器:
- Postgres數(shù)據(jù)庫(這個(gè)服務(wù)將是我們的源操作數(shù)據(jù)庫,從中獲取數(shù)據(jù)。Docker-compose還涉及創(chuàng)建一個(gè)mainDB,以及在名為transactions的表中插入500萬個(gè)模擬記錄。請注意:可以插入任意數(shù)量的行來模擬一個(gè)更大的數(shù)據(jù)集(在準(zhǔn)備本教程的材料時(shí),嘗試過5000萬和1億行),但Docker服務(wù)的性能會受到嚴(yán)重影響。)
- MinIO(這個(gè)服務(wù)將用于模擬AWS S3桶,然后使用awswrangler包幫助將pandas數(shù)據(jù)框以parquet格式寫入其中。)
- Jupyter Notebook(這個(gè)服務(wù)將用于通過熟悉的編譯器以交互方式運(yùn)行Python片段。)
下面的圖表是對到目前為止所描述的內(nèi)容的可視化表示:
第一步,我們項(xiàng)目的GitHub存儲庫并切換到相關(guān)文件夾:
git clone git@github.com:anbento0490/projects.git &&
cd fetch_data_with_python_generators
然后,我們可以運(yùn)行docker-compose來啟動這三個(gè)服務(wù):
docker compose up -d
[+] Running 5/5
? Network shared-network Created 0.0s
? Container jupyter-notebooks Started 1.0s
? Container minio Started 0.7s
? Container postgres-db Started 0.9s
? Container mc Started
最終,我們可以驗(yàn)證:
(1) 在Postgres數(shù)據(jù)庫中存在一個(gè)名為transactions的表,其中包含5百萬條記錄。
docker exec -it postgres-db /bin/bash
root@9632469c70e0:/# psql -U postgres
psql (13.13 (Debian 13.13-1.pgdg120+1))
Type "help" for help.
postgres=# \c mainDB
You are now connected to database "mainDB" as user "postgres".
mainDB=# select count(*) from transactions;
count
---------
5000000
(1 row)
(2) 可以通過端口localhost:9001訪問MinIO UI(在要求憑據(jù)時(shí)插入管理員和密碼),并且已經(jīng)創(chuàng)建了一個(gè)名為generators-test-bucket的空桶:
MinIO UI端口9001處的用戶界面
(3) 可以通過localhost:8889訪問Jupyter Notebook用戶界面,并通過以下方法檢索令牌:
docker exec -it jupyter-notebooks /bin/bash
root@eae08d1f4bf6:~# jupyter server list
Currently running servers:
http://eae08d1f4bf6:8888/?token=8a45d846d03cf0c0e4584c3b73af86ba5dk9e83c8ac47ee7 :: /home/jovyan
很好!我們已經(jīng)準(zhǔn)備好在Jupyter上運(yùn)行一些代碼了。但在我們這樣做之前,我們需要創(chuàng)建一個(gè)新的access_key和secret_access_key,以便能夠與MinIO桶進(jìn)行交互:
如何在MinIO中生成新的密鑰對
注意:MinIO桶的最酷的功能之一是,我們可以與它們交互,就像它們是AWS S3桶一樣(例如使用boto3、awswrangler等),但它們是免費(fèi)的,而且無需擔(dān)心暴露密鑰,因?yàn)樗鼈儍H存在于我們的本地環(huán)境中,并且除非持久保存,否則將在容器停止時(shí)被刪除。
現(xiàn)在,在生成器筆記本中,讓我們運(yùn)行以下代碼(確保替換secrets):
import psycopg2
import pandas as pd
import boto3
import awswrangler as wr
#######################################################
######## CONNECTING TO PG DB + CREATING CURSORS #######
connection = psycopg2.connect(user="postgres",
password="postgres",
port="5432",
database="mainDB")
cursor = connection.cursor()
query = "select * from transactions;"
#######################################################
######## CONNECTING TO MINIO BUCKET ###################
boto3.setup_default_session(aws_access_key_id = 'your_access_key',
aws_secret_access_key = 'your_secret_key')
bucket = 'generators-test-bucket'
folder_gen = 'data_gen'
folder_batch = 'data_batch'
parquet_file_name = 'transactions'
batch_size = 1000000
wr.config.s3_endpoint_url = 'http://minio:9000'
這將創(chuàng)建一個(gè)連接到mainDB的連接以及用于執(zhí)行查詢的游標(biāo)。還將設(shè)置一個(gè)default_session,以與generators-test-bucket進(jìn)行交互。
用例 #1:從數(shù)據(jù)庫讀取數(shù)據(jù)
作為數(shù)據(jù)工程師,在將大型數(shù)據(jù)集從數(shù)據(jù)庫或外部服務(wù)抓取到Python管道中時(shí),我們經(jīng)常需要在以下方面找到合適的平衡:
- 內(nèi)存:一次性拉取整個(gè)數(shù)據(jù)集可能導(dǎo)致OOM錯(cuò)誤或影響整個(gè)實(shí)例/集群的性能。
- 速度:逐行獲取數(shù)據(jù)也會導(dǎo)致昂貴的I/O網(wǎng)絡(luò)操作。
方法 #1:使用批處理
一個(gè)合理的折衷方案(在實(shí)踐中經(jīng)常使用)是以批處理方式獲取數(shù)據(jù),其中批處理的大小取決于可用內(nèi)存以及數(shù)據(jù)管道的速度要求。
# 1.1. CREATE DF USING BATCHES
def create_df_batch(cursor, batch_size):
print('Creating pandas DF using generator...')
colnames = ['transaction_id',
'user_id',
'product_name',
'transaction_date',
'amount_gbp']
df = pd.DataFrame(columns=colnames)
cursor.execute(query)
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
# some tramsformation
batch_df = pd.DataFrame(data = rows, columns=colnames)
df = pd.concat([df, batch_df], ignore_index=True)
print('DF successfully created!\n')
return df
上面的代碼執(zhí)行以下操作:
- 創(chuàng)建一個(gè)空的df(數(shù)據(jù)框);
- 執(zhí)行查詢,將整個(gè)結(jié)果緩存到游標(biāo)對象中;
- 初始化一個(gè)while循環(huán),每次迭代都獲取等于指定batch_size的行數(shù)(在此示例中為1百萬行),并使用這些數(shù)據(jù)創(chuàng)建一個(gè)batch_df(批數(shù)據(jù)框)。
- 最終將batch_df附加到主df。該過程重復(fù)進(jìn)行,直到整個(gè)數(shù)據(jù)集被遍歷。
讓我們明確一下:這只是一個(gè)基本示例,我們可以在while循環(huán)的一部分執(zhí)行許多其他操作(過濾、排序、聚合、將數(shù)據(jù)寫入其他位置等),而不僅僅是一次一個(gè)批次地創(chuàng)建df。當(dāng)在筆記本中執(zhí)行該函數(shù)時(shí),我們得到:
%%time
df_batch = create_df_batch(cursor, batch_size)
df_batch.head()
Output:
Creating pandas DF using generator...
DF successfully created!
CPU times: user 9.97 s, sys: 13.7 s, total: 23.7 s
Wall time: 25 s
df_batch數(shù)據(jù)框的前5行
方法 #2:使用生成器
一種不太常見但強(qiáng)大的數(shù)據(jù)工程師策略是使用生成器以流的形式獲取數(shù)據(jù):
# AUXILIARY FUNCTION
def generate_dataset(cursor):
cursor.execute(query)
for row in cursor.fetchall():
# some tramsformation
yield row
# 2.1. CREATE DF USING GENERATORS
def create_df_gen(cursor):
print('Creating pandas DF using generator...')
colnames = ['transaction_id',
'user_id',
'product_name',
'transaction_date',
'amount_gbp']
df = pd.DataFrame(data = generate_dataset(cursor), columns=colnames)
print('DF successfully created!\n')
return df
在上面的代碼片段中,我們創(chuàng)建了`generate_dataset` 輔助函數(shù),該函數(shù)執(zhí)行查詢,然后將行作為序列生成。該函數(shù)直接傳遞給`pd.DataFrame()` 子句的`data`參數(shù),該子句在背后遍歷所有獲取的記錄,直到行被耗盡。
同樣,這個(gè)例子非常基礎(chǔ)(主要是為了演示目的),但我們可以在輔助函數(shù)中執(zhí)行任何類型的過濾或轉(zhuǎn)換。當(dāng)執(zhí)行該函數(shù)時(shí),我們得到df_gen數(shù)據(jù)框的前5行
%%time
df_gen = create_df_gen(cursor)
df_gen.head()
Creating pandas DF using generator...
DF successfully created!
CPU times: user 9.04 s, sys: 2.1 s, total: 11.1 s
Wall time: 14.4 s
看起來似乎兩種方法最終都使用了同樣的內(nèi)存量(因?yàn)閐f都是以不同方式返回的),但事實(shí)并非如此,因?yàn)閿?shù)據(jù)在生成df本身時(shí)的處理方式是不同的:
- 對于方法 #1,是急切地獲取數(shù)據(jù),通過網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)交換有點(diǎn)低效,導(dǎo)致內(nèi)存占用峰值較高;
- 對于方法 #2,是懶惰地獲取數(shù)據(jù),只有在需要時(shí)才計(jì)算,并且逐個(gè)計(jì)算,從而降低內(nèi)存占用。
用例 #2:寫入云對象存儲
有時(shí),數(shù)據(jù)工程師需要獲取存儲在數(shù)據(jù)庫中的大量數(shù)據(jù),并將這些記錄外部共享(例如與監(jiān)管機(jī)構(gòu)、審計(jì)員、合作伙伴共享)。
一種常見的解決方案是創(chuàng)建一個(gè)云對象存儲,數(shù)據(jù)將被傳遞到該存儲中,以便第三方(具有適當(dāng)訪問權(quán)限的人)能夠讀取并將數(shù)據(jù)復(fù)制到其系統(tǒng)中。
實(shí)際上,我們創(chuàng)建了一個(gè)名為`generators-test-bucket`的桶,數(shù)據(jù)將以parquet格式寫入其中,利用了`awswrangler`包。
`awswrangler`的優(yōu)勢在于它與pandas數(shù)據(jù)框非常有效地配合,并允許以保留數(shù)據(jù)集結(jié)構(gòu)的方式將它們轉(zhuǎn)換為parquet格式。
方法 #1:使用批處理
與第一個(gè)用例一樣,一個(gè)常見的解決方案是以批處理方式獲取數(shù)據(jù),然后寫入數(shù)據(jù),直到整個(gè)數(shù)據(jù)集被遍歷:
# 1.2 WRITING DF TO MINIO BUCKET IN PARQUET FORMAT USING BATCHES
def write_df_to_s3_batch(cursor, bucket, folder, parquet_file_name, batch_size):
colnames = ['transaction_id',
'user_id',
'product_name',
'transaction_date',
'amount_gbp']
cursor.execute(query)
batch_num = 1
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
print(f"Writing DF batch #{batch_num} to S3 bucket...")
wr.s3.to_parquet(df= pd.DataFrame(data = rows, columns=colnames),
path=f's3://{bucket}/{folder}/{parquet_file_name}',
compression='gzip',
mode = 'append',
dataset=True)
print('Batch successfully written to S3 bucket!\n')
batch_num += 1
執(zhí)行`write_df_to_s3_batch()` 函數(shù)會在桶中創(chuàng)建五個(gè)parquet文件,每個(gè)文件包含1百萬條記錄:
write_df_to_s3_batch(cursor, bucket, folder_batch, parquet_file_name, batch_size)
Writing DF batch #1 to S3 bucket...
Batch successfully written to S3 bucket!
Writing DF batch #2 to S3 bucket...
Batch successfully written to S3 bucket!
Writing DF batch #3 to S3 bucket...
Batch successfully written to S3 bucket!
Writing DF batch #4 to S3 bucket...
Batch successfully written to S3 bucket!
Writing DF batch #5 to S3 bucket...
Batch successfully written to S3 bucket!
在MinIO中以批處理方式寫入的數(shù)據(jù)
方法 #2:使用生成器
或者,可以通過利用生成器提取數(shù)據(jù)并將其寫入桶中。由于生成器在提取和移動數(shù)據(jù)時(shí)不會導(dǎo)致內(nèi)存效率問題,我們甚至可以決定一次性寫入整個(gè)df:
# 2.2 WRITING DF TO MINIO BUCKET IN PARQUET FORMAT USING GENERATORS
def write_df_to_s3_gen(cursor, bucket, folder, parquet_file_name):
print('Writing DF to S3 bucket...')
colnames = ['transaction_id',
'user_id',
'product_name',
'transaction_date',
'amount_gbp']
wr.s3.to_parquet(df= pd.DataFrame(data = generate_dataset(cursor), columns=colnames),
path=f's3://{bucket}/{folder}/{parquet_file_name}',
compression='gzip',
mode = 'append',
dataset=True)
print('Data successfully written to S3 bucket!\n')
當(dāng)執(zhí)行`write_df_to_s3_gen()` 函數(shù)時(shí),將一個(gè)包含所有5百萬行的唯一較大parquet文件保存到桶中:
write_df_to_s3_gen(cursor, bucket, folder_gen, parquet_file_name)
Writing DF to S3 bucket...
Data successfully written to S3 bucket!
利用生成器寫入MinIO的數(shù)據(jù)