自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

基于Spark的數據分析實踐

大數據 Spark
本文主要分析了 Spark RDD 以及 RDD 作為開發(fā)的不足之處,介紹了 SparkSQL 對已有的常見數據系統的操作方法,以及重點介紹了普元在眾多數據開發(fā)項目中總結的基于 SparkSQL Flow 開發(fā)框架。

引言:

Spark是在借鑒了MapReduce之上發(fā)展而來的,繼承了其分布式并行計算的優(yōu)點并改進了MapReduce明顯的缺陷。Spark主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等組件。

本文主要分析了 Spark RDD 以及 RDD 作為開發(fā)的不足之處,介紹了 SparkSQL 對已有的常見數據系統的操作方法,以及重點介紹了普元在眾多數據開發(fā)項目中總結的基于 SparkSQL Flow 開發(fā)框架。

目錄:

  1. Spark RDD
  2. 基于Spark RDD數據開發(fā)的不足
  3. SparkSQL
  4. SparkSQL Flow

一、Spark RDD

RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區(qū)、元素可并行計算的集合。

RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。

//Scala 在內存中使用列表創(chuàng)建

  1. val lines = List(“A”, “B”, “C”, “D” …) 
  2. val rdd:RDD = sc.parallelize(lines); 

//以文本文件創(chuàng)建

  1. val rdd:RDD[String] = sc.textFile(“hdfs://path/filename”) 

Spark RDD Partition 分區(qū)劃分 

基于 Spark 的數據分析實踐

新版本的 Hadoop 已經把 BlockSize 改為 128M,也就是說每個分區(qū)處理的數據量更大。

Spark 讀取文件分區(qū)的核心原理

本質上,Spark 是利用了 Hadoop 的底層對數據進行分區(qū)的 API(InputFormat):

  1. public abstract class InputFormat<K,V>{ 
  2.  public abstract List<InputSplit> getSplits(JobContextcontext 
  3.  ) throwsIOException,InterruptedException; 
  4.   
  5.  public abstract RecordReader<K,V> createRecordReader(InputSplitsplit, 
  6.  TaskAttemptContextcontext 
  7.  )throwsIOException,InterruptedException; 

Spark 任務提交后通過對輸入進行 Split,在 RDD 構造階段,只是判斷是否可 Split(如果參數異常一定在此階段報出異常),并且 Split 后每個 InputSplit 都是一個分區(qū)。只有在Action 算子提交后,才真正用 getSplits 返回的 InputSplit 通過 createRecordReader 獲得每個 Partition 的連接。

然后通過 RecordReader 的 next() 遍歷分區(qū)內的數據。

Spark RDD 轉換函數和提交函數 

基于 Spark 的數據分析實踐

Spark RDD 的眾多函數可分為兩大類Transformation 與 Action。Transformation 與 Action 的區(qū)別在于,對 RDD 進行 Transformation 并不會觸發(fā)計算:Transformation 方法所產生的 RDD 對象只會記錄住該 RDD 所依賴的 RDD 以及計算產生該 RDD 的數據的方式;只有在用戶進行 Action 操作時,Spark 才會調度 RDD 計算任務,依次為各個 RDD 計算數據。這就是 Spark RDD 內函數的“懶加載”特性。

二、基于Spark RDD數據開發(fā)的不足

由于MapReduce的shuffle過程需寫磁盤,比較影響性能;而Spark利用RDD技術,計算在內存中流式進行。另外 MapReduce計算框架(API)比較局限, 使用需要關注的參數眾多,而Spark則是中間結果自動推斷,通過對數據集上鏈式執(zhí)行函數具備一定的靈活性。

即使 SparkRDD 相對于 MapReduce 提高很大的便利性,但在使用上仍然有許多問題。體現在一下幾個方面:

  1. RDD 函數眾多,開發(fā)者不容易掌握,部分函數使用不當 shuffle時造成數據傾斜影響性能;
  2. RDD 關注點仍然是Spark太底層的 API,基于 Spark RDD的開發(fā)是基于特定語言(Scala,Python,Java)的函數開發(fā),無法以數據的視界來開發(fā)數據;
  3. 對 RDD 轉換算子函數內部分常量、變量、廣播變量使用不當,會造成不可控的異常;
  4. 對多種數據開發(fā),需各自開發(fā)RDD的轉換,樣板代碼較多,無法有效重利用;
  5. 其它在運行期可能發(fā)生的異常。如:對象無法序列化等運行期才能發(fā)現的異常。

三、SparkSQL

Spark 從 1.3 版本開始原有 SchemaRDD 的基礎上提供了類似Pandas DataFrame API。新的DataFrame API不僅可以大幅度降低普通開發(fā)者的學習門檻,同時還支持Scala、Java與Python三種語言。更重要的是,由于脫胎自SchemaRDD,DataFrame天然適用于分布式大數據場景。 

基于 Spark 的數據分析實踐

一般的數據處理步驟:讀入數據 -> 對數據進行處理 -> 分析結果 -> 寫入結果

SparkSQL 結構化數據

  • 處理結構化數據(如 CSV,JSON,Parquet 等);
  • 把已經結構化數據抽象成 DataFrame (HiveTable);
  • 非結構化數據通過 RDD.map.filter 轉換成結構化進行處理;
  • 按照列式數據庫,只加載非結構化中可結構化的部分列(Hbase,MongoDB);

處理非結構化數據,不能簡單的用 DataFrame 裝載。而是要用 SparkRDD 把數據讀入,在通過一系列的 Transformer Method 把非結構化的數據加工為結構化,或者過濾到不合法的數據。

SparkSQL DataFrame 

基于 Spark 的數據分析實踐

SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一種以RDD為基礎的分布式數據集,類似于傳統數據庫中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。如果熟悉 Python Pandas 庫中的 DataFrame 結構,則會對 SparkSQL DataFrame 概念非常熟悉。

TextFile DataFrame

  1. import.org.apache.spark.sql._ 
  2. //定義數據的列名稱和類型 
  3. valdt=StructType(List(id:String,name:String,gender:String,age:Int)) 
  4. ​ 
  5. //導入user_info.csv文件并指定分隔符 
  6. vallines = sc.textFile("/path/user_info.csv").map(_.split(",")) 
  7. ​ 
  8. //將表結構和數據關聯起來,把讀入的數據user.csv映射成行,構成數據集 
  9. valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt)) 
  10. ​ 
  11. //通過SparkSession.createDataFrame()創(chuàng)建表,并且數據表表頭 
  12. val df= spark.createDataFrame(rowRDD, dt) 

讀取規(guī)則數據文件作為DataFrame

  1. SparkSession.Builder builder = SparkSession.builder() 
  2. Builder.setMaster("local").setAppName("TestSparkSQLApp"
  3. SparkSession spark = builder.getOrCreate(); 
  4. SQLContext sqlContext = spark.sqlContext(); 
  5. ​ 
  6. # 讀取 JSON 數據,path 可為文件或者目錄 
  7. valdf=sqlContext.read().json(path); 
  8. ​ 
  9. # 讀取 HadoopParquet 文件 
  10. vardf=sqlContext.read().parquet(path); 
  11. ​ 
  12. # 讀取 HadoopORC 文件 
  13. vardf=sqlContext.read().orc(path); 

JSON 文件為每行一個 JSON 對象的文件類型,行尾無須逗號。文件頭也無須[]指定為數組;SparkSQL 讀取是只是按照每行一條 JSON Record序列化;

Parquet文件

  1. Configurationconfig = new Configuration(); 
  2. ParquetFileReaderreader = ParquetFileReader.open
  3.  HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf)); 
  4. Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData(); 
  5. String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata"); 

allFiedls 的值就是各字段的名稱和具體的類型,整體是一個json格式進行展示。

讀取 Hive 表作為 DataFrame

Spark2 API 推薦通過 SparkSession.Builder 的 Builder 模式創(chuàng)建 SparkContext。 Builder.getOrCreate() 用于創(chuàng)建 SparkSession,SparkSession 是 SparkContext 的封裝。

在Spark1.6中有兩個核心組件SQLcontext和HiveContext。SQLContext 用于處理在 SparkSQL 中動態(tài)注冊的表,HiveContext 用于處理 Hive 中的表。

從Spark2.0以上的版本開始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可執(zhí)行 Hive 中的表,也可執(zhí)行內部注冊的表;

在需要執(zhí)行 Hive 表時,只需要在 SparkSession.Builder 中開啟 Hive 支持即可(enableHiveSupport())。

  1. SparkSession.Builder builder = SparkSession.builder().enableHiveSupport(); 
  2. SparkSession spark = builder.getOrCreate(); 
  3. SQLContext sqlContext = spark.sqlContext(); 

// db 指 Hive 庫中的數據庫名,如果不寫默認為 default

// tableName 指 hive 庫的數據表名

  1. sqlContext.sql(“select * from db.tableName”) 

SparkSQL ThriftServer

//首先打開 Hive 的 Metastore服務

  1. hive$bin/hive –-service metastore –p 8093 

//把 Spark 的相關 jar 上傳到hadoophdfs指定目錄,用于指定sparkonyarn的依賴 jar

  1. spark$hadoop fs –put jars/*.jar /lib/spark2 

// 啟動 spark thriftserver 服務

  1. spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf  
  2. spark.yarn.jars=hdfs:///lib/spark2/*.jar 

當hdfs 上傳了spark 依賴 jar 時,通過spark.yarn.jars 可看到日志 spark 無須每個job 都上傳jar,可節(jié)省啟動時間

  1. 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar 
  2. 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar 

//通過 spark bin 下的 beeline 工具,可以連接到 spark ThriftServer(SparkOnHive)

  1. bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop 
  • -u 是指定 beeline 的執(zhí)行驅動地址;
  • -n 是指定登陸到 spark Session 上的用戶名稱;

Beeline 還支持傳入-e 可傳入一行 SQL,

  • -e query that should be executed

也可通過 –f 指定一個 SQL File,內部可用逗號分隔的多個 SQL(存儲過程)

  • -f script file that should be executed

SparkSQL Beeline 的執(zhí)行效果展示 

基于 Spark 的數據分析實踐

SparkSQL ThriftServer 

基于 Spark 的數據分析實踐

對于 SparkSQL ThriftServer 服務,每個登陸的用戶都有創(chuàng)建的 SparkSession,并且執(zhí)行的對個 SQL 會通過時間順序列表展示。

SparkSQL ThriftServer 服務可用于其他支持的數據庫工具創(chuàng)建查詢,也用于第三方的 BI 工具,如 tableau。

四、SparkSQL Flow

SparkSQL Flow 是以 SparkSQL 為基礎,開發(fā)的統一的基于 XML 配置化的可執(zhí)行一連串的 SQL 操作,這一連串的 SQL 操作定義為一個 Flow。下文開始 SparkSQL Flow 的介紹:

SparkSQL Flow 是基于 SparkSQL 開發(fā)的一種基于 XML 配置化的 SQL 數據流轉處理模型。該模型簡化了 SparkSQL 、Spark RDD的開發(fā),并且降低開發(fā)了難度,適合了解數據業(yè)務但無法駕馭大數據以及 Spark 技術的開發(fā)者。

  • 一個由普元技術部提供的基于 SparkSQL 的開發(fā)模型;
  • 一個可二次定制開發(fā)的大數據開發(fā)框架,提供了靈活的可擴展 API;
  • 一個提供了 對文件,數據庫,NoSQL 等統一的數據開發(fā)視界語義;
  • 基于 SQL 的開發(fā)語言和 XML 的模板配置,支持 Spark UDF 的擴展管理;
  • 支持基于 Spark Standlone,Yarn,Mesos 資源管理平臺;
  • 支持開源、華為、星環(huán)等平臺統一認證。

SparkSQL Flow 適合的場景:

  1. 批量 ETL;
  2. 非實時分析服務;

SparkSQL Flow XML 概覽 

基于 Spark 的數據分析實踐
  1. Properties 內定義一組變量,可用于宏替換;
  2. Methods 內可注冊 udf 和 udaf 兩種函數;
  3. Prepare 內可定義前置 SQL,用于執(zhí)行 source 前的 sql 操作;
  4. Sources 內定義一個到多個數據表視圖;
  5. Transformer 內可定義 0 到多個基于 SQL 的數據轉換操作(支持 join);
  6. Targets 用于定義 1 到多個數據輸出;
  7. After 可定義 0到多個任務日志;

如你所見,source 的 type 參數用于區(qū)分 source 的類型,source 支持的種類直接決定SparkSQL Flow 的數據源加載廣度;并且,根據 type 不同,source 也需要配置不同的參數,如數據庫還需要 driver,url,user和 password 參數。

Transformer 是基于 source 定的數據視圖可執(zhí)行的一組轉換 SQL,該 SQL 符合 SparkSQL 的語法(SQL99)。Transform 的 SQL 的執(zhí)行結果被作為中間表命名為 table_name 指定的值。

Targets 為定義輸出,table_name 的值需在 source 或者 Transformer 中定義。

SparkSQL Flow 支持的Sourse 

基于 Spark 的數據分析實踐
  • 支持從 Hive 獲得數據;
  • 支持文件:JSON,TextFile(CSV),ParquetFile,AvroFile
  • 支持RDBMS數據庫:PostgreSQL, MySQL,Oracle
  • 支持 NOSQL 數據庫:Hbase,MongoDB

SparkSQL Flow TextFile Source

textfile 為讀取文本文件,把文本文件每行按照 delimiter 指定的字符進行切分,切分不夠的列使用 null 填充。

  1. <source type="textfile" table_name="et_rel_pty_cong" 
  2.  fields="cust_id,name1,gender1,age1:int"  
  3.  delimiter="," 
  4.  path="file:///Users/zhenqin/software/hive/user.txt"/> 
  1. Tablename 為該文件映射的數據表名,可理解為數據的視圖;
  2. Fields 為切分后的字段,使用逗號分隔,字段后可緊跟該字段的類型,使用冒號分隔;
  3. Delimiter 為每行的分隔符;
  4. Path 用于指定文件地址,可以是文件,也可是文件夾;
  5. Path 指定地址需要使用協議,如:file:// 、 hdfs://,否則跟 core-site.xml 配置密切相關;

SparkSQL Flow DB Source

  1. <source type="mysql" table_name="et_rel_pty_cong" 
  2.  table="user" 
  3.  url="jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8" 
  4.  driver="com.mysql.jdbc.Driver" 
  5.  user="root" password="123456"/> 

RDBMS 是從數據庫使用 JDBC讀取 數據集。支持 type 為:db、mysql、oracle、postgres、mssql;

  1. tablename 為該數據表的抽象 table 名稱(視圖);
  2. url、driver、user,password 為數據庫 JDBC 驅動信息,為必須字段;
  3. SparkSQL 會加載該表的全表數據,無法使用 where 條件。

SparkSQL Flow Transformer

  1. <transform type="sql" table_name="cust_id_agmt_id_t" cached="true"
  2.  SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids 
  3.  FROM user_concat_testx 
  4.  group by c_phone,c_type,c_num 
  5. </transform> 

Transform 支持 cached 屬性,默認為 false;如果設置為 true,相當于把該結果緩存到內存中,緩存到內存中的數據在后續(xù)其它 Transform 中使用能提高計算效率。但是需使用大量內存,開發(fā)者需要評估該數據集能否放到內存中,防止出現 OutofMemory 的異常。

SparkSQL Flow Targets

SparkSQL Flow Targets 支持輸出數據到一個或者多個目標。這些目標,基本覆蓋了 Source 包含的外部系統。下面以 Hive 舉例說明:

  1. <target type="hive" 
  2.  table_name="cust_id_agmt_id_t"  
  3.  savemode=”append” 
  4. target_table_name="cust_id_agmt_id_h"/> 
  1. table_name 為 source 或者 Transform 定義的表名稱;
  2. target_table_name 為 hive 中的表結果,Hive 表可不存在也可存在,sparksql 會根據 DataFrame 的數據類型自動創(chuàng)建表;
  3. savemode 默認為 overwrite 覆蓋寫入,當寫入目標已存在時刪除源表再寫入;支持 append 模式, 可增量寫入。

Target 有一個特殊的 show 類型的 target。用于直接在控制臺輸出一個 DataFrame 的結果到控制臺(print),該 target 用于開發(fā)和測試。

  1. <target type="show" table_name="cust_id_agmt_id_t" rows=”10000”/> 

Rows 用于控制輸出多少行數據。

SparkSQL Around

After 用于 Flow 在運行結束后執(zhí)行的一個環(huán)繞,用于記錄日志和寫入狀態(tài)。類似 Java 的 try {} finally{ round.execute() }

多個 round 一定會執(zhí)行,round 異常不會導致任務失敗。

  1. <prepare
  2.  <round type="mysql" 
  3.  sql="insert into cpic_task_history(id, task_type, catalog_model, start_time, retry_count, final_status, created_at) 
  4.  values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())" 
  5.  url="${jdbc.url}" .../> 
  6. </prepare
  7. <after
  8.  <round type="mysql" 
  9.  sql="update cpic_task_history set 
  10.  end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}" 
  11.  url="${jdbc.url}”…/> 
  12. </after

Prepare round 和 after round 配合使用可用于記錄 SparkSQL Flow 任務的運行日志。

SparkSQL Around的執(zhí)行效果 

基于 Spark 的數據分析實踐

Prepare round 可做插入(insert)動作,after round 可做更新 (update)動作,相當于在數據庫表中從執(zhí)行開始到結束有了完整的日志記錄。SparkSQL Flow 會保證round 一定能被執(zhí)行,而且 round 的執(zhí)行不影響任務的狀態(tài)。

SparkSQL Flow 提交

  1. bin/spark-submit --master yarn-client --driver-memory 1G  
  2. --num-executors 10 --executor-memory 2G  
  3. --jars /lib/jsoup-1.11.3.jarlib/jsqlparser-0.9.6.jar,/lib/mysql-connector-java-5.1.46.jar  
  4. --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar  
  5. --queue default --name FlowTest  
  6. etl-flow-0.2.0.jar -f hive-flow-test.xml 

 基于 Spark 的數據分析實踐

接收必須的參數 –f,可選的參數為支持 Kerberos 認證的租戶名稱principal,和其認證需要的密鑰文件。

  1. usage: spark-submit --jars etl-flow.jar --class 
  2.  com.yiidata.etl.flow.source.FlowRunner 
  3.  -f,--xml-file <arg> Flow XML File Path 
  4.  --keytabFile <arg> keytab File Path(Huawei) 
  5.  --krb5File <arg> krb5 File Path(Huawei) 
  6.  --principal <arg> principal for hadoop(Huawei) 

SparkSQL Execution Plan 

基于 Spark 的數據分析實踐

每個Spark Flow 任務本質上是一連串的 SparkSQL 操作,在 SparkUI SQL tab 里可以看到 flow 中重要的數據表操作。

regiserDataFrameAsTable 是每個 source 和 Transform 的數據在 SparkSQL 中的數據視圖,每個視圖都會在 SparkContex 中注冊一次。

對RegisterDataFrameAsTable的分析 

基于 Spark 的數據分析實踐

通過單個 regiserDataFrameAsTable 項進行分析,SparkSQL 并不是把source 的數據立即計算把數據放到內存,而是每次執(zhí)行 source 時只是生成了一個 Logical Plan,只有遇到需要提交的算子(Action),SparkSQL 才會觸發(fā)前面所依賴的的 plan 執(zhí)行。

總結

這是一個開發(fā)框架,不是一個成熟的產品,也不是一種架構。他只是基于 SparkSQL 整合了大多數的外部系統,能通過 XML 的模板配置完成數據開發(fā)。面向的是理解數據業(yè)務但不了解 Spark 的數據開發(fā)人員。整個框架完成了大多數的外部系統對接,開發(fā)者只需要使用 type 獲得數據,完成數據開發(fā)后通過 target 回寫到目標系統中。整個過程基本無須程序開發(fā),除非當前的 SQL 函數無法滿足使用的情況下,需要自行開發(fā)一下特定的 UDF。因此本框架在對 SparkSQL 做了二次開發(fā)基礎上,大大簡化了 Spark 的開發(fā),可降低了開發(fā)者使用難度。

關于作者:震秦,普元資深開發(fā)工程師,專注于大數據開發(fā) 8 年,擅長 Hadoop 生態(tài)內各工具的使用和優(yōu)化。參與某公關廣告(上市)公司DMP 建設,負責數據分層設計和批處理,調度實現,完成交付使用;參與國內多省市公安社交網絡項目部署,負責產品開發(fā)(Spark 分析應用);參與數據清洗加工為我方主題庫并部署上層應用。

關于EAWorld:微服務,DevOps,數據治理,移動架構原創(chuàng)技術分享。

責任編輯:未麗燕 來源: 今日頭條
相關推薦

2015-10-12 17:40:12

數據分析實踐

2024-11-01 08:16:54

2015-09-23 09:24:56

spark數據分析

2018-06-15 20:44:40

Hadoop數據分析數據

2020-10-21 10:51:43

數據分析

2021-01-25 20:20:35

數據分析SparkHadoop

2014-06-30 10:59:21

2016-12-01 19:07:46

大數據數據分析

2018-02-26 08:44:35

Python微信數據分析

2015-10-16 09:21:13

SparkMySQL數據分析

2023-03-01 18:32:16

系統監(jiān)控數據

2017-10-11 11:10:02

Spark Strea大數據流式處理

2021-06-06 19:03:25

SQL大數據Spark

2016-10-19 18:31:11

2012-03-21 09:31:51

ibmdw

2015-07-01 13:51:12

HadoopMapReduce數據分析

2021-08-25 08:23:51

AI數據機器學習

2018-05-23 08:39:18

AlluxioCeph對象存儲

2024-04-22 10:33:18

物聯網數據科學智慧城市

2016-12-07 15:40:42

谷歌數據分析Airbnb
點贊
收藏

51CTO技術棧公眾號