從盤古開天辟地說起為什么 Flink CP 能實(shí)現(xiàn)精確一次之二
為什么我要把這段話放在最前面呢?因?yàn)椴┲饔辛舜蟀l(fā)現(xiàn),博主在總結(jié)學(xué)習(xí)的過程中,總結(jié)了除了 Flink CP、Chandy-Lamport 全局一致性快照算法之外的一種 通用全局一致性快照算法!!!。
這套 通用算法 包含 Chandy-Lamport 算法 ≈ Flink 非對(duì)齊 CP 算法 包含 Flink 對(duì)齊 CP 算法。
可能這一套 通用算法 之前已經(jīng)有人提過了,但是博主是自己在總結(jié) Flink CP、Chandy-Lamport 算法的過程中,逆推總結(jié)出來(lái)的,并沒有借助外力!!!
1.前言
對(duì)于很多做離線或者實(shí)時(shí)數(shù)倉(cāng)的小伙伴來(lái)說,我先問幾個(gè)問題,看看小伙伴萌能回答上來(lái)嗎?
- ? 你知道狀態(tài)是什么嗎?在離線數(shù)據(jù)開發(fā)的經(jīng)歷中,你碰到過狀態(tài)的概念嗎?
- ? 為什么離線數(shù)倉(cāng)不需要狀態(tài),實(shí)時(shí)數(shù)據(jù)開發(fā)中老是提到狀態(tài)的概念?
- ? Flink 中的狀態(tài)、狀態(tài)后端、全局一致性快照(Checkpoint\Savepoint) 的作用都是什么,這三個(gè)概念的關(guān)聯(lián)又是什么?
- ? Flink 是通過什么機(jī)制來(lái)做 Checkpoint 的?為什么這套機(jī)制能夠做到故障恢復(fù)呢?
- ? Flink Checkpoint 是基于 Chandy-Lamport 算法的,但是 Flink 的實(shí)現(xiàn)相比 Chandy-Lamport 算法之間又有哪些優(yōu)點(diǎn)、缺點(diǎn)?
- ? Flink Checkpoint 用到了 barrier,為什么用了 barrier 做的快照就能保證全局一致性快照的正確性?barrier 到底起到了什么作用?
- ? Flink 對(duì)齊 Checkpoint 和非對(duì)齊 Checkpoint 的區(qū)別是什么?非對(duì)齊 Checkpoint 也能保障精確一次嗎?
小伙伴們思考一下,都能回答上來(lái)么,如果對(duì)于某些問題你還有疑問,樓主會(huì)通過本篇文章幫你解答這些問題,理清這些概念!
由于本文內(nèi)容較多,所以博主將本文分為上,中,下三集,本文是中,三集內(nèi)容是有連接關(guān)系的,如果小伙伴在看本文的過程中對(duì)有些概念不清楚,可以跳轉(zhuǎn)到上文進(jìn)行查看:
本文最主要的內(nèi)容就是解釋了:
一個(gè)分布式應(yīng)用是怎么異步做一個(gè)全局一致性快照?
2.名詞解釋
- ? Single-Token conservation:一個(gè)最常見的分布式應(yīng)用,單 Token 流轉(zhuǎn)分布式應(yīng)用
- ? Process:指分布式應(yīng)用中的進(jìn)程,舉個(gè) Flink 中的例子就是 TaskManager
- ? Channel:指分布式應(yīng)用中進(jìn)程之間的傳輸通道,舉個(gè) Flink 中的例子就是 TaskManager 之間傳輸數(shù)據(jù)的網(wǎng)絡(luò)傳輸通道
3.分布式應(yīng)用全局一致性快照要記錄的狀態(tài)內(nèi)容
首先在分析一個(gè)復(fù)雜的大數(shù)據(jù)應(yīng)用的全局一致性快照之前,我們先以最簡(jiǎn)單的分布式應(yīng)用為例。
Single-Token conservation:其有 p 和 q 兩個(gè)進(jìn)程,p 可以通過 Channel pq(記為 Cpq) 向 q 發(fā)消息,q 可以通過 Channel qp(記為 Cqp) 向 p 發(fā)消息,其中有一個(gè)叫 token 的消息,在這個(gè)系統(tǒng)中一直不停的傳輸流轉(zhuǎn),從 p 到 q,再?gòu)?q 到 p。
- ? 首先我們來(lái)分析這個(gè)應(yīng)用中,全局一致性快照應(yīng)該包含哪些內(nèi)容?
- ? 結(jié)論:全局一致性快照 = Process 狀態(tài) + Channel 狀態(tài)。
- ? 原因:以上面的四幅圖為例,每一幅圖代表一個(gè)時(shí)刻,如果我們以拍照這種方式做全局一致性快照來(lái)理解時(shí),那么同一時(shí)刻,Process 和 Channel 同時(shí)都會(huì)存在數(shù)據(jù),這些數(shù)據(jù)都是作為全局一致性快照的一部分內(nèi)容。
使用上述的這個(gè)結(jié)論,我們可以得到上圖 Single-Token conservation 示例中的全局一致性快照 S = S(p) + S(Cpq) + S(q) + S(Cqp)
其中:
- S:全局一致性快照
- S(p):p 進(jìn)程的狀態(tài)
- S(Cpq):p 進(jìn)程到 q 進(jìn)程的 Channel 狀態(tài)
- S(q):q 進(jìn)程的狀態(tài)
- S(Cqp):q 進(jìn)程到 p 進(jìn)程的 Channel 狀態(tài)
這里就碰到了我們要分析的關(guān)鍵問題:做全局一致性快照時(shí),小伙伴萌都容易理解S(p),S(q)這兩個(gè),因?yàn)檫@兩份狀態(tài)數(shù)據(jù)就真實(shí)的存在我們的分布式應(yīng)用中,但是S(Cpq),S(Cqp)這兩個(gè)怎么理解呢?這些數(shù)據(jù)都是在網(wǎng)絡(luò)中傳輸啊,我們做全局一致性快照時(shí)用啥方法才能把這些數(shù)據(jù)也記錄下來(lái)呢?接下來(lái)詳細(xì)講講博主的理解
4.Process 狀態(tài)記錄的內(nèi)容
記錄和實(shí)際業(yè)務(wù)相關(guān)的狀態(tài)內(nèi)容。舉例:id 去重就存儲(chǔ)歷史所有的 id 就可以了。
5.Channel 狀態(tài)記錄的內(nèi)容
還是以前文的 Single-Token conservation 為例:
token 在 p 時(shí)(對(duì)應(yīng)第一張圖),這時(shí)的全局一致性快照為:
S(token-in-p) = S(p-token-in-p) + S(Cpq-token-in-p) + S(q-token-in-p) + S(Cqp-token-in-p)
其中:
- S(token-in-p):token 在 p 時(shí),做的全局一致性快照
- S(p-token-in-p):token 在 p 時(shí),p 進(jìn)程的狀態(tài)
- S(Cpq-token-in-p):token 在 p 時(shí),p 進(jìn)程到 q 進(jìn)程的 Channel 狀態(tài)
- S(q-token-in-p):token 在 p 時(shí),q 進(jìn)程的狀態(tài)
- S(Cqp-token-in-p):token 在 p 時(shí),q 進(jìn)程到 p 進(jìn)程的 Channel 狀態(tài)
其中 S(p-token-in-p) 好理解,做快照時(shí),token 還沒有從 p 發(fā)出去,p 肯定知道 token 還在 p;但是站在 Cpq 做狀態(tài)時(shí)來(lái)說:Cpq 做狀態(tài)時(shí),怎么保障 Cpq 知道 token in p?
在分析上面這個(gè)問題前,博主先使用 數(shù)學(xué)的方式 分析一下 S(Cpq) 到底記錄了哪些內(nèi)容。
? 第一步:定義變量
- n:在 p 的狀態(tài)記錄前,p 記錄的 p 發(fā)往 Cpq 的 msg 數(shù);
- n′:在 Cpq 的狀態(tài)記錄前,Cpq 記錄的 p 發(fā)往 Cpq 的 msg 數(shù);
- m:在 q 的狀態(tài)記錄前,q 記錄的 q 從 Cpq 中接收到的 msg 數(shù);
- m′:在 Cpq 的狀態(tài)記錄前,Cpq 記錄的 q 從 Cpq 中接收到的 msg 數(shù);
? 第二步:提出假設(shè)
- 假設(shè) Channel 和 Process 一樣,也可以自主的去將做快照時(shí) Channel 中進(jìn)行網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)作為狀態(tài)保存下來(lái);對(duì)應(yīng)到上述案例中就是 Cpq 可以主動(dòng)的將做好的 S(Cpq) 狀態(tài)保存下來(lái);
? 第三步:先說結(jié)論
- Cpq 記錄S(Cpq)時(shí),必然會(huì)有 n = n' ≥ m = m';
- 一個(gè) Channel 要記錄的狀態(tài)是,它 sender 記錄自己狀態(tài)之前 channel 所接收到 sender 發(fā)的的 msg 列表,再減去 receiver 記錄自己狀態(tài)之前 channel 已經(jīng)發(fā)給 receiver 的 msg 列表,減去的之后的 msg 就是還在 Channel 中的數(shù)據(jù),這些數(shù)據(jù)是需要 Channel 作為狀態(tài)記錄下來(lái)的。
- 而如果 n′ = m′,那么 Channel c 中要記錄的 msg 列表就是 empty 列表。如果 n′ > m′,那么要記錄的列表是 (m′+1),…n′ 號(hào)消息對(duì)應(yīng)的 msg 列表。
? 第四步:給出證明
首先:n = n',利用反證法:如果 n != n',則會(huì)有:
1.n > n' 時(shí),假設(shè):
- n = 10(p 記錄狀態(tài)前,p 記錄 p 發(fā)往 Cpq msg 數(shù)為 10(msg 編號(hào) 1 - 10));
- n' = 7(Cpq 記錄狀態(tài)前,Cpq 記錄 p 發(fā)往 Cpq 的 msg 數(shù)為 7(msg 編號(hào) 1 - 7));
- 那么假設(shè) token 這條 msg 的編號(hào)為 9,就會(huì)出現(xiàn) p 記錄的狀態(tài)為S(p-token-in-Cpq),Cpq 記錄的狀態(tài)為S(p-token-in-p),實(shí)際這是不符合全局一致性快照的要求的;
2.n < n' 時(shí),假設(shè):
- n = 7(p 記錄狀態(tài)前,p 記錄 p 發(fā)往 Cpq msg 數(shù)為 7(編號(hào) 1 - 7));
- n' = 10(Cpq 記錄狀態(tài)前,Cpq 記錄 p 發(fā)往 Cpq 的 msg 數(shù)為 10(編號(hào) 1 - 10));
- 那么假設(shè) token 這條 msg 的編號(hào)為 9,就會(huì)出現(xiàn) p 記錄的狀態(tài)為S(p-token-in-p),Cpq 記錄的狀態(tài)為S(p-token-in-Cpq),實(shí)際這是不符合全局一致性快照的要求的;
3.n = n' 時(shí),假設(shè):
- p 做出S(p-token-in-p)的狀態(tài)時(shí),因?yàn)?n = n',這就代表 p 沒有把 token 發(fā)出去,Cpq 也沒有接受到 token,Cpq 就知道 token 沒有發(fā)過來(lái),則只有這種情況可以滿足S(Cpq-token-in-p);
其次:m = m',同樣利用反證法可以得到,下文只舉 m > m' 的案例:
1.m > m' 時(shí):
- n = n' = m = 10(q 記錄狀態(tài)前,Cpq 記錄 q 從 Cpq 接收到的 msg 數(shù)為 10(編號(hào) 1 - 10));
- m' = 7(Cpq記錄狀態(tài)前,Cpq 記錄的 q 從 Cpq 接收到的 msg 數(shù)為 7(編號(hào) 1 - 7));
- 那么假設(shè) token 這條 msg 的編號(hào)為 9,就會(huì)出現(xiàn) Cpq 記錄的狀態(tài)為S(Cpq-token-in-Cpq),q 記錄的狀態(tài)為S(q-token-in-p),實(shí)際這是不符合全局一致性快照的要求的;
最后:n' ≥ m' and n ≥ m:同樣可以利用反證法得到,此處不再舉例。
4.? 第五步:解答 2.4 節(jié)提出的 Cpq 怎么知道 token in p 的問題
通過 n = n' ≥ m = m' 其實(shí)就可以推論出 Cpq 一定會(huì)知道 token in p。
為了幫大家更容易的理解一個(gè)分布式應(yīng)用包含的全局一致性快照包含的數(shù)據(jù)內(nèi)容,接下來(lái)我用偽代碼描述一下,會(huì)比文字更好理解~
6.偽代碼描述一個(gè)分布式應(yīng)用全局一致快照包含的數(shù)據(jù)內(nèi)容
偽代碼如下:
// S_all 即一個(gè)分布式應(yīng)用的全局一致性快照
S_all = null;
// 假設(shè)總共有 x 個(gè) Process,S_all 先把所有 Process 的狀態(tài)記錄下來(lái)
for (int i = 1; i <= x; i++) {
// 第 i 個(gè) Process 的狀態(tài)為 S_P_i,直接按照 += 寫,勿噴
S_all += S_P_i;
}
// 假設(shè)總共有 y 個(gè) Channel,S_all 把所有 Channel 的狀態(tài)記錄下來(lái)
for (int i = 1; i <= y; i++) {
// 1. S_C_i:第 i 個(gè) Channel 的狀態(tài)
// 2. m_i:第 i 個(gè) Channel 做快照前,發(fā)往下游 Process 的消息(數(shù)據(jù))編號(hào),m_i 其實(shí)就是上文變量中的 m
// 3. n_i:第 i 個(gè) Channel 做快照前,接受上游 Process 的消息(數(shù)據(jù))編號(hào),n_i 其實(shí)就是上文變量中的 n
// 4. 需要注意,每一個(gè) Channel 的 m_i 和 n_i 的數(shù)值都可能是不一樣的
// 5. Message[m_i + 1] :代表編號(hào)為 m_i + 1 的那條消息(數(shù)據(jù))。舉例 Message[0] 代表編號(hào)為 0 的那條消息(數(shù)據(jù))
S_C_i = Message[m_i + 1] + ... + Message[n_i];
S_all += S_C_i;
}
// 狀態(tài)做完了
7.怎樣去記錄 Channel 的狀態(tài)?
通過上面的分析,我們已經(jīng)討論得到了S(Cpq)都包含了什么內(nèi)容,并且其之間要滿足什么樣的數(shù)學(xué)關(guān)系。但是在現(xiàn)實(shí)實(shí)際生活中,消息在 Channel 上傳輸(光纖上傳輸)時(shí),我們是無(wú)法記錄這些消息作為 Channel 的狀態(tài)的。
那么有沒有什么思路可以讓我們也能夠去記錄 Channel 的消息呢?
當(dāng)然有。
因?yàn)橹灰覀兎植际綉?yīng)用的傳輸這些消息的光纖沒有被挖斷,消息終究會(huì)通過 Channel 到達(dá) Process 的,因此我們就可以自然的想到其實(shí)可以在消息傳輸 終點(diǎn)的 Process 去記錄這些消息作為 Channel 的狀態(tài)。對(duì)應(yīng)到上述的 Single-Token conservation 案例來(lái)說,我們可以在 q 中記錄 Channel pq 的S(Cpq),在 p 中記錄 Channel pq 的S(Cqp)。
如果是按照這個(gè)思路去分析的話,上面那段偽代碼就可以簡(jiǎn)化為下面這樣:
// S_all 即全局一致性快照
S_all = null;
// 假設(shè)總共有 x 個(gè) Process
for (int i = 1; i <= x; i++) {
// S_i_all:第 i 個(gè) Process 要記錄的所有狀態(tài)
S_i_all = null;
// S_P_i:第 i 個(gè) process 的狀態(tài)
S_i_all += S_P_i; // 【直接按照 += 寫,勿噴】
// 第 i 個(gè) Process 總共有 y 個(gè)輸入 channel,下文中 j 即指代第 i 個(gè) Process 的第 j 個(gè)上游 Channel
for (int j = 1; j <= y; j++) {
// 1.S_C_j:第 j 個(gè) Channel 的狀態(tài)
// 2.m_j:第 j 個(gè) Channel 做快照前,發(fā)往下游 Process 的消息(數(shù)據(jù))編號(hào)
// 3.n_j:第 j 個(gè) Channel 做快照前,接受上游 Process 的消息(數(shù)據(jù))編號(hào)
S_C_j = Message[m_j + 1] + ... + Message[n_j];
S_i_all += S_C_j;
}
S_all += S_i_all;
}
// 狀態(tài)做完了
解釋一下上面的偽代碼:
1.S_all:所有的進(jìn)程記錄的狀態(tài)之和,即所有S_i_all之和
2.S_i_all:每一個(gè)進(jìn)程要記錄的所有狀態(tài)之和
3.Process i 在做S_i_all其實(shí)只有一個(gè)變量在做快照時(shí)是不知道到的,那就是 n_j(即第 i 個(gè) channel 做快照前,接受到 j(上游) 的消息個(gè)數(shù))
- S_P_i是 Process 自己的狀態(tài),所以是明確已知的
- S_C_j是 Channel 狀態(tài),由Message[m_j + 1] + ... + Message[n_j]組成,其中 m_j 的含義是這個(gè) Channel j 在做快照時(shí)發(fā)給當(dāng)前 Process i 的消息編號(hào),前文已經(jīng)介紹到 m = m',即 m_j 值就等于是是當(dāng)前 Process i 在做S_P_i時(shí),接收到上游 Channel 的消息編號(hào),所以 m_j 是明確已知的。但是 Process i 在做上游 Channel j 快照時(shí) n_j 是無(wú)法獲取到的。
那么 Process i 在做S_C_j如何獲取到 n_j 呢?
這里我們就可以想到 n = n':
- 因?yàn)?n = n',所以 n_j 的值就等于 Channel j 的上游 Process 在做快照時(shí),Process 發(fā)往 Channel j 的消息個(gè)數(shù);
- 所以我們可以認(rèn)為 n_j 是 Channel j 的輸入 Process 在做快照時(shí)已知的一個(gè)值,則只需要這個(gè) Process 把這個(gè)值發(fā)給 Process i 就行;
- Channel j 的輸入 Process 告知 Process i 的方式其做完快照之后,直接發(fā)一個(gè) marker 下去,這個(gè) marker 不會(huì)對(duì)計(jì)算有任何影響(即不會(huì)對(duì)狀態(tài)產(chǎn)生任何影響),marker 只是一個(gè)標(biāo)識(shí),當(dāng) Process i 接受到這個(gè) marker 時(shí),就知道 n_j 的具體值了。
大家注意到了沒,這個(gè) marker 其實(shí)就對(duì)應(yīng)到了 Flink 中 Checkpoint 的 barrier。
8.總結(jié)
本文主要講了一個(gè)分布式應(yīng)用異步做一個(gè)全局一致性快照的機(jī)制。
好,今天主要就講這么多,下集我們?cè)僬f說 Chandy-Lambort,F(xiàn)link CP 和今天介紹的全局一致性快照原理的關(guān)系。