分布式程序員必讀:你與分布式和事務(wù)真的很熟嗎?
微吐槽
hello,world.
不想了,我等碼農(nóng),還是看看怎么來處理分布式系統(tǒng)中的事務(wù)這個老大難吧!
- 本文略長,讀者需要有一定耐心,如果你是高級碼農(nóng)或者架構(gòu)師級別,你可以跳過。
- 本文注重實戰(zhàn)或者實現(xiàn),不涉及CAP,略提ACID。
- 本文適合基礎(chǔ)分布式程序員:
- 本文會涉及集群中節(jié)點的failover和recover問題.
- 本文會涉及事務(wù)及不透明事務(wù)的問題.
- 本文會提到微博和tweeter,并引出一個大數(shù)據(jù)問題.
由于分布式這個話題太大,事務(wù)這個話題也太大,我們從一個集群的一個小小節(jié)點開始談起。
集群中存活的節(jié)點與同步
分布式系統(tǒng)中,如何判斷一個節(jié)點(node)是否存活?
kafka這樣認(rèn)為:
- 此節(jié)點和zookeeper能喊話.(Keep sessions with zookeeper through heartbeats.)
- 此節(jié)點如果是個從節(jié)點,必須能夠盡可能忠實地反映主節(jié)點的數(shù)據(jù)變化。
也就是說,必須能夠在主節(jié)點寫了新數(shù)據(jù)后,及時復(fù)制這些變化的數(shù)據(jù),所謂及時,不能拉下太多哦.
那么,符合上面兩個條件的節(jié)點就可以認(rèn)為是存活的,也可以認(rèn)為是同步的(in-sync).
關(guān)于第1點,大家對心跳都很熟悉,那么我們可以這樣認(rèn)為某個節(jié)點不能和zookeeper喊話了:
- zookeeper-node:
- var timer =
- new timer()
- .setInterval(10sec)
- .onTime(slave-nodes,function(slave-nodes){
- slave-nodes.forEach( node -> {
- boolean isAlive = node.heartbeatACK(15sec);
- if(!isAlive) {
- node.numNotAlive += 1;
- if(node.numNotAlive >= 3) {
- node.declareDeadOrFailed();
- slave-nodes.remove(node);
- //回調(diào)也可 leader-node-app.notifyNodeDeadOrFailed(node)
- }
- }else
- node.numNotAlive = 0;
- });
- });
- timer.run();
- //你可以回調(diào)也可以像下面這樣簡單的計時判斷
- leader-node-app:
- var timer =
- new timer()
- .setInterval(10sec)
- .onTime(slave-nodes,function(slave-nodes){
- slave-nodes.forEach(node -> {
- if(node.isDeadOrFailed) {
- //node不能和zookeeper喊話了
- }
- });
- });
- timer.run();
關(guān)于第二點,要稍微復(fù)雜點了,怎么搞呢?
來這么分析:
- 數(shù)據(jù) messages.
- 操作 op-log.
- 偏移 position/offset.
- // 1. 先考慮messages
- // 2. 再考慮log的postion或者offset
- // 3. 考慮msg和off都記錄在同源數(shù)據(jù)庫或者存儲設(shè)備上.(database or storage-device.)
- var timer =
- new timer()
- .setInterval(10sec)
- .onTime(slave-nodes,function(nodes){
- var core-of-cpu = 8;
- //嫌慢就并發(fā)唄 mod hash go!
- nodes.groupParallel(core-of-cpu)
- .forEach(node -> {
- boolean nodeSucked = false;
- if(node.ackTimeDiff > 30sec) {
- //30秒內(nèi)沒有回復(fù),node卡住了
- nodeSucked = true;
- }
- if(node.logOffsetDiff > 100) {
- //node復(fù)制跟不上了,差距超過100條數(shù)據(jù)
- nodeSucked = true;
- }
- if(nodeSucked) {
- //總之node“死”掉了,其實到底死沒死,誰知道呢?network-error在分布式系統(tǒng)中或者節(jié)點失敗這個事情是正?,F(xiàn)象.
- node.declareDeadOrFailed();
- //不和你玩啦,集群不要你了
- nodes.remove(node);
- //該怎么處理呢,拋個事件吧.
- fire-event-NodeDeadOrFailed(node);
- }
- });
- });
- timer.run();
上面的節(jié)點的狀態(tài)管理一般由zookeeper來做,leader或者master節(jié)點也會維護(hù)那么點狀態(tài)。
那么應(yīng)用中的leader或者master節(jié)點,只需要從zookeeper拉狀態(tài)就可以,同時,上面的實現(xiàn)是不是一定最佳呢?不是的,而且多數(shù)操作可以合起來,但為了描述節(jié)點是否存活這個事兒,咱們這么寫沒啥問題。
#p#
節(jié)點死掉、失敗、不同步了,咋處理呢?
好嘛,終于說到failover和recover了,那failover比較簡單,因為還有其它的slave節(jié)點在,不影響數(shù)據(jù)讀取。
- 同時多個slave節(jié)點失敗了?沒有100%的可用性.數(shù)據(jù)中心和機房癱瘓、網(wǎng)絡(luò)電纜切斷、hacker入侵刪了你的根,總之你rp爆表了.
- 如果主節(jié)點失敗了,那master-master不行嘛?keep-alived或者LVS或者你自己寫failover吧.高可用架構(gòu)(HA)又是個大件兒了,此文不展開了。
我們來關(guān)注下recover方面的東西,這里把視野打開點,不僅關(guān)注slave節(jié)點重啟后追log來同步數(shù)據(jù),我們看下在實際應(yīng)用中,數(shù)據(jù)請求(包括讀、寫、更新)失敗怎么辦?
大家可能都會說,重試(retry)唄、重放(replay)唄或者干脆不管了唄!
行,都行,這些都是策略,但具體怎么個搞法,你真的清楚了?
一個bigdata問題
我們先擺個探討的背景:
問題:消息流,比如微博的微博(真繞),源源不斷地流進(jìn)我們的應(yīng)用中,要處理這些消息,有個需求是這樣的:
Reach is the number of unique people exposed to a URL on Twitter. 那么,統(tǒng)計一下3小時內(nèi)的本條微博(url)的reach總數(shù)。 |
怎么解決呢?
把某時間段內(nèi)轉(zhuǎn)發(fā)過某條微博(url)的人拉出來,把這些人的粉絲拉出來,去掉重復(fù)的人,然后求總數(shù),就是要求的reach. |
為了簡單,我們忽略掉日期,先看看這個方法行不行:
- /** ---------------------------------
- * 1. 求出轉(zhuǎn)發(fā)微博(url)的大V.
- * __________________________________*/
- 方法 :getUrlToTweetersMap(String url_id)
- SQL : /* 數(shù)據(jù)庫A,表url_user存儲了轉(zhuǎn)發(fā)某url的user */
- SELECT url_user.user_id as tweeter_id
- FROM url_user
- WHERE url_user.url_id = ${url_id}
- 返回 :[user_1,...,user_m]
- /** ---------------------------------
- * 2. 求出大V的粉絲
- * __________________________________*/
- 方法 : getFollowers(String tweeter_id);
- SQL : /* 數(shù)據(jù)庫B */
- SELECT users.id as user_id
- FROM users
- WHERE users.followee_id = ${tweeter_id}
- 返回:tweeter的粉絲
- /** ---------------------------------
- * 3. 求出Reach
- * __________________________________*/
- var url = queryArgs.getUrl();
- var tweeters = getUrlToTweetersMap();
- var result = new HashMap<String,Integer>();
- tweeters.forEach(t -> {
- // 你可以批量in + 并發(fā)讀來優(yōu)化下面方法的性能
- var followers = getFollowers(t.tweeter_id);
- followers.forEach(f -> {
- //hash去重
- result.put(f.user_id,1);
- });
- });
- //Reach
- return result.size();
頂呱呱,無論如何,求出了Reach啊!
#p#
其實這又引出了一個很重要的問題,也是很多大談框架、設(shè)計、模式卻往往忽視的問題:性能和數(shù)據(jù)庫建模的關(guān)系。
1.數(shù)據(jù)量有多大?
不知道讀者有木有對這個問題的數(shù)據(jù)庫I/O有點想法,或者虎軀一震呢?
Computing reach is too intense for a single machine – it can require thousands of database calls and tens of millions of tuples.
在上面的數(shù)據(jù)庫設(shè)計中避免了JOIN,為了提高求大V粉絲的性能,可以將一批大V作為batch/bulk,然后多個batch并發(fā)讀,誓死搞死數(shù)據(jù)庫。
這里將微博到轉(zhuǎn)發(fā)者表所在的庫,與粉絲庫分離,如果數(shù)據(jù)更大怎么辦?
庫再分表...
OK,假設(shè)你已經(jīng)非常熟悉傳統(tǒng)關(guān)系型數(shù)據(jù)庫的分庫分表及數(shù)據(jù)路由(讀路徑的聚合、寫路徑的分發(fā))、或者你對于sharding技術(shù)也很熟悉、或者你良好的結(jié)合了HBase的橫向擴展能力并有一致性策略來解決其二級索引問題.
總之,存儲和讀取的問題假設(shè)你已經(jīng)解決了,那么分布式計算呢?
2.微博這種應(yīng)用,人與人之間的關(guān)系成圖狀(網(wǎng)),你怎么建模存儲?而不僅僅對應(yīng)這個問題,比如:
某人的好友的好友可能和某人有幾分相熟?
看看用storm怎么來解決分布式計算,并提供流式計算的能力:
- // url到大V -> 數(shù)據(jù)庫1
- TridentState urlToTweeters =
- topology.newStaticState(getUrlToTweetersState());
- // 大V到粉絲 -> 數(shù)據(jù)庫2
- TridentState tweetersToFollowers =
- topology.newStaticState(getTweeterToFollowersState());
- topology.newDRPCStream("reach")
- .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
- .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
- .shuffle() /* 大V的粉絲很多,所以需要分布式處理*/
- .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
- .parallelismHint(200) /* 粉絲很多,所以需要高并發(fā) */
- .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
- .groupBy(new Fields("follower"))
- .aggregate(new One(), new Fields("one")) /* 去重 */
- .parallelismHint(20)
- .aggregate(new Count(), new Fields("reach")); /* 計算reach數(shù) */
最多處理一次(At most once)
回到主題,引出上面的例子,一是為了引出一個有關(guān)分布式(存儲+計算)的問題,二是透漏這么點意思:
碼農(nóng),就應(yīng)該關(guān)注設(shè)計和實現(xiàn)的東西,比如Jay Kreps是如何發(fā)明Kafka這個輪子的 : ]
如果你還是碼農(nóng)級別,咱來務(wù)點實吧,前面我們說到recover,節(jié)點恢復(fù)的問題,那么我們恢復(fù)幾個東西?
基本的:
- 節(jié)點狀態(tài)
- 節(jié)點數(shù)據(jù)
本篇從數(shù)據(jù)上來討論下這個問題,為使問題再簡單點,我們考慮寫數(shù)據(jù)的場景,如果我們用write-ahead-log的方式來保證數(shù)據(jù)復(fù)制和一致性,那么我們會怎么處理一致性問題呢?
主節(jié)點有新數(shù)據(jù)寫入.
從節(jié)點追log,準(zhǔn)備復(fù)制這批新數(shù)據(jù)。從節(jié)點做兩件事:
(1). 把數(shù)據(jù)的id偏移寫入log;
(2). 正要處理數(shù)據(jù)本身,從節(jié)點掛了。
那么根據(jù)上文的節(jié)點存活條件,這個從節(jié)點掛了這件事被探測到了,從節(jié)點由維護(hù)人員手動或者其自己恢復(fù)了,那么在加入集群和小伙伴們繼續(xù)玩耍之前,它要同步自己的狀態(tài)和數(shù)據(jù)。
問題來了:
如果根據(jù)log內(nèi)的數(shù)據(jù)偏移來同步數(shù)據(jù),那么,因為這個節(jié)點在處理數(shù)據(jù)之前就把偏移寫好了,可是那批數(shù)據(jù)lost-datas沒有得到處理,如果追log之后的數(shù)據(jù)來同步,那么那批數(shù)據(jù)lost-datas就丟了。
在這種情況下,就叫作數(shù)據(jù)最多處理一次,也就是說數(shù)據(jù)會丟失。
最少處理一次(At least once)
好吧,丟失數(shù)據(jù)不能容忍,那么我們換種方式來處理:
1.主節(jié)點有新數(shù)據(jù)寫入.
2.從節(jié)點追log,準(zhǔn)備復(fù)制這批新數(shù)據(jù)。從節(jié)點做兩件事:
(1). 先處理數(shù)據(jù);
(2). 正要把數(shù)據(jù)的id偏移寫入log,從節(jié)點掛了。
問題又來了:
如果從節(jié)點追log來同步數(shù)據(jù),那么因為那批數(shù)據(jù)duplicated-datas被處理過了,而數(shù)據(jù)偏移沒有反映到log中,如果這樣追,會導(dǎo)致這批數(shù)據(jù)重復(fù)。 這種場景,從語義上來講,就是數(shù)據(jù)最少處理一次,意味著數(shù)據(jù)處理會重復(fù)。 |
#p#
僅處理一次(Exactly once)
Transaction
好吧,數(shù)據(jù)重復(fù)也不能容忍?要求挺高啊。
大家都追求的強一致性保證(這里是最終一致性),怎么來搞呢?
換句話說,在更新數(shù)據(jù)的時候,事務(wù)能力如何保障呢?
假設(shè)一批數(shù)據(jù)如下:
// 新到數(shù)據(jù) { transactionId:4 urlId:99 reach:5 }
現(xiàn)在要更新這批數(shù)據(jù)到庫里或者log里,那么原來的情況是:
// 老數(shù)據(jù) { transactionId:3 urlId:99 reach:3 }
如果說可以保證如下三點:
事務(wù)ID的生成是強有序的.(隔離性,串行)
同一個事務(wù)ID對應(yīng)的一批數(shù)據(jù)相同.(冪等性,多次操作一個結(jié)果)
單條數(shù)據(jù)會且僅會出現(xiàn)在某批數(shù)據(jù)中.(一致性,無遺漏無重復(fù))
那么,放心大膽的更新好了:
// 更新后數(shù)據(jù) { transactionId:4 urlId:99 //3 + 5 = 8 reach:8 }
注意到這個更新是ID偏移和數(shù)據(jù)一起更新的,那么這個操作靠什么來保證:原子性。 你的數(shù)據(jù)庫不提供原子性?后文略有提及。 |
這里是更新成功了。如果更新的時候,節(jié)點掛了,那么庫里或者log里的id偏移不寫,數(shù)據(jù)也不處理,等節(jié)點恢復(fù),就可以放心去同步,然后加入集群玩耍了。
所以說,要保證數(shù)據(jù)僅處理一次,還是挺困難的吧?
上面的保障“僅處理一次”這個語義的實現(xiàn)有什么問題呢?
性能問題。
這里已經(jīng)使用了batch策略來減少到庫或磁盤的Round-Trip Time,那么這里的性能問題是什么呢?
考慮一下,采用master-master架構(gòu)來保證主節(jié)點的可用性,但是一個主節(jié)點失敗了,到另一個主節(jié)點主持工作,是需要時間的。 假設(shè)從節(jié)點正在同步,啪!主節(jié)點掛了!因為要保證僅處理一次的語義,所以原子性發(fā)揮作用,失敗,回滾,然后從主節(jié)點拉失敗的數(shù)據(jù)(你不能就近更新,因為這批數(shù)據(jù)可能已經(jīng)變化了,或者你根本沒緩存本批數(shù)據(jù)),結(jié)果是什么呢? 老主節(jié)點掛了, 新的主節(jié)點還沒啟動,所以這次事務(wù)就卡在這里,直到數(shù)據(jù)同步的源——主節(jié)點可以響應(yīng)請求。 |
如果不考慮性能,就此作罷,這也不是什么大事。
你似乎意猶未盡?來吧,看看“銀彈”是什么?
Opaque-Transaction
現(xiàn)在,我們來追求這樣一種效果:
某條數(shù)據(jù)在一批數(shù)據(jù)中(這批數(shù)據(jù)對應(yīng)著一個事務(wù)),很可能會失敗,但是它會在另一批數(shù)據(jù)中成功。
換句話說,一批數(shù)據(jù)的事務(wù)ID一定相同。 |
來看看例子吧,老數(shù)據(jù)不變,只是多了個字段:prevReach。
// 老數(shù)據(jù) { transactionId:3 urlId:99 //注意這里多了個字段,表示之前的reach的值 prevReach:2 reach:3 } // 新到數(shù)據(jù) { transactionId:4 urlId:99 reach:5 }
這種情況,新事務(wù)的ID更大、更靠后,表明新事務(wù)可以執(zhí)行,還等什么,直接更新,更新后數(shù)據(jù)如下:
// 新到數(shù)據(jù) { transactionId:4 urlId:99 //注意這里更新為之前的值 prevReach:3 //3 + 5 = 8 reach:8 }
現(xiàn)在來看下另外的情況:
// 老數(shù)據(jù) { transactionId:3 urlId:99 prevReach:2 reach:3 } // 新到數(shù)據(jù) { //注意事務(wù)ID為3,和老數(shù)據(jù)中的事務(wù)ID相同 transactionId:3 urlId:99 reach:5 }
這種情況怎么處理?是跳過嗎?因為新數(shù)據(jù)的事務(wù)ID和庫里或者log里的事務(wù)ID相同,按事務(wù)要求這次數(shù)據(jù)應(yīng)該已經(jīng)處理過了,跳過?
#p#
不,這種事不能靠猜的,想想我們有的幾個性質(zhì),其中關(guān)鍵一點就是:
給定一批數(shù)據(jù),它們所屬的事務(wù)ID相同。
仔細(xì)體會下,上面那句話和下面這句話的差別: 給定一個事務(wù)ID,任何時候,其所關(guān)聯(lián)的那批數(shù)據(jù)相同。 |
我們應(yīng)該這么做,考慮到新到數(shù)據(jù)的事務(wù)ID和存儲中的事務(wù)ID一致,所以這批數(shù)據(jù)可能被分別或者異步處理了,但是,這批數(shù)據(jù)對應(yīng)的事務(wù)ID永遠(yuǎn)是同一個,那么,即使這批數(shù)據(jù)中的A部分先處理了,由于大家都是一個事務(wù)ID,那么A部分的前值是可靠的。
所以,我們將依靠prevReach而不是Reach的值來更新:
// 更新后數(shù)據(jù) { transactionId:3 urlId:99 //這個值不變 prevReach:2 //2 + 5 = 7 reach:7 }
你發(fā)現(xiàn)了什么呢?
不同的事務(wù)ID,導(dǎo)致了不同的值:
當(dāng)事務(wù)ID為4,大于存儲中的事務(wù)ID3,Reach更新為3+5 = 8.
當(dāng)事務(wù)ID為3,等于存儲中的事務(wù)ID3,Reach更新為2+5 = 7.
這就是Opaque Transaction.
這種事務(wù)能力是最強的了,可以保證事務(wù)異步提交。所以不用擔(dān)心被卡住了,如果說集群中:
Transaction:
- 數(shù)據(jù)是分批處理的,每個事務(wù)ID對應(yīng)一批確定、相同的數(shù)據(jù).
- 保證事務(wù)ID的產(chǎn)生是強有序的.
- 保證分批的數(shù)據(jù)不重復(fù)、不遺漏.
- 如果事務(wù)失敗,數(shù)據(jù)源丟失,那么后續(xù)事務(wù)就卡住直到數(shù)據(jù)源恢復(fù).
Opaque-Transaction:
- 數(shù)據(jù)是分批處理的,每批數(shù)據(jù)有確定而唯一的事務(wù)ID.
- 保證事務(wù)ID的產(chǎn)生是強有序的.
- 保證分批的數(shù)據(jù)不重復(fù)、不遺漏.
- 如果事務(wù)失敗,數(shù)據(jù)源丟失,不影響后續(xù)事務(wù),除非后續(xù)事務(wù)的數(shù)據(jù)源也丟了.
其實這個全局ID的設(shè)計也是門藝術(shù):
- 冗余關(guān)聯(lián)表的ID,以減少join,做到O(1)取ID.
- 冗余日期(long型)字段,以避免order by.
- 冗余過濾字段,以避免無二級索引(HBase)的尷尬.
- 存儲mod-hash的值,以方便分庫、分表后,應(yīng)用層的數(shù)據(jù)路由書寫.
這個內(nèi)容也太多,話題也太大,就不在此展開了。
你現(xiàn)在知道twitter的snowflake生成全局唯一且有序的ID的重要性了。
兩階段提交
現(xiàn)在用zookeeper來做兩階段提交已經(jīng)是入門級技術(shù),所以也不展開了。
如果你的數(shù)據(jù)庫不支持原子操作,那么考慮兩階段提交吧。
To be continued.
博文出處:http://www.cnblogs.com/foreach-break/p/distributed_system_and_transaction.html