Spark 在供應鏈核算中的應用總結
一、業(yè)務背景
(會計)核算是使用會計語言與方法,對產品業(yè)務的結果進行登記與反映,從而為利益相關者提供直觀、準確、有價值的信息,主要服務對象是財務、審計、外部監(jiān)管、合規(guī)以及管理層,同時核算也是資金管理風險防范的其中一個手段。整體流程可以概括為基于核算規(guī)則從業(yè)務事件(采購入庫、退供、TOC確認收貨、開票等)關聯(lián)單據(jù)中提取業(yè)務要素(采購/銷售主體、業(yè)務時間、客商、金額等)轉換為會計語言表達的數(shù)據(jù)(會計分錄,會計要素主要包括OU/收益部門/預算部門/往來段/明細段/行業(yè)段/成本中心等),供應鏈核算主要鏈路如下圖所示:
從上圖可以看到供應鏈核算一腳在業(yè)務(計費/結算可以理解為財務視角的業(yè)務),一腳在財務,職責上既要滿足核算團隊月結出賬的訴求,又要提供業(yè)財對賬的能力,基于此我們將數(shù)據(jù)處理統(tǒng)一為如下流程:
二、離線 SQL 模式存在的問題
從第1章節(jié)圖2可以看到,核算的流程就是ETL的過程,在早期的方案中通過離線+在線的實現(xiàn)方式,其中離線完成原始憑證的加工,業(yè)務接入的邏輯通過SQL實現(xiàn),在線系統(tǒng)完成記賬+拋賬,同時由于在線系統(tǒng)處理能力有限,在原始憑證加工中進行了業(yè)務單據(jù)的聚合,此種實現(xiàn)方式主要存在以下問題。
1.對賬問題定位困難,核算小二主要通過下載分錄及對應的業(yè)務單據(jù)匯總數(shù)據(jù)進行對賬,如果某一分錄和業(yè)務數(shù)據(jù)有出入,只能逐一業(yè)務要素分析,由于缺乏通過分錄精確追溯到關聯(lián)業(yè)務單據(jù)的下鉆能力,問題定位耗時較長,造成這一問題的主要原因在于通過離線SQL實現(xiàn)的原始加工邏輯無法精確的建立業(yè)務單據(jù)和原始憑證的關聯(lián)關系。
2.日常運維困難,隨著業(yè)務的不斷發(fā)展,業(yè)務接入離線任務在不斷的膨脹,最終成為一個橫跨4個項目空間,150+離線任務、100+離線表的工程,任一節(jié)點的錯誤都會造成月結數(shù)據(jù)出錯。
3.行業(yè)實施效率較低,每次新接入行業(yè)都需要開發(fā)小二新建一套離線表+離線任務,相應的也造成運維問題的持續(xù)惡化。
三、為什么選擇Spark
1.核心訴求
在核算主版本的建設中,我們希望能夠通過打造穩(wěn)定可復用的產品能力最大程度的解決上述問題,核心訴求如下:
1)核算規(guī)則(業(yè)務接入/記賬/拋賬)可配、可視,不存在黑盒的加工邏輯,加工流程對核算小二全透明(提升實施+對賬效率)
2)建立整個核算鏈路單據(jù)維度的關聯(lián)關系(業(yè)務單據(jù)<->原始憑證<->記賬憑證<->拋賬憑證),具備雙向的單據(jù)追溯能力(提升對賬效率)
基于以上訴求,我們抽象了標準的規(guī)則模型,滿足用戶多場景下各個鏈路(業(yè)務接入、記賬、拋賬)的加工邏輯配置(規(guī)則相關設計方案不再此文展開),與之配套的會計引擎完成基于核算規(guī)則的數(shù)據(jù)處理,另外在主版本的設計中,原始憑證需要1V1還原業(yè)務單據(jù),每月原始憑證數(shù)據(jù)量達到了10億級別,為了滿足月結時效性的要求,我們需要采用高性能、支持大數(shù)據(jù)量、且編程友好(便于建立單據(jù)關系)的計算引擎。
2.Spark VS MapReduce
基于上述訴求,我們重點調研了Spark和MapReduce兩款計算引擎,差異如下所示:
引擎 | MapReduce | Spark |
編程友好 | 一般,支持Map/Reduce兩種算子 | 較好,支持的算子豐富(map/filter/reduce/aggregate等) |
性能 | 一般,中間態(tài)數(shù)據(jù)需要落盤,計算邏輯相對復雜時,MapReduce會涉及到多MapReduce任務執(zhí)行(多次shuffle),每次shuffle也會涉及到大量的磁盤IO | 較好,基于內存計算,基于DAG可以構建RDD的血緣關系,在調度過程中可以避免大量無效的磁盤IO,另外rdd共享機制可以降低網絡IO的開銷 |
集團生態(tài) | 較好,odps提供MapReduce計算框架支持,可以通過LogView查看日志 | 較好,odps提供Spark計算引擎支持,可以通過LogView查看日志,目前提供了stand-alone、集群及client三種模式的支持 |
比較形象的對比(并不是說spark不會落盤,在基于DAG圖拆分stage時,也會涉及到shuffle,但整體的磁盤IO消耗比MapReduce要低)。
3.編程模式優(yōu)勢: RDD + DataFrame 的編程模式
如上面和MapReduce的比較中看到 Spark 在編程友好性上比MapReduce好一些,比較適合后端開發(fā)人員。
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
上面是一個官方的例子,在schema控制,可編程性和 sql 操作等能較好的結合,邏輯比較類同后端開發(fā)。
基于上述spark特點及優(yōu)勢,我們最終選擇spark實現(xiàn)會計引擎邏輯。
四、spark基礎介紹
1.基礎概念
- Rdd(Resilient distributed dataset):不可變的彈性分布式數(shù)據(jù)集(不可變性似于docker中的只讀鏡像層),只能通過其他的transformation算子創(chuàng)建新的RDD。
- Operations:算子,spark包括兩類算子,transformation(轉換算子,通過對前置rdd的處理生成新的rdd)/action(觸發(fā)spark job的拆分及執(zhí)行,負責將rdd輸出)。
- Task:執(zhí)行器執(zhí)行的任務單元,一般基于當前rdd的分區(qū)數(shù)量拆分。
- Job:包含多個task的集合,基于Action算子拆分。
- Stage:基于當前rdd處理邏輯的寬窄依賴拆分,spark中非常重要的概念,stage的切換會涉及到IO。
- Narrow/Wide dependencies:參考下圖,區(qū)分的重要依據(jù)在于父節(jié)點是否會被多個子節(jié)點使用。
2.Spark on MaxCompute(ODPS)
我們在實踐中,主要基于spark on odps提供的client模式實現(xiàn),client模式的詳細介紹可以參考相關文檔。
- Spark 有很多的后端的 Runtime,例如其商業(yè)化公司的Databricks Runtime, 彈內我們使用的是 AliSpark,是集團的適配MaxComputer,同時在離線交互是使用了 Cupid-SDK 的 Client模式,這個模式不是獨立集群的模式,類Serveless模式,整體的成本上比獨立集群要低,當然資源保障上沒有獨立集群好。
Client模式原理參考相關文檔,比調度模式有更好的應用交互性。
- 集團client模式將spark session作為服務提供,可以方便地與在線系統(tǒng)交互,包括任務的提交、關閉、實例的關閉等;
- 在使用集團提供的spark能力時,比較麻煩的在于如何方便的查看日志,從我們的實踐看主要有以下2個路徑。
申請odps對應項目空間的logview權限,可以直接在https://logview.alibaba-inc.com/中基于sparkInstanceId定位到具體的日志;
借助odps client+提交spark任務時返回的實例ID獲取log地址,代碼參考如下:
//instanceIdd對應odps client中的lookupName
Account account = new AliyunAccount(sparkSessionConfig.getAccessId(), sparkSessionConfig.getAccessKey());
Odps odps = new Odps(account);
odps.setEndpoint(sparkSessionConfig.getEndPoint());
odps.setDefaultProject(sparkSessionConfig.getNamespace());
//日志地址目前設定有效期為7*24小時
try {
return odps.logview().generateLogView(odps.instances().get(sparkInstanceId), 7 * 24L);
} catch (OdpsException e) {
LOGGER.error("生成logView地址失敗,config:{},instanceId:{},e:{}", sparkSessionConfig, sparkInstanceId, e);
}
五、技術方案
1.整體方案
spark作為大數(shù)據(jù)處理引擎,在實例數(shù)量較少的情況下采用odps任務目前的運維方式來管理的話成本并不高,但是在供應鏈核算的場景下,需要支持每天將近600+(行業(yè)*核算場景)數(shù)量的實例運行,且需滿足核算完整性、準確性、及時性的要求,另外由于目前我們的spark任務(cupid)與odps任務共享項目空間資源,意味著我們需要在有限的資源下支持核算的業(yè)務,基于以上背景及訴求,供應鏈核算整體的應用架構設計如下:
其中ascp-finance-accounting負責任務調度,組件交互如下:
- spark任務管理:負責spark任務相關生命周期的管理,承接核算任務和spark session之間的交互;
- spark session管理:負責spark實例的創(chuàng)建、銷毀、job提交等,另外針對不同類型的session,支持自定義所需資源,包括實例worker數(shù)量、分區(qū)大小等,主要與spark on odps交互;
- 核算任務管理:負責業(yè)務接入、記賬、拋賬等核算任務的生命周期管理;
- spark job版本管理:spark任務所需jar包會不斷的迭代,針對不同的核算場景可以定制所需的job版本。
ascp-finance-accounting-spark負責spark job的開發(fā)維護,spark on odps client模式下需要基于服務上傳jar包,若jar包較大,性能較差,所以基于client模式下提供的resource管理能力,我們將項目module拆分如下:
包名 | 作用 |
accounting-spark-client | 對外提供spark任務的啟動、查詢及終止服務 |
accounting-spark-common | 公共包,包括常量、工具類等 |
accounting-spark-job | spark任務包,封裝了任務接入和記賬兩個任務的實現(xiàn) |
accounting-spark-dependency | spark任務包依賴的二方包,client模式下若job包過大,會造成上傳失敗的問題,所以部分job依賴的二方包可以放在dependency中,單獨打包,手工在datawork中上傳,通過resources傳遞參數(shù) |
2.數(shù)據(jù)處理流程
核算接入、記賬、拋賬等主流程的spark處理邏輯如下所示:
六、運維及調優(yōu)
基于spark的特性,完成數(shù)據(jù)處理邏輯的編寫對我們來說并不困難,問題主要集中在如何用盡可能低的成本滿足業(yè)務需求,特別是在目前控制成本的背景下,在供應鏈核算的落地過程中,我們主要采用了以下優(yōu)化方式。
1.數(shù)據(jù)量評估
spark任務的運行效率很大程度上受到分區(qū)數(shù)量的影響,spark提供了如下手段來進行分區(qū)數(shù)量的調整(部分為spark on odps能力),供應鏈核算在實現(xiàn)過程中主要用到了odps離線表和lindorm兩種數(shù)據(jù)源。
1)spark.hadoop.odps.input.split.size:用于設置spark讀取odps離線表的分區(qū)大小,默認為256M,在實踐過程中需要結合當前分區(qū)的大小進行調整,比如當前分區(qū)大小為1GB,那么默認情況下會拆分為4個分區(qū);
2)spark讀寫lindorm(類hbase)的分區(qū)數(shù)主要受到region數(shù)量的影響,在供應鏈核算系統(tǒng)的實踐中,由于初始region數(shù)量較少,導致分區(qū)數(shù)量很少,spark執(zhí)行效率很差,針對此問題我們實踐了兩種處理策略;
- 進行重分區(qū)(repartition算子):針對數(shù)據(jù)傾斜進行重新分區(qū),但是會拆分stage,觸發(fā)shuffle,增加額外的IO成本。
- lindorm進行預分區(qū),比如預分區(qū)為128個region,但此種實現(xiàn)方案需要結合rowkey的設計一起使用,會影響到scan的效率。
2.代碼邏輯相關job/stage/task評估
除了六中所述數(shù)據(jù)量以外,數(shù)據(jù)處理邏輯的實現(xiàn)方法也會影響到任務的執(zhí)行效率,spark比mapreduce執(zhí)行效率高的一個原因就在于spark會先基于處理流程構建DAG,這樣可以有效評估每個stage是否需要落盤(IO成本),在邏輯實現(xiàn)過程中我們在保證數(shù)據(jù)處理無誤的情況下需要盡可能得降低IO(減少shuffle),比如可以執(zhí)行以下策略。
- 慎用效率角度的算子,比如groupBy。
- 盡量減少stage數(shù)量。
3.計算存儲資源評估
計算存儲資源同樣是spark執(zhí)行效率優(yōu)化的關鍵,spark也提供了多種手段來調整資源的使用情況;
- spark.executor.instances executor:設置當前實例的worker數(shù)量;
- spark.executor.cores:核數(shù),每個Executor中的可同時運行的task數(shù)目;
- spark.executor.memory:executor內存。
4.其他參數(shù)
odps.cupid.clientmode.heartbeat.timeout 此配置用來調節(jié)cupid(spark on odps) client模式下的心跳超時時間,默認為30分鐘,若任務執(zhí)行較長,需要進行調整。
hbase.client.write.buffer:用來調節(jié)lindorm的flush磁盤的buffer大小,lindorm mput數(shù)量限制為100(經咨詢?yōu)槿窒拗?,無法調整),所以在spark寫lindorm時我們主要采用此配置項調節(jié)批量寫入的數(shù)量,這點比較坑。
spark.hadoop.odps.cupid.job.priority:用于調節(jié)任務資源獲取的優(yōu)先級。
5.Spark UI
spark 本身的 UI 中有整體的job/stage/task的可視化分析數(shù)據(jù),比較方便的查詢到對應的執(zhí)行過程,如下圖:
通過SparkUI 可以看到任務的驅動步驟和對應的執(zhí)行的日志。通過分析可以針對性的優(yōu)化提升。
6.交互式開發(fā)測試
ODPS 有一個非常好的所見所得的 dataworks 平臺,大大提升了開發(fā)的效率,spark 當前在dataworks沒有直接的交互的IDE,需要通過 zeppelin 來實現(xiàn)。zeppelin在數(shù)據(jù)技術棧中的定位如下:
Web-based notebook that enables data-driven,interactive data analytics and collaborative documents with SQL, Scala, Python, R and more.
可以在交互中實現(xiàn)結果的快速反饋。
支持 scala 的 UDF 驗證等,提升了測試驗證效率。
7.效果
經過以上優(yōu)化,在2500萬數(shù)據(jù)量60worker數(shù)的場景,接入+記賬+拋賬流程由之前的2小時提效至10分鐘,同時在編程模式上更加匹配服務端技術的研發(fā)模式,提升了研發(fā)效率。
七、總結
核算業(yè)務的特征比較偏向數(shù)據(jù)和規(guī)則的處理,大數(shù)據(jù)引擎的引入有助于整體業(yè)務的交付效率提升和成本降低。目前我們對Spark的認知主要在完成數(shù)據(jù)處理邏輯開發(fā)及日常的調優(yōu)上,隨著運行實例的增多以及業(yè)務的不斷發(fā)展,當前的技術方案也會不斷的迭代演進。
參考文檔
通過spark訪問lindorm:https://help.aliyun.com/document_detail/174657.html