自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

基于PySpark SQL的媒體瀏覽日志ETL作業(yè)

數(shù)據(jù)庫 其他數(shù)據(jù)庫
pyspark除了官方的文檔,網(wǎng)上的教程資料一直很少,但基于調(diào)度平臺下,使用pyspark編寫代碼非常高效,程序本身是提交到spark集群中,性能上也是毫無問題的

pyspark除了官方的文檔,網(wǎng)上的教程資料一直很少,但基于調(diào)度平臺下,使用pyspark編寫代碼非常高效,程序本身是提交到spark集群中,性能上也是毫無問題的,在本文中,我們將深入探討基于Spark的媒體瀏覽日志ETL(提取、轉(zhuǎn)換、加載)流水線的詳細實現(xiàn),在展示如何使用PySpark SQL處理大規(guī)模的媒體瀏覽日志數(shù)據(jù),包括IP地址轉(zhuǎn)換、數(shù)據(jù)清洗、時間維度補充、碼表關(guān)聯(lián)等關(guān)鍵步驟。

一、環(huán)境配置

首先,我們需要創(chuàng)建一個SparkSession并導(dǎo)入必要的庫和設(shè)置默認參數(shù),包括與IP-to-Location數(shù)據(jù)庫的交互以及其他相關(guān)的配置。

如果pyspark僅僅是本地運行而不是提交集群時,可以使用findspark庫,它能夠幫助我們快速初始化Spark環(huán)境。在開始之前,確保您已經(jīng)成功安裝了findspark庫,并已經(jīng)下載并解壓了Spark二進制文件。將Spark的安裝路徑和Python解釋器路徑指定為變量。

import findspark

# 指定Spark的安裝路徑
spark_home = "/usr/local/spark"

# 指定用于Spark的Python解釋器路徑
python_path = "/home/hadoop/.conda/envs/sparkbox/bin/python3.6"

# 使用findspark.init方法初始化Spark環(huán)境
findspark.init(spark_home, python_path)

findspark.init方法將幫助設(shè)置PYSPARK_PYTHON和SPARK_HOME環(huán)境變量,確保正確的Spark庫和配置文件被加載。其簡化了Spark環(huán)境的初始化過程,避免手動配置環(huán)境變量。

二、數(shù)據(jù)處理

接下來,我們定義了一個NewsEtl類,用于執(zhí)行數(shù)據(jù)處理和轉(zhuǎn)換的各個步驟。這包括從HDFS中讀取媒體瀏覽日志數(shù)據(jù),進行IP地址轉(zhuǎn)、換,清洗數(shù)據(jù),添加時間維度,補充碼表信息等。

在spark_function中,我們詳細說明了數(shù)據(jù)處理的邏輯。這包括讀取媒體瀏覽日志數(shù)據(jù)、進行IP地址轉(zhuǎn)換、添加時間維度、補充碼表信息、數(shù)據(jù)清洗和最終寫入HDFS等步驟。

1.數(shù)據(jù)讀取

首先,我們使用PySpark的read方法從HDFS中讀取媒體瀏覽日志數(shù)據(jù)。我們指定了數(shù)據(jù)的schema,以確保正確地解析每一列。

df = spark.read.schema(schema).parquet(
"hdfs://xxx:8020/user/hive/warehouse/xxx.db/ods_media_browse_log").filter(
"dt in ({})".format(",".join(["'{}'".format(partition) for partition in latest_partitions])))

2.IP地址轉(zhuǎn)換

接下來,我們通過iptranslate函數(shù)將IP地址轉(zhuǎn)換為地理位置信息。這使用了XdbSearcher類,該類負責讀取xdb文件并執(zhí)行IP地址的二分查找。

# 根據(jù)IP地址獲取地點信息
from_ip_get_place_udf = udf(action.iptranslate, struct_schema)
df = df.withColumn('country', from_ip_get_place_udf(col('ip'), lit('country')))
df = df.withColumn("place", from_ip_get_place_udf(col('ip')))
df = df.withColumn("country", df["place"]["country"])
df = df.withColumn("city", df["place"]["city"])
df = df.withColumn("province", df["place"]["province"])
df = df.drop('place')

3.時間維度添加

我們生成當前時間的時間戳,并添加各種時間格式的列,包括年、季度、月、日、小時等。

# 生成當前時間的時間戳
df = df.withColumn("current_timestamp", from_unixtime(df["operation_time"] / 1000))
# 添加各種時間格式的列
df = df.withColumn("year", date_format("current_timestamp", "yyyy"))
df = df.withColumn("quarter", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("month", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("day", date_format("current_timestamp", "dd"))
df = df.withColumn("hour_time", date_format("current_timestamp", "yyyy-MM-dd HH"))
df = df.withColumn("dt", date_format("current_timestamp", "yyyy-MM-dd"))
df = df.withColumn("hour", date_format("current_timestamp", "HH"))
df = df.drop('current_timestamp')

4.數(shù)據(jù)清洗

最后,我們對數(shù)據(jù)進行清洗,包括將空值替換為默認值、字符串去除空格、數(shù)據(jù)類型轉(zhuǎn)換等。

# 數(shù)據(jù)清洗
newdf = newdf.withColumn("media_type", when(col("media_type").isNull(), 0).otherwise(col("media_type")))
newdf = newdf.withColumn("news_type", when(col("news_type").isNull(), 99).otherwise(col("news_type")))
newdf = newdf.withColumn("original_type", when(col("original_type").isNull(), 99).otherwise(col("original_type")))
# ...

5.最終寫入HDFS

最終,我們將處理后的數(shù)據(jù)寫入HDFS,采用分區(qū)方式存儲,以便更高效地管理和查詢。

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
newdf.write.partitionBy("dt", "hour").mode("overwrite").option('user', 'hive').parquet(
"hdfs://xxxx:8020/user/hive/warehouse/xxx.db/dwd_media_browse_log")

通過這一系列步驟,我們完成了對媒體瀏覽日志數(shù)據(jù)的全面處理,包括數(shù)據(jù)轉(zhuǎn)換、地理位置信息的添加、時間維度的補充和數(shù)據(jù)清洗等關(guān)鍵步驟。

三、結(jié)論

通過詳細的實現(xiàn)步驟,深入解析了基于Spark的媒體瀏覽日志ETL任務(wù)的構(gòu)建過程。這個任務(wù)可以根據(jù)具體需求進行調(diào)整和擴展,為大規(guī)模數(shù)據(jù)處理任務(wù)提供了一種高效而靈活的解決方案。

責任編輯:華軒 來源: 口袋大數(shù)據(jù)
相關(guān)推薦

2010-10-19 12:11:15

SQL Server定

2010-10-20 15:11:53

SQL Server作

2023-04-04 12:38:50

GPT機器人LLM

2009-04-16 08:32:43

Cooliris瀏覽器媒體中心

2010-10-20 17:00:51

SQL Server代

2019-07-08 09:10:48

TigGitLinux

2022-06-28 08:40:16

LokiPromtail日志報警

2011-04-06 14:16:49

SQL Server自動備份

2010-09-13 14:12:21

SQL Server日

2010-09-06 09:36:51

SQL語句

2009-02-04 16:11:45

2009-01-15 10:43:21

SCDMA通信產(chǎn)業(yè)McWiLL

2010-07-07 16:46:52

SQL Server日

2010-07-02 10:42:11

SQL Server

2010-11-10 11:54:32

SQL SERVER刪

2010-10-27 11:12:39

2009-04-13 08:40:30

AMD瀏覽器Fusion Medi

2012-04-11 10:16:02

EclipseIDE

2012-04-16 10:04:08

Eclipse瀏覽器IDE

2009-04-03 09:09:21

瀏覽器網(wǎng)絡(luò)辦公室
點贊
收藏

51CTO技術(shù)棧公眾號