自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

從數(shù)據(jù)庫獲取數(shù)據(jù),必須要了解Python生成器

開發(fā) 前端 數(shù)據(jù)庫
在本教程中,我們將通過旋轉(zhuǎn)運(yùn)行三個(gè)服務(wù)的Docker容器來模擬一個(gè)真實(shí)的端到端數(shù)據(jù)工作流程,探討在數(shù)據(jù)工程師中使用生成器的兩個(gè)實(shí)際用例。

介紹

作為數(shù)據(jù)工程師,我們經(jīng)常面臨這樣的情況:我們必須從運(yùn)營數(shù)據(jù)庫中獲取一個(gè)特別大的數(shù)據(jù)集,對其進(jìn)行一些轉(zhuǎn)換,然后將其寫回分析數(shù)據(jù)庫或云對象存儲(例如S3桶)。

如果數(shù)據(jù)集太大無法裝入內(nèi)存,但同時(shí)使用分布式計(jì)算不值得或不可行,該怎么辦呢?

在這種情況下,我們需要找到一種方法,在不影響數(shù)據(jù)團(tuán)隊(duì)其他同事(例如通過使用Airflow實(shí)例中可用內(nèi)存的大部分)的情況下完成工作。這就是Python生成器可能會派上用場的時(shí)候,通過避免內(nèi)存峰值來高效地從數(shù)據(jù)庫獲取數(shù)據(jù)。

事實(shí)上,在本教程中,我們將通過旋轉(zhuǎn)運(yùn)行三個(gè)服務(wù)(PostgresDB、Jupyter Notebook和MinIO)的Docker容器來模擬一個(gè)真實(shí)的端到端數(shù)據(jù)工作流程,探討在數(shù)據(jù)工程師中使用生成器的兩個(gè)實(shí)際用例。

Python中使用生成器的優(yōu)點(diǎn)

在Python中,標(biāo)準(zhǔn)函數(shù)計(jì)算并返回單個(gè)值然后終止,而生成器可以隨時(shí)間產(chǎn)生一系列值,根據(jù)需要暫停和恢復(fù)。生成器是一種特殊的函數(shù),它使用`yield`子句而不是`return`來產(chǎn)生一系列的值。值逐個(gè)創(chuàng)建,無需將整個(gè)序列存儲在內(nèi)存中。

當(dāng)調(diào)用生成器函數(shù)時(shí),它返回一個(gè)迭代器對象,可以用于迭代生成器產(chǎn)生的值的序列。例如,讓我們創(chuàng)建一個(gè)squares_generator(n)函數(shù),該函數(shù)生成介于零和輸入變量n之間的數(shù)字的平方:

def squares_generator(n):

    num = 0

    while num < n:

        yield num * num

        num += 1

當(dāng)調(diào)用該函數(shù)時(shí),它只返回一個(gè)迭代器:

squares_generator(n)

#Output:
# <generator object squares_generator at 0x10653bdd0>

為了觸發(fā)整個(gè)值序列,我們必須在循環(huán)中調(diào)用生成器函數(shù):

for num in squares_generator(5):
  print(num)

#Output:
0
1
4
9
16

另一種更優(yōu)雅的選擇是創(chuàng)建一個(gè)生成器表達(dá)式,它執(zhí)行與上述函數(shù)相同的操作,但作為一行代碼:

n = 5 
generator_exp = (num * num for num in range(n))

現(xiàn)在,可以直接使用`next()`方法訪問值:

print(next(generator_exp)) # 0
print(next(generator_exp)) # 1
print(next(generator_exp)) # 4
print(next(generator_exp)) # 9
print(next(generator_exp)) # 16

正如我們所看到的,生成器函數(shù)返回值的方式并不像常規(guī)Python函數(shù)那樣直觀,這可能是為什么許多數(shù)據(jù)工程師沒有像他們應(yīng)該的那樣經(jīng)常使用生成器的原因。

目標(biāo)與設(shè)置

本教程的目標(biāo)是:

  • 從Postgres數(shù)據(jù)庫中獲取數(shù)據(jù)并將其存儲為pandas數(shù)據(jù)框。
  • 將pandas數(shù)據(jù)框以parquet格式寫入S3桶。

每個(gè)目標(biāo)都將使用常規(guī)函數(shù)和生成器函數(shù)兩種方法實(shí)現(xiàn)。為了模擬這樣的工作流程,我們將使用三個(gè)服務(wù)旋轉(zhuǎn)一個(gè)Docker容器:

  • Postgres數(shù)據(jù)庫(這個(gè)服務(wù)將是我們的源操作數(shù)據(jù)庫,從中獲取數(shù)據(jù)。Docker-compose還涉及創(chuàng)建一個(gè)mainDB,以及在名為transactions的表中插入500萬個(gè)模擬記錄。請注意:可以插入任意數(shù)量的行來模擬一個(gè)更大的數(shù)據(jù)集(在準(zhǔn)備本教程的材料時(shí),嘗試過5000萬和1億行),但Docker服務(wù)的性能會受到嚴(yán)重影響。)
  • MinIO(這個(gè)服務(wù)將用于模擬AWS S3桶,然后使用awswrangler包幫助將pandas數(shù)據(jù)框以parquet格式寫入其中。)
  • Jupyter Notebook(這個(gè)服務(wù)將用于通過熟悉的編譯器以交互方式運(yùn)行Python片段。)

下面的圖表是對到目前為止所描述的內(nèi)容的可視化表示:

第一步,我們項(xiàng)目的GitHub存儲庫并切換到相關(guān)文件夾:

git clone git@github.com:anbento0490/projects.git &&
cd fetch_data_with_python_generators

然后,我們可以運(yùn)行docker-compose來啟動這三個(gè)服務(wù):

docker compose up -d

[+] Running 5/5
 ? Network shared-network                 Created                                                 0.0s
 ? Container jupyter-notebooks            Started                                                 1.0s
 ? Container minio                        Started                                                 0.7s
 ? Container postgres-db                  Started                                                 0.9s
 ? Container mc                           Started

最終,我們可以驗(yàn)證:

(1) 在Postgres數(shù)據(jù)庫中存在一個(gè)名為transactions的表,其中包含5百萬條記錄。

docker exec -it postgres-db /bin/bash

root@9632469c70e0:/# psql -U postgres

psql (13.13 (Debian 13.13-1.pgdg120+1))
Type "help" for help.

postgres=# \c mainDB
You are now connected to database "mainDB" as user "postgres".

mainDB=# select count(*) from transactions;
  count
---------
 5000000
(1 row)

(2) 可以通過端口localhost:9001訪問MinIO UI(在要求憑據(jù)時(shí)插入管理員和密碼),并且已經(jīng)創(chuàng)建了一個(gè)名為generators-test-bucket的空桶:

  MinIO UI端口9001處的用戶界面

(3) 可以通過localhost:8889訪問Jupyter Notebook用戶界面,并通過以下方法檢索令牌:

docker exec -it jupyter-notebooks /bin/bash

root@eae08d1f4bf6:~# jupyter server list

Currently running servers:
http://eae08d1f4bf6:8888/?token=8a45d846d03cf0c0e4584c3b73af86ba5dk9e83c8ac47ee7 :: /home/jovyan

很好!我們已經(jīng)準(zhǔn)備好在Jupyter上運(yùn)行一些代碼了。但在我們這樣做之前,我們需要創(chuàng)建一個(gè)新的access_key和secret_access_key,以便能夠與MinIO桶進(jìn)行交互:

如何在MinIO中生成新的密鑰對

注意:MinIO桶的最酷的功能之一是,我們可以與它們交互,就像它們是AWS S3桶一樣(例如使用boto3、awswrangler等),但它們是免費(fèi)的,而且無需擔(dān)心暴露密鑰,因?yàn)樗鼈儍H存在于我們的本地環(huán)境中,并且除非持久保存,否則將在容器停止時(shí)被刪除。

現(xiàn)在,在生成器筆記本中,讓我們運(yùn)行以下代碼(確保替換secrets):

import psycopg2
import pandas as pd
import boto3
import awswrangler as wr

#######################################################
######## CONNECTING TO PG DB + CREATING CURSORS #######
connection = psycopg2.connect(user="postgres",
                              password="postgres",
                              port="5432",
                              database="mainDB")
cursor = connection.cursor()

query = "select * from transactions;"

#######################################################
######## CONNECTING TO MINIO BUCKET ###################

boto3.setup_default_session(aws_access_key_id = 'your_access_key',
                            aws_secret_access_key = 'your_secret_key')

bucket = 'generators-test-bucket'
folder_gen = 'data_gen'
folder_batch = 'data_batch'
parquet_file_name = 'transactions'
batch_size = 1000000

wr.config.s3_endpoint_url = 'http://minio:9000'

這將創(chuàng)建一個(gè)連接到mainDB的連接以及用于執(zhí)行查詢的游標(biāo)。還將設(shè)置一個(gè)default_session,以與generators-test-bucket進(jìn)行交互。

用例 #1:從數(shù)據(jù)庫讀取數(shù)據(jù)

作為數(shù)據(jù)工程師,在將大型數(shù)據(jù)集從數(shù)據(jù)庫或外部服務(wù)抓取到Python管道中時(shí),我們經(jīng)常需要在以下方面找到合適的平衡:

  • 內(nèi)存:一次性拉取整個(gè)數(shù)據(jù)集可能導(dǎo)致OOM錯(cuò)誤或影響整個(gè)實(shí)例/集群的性能。
  • 速度:逐行獲取數(shù)據(jù)也會導(dǎo)致昂貴的I/O網(wǎng)絡(luò)操作。

方法 #1:使用批處理

一個(gè)合理的折衷方案(在實(shí)踐中經(jīng)常使用)是以批處理方式獲取數(shù)據(jù),其中批處理的大小取決于可用內(nèi)存以及數(shù)據(jù)管道的速度要求。

# 1.1. CREATE DF USING BATCHES
def create_df_batch(cursor, batch_size):

    print('Creating pandas DF using generator...')
    colnames = ['transaction_id', 
                'user_id', 
                'product_name', 
                'transaction_date', 
                'amount_gbp']
    
    df = pd.DataFrame(columns=colnames)
    cursor.execute(query)

    while True:
        rows = cursor.fetchmany(batch_size)
        if not rows:
            break
        # some tramsformation
        batch_df = pd.DataFrame(data = rows, columns=colnames)        
        df = pd.concat([df, batch_df], ignore_index=True)

    print('DF successfully created!\n')

    return df

上面的代碼執(zhí)行以下操作:

  • 創(chuàng)建一個(gè)空的df(數(shù)據(jù)框);
  • 執(zhí)行查詢,將整個(gè)結(jié)果緩存到游標(biāo)對象中;
  • 初始化一個(gè)while循環(huán),每次迭代都獲取等于指定batch_size的行數(shù)(在此示例中為1百萬行),并使用這些數(shù)據(jù)創(chuàng)建一個(gè)batch_df(批數(shù)據(jù)框)。
  • 最終將batch_df附加到主df。該過程重復(fù)進(jìn)行,直到整個(gè)數(shù)據(jù)集被遍歷。

讓我們明確一下:這只是一個(gè)基本示例,我們可以在while循環(huán)的一部分執(zhí)行許多其他操作(過濾、排序、聚合、將數(shù)據(jù)寫入其他位置等),而不僅僅是一次一個(gè)批次地創(chuàng)建df。當(dāng)在筆記本中執(zhí)行該函數(shù)時(shí),我們得到:

%%time 
df_batch = create_df_batch(cursor, batch_size)
df_batch.head()

Output:

Creating pandas DF using generator...
DF successfully created!

CPU times: user 9.97 s, sys: 13.7 s, total: 23.7 s
Wall time: 25 s

df_batch數(shù)據(jù)框的前5行

方法 #2:使用生成器

一種不太常見但強(qiáng)大的數(shù)據(jù)工程師策略是使用生成器以流的形式獲取數(shù)據(jù):

# AUXILIARY FUNCTION
def generate_dataset(cursor):
    
    cursor.execute(query)
    
    for row in cursor.fetchall():
        # some tramsformation
        yield row 

# 2.1. CREATE DF USING GENERATORS
def create_df_gen(cursor):
    print('Creating pandas DF using generator...')

    colnames = ['transaction_id', 
                'user_id', 
                'product_name', 
                'transaction_date', 
                'amount_gbp']
    
    df = pd.DataFrame(data = generate_dataset(cursor), columns=colnames)

    print('DF successfully created!\n')
    
    return df

在上面的代碼片段中,我們創(chuàng)建了`generate_dataset` 輔助函數(shù),該函數(shù)執(zhí)行查詢,然后將行作為序列生成。該函數(shù)直接傳遞給`pd.DataFrame()` 子句的`data`參數(shù),該子句在背后遍歷所有獲取的記錄,直到行被耗盡。

同樣,這個(gè)例子非常基礎(chǔ)(主要是為了演示目的),但我們可以在輔助函數(shù)中執(zhí)行任何類型的過濾或轉(zhuǎn)換。當(dāng)執(zhí)行該函數(shù)時(shí),我們得到df_gen數(shù)據(jù)框的前5行

%%time 
df_gen = create_df_gen(cursor)
df_gen.head()

Creating pandas DF using generator...
DF successfully created!

CPU times: user 9.04 s, sys: 2.1 s, total: 11.1 s
Wall time: 14.4 s

看起來似乎兩種方法最終都使用了同樣的內(nèi)存量(因?yàn)閐f都是以不同方式返回的),但事實(shí)并非如此,因?yàn)閿?shù)據(jù)在生成df本身時(shí)的處理方式是不同的:

  • 對于方法 #1,是急切地獲取數(shù)據(jù),通過網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)交換有點(diǎn)低效,導(dǎo)致內(nèi)存占用峰值較高;
  • 對于方法 #2,是懶惰地獲取數(shù)據(jù),只有在需要時(shí)才計(jì)算,并且逐個(gè)計(jì)算,從而降低內(nèi)存占用。

用例 #2:寫入云對象存儲

有時(shí),數(shù)據(jù)工程師需要獲取存儲在數(shù)據(jù)庫中的大量數(shù)據(jù),并將這些記錄外部共享(例如與監(jiān)管機(jī)構(gòu)、審計(jì)員、合作伙伴共享)。

一種常見的解決方案是創(chuàng)建一個(gè)云對象存儲,數(shù)據(jù)將被傳遞到該存儲中,以便第三方(具有適當(dāng)訪問權(quán)限的人)能夠讀取并將數(shù)據(jù)復(fù)制到其系統(tǒng)中。

實(shí)際上,我們創(chuàng)建了一個(gè)名為`generators-test-bucket`的桶,數(shù)據(jù)將以parquet格式寫入其中,利用了`awswrangler`包。

`awswrangler`的優(yōu)勢在于它與pandas數(shù)據(jù)框非常有效地配合,并允許以保留數(shù)據(jù)集結(jié)構(gòu)的方式將它們轉(zhuǎn)換為parquet格式。

方法 #1:使用批處理

與第一個(gè)用例一樣,一個(gè)常見的解決方案是以批處理方式獲取數(shù)據(jù),然后寫入數(shù)據(jù),直到整個(gè)數(shù)據(jù)集被遍歷:


# 1.2 WRITING DF TO MINIO BUCKET IN PARQUET FORMAT USING BATCHES
def write_df_to_s3_batch(cursor, bucket, folder, parquet_file_name, batch_size):
    colnames = ['transaction_id', 
                'user_id', 
                'product_name', 
                'transaction_date', 
                'amount_gbp']
    cursor.execute(query)
    batch_num = 1
    while True:
        rows = cursor.fetchmany(batch_size)
        if not rows:
            break
        print(f"Writing DF batch #{batch_num} to S3 bucket...")
        wr.s3.to_parquet(df= pd.DataFrame(data = rows, columns=colnames),
                         path=f's3://{bucket}/{folder}/{parquet_file_name}',
                         compression='gzip',
                         mode = 'append',
                         dataset=True)
        print('Batch successfully written to S3 bucket!\n')
        batch_num += 1

執(zhí)行`write_df_to_s3_batch()` 函數(shù)會在桶中創(chuàng)建五個(gè)parquet文件,每個(gè)文件包含1百萬條記錄:

write_df_to_s3_batch(cursor, bucket, folder_batch, parquet_file_name, batch_size)

Writing DF batch #1 to S3 bucket...
Batch successfully written to S3 bucket!

Writing DF batch #2 to S3 bucket...
Batch successfully written to S3 bucket!

Writing DF batch #3 to S3 bucket...
Batch successfully written to S3 bucket!

Writing DF batch #4 to S3 bucket...
Batch successfully written to S3 bucket!

Writing DF batch #5 to S3 bucket...
Batch successfully written to S3 bucket!

在MinIO中以批處理方式寫入的數(shù)據(jù)

方法 #2:使用生成器

或者,可以通過利用生成器提取數(shù)據(jù)并將其寫入桶中。由于生成器在提取和移動數(shù)據(jù)時(shí)不會導(dǎo)致內(nèi)存效率問題,我們甚至可以決定一次性寫入整個(gè)df:

# 2.2 WRITING DF TO MINIO BUCKET IN PARQUET FORMAT USING GENERATORS
def write_df_to_s3_gen(cursor, bucket, folder, parquet_file_name):
    print('Writing DF to S3 bucket...')

    colnames = ['transaction_id', 
                'user_id', 
                'product_name', 
                'transaction_date', 
                'amount_gbp']
    
    wr.s3.to_parquet(df= pd.DataFrame(data = generate_dataset(cursor), columns=colnames),
             path=f's3://{bucket}/{folder}/{parquet_file_name}',
             compression='gzip',
             mode = 'append',
             dataset=True)
    print('Data successfully written to S3 bucket!\n')

當(dāng)執(zhí)行`write_df_to_s3_gen()` 函數(shù)時(shí),將一個(gè)包含所有5百萬行的唯一較大parquet文件保存到桶中:

write_df_to_s3_gen(cursor, bucket, folder_gen, parquet_file_name)

Writing DF to S3 bucket...
Data successfully written to S3 bucket!

利用生成器寫入MinIO的數(shù)據(jù)

責(zé)任編輯:趙寧寧 來源: 小白玩轉(zhuǎn)Python
相關(guān)推薦

2018-09-25 08:33:38

數(shù)據(jù)庫鎖JavaSQL

2018-11-08 12:07:38

備份手動磁盤

2018-09-21 11:11:34

備份離線自動

2018-11-19 10:10:51

Python數(shù)據(jù)庫隨機(jī)生成器

2022-01-26 23:16:25

開源NLP 庫GitHub

2011-06-23 17:13:07

SEO

2016-12-23 08:59:00

AB 測試CRO

2023-04-26 16:34:12

2021-06-07 11:33:24

服務(wù)器優(yōu)化TIME-WAIT

2017-10-29 06:50:30

前端開發(fā)CSSWeb

2015-10-23 15:22:16

AsyncTask基礎(chǔ)Android

2021-09-15 09:51:36

數(shù)據(jù)庫架構(gòu)技術(shù)

2021-03-11 10:49:27

數(shù)據(jù)管理

2018-07-12 11:11:46

人工智能AI術(shù)語

2020-08-21 13:15:29

開發(fā)技能代碼

2018-04-19 13:43:15

區(qū)塊鏈人工智能Go語言

2021-04-27 22:27:19

手機(jī)安卓蘋果

2017-09-06 09:26:03

Python生成器協(xié)程

2021-12-04 22:07:44

Python

2020-11-29 16:52:13

數(shù)據(jù)庫SQL數(shù)據(jù)分析
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號