自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

大數(shù)據(jù)實(shí)戰(zhàn):基于Flink+ODPS歷史累計(jì)計(jì)算項(xiàng)目分析與優(yōu)化

開發(fā) 前端
我們將其拆分到“實(shí)時(shí)+離線”兩條鏈路分別計(jì)算,離線鏈路計(jì)算用戶歷史至昨日的累計(jì)數(shù)據(jù)data1,實(shí)時(shí)鏈路計(jì)算當(dāng)日實(shí)時(shí)累計(jì)數(shù)據(jù)data2。然后在對(duì)兩條鏈路的數(shù)據(jù)進(jìn)行匯總,data1+data2即為用戶歷史至今日此時(shí)刻的累計(jì)數(shù)據(jù)。

1.前置知識(shí)

ODPS(Open Data Platform and Service)是阿里云自研的一體化大數(shù)據(jù)計(jì)算平臺(tái)和數(shù)據(jù)倉庫產(chǎn)品,在集團(tuán)內(nèi)部離線作為離線數(shù)據(jù)處理和存儲(chǔ)的產(chǎn)品。離線計(jì)算任務(wù)節(jié)點(diǎn)叫做Odps節(jié)點(diǎn),存儲(chǔ)的離線表叫做Odps表;

Flink: 實(shí)時(shí)計(jì)算引擎,本文代碼開發(fā)和測試均基于集團(tuán)內(nèi)部實(shí)時(shí)計(jì)算平臺(tái),代碼細(xì)節(jié)可能會(huì)和Flink 官方社區(qū)文檔有些許不同,假如用于生產(chǎn)環(huán)境測試,參考Apache Flink 官方文檔為準(zhǔn),但是技術(shù)方案是通用的哈;

https://flink.apache.org/posts/

2.項(xiàng)目背景

現(xiàn)有業(yè)務(wù)需求是 “根據(jù)用戶注冊以來的累計(jì)跑步里程,給用戶發(fā)放勛章”,需要實(shí)時(shí)的計(jì)算出用戶【歷史~此時(shí)刻】的累計(jì)跑步數(shù)據(jù)。

比如說,某個(gè)用戶20210101首次上傳跑步記錄,之后又多次上傳跑步記錄,我們需要實(shí)時(shí)的計(jì)算出,在20210101~當(dāng)前時(shí)刻 期間,該用戶累計(jì)跑了多少公里,累計(jì)跑了多少次等指標(biāo)。上述指標(biāo)的計(jì)算涉及用戶歷史至今的所有數(shù)據(jù)(2018~至今該用戶所有數(shù)據(jù)),考慮使用批流結(jié)合的方式進(jìn)行統(tǒng)計(jì)。參考批流結(jié)合的常用 lambda 方案:

圖片圖片

我們將其拆分到“實(shí)時(shí)+離線”兩條鏈路分別計(jì)算,離線鏈路計(jì)算用戶歷史至昨日的累計(jì)數(shù)據(jù)data1,實(shí)時(shí)鏈路計(jì)算當(dāng)日實(shí)時(shí)累計(jì)數(shù)據(jù)data2。然后在對(duì)兩條鏈路的數(shù)據(jù)進(jìn)行匯總,data1+data2即為用戶歷史至今日此時(shí)刻的累計(jì)數(shù)據(jù)。

圖片圖片

這里,離線鏈路使用odps來做,實(shí)時(shí)計(jì)算使用Flink來做,數(shù)據(jù)存儲(chǔ)涉及 hbase、odps,所用消息中間件是MQ。

3.解決方案

3.1 方案描述 

離線鏈路設(shè)計(jì)

離線鏈路計(jì)算目的:為了計(jì)算出全量用戶【歷史至昨日】的累計(jì)數(shù)據(jù)。

任務(wù)初始化時(shí),先將歷史的存量數(shù)據(jù)全量計(jì)算一次,得到存量累計(jì)值;以后每日計(jì)算用戶昨日的新增數(shù)據(jù),即新增累計(jì)值 ;兩者相加即為用戶歷史至昨日的累計(jì)數(shù)據(jù);循環(huán)往復(fù),即可每日更新歷史累計(jì)數(shù)據(jù)。

對(duì)應(yīng)的數(shù)據(jù)鏈路應(yīng)該長這樣:

圖片圖片

離線鏈路計(jì)算流程如下:

step1:用戶歷史數(shù)據(jù)初始化。假設(shè)該計(jì)算任務(wù)發(fā)布的時(shí)間為20231010,首先要對(duì)用戶 歷史~20231009 期間的歷史數(shù)據(jù)進(jìn)行匯總,得到一個(gè) 歷史存量累計(jì)數(shù)據(jù) history_data;

step2:從20231010起,對(duì)用戶每日的增量跑步數(shù)據(jù)進(jìn)行匯總,得到該日的增量累計(jì)數(shù)據(jù) day_data;

step3:將每日的增量累計(jì)數(shù)據(jù)day_data 與 歷史存量累計(jì)數(shù)據(jù)history_data 進(jìn)行求和,作為新的歷史存量累計(jì)數(shù)據(jù) history_data(T-1) = day_data(T-1) + history_data(T-2) ;

step4:重復(fù) step2 和step3 ,每日更新歷史存量累計(jì)數(shù)據(jù) history_data 。

該方案的優(yōu)點(diǎn)是,歷史全量數(shù)據(jù)只用計(jì)算一次,每日只需計(jì)算增量部分后再與存量合并即可,節(jié)省計(jì)算資源。

實(shí)時(shí)鏈路設(shè)計(jì)

實(shí)時(shí)鏈路計(jì)算目的:實(shí)時(shí)計(jì)算出用戶【當(dāng)日零點(diǎn)至此刻】的累計(jì)數(shù)據(jù)

實(shí)時(shí)鏈路的計(jì)算邏輯比較簡單,對(duì)應(yīng)的計(jì)算鏈路示意圖如下:

圖片圖片

實(shí)時(shí)鏈路計(jì)算流程如下:

step1:用戶新增的跑步記錄通過MQ發(fā)送給Flink任務(wù);

step2:Flink節(jié)點(diǎn)1對(duì)數(shù)據(jù)去重;

step3:Flink節(jié)點(diǎn)2對(duì)實(shí)時(shí)匯總統(tǒng)計(jì) 當(dāng)日零點(diǎn)至此刻 用戶的跑步累計(jì)數(shù)據(jù);step4:將計(jì)算結(jié)果輸出給下游。

實(shí)時(shí)離線鏈路融合

實(shí)時(shí)離線鏈路融合目的:實(shí)時(shí)得到用戶歷史至此時(shí)刻的匯總數(shù)據(jù)

從上述的離線、實(shí)時(shí)鏈路中,我們分別得到了用戶【歷史~昨日】累計(jì)數(shù)據(jù),和【當(dāng)日凌晨~此刻】累計(jì)數(shù)據(jù),只需將兩者相加即可實(shí)時(shí)得到用戶【歷史~此刻】的累計(jì)數(shù)據(jù):

  1. ODPS 計(jì)算出用戶 [非當(dāng)日的歷史累計(jì)數(shù)據(jù)],為使用方便,會(huì)每天更新全量用戶歷史累計(jì)數(shù)據(jù);
  2. 使用Flink節(jié)點(diǎn)1 實(shí)時(shí)計(jì)算用戶當(dāng)日上傳的跑步累計(jì)數(shù)據(jù);
  3. 使用 Flink節(jié)點(diǎn)2 實(shí)時(shí)的將離線數(shù)據(jù)和實(shí)時(shí)數(shù)據(jù)匯總起來;
  4. 將匯總結(jié)果寫入Hbase結(jié)果表,同時(shí)發(fā)送個(gè)MQ消息給下游業(yè)務(wù)方。

圖片圖片

這里需要有兩點(diǎn)需要注意:

1、根據(jù)業(yè)務(wù)特點(diǎn),這里將離線計(jì)算結(jié)果作為維表使用:

Flink任務(wù)的下游業(yè)務(wù)方更關(guān)注當(dāng)日上傳過跑步記錄的用戶的數(shù)據(jù)更新情況,ODPS結(jié)果表作為維表用,F(xiàn)link任務(wù)只對(duì)當(dāng)日上傳跑步記錄的用戶進(jìn)行查詢,得到“非當(dāng)日歷史統(tǒng)計(jì)數(shù)據(jù)”,在與“當(dāng)日新增跑步數(shù)據(jù)”相加,即可得到該歷史至今的最終的統(tǒng)計(jì)數(shù)據(jù)(更新hbase結(jié)果表),符合需求;

我們的跑步用戶中大部分的用戶不會(huì)每天都上傳跑步記錄,這些人的結(jié)果數(shù)據(jù)不會(huì)發(fā)生改變。若將ODPS表作為源表,則依舊會(huì)為這些用戶更新數(shù)據(jù),浪費(fèi)計(jì)算資源。

【優(yōu)化】odps表作為維表,不適合大數(shù)據(jù)量的情況,大數(shù)據(jù)量使用hbase表作為維表比較合適。這里將odps表數(shù)據(jù)同步到hbase表中,再拿該hbase表作為維表。

2、初始化下游結(jié)果表:在整個(gè)任務(wù)跑起來前,需要先使用ODPS表的bizdate分區(qū)數(shù)據(jù)初始化hbase結(jié)果表,然后再由實(shí)時(shí)任務(wù)對(duì)結(jié)果表進(jìn)行更新;

最終的方案示意圖如下:

圖片圖片

3.2 存在的問題

上面的lambda方案有個(gè)問題,每日凌晨零點(diǎn)過后,實(shí)時(shí)任務(wù)已開始計(jì)算新的一天數(shù)據(jù),而離線任務(wù)計(jì)算尚未結(jié)束,這時(shí)會(huì)出現(xiàn)一個(gè)離線數(shù)據(jù)缺失的窗口期。重點(diǎn)分析一下框圖中“實(shí)時(shí)數(shù)據(jù)+離線數(shù)據(jù)”的部分:

圖片圖片

正常情況

當(dāng)一個(gè)用戶在T日實(shí)時(shí)上傳了自己的跑步記錄,F(xiàn)link節(jié)點(diǎn)1會(huì)計(jì)算出其 [當(dāng)日0點(diǎn)起至此刻] 的跑步累計(jì)數(shù)據(jù)data1,F(xiàn)link節(jié)點(diǎn)2會(huì)根據(jù)該用戶id取hbase維表里查詢其 [歷史~T-1日] 的累計(jì)數(shù)據(jù) data2 (hbase表里數(shù)據(jù)由odps每日更新,即T-1日的存量累計(jì)匯總數(shù)據(jù)),將data1和data2二者匯總,就可得到 用戶歷史至此時(shí)刻的匯總數(shù)據(jù);

異常情況 

在凌晨(比如說,在00:00~00:30),ODPS正在計(jì)算最新分區(qū)數(shù)據(jù)(T-1日的數(shù)據(jù))的期間,新的分區(qū)還沒生成完,或者ODPS計(jì)算已經(jīng)完成,但odps表同步base表同步任務(wù)還未完成,此時(shí)若發(fā)生了查詢,會(huì)發(fā)生什么?

會(huì)使用老分區(qū)的數(shù)據(jù)(T-2日的數(shù)據(jù),而不是期望的T-1日數(shù)據(jù)),導(dǎo)致數(shù)據(jù)不準(zhǔn)。

【問題描述】

在凌晨時(shí)分,ODPS計(jì)算T-1日數(shù)據(jù)期間,如果發(fā)生了對(duì)T-1日的數(shù)據(jù)查詢,則無法獲取到期望的T-1日數(shù)據(jù),會(huì)繼續(xù)使用T-2日的數(shù)據(jù)

這里“無法獲取正確數(shù)據(jù)”的時(shí)間長度 = ODPS計(jì)算時(shí)間 + ODPS同步數(shù)據(jù)到Hbase的時(shí)間

【原因】

Flink查詢維表時(shí) 使用維表當(dāng)前的數(shù)據(jù)快照,本次查詢完成后再發(fā)生的維表更新不會(huì)對(duì)已有查詢造成影響。

【舉例】

case1(ODPS計(jì)算未完成):

27號(hào),F(xiàn)link任務(wù)計(jì)算27號(hào)當(dāng)天的用戶累計(jì)數(shù)據(jù),同時(shí)查詢odps維表的 26號(hào)分區(qū) 中該用戶的歷史累計(jì)數(shù)據(jù),兩者相加,得到27號(hào)的實(shí)時(shí)累計(jì)結(jié)果;

28號(hào)凌晨,ODPS正在計(jì)算27號(hào)分區(qū)的數(shù)據(jù),任務(wù)還未結(jié)束,27號(hào)分區(qū)數(shù)據(jù)尚不可用;而Flink任務(wù)已經(jīng)開始計(jì)算28號(hào)當(dāng)天的用戶累計(jì)數(shù)據(jù),此刻發(fā)生了一次維表查詢,期望從維表中查到該用戶27號(hào)統(tǒng)計(jì)的歷史累計(jì)數(shù)據(jù),然而由于27號(hào)數(shù)據(jù)未準(zhǔn)備好,則維表會(huì)返回26號(hào)的歷史累計(jì)數(shù)據(jù),這會(huì)導(dǎo)致數(shù)據(jù)計(jì)算錯(cuò)誤,相當(dāng)于丟失了該用戶27號(hào)的數(shù)據(jù)。

case2(ODPS計(jì)算完成,但odps表同步habse表任務(wù)未完成):

28號(hào)凌晨,ODPS的計(jì)算已完成,odps表正在同步數(shù)據(jù)到hbase表期間,如果Flink發(fā)生了查詢,期望獲取用戶27號(hào)的最新數(shù)據(jù),但由于還沒有更新完成,還是會(huì)用26號(hào)的數(shù)據(jù),會(huì)造成類似的錯(cuò)誤結(jié)果。

本文轉(zhuǎn)載自微信公眾號(hào)「滌生大數(shù)據(jù)」,作者「滌生-莫哥」,可以通過以下二維碼關(guān)注。

轉(zhuǎn)載本文請(qǐng)聯(lián)系「滌生大數(shù)據(jù)」公眾號(hào)。

責(zé)任編輯:武曉燕 來源: 滌生大數(shù)據(jù)
相關(guān)推薦

2024-06-03 08:26:35

2024-06-06 08:58:08

大數(shù)據(jù)SQLAPI

2009-11-05 08:46:10

WCF與ExtJs

2024-06-05 09:16:54

開源工具Airflow

2017-09-18 17:59:23

Hadoop數(shù)據(jù)分析

2013-01-21 09:31:22

大數(shù)據(jù)分析大數(shù)據(jù)實(shí)時(shí)分析云計(jì)算

2017-01-15 13:45:20

Docker大數(shù)據(jù)京東

2021-06-04 07:24:14

Flink CDC數(shù)據(jù)

2014-08-20 09:40:56

大數(shù)據(jù)實(shí)踐項(xiàng)目

2016-05-09 10:16:14

MapReduce數(shù)據(jù)分析明星微博

2024-09-11 14:47:00

2017-10-11 11:10:02

Spark Strea大數(shù)據(jù)流式處理

2010-05-11 09:29:44

數(shù)據(jù)中心節(jié)能項(xiàng)目

2021-07-05 10:48:42

大數(shù)據(jù)實(shí)時(shí)計(jì)算

2024-06-04 14:10:00

FlinkSQL窗口大數(shù)據(jù)

2016-11-02 09:02:56

交通大數(shù)據(jù)計(jì)算

2021-03-10 14:04:10

大數(shù)據(jù)計(jì)算技術(shù)

2017-03-13 09:48:26

pysparkhive數(shù)據(jù)

2023-02-26 00:12:10

Hadoop數(shù)據(jù)湖存儲(chǔ)

2012-09-29 09:23:32

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)