基于PySpark SQL的媒體瀏覽日志ETL作業(yè)
pyspark除了官方的文檔,網(wǎng)上的教程資料一直很少,但基于調(diào)度平臺下,使用pyspark編寫代碼非常高效,程序本身是提交到spark集群中,性能上也是毫無問題的,在本文中,我們將深入探討基于Spark的媒體瀏覽日志ETL(提取、轉(zhuǎn)換、加載)流水線的詳細實現(xiàn),在展示如何使用PySpark SQL處理大規(guī)模的媒體瀏覽日志數(shù)據(jù),包括IP地址轉(zhuǎn)換、數(shù)據(jù)清洗、時間維度補充、碼表關(guān)聯(lián)等關(guān)鍵步驟。
一、環(huán)境配置
首先,我們需要創(chuàng)建一個SparkSession并導(dǎo)入必要的庫和設(shè)置默認參數(shù),包括與IP-to-Location數(shù)據(jù)庫的交互以及其他相關(guān)的配置。
如果pyspark僅僅是本地運行而不是提交集群時,可以使用findspark庫,它能夠幫助我們快速初始化Spark環(huán)境。在開始之前,確保您已經(jīng)成功安裝了findspark庫,并已經(jīng)下載并解壓了Spark二進制文件。將Spark的安裝路徑和Python解釋器路徑指定為變量。
import findspark
# 指定Spark的安裝路徑
spark_home = "/usr/local/spark"
# 指定用于Spark的Python解釋器路徑
python_path = "/home/hadoop/.conda/envs/sparkbox/bin/python3.6"
# 使用findspark.init方法初始化Spark環(huán)境
findspark.init(spark_home, python_path)
findspark.init方法將幫助設(shè)置PYSPARK_PYTHON和SPARK_HOME環(huán)境變量,確保正確的Spark庫和配置文件被加載。其簡化了Spark環(huán)境的初始化過程,避免手動配置環(huán)境變量。
二、數(shù)據(jù)處理
接下來,我們定義了一個NewsEtl類,用于執(zhí)行數(shù)據(jù)處理和轉(zhuǎn)換的各個步驟。這包括從HDFS中讀取媒體瀏覽日志數(shù)據(jù),進行IP地址轉(zhuǎn)、換,清洗數(shù)據(jù),添加時間維度,補充碼表信息等。
在spark_function中,我們詳細說明了數(shù)據(jù)處理的邏輯。這包括讀取媒體瀏覽日志數(shù)據(jù)、進行IP地址轉(zhuǎn)換、添加時間維度、補充碼表信息、數(shù)據(jù)清洗和最終寫入HDFS等步驟。
1.數(shù)據(jù)讀取
首先,我們使用PySpark的read方法從HDFS中讀取媒體瀏覽日志數(shù)據(jù)。我們指定了數(shù)據(jù)的schema,以確保正確地解析每一列。
df = spark.read.schema(schema).parquet(
"hdfs://xxx:8020/user/hive/warehouse/xxx.db/ods_media_browse_log").filter(
"dt in ({})".format(",".join(["'{}'".format(partition) for partition in latest_partitions])))
2.IP地址轉(zhuǎn)換
接下來,我們通過iptranslate函數(shù)將IP地址轉(zhuǎn)換為地理位置信息。這使用了XdbSearcher類,該類負責讀取xdb文件并執(zhí)行IP地址的二分查找。
# 根據(jù)IP地址獲取地點信息
from_ip_get_place_udf = udf(action.iptranslate, struct_schema)
df = df.withColumn('country', from_ip_get_place_udf(col('ip'), lit('country')))
df = df.withColumn("place", from_ip_get_place_udf(col('ip')))
df = df.withColumn("country", df["place"]["country"])
df = df.withColumn("city", df["place"]["city"])
df = df.withColumn("province", df["place"]["province"])
df = df.drop('place')
3.時間維度添加
我們生成當前時間的時間戳,并添加各種時間格式的列,包括年、季度、月、日、小時等。
# 生成當前時間的時間戳
df = df.withColumn("current_timestamp", from_unixtime(df["operation_time"] / 1000))
# 添加各種時間格式的列
df = df.withColumn("year", date_format("current_timestamp", "yyyy"))
df = df.withColumn("quarter", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("month", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("day", date_format("current_timestamp", "dd"))
df = df.withColumn("hour_time", date_format("current_timestamp", "yyyy-MM-dd HH"))
df = df.withColumn("dt", date_format("current_timestamp", "yyyy-MM-dd"))
df = df.withColumn("hour", date_format("current_timestamp", "HH"))
df = df.drop('current_timestamp')
4.數(shù)據(jù)清洗
最后,我們對數(shù)據(jù)進行清洗,包括將空值替換為默認值、字符串去除空格、數(shù)據(jù)類型轉(zhuǎn)換等。
# 數(shù)據(jù)清洗
newdf = newdf.withColumn("media_type", when(col("media_type").isNull(), 0).otherwise(col("media_type")))
newdf = newdf.withColumn("news_type", when(col("news_type").isNull(), 99).otherwise(col("news_type")))
newdf = newdf.withColumn("original_type", when(col("original_type").isNull(), 99).otherwise(col("original_type")))
# ...
5.最終寫入HDFS
最終,我們將處理后的數(shù)據(jù)寫入HDFS,采用分區(qū)方式存儲,以便更高效地管理和查詢。
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
newdf.write.partitionBy("dt", "hour").mode("overwrite").option('user', 'hive').parquet(
"hdfs://xxxx:8020/user/hive/warehouse/xxx.db/dwd_media_browse_log")
通過這一系列步驟,我們完成了對媒體瀏覽日志數(shù)據(jù)的全面處理,包括數(shù)據(jù)轉(zhuǎn)換、地理位置信息的添加、時間維度的補充和數(shù)據(jù)清洗等關(guān)鍵步驟。
三、結(jié)論
通過詳細的實現(xiàn)步驟,深入解析了基于Spark的媒體瀏覽日志ETL任務(wù)的構(gòu)建過程。這個任務(wù)可以根據(jù)具體需求進行調(diào)整和擴展,為大規(guī)模數(shù)據(jù)處理任務(wù)提供了一種高效而靈活的解決方案。