Apache Spark Delta Lake寫數(shù)據(jù)使用及實(shí)現(xiàn)原理代碼解析
Delta Lake 寫數(shù)據(jù)是其最基本的功能,而且其使用和現(xiàn)有的 Spark 寫 Parquet 文件基本一致,在介紹 Delta Lake 實(shí)現(xiàn)原理之前先來看看如何使用它,具體使用如下:
- df.write.format("delta").save("/data/yangping.wyp/delta/test/")
- //數(shù)據(jù)按照 dt 分區(qū)
- df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/")
- // 覆蓋之前的數(shù)據(jù)
- df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/")
大家可以看出,使用寫 Delta 數(shù)據(jù)是非常簡單的,這也是 Delte Lake 介紹的 100% 兼容 Spark。
Delta Lake 寫數(shù)據(jù)原理
前面簡單了解了如何使用 Delta Lake 來寫數(shù)據(jù),本小結(jié)我們將深入介紹 Delta Lake 是如何保證寫數(shù)據(jù)的基本原理以及如何保證事務(wù)性。
得益于 Apache Spark 強(qiáng)大的數(shù)據(jù)源 API,我們可以很方便的給 Spark 添加任何數(shù)據(jù)源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 實(shí)現(xiàn)的一種新的數(shù)據(jù)源,我們調(diào)用 df.write.format("delta") 其實(shí)底層調(diào)用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 類。為了簡單起見,本文介紹的是 Delta Lake 批量寫的實(shí)現(xiàn),實(shí)時(shí)流寫 Delta Lake 本文不涉及,后面有機(jī)會再介紹。 Delta Lake 批量寫擴(kuò)展了 org.apache.spark.sql.sources.CreatableRelationProvider 特質(zhì),并實(shí)現(xiàn)了其中的方法。我們調(diào)用上面的寫數(shù)據(jù)方法首先會調(diào)用 DeltaDataSource 類的 createRelation 方法,它的具體實(shí)現(xiàn)如下:
- override def createRelation(
- sqlContext: SQLContext,
- mode: SaveMode,
- parameters: Map[String, String],
- data: DataFrame): BaseRelation = {
- // 寫數(shù)據(jù)的路徑
- val path = parameters.getOrElse("path", {
- throw DeltaErrors.pathNotSpecifiedException
- })
- // 分區(qū)字段
- val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY)
- .map(DeltaDataSource.decodePartitioningColumns)
- .getOrElse(Nil)
- // 事務(wù)日志對象
- val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)
- // 真正的寫操作過程
- WriteIntoDelta(
- deltaLog = deltaLog,
- mode = mode,
- new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf),
- partitionColumns = partitionColumns,
- configuration = Map.empty,
- data = data).run(sqlContext.sparkSession)
- deltaLog.createRelation()
- }
其中 mode 就是保持?jǐn)?shù)據(jù)的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 這個(gè)傳遞的參數(shù),比如分區(qū)字段、數(shù)據(jù)保存路徑以及 Delta 支持的一些參數(shù)(replaceWhere、mergeSchema、overwriteSchema 等,具體參見 org.apache.spark.sql.delta.DeltaOptions);data 就是我們需要保存的數(shù)據(jù)。
createRelation 方法緊接著就是獲取數(shù)據(jù)保存的路徑,分區(qū)字段等信息。然后初始化 deltaLog,deltaLog 的初始化會做很多事情,比如會讀取磁盤所有的事務(wù)日志(_delta_log 目錄下),并構(gòu)建最新事務(wù)日志的最新快照,里面可以拿到最新數(shù)據(jù)的版本。由于 deltaLog 的初始化成本比較高,所以 deltaLog 初始化完之后會緩存到 deltaLogCache 中,這是一個(gè)使用 Guava 的 CacheBuilder 類實(shí)現(xiàn)的一個(gè)緩存,緩存的數(shù)據(jù)保持一小時(shí),緩存大小可以通過 delta.log.cacheSize 參數(shù)進(jìn)行設(shè)置。只要寫數(shù)據(jù)的路徑是一樣的,就只需要初始化一次 deltaLog,后面直接從緩存中拿即可。除非之前緩存的 deltaLog 被清理了,或者無效才會再次初始化。DeltaLog 類是 Delta Lake 中最重要的類之一,涉及的內(nèi)容非常多,所以我們會單獨(dú)使用一篇文章進(jìn)行介紹。
緊接著初始化 WriteIntoDelta,WriteIntoDelta 擴(kuò)展自 RunnableCommand,Delta Lake 中的更新、刪除、合并都是擴(kuò)展這個(gè)類的。初始化完 WriteIntoDelta 之后,就會調(diào)用 run 方法執(zhí)行真正的寫數(shù)據(jù)操作。WriteIntoDelta 的 run 方法實(shí)現(xiàn)如下:
- override def run(sparkSession: SparkSession): Seq[Row] = {
- deltaLog.withNewTransaction { txn =>
- val actions = write(txn, sparkSession)
- val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere)
- txn.commit(actions, operation)
- }
- Seq.empty
- }
Delta Lake 所有的更新操作都是在事務(wù)中進(jìn)行的,deltaLog.withNewTransaction 就是一個(gè)事務(wù),withNewTransaction 的實(shí)現(xiàn)如下:
- def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
- try {
- // 更新當(dāng)前表事務(wù)日志的快照
- update()
- // 初始化樂觀事務(wù)鎖對象
- val txn = new OptimisticTransaction(this)
- // 開啟事務(wù)
- OptimisticTransaction.setActive(txn)
- // 執(zhí)行寫數(shù)據(jù)操作
- thunk(txn)
- } finally {
- // 關(guān)閉事務(wù)
- OptimisticTransaction.clearActive()
- }
- }
在開啟事務(wù)之前,需要更新當(dāng)前表事務(wù)的快照,因?yàn)樵趫?zhí)行寫數(shù)據(jù)之前,這張表可能已經(jīng)被修改了,執(zhí)行 update 操作之后,就可以拿到當(dāng)前表的最新版本,緊接著開啟樂觀事務(wù)鎖。thunk(txn) 就是需要執(zhí)行的事務(wù)操作,對應(yīng) deltaLog.withNewTransaction 里面的所有代碼。
我們回到上面的 run 方法。val actions = write(txn, sparkSession) 就是執(zhí)行寫數(shù)據(jù)的操作,它的實(shí)現(xiàn)如下:
- def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
- import sparkSession.implicits._
- // 如果不是第一次往表里面寫數(shù)據(jù),需要判斷寫數(shù)據(jù)的模式是否符合條件
- if (txn.readVersion > -1) {
- // This table already exists, check if the insert is valid.
- if (mode == SaveMode.ErrorIfExists) {
- throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath)
- } else if (mode == SaveMode.Ignore) {
- return Nil
- } else if (mode == SaveMode.Overwrite) {
- deltaLog.assertRemovable()
- }
- }
- // 更新表的模式,比如是否覆蓋現(xiàn)有的模式,是否和現(xiàn)有的模式進(jìn)行 merge
- updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation)
- // 是否定義分區(qū)過濾條件
- val replaceWhere = options.replaceWhere
- val partitionFilters = if (replaceWhere.isDefined) {
- val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get)
- if (mode == SaveMode.Overwrite) {
- verifyPartitionPredicates(
- sparkSession, txn.metadata.partitionColumns, predicates)
- }
- Some(predicates)
- } else {
- None
- }
- // 第一次寫數(shù)據(jù)初始化事務(wù)日志的目錄
- if (txn.readVersion < 0) {
- // Initialize the log path
- deltaLog.fs.mkdirs(deltaLog.logPath)
- }
- // 寫數(shù)據(jù)到文件系統(tǒng)中
- val newFiles = txn.writeFiles(data, Some(options))
- val deletedFiles = (mode, partitionFilters) match {
- // 全量覆蓋,直接拿出緩存在內(nèi)存中最新事務(wù)日志快照里面的所有 AddFile 文件
- case (SaveMode.Overwrite, None) =>
- txn.filterFiles().map(_.remove)
- // 從事務(wù)日志快照中獲取對應(yīng)分區(qū)里面的所有 AddFile 文件
- case (SaveMode.Overwrite, Some(predicates)) =>
- // Check to make sure the files we wrote out were actually valid.
- val matchingFiles = DeltaLog.filterFileList(
- txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect()
- val invalidFiles = newFiles.toSet -- matchingFiles
- if (invalidFiles.nonEmpty) {
- val badPartitions = invalidFiles
- .map(_.partitionValues)
- .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") }
- .mkString(", ")
- throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions)
- }
- txn.filterFiles(predicates).map(_.remove)
- case _ => Nil
- }
- newFiles ++ deletedFiles
- }
- }
如果 txn.readVersion == -1,說明是第一次寫數(shù)據(jù)到 Delta Lake 表,所以當(dāng)這個(gè)值大于 -1 的時(shí)候,需要判斷一下寫數(shù)據(jù)的操作是否合法。
由于 Delta Lake 底層使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,這就是 updateMetadata 函數(shù)對應(yīng)的操作。
因?yàn)?Delta Lake 表支持分區(qū),所以我們可能在寫數(shù)據(jù)的時(shí)候指定某個(gè)分區(qū)進(jìn)行覆蓋。
真正寫數(shù)據(jù)的操作是 txn.writeFiles 函數(shù)執(zhí)行的,具體實(shí)現(xiàn)如下:
- def writeFiles(
- data: Dataset[_],
- writeOptions: Option[DeltaOptions],
- isOptimize: Boolean): Seq[AddFile] = {
- hasWritten = true
- val spark = data.sparkSession
- val partitionSchema = metadata.partitionSchema
- val outputPath = deltaLog.dataPath
- val (queryExecution, output) = normalizeData(data, metadata.partitionColumns)
- val partitioningColumns =
- getPartitioningColumns(partitionSchema, output, output.length < data.schema.size)
- // 獲取 DelayedCommitProtocol,里面可以設(shè)置寫文件的名字,
- // commitTask 和 commitJob 等做一些事情
- val committer = getCommitter(outputPath)
- val invariants = Invariants.getFromSchema(metadata.schema, spark)
- SQLExecution.withNewExecutionId(spark, queryExecution) {
- val outputSpec = FileFormatWriter.OutputSpec(
- outputPath.toString,
- Map.empty,
- output)
- val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants)
- FileFormatWriter.write(
- sparkSession = spark,
- plan = physicalPlan,
- fileFormat = snapshot.fileFormat,
- committer = committer,
- outputSpec = outputSpec,
- hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
- partitionColumns = partitioningColumns,
- bucketSpec = None,
- statsTrackers = Nil,
- options = Map.empty)
- }
- // 返回新增的文件
- committer.addedStatuses
- }
Delta Lake 寫操作最終調(diào)用 Spark 的 FileFormatWriter.write 方法進(jìn)行的,通過這個(gè)方法的復(fù)用將我們真正的數(shù)據(jù)寫入到 Delta Lake 表里面去了。
在 Delta Lake 中,如果是新增文件則會在事務(wù)日志中使用 AddFile 類記錄相關(guān)的信息,AddFile 持久化到事務(wù)日志里面的內(nèi)容如下:
- {"add":{"path":"dt=20190801/part-00001-bdff67f3-c70f-4817-898d-15a73c93271a.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566990855000,"dataChange":true}}
可以看出 AddFile 里面記錄了新增文件的保存路徑,分區(qū)信息,新增的文件大小,修改時(shí)間等信息。如果是刪除文件,也會在事務(wù)日志里面記錄這個(gè)刪除操作,對應(yīng)的就是使用 RemoveFile 類存儲,RemoveFile 持久化到事務(wù)日志里面的內(nèi)容如下:
- {"remove":{"path":"dt=20190801/part-00001-7f3fe89d-e55b-4848-93ea-4133b5d406d6.c000.snappy.parquet","deletionTimestamp":1566990856332,"dataChange":true}}
RemoveFile 里面保存了刪除文件的路徑,刪除時(shí)間等信息。如果新增一個(gè)文件,再刪除一個(gè)文件,那么最新的事務(wù)日志快照里面只會保存刪除這個(gè)文件的記錄。從這里面也可以看出, Delta Lake 刪除、新增 ACID 是針對文件級別的。
上面的寫操作肯定會產(chǎn)生新的文件,所以寫操作之后就需要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和需要?jiǎng)h除的文件(RemoveFile)。針對那些文件需要?jiǎng)h除需要做一些判斷,主要分兩種情況(具體參見 write 方法里面的):
- 如果是全表覆蓋,則直接從緩存在內(nèi)存中最新的事務(wù)日志快照中拿出所有 AddFile 文件,然后將其標(biāo)記為 RemoveFile;
- 如果是分區(qū)內(nèi)的覆蓋,則從緩存在內(nèi)存中最新的事務(wù)日志快照中拿出對應(yīng)分區(qū)下的 AddFile 文件,然后將其標(biāo)記為 RemoveFile。
最后 write 方法返回新增的文件和需要?jiǎng)h除的文件(newFiles ++ deletedFiles),這些文件最終需要記錄到事務(wù)日志里面去。關(guān)于事務(wù)日志是如何寫進(jìn)去的請參見這篇文章的詳細(xì)分析。