自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

如何做到“恰好一次”地傳遞數(shù)十億條消息

大數(shù)據(jù)
在分布式領(lǐng)域中存在著三種類型的消息投遞語義,分別是:最多一次(at-most-once)、至少一次(at-least-once)和恰好一次(exactly-once)。本文作者介紹了一個(gè)利用Kafka和RocksDB來構(gòu)建的“恰好一次”消息去重系統(tǒng)的實(shí)現(xiàn)原理。

在分布式領(lǐng)域中存在著三種類型的消息投遞語義,分別是:最多一次(at-most-once)、至少一次(at-least-once)和恰好一次(exactly-once)。本文作者介紹了一個(gè)利用Kafka和RocksDB來構(gòu)建的“恰好一次”消息去重系統(tǒng)的實(shí)現(xiàn)原理。

對(duì)任何一個(gè)數(shù)據(jù)流水線的唯一要求就是不能丟失數(shù)據(jù)。數(shù)據(jù)通??梢员谎舆t或重新排序,但不能丟失。

為了滿足這一要求,大多數(shù)的分布式系統(tǒng)都能夠保證“至少一次”的投遞消息技術(shù)。實(shí)現(xiàn)“至少一次”的投遞技術(shù)通常就是:“重試、重試、再重試”。在你收到消費(fèi)者的確認(rèn)消息之前,你永遠(yuǎn)不要認(rèn)為消息已經(jīng)投遞過去。

但“至少一次”的投遞并不是用戶想要的。用戶希望消息被投遞一次,并且僅有一次。

然而,實(shí)現(xiàn)“恰好一次”的投遞需要***的設(shè)計(jì)。每種投遞失敗的情況都必須認(rèn)真考慮,并設(shè)計(jì)到架構(gòu)中去,因此它不能在事后“掛到”現(xiàn)有的實(shí)現(xiàn)上去。即使這樣,“只有一次”的投遞消息幾乎是不可能的。

在過去的三個(gè)月里,我們構(gòu)建了一個(gè)全新的去重系統(tǒng),以便在面對(duì)各種故障時(shí)能讓系統(tǒng)盡可能實(shí)現(xiàn)“恰好一次”的投遞。

新系統(tǒng)能夠跟蹤舊系統(tǒng)100倍的消息數(shù)量,并且可靠性也得到了提高,而付出的代價(jià)卻只有一點(diǎn)點(diǎn)。下面我們就開始介紹這個(gè)新系統(tǒng)。

問題所在

Segment內(nèi)部的大部分系統(tǒng)都是通過重試、消息重新投遞、鎖定和兩階段提交來優(yōu)雅地處理故障。但是,有一個(gè)特例,那就是將數(shù)據(jù)直接發(fā)送到公共API的客戶端程序。

客戶端(特別是移動(dòng)客戶端)經(jīng)常會(huì)發(fā)生網(wǎng)絡(luò)問題,有時(shí)候發(fā)送了數(shù)據(jù),卻沒有收到API的響應(yīng)。

想象一下,某天你乘坐公共汽車,在iPhone上使用HotelTonight軟件預(yù)訂房間。該應(yīng)用程序?qū)?shù)據(jù)上傳到了Segment的服務(wù)器上,但汽車突然進(jìn)入了隧道并失去了網(wǎng)絡(luò)連接。你發(fā)送的某些數(shù)據(jù)在服務(wù)器上已經(jīng)被處理,但客戶端卻無法收到服務(wù)器的響應(yīng)消息。

在這種情況下,即使服務(wù)器在技術(shù)上已經(jīng)收到了這些確切的消息,但客戶端也會(huì)進(jìn)行重試并將相同的消息重新發(fā)送給Segment的API。

從我們服務(wù)器的統(tǒng)計(jì)數(shù)據(jù)來看,在四個(gè)星期的窗口時(shí)間內(nèi),大約有0.6%的消息似乎是我們已經(jīng)收到過的重復(fù)消息。

這個(gè)錯(cuò)誤率聽起來可能并不是很高。但是,對(duì)于一個(gè)能創(chuàng)造數(shù)十億美元效益的電子商務(wù)應(yīng)用程序來說,0.6%的出入可能意味著盈利和數(shù)百萬美元損失之間的差別。

對(duì)消息進(jìn)行去重

現(xiàn)在,我們認(rèn)識(shí)到問題的癥結(jié)了,我們必須刪除發(fā)送到API的重復(fù)消息。但是,該怎么做呢?

最簡單的思路就是使用針對(duì)任何類型的去重系統(tǒng)的高級(jí)API。在Python中,我們可以將其表示為:

  1. def dedupe(stream): 
  2.  
  3.   for message in stream: 
  4.  
  5.     if has_seen(message.id):  
  6.  
  7.       discard(message) 
  8.  
  9.     else
  10.  
  11.       publish_and_commit(message)  

對(duì)于數(shù)據(jù)流中的每個(gè)消息,首先要把他的id(假設(shè)是唯一的)作為主鍵,檢查是否曾經(jīng)見過這個(gè)特定的消息。如果以前見過這個(gè)消息,則丟棄它。如果沒有,則是新的,我們應(yīng)重新發(fā)布這個(gè)消息并以原子的方式提交消息。

為了避免存儲(chǔ)所有的消息,我們會(huì)設(shè)置“去重窗口”這個(gè)參數(shù),這個(gè)參數(shù)定義了在消息過期之前key存儲(chǔ)的時(shí)長。只要消息落在窗口時(shí)間之外,我們就認(rèn)為它已過期失效。我們要保證在窗口時(shí)間內(nèi)某個(gè)給定ID的消息只發(fā)送一次。

這個(gè)行為很容易描述,但有兩個(gè)方面需要特別注意:讀/寫性能和正確性。

我們希望系統(tǒng)能夠低延遲和低成本的對(duì)通過流水線的數(shù)十億個(gè)事件進(jìn)行去重。更重要的是,我們要確保所有的事件都能夠被持久化,以便可以從崩潰中恢復(fù)出來,并且不會(huì)輸出重復(fù)的消息。

架構(gòu)

為了實(shí)現(xiàn)這一點(diǎn),我們創(chuàng)建了一個(gè)“兩階段”架構(gòu),它讀入Kafka的數(shù)據(jù),并且在四個(gè)星期的時(shí)間窗口內(nèi)對(duì)接收到的所有事件進(jìn)行去重。

 

去重系統(tǒng)的高級(jí)架構(gòu)圖

Kafka的拓?fù)浣Y(jié)構(gòu)

要了解其工作原理,首先看一下Kafka的流拓?fù)浣Y(jié)構(gòu)。所有傳入消息的API調(diào)用都將作為單獨(dú)的消息進(jìn)行分離,并讀入到Kafka輸入主題(input topic)中。

首先,每個(gè)傳入的消息都有一個(gè)由客戶端生成的具有唯一性的messageId標(biāo)記。在大多數(shù)情況下,這是一個(gè)UUIDv4(我們考慮切換到ksuids)。 如果客戶端不提供messageId,我們會(huì)在API層自動(dòng)分配一個(gè)。

我們不使用矢量時(shí)鐘或序列號(hào),因?yàn)槲覀兿M芙档涂蛻舳说膹?fù)雜性。使用UUID可以讓任何人輕松地將數(shù)據(jù)發(fā)送到我們的API上來,因?yàn)閹缀跛械闹饕Z言都支持它。

  1.  
  2.   "messageId""ajs-65707fcf61352427e8f1666f0e7f6090"
  3.  
  4.   "anonymousId""e7bd0e18-57e9-4ef4-928a-4ccc0b189d18"
  5.  
  6.   "timestamp""2017-06-26T14:38:23.264Z"
  7.  
  8.   "type""page" 
  9.  
  10. }  

為了能夠?qū)⑾⒊志没⒛軌蛑匦掳l(fā)送,一個(gè)個(gè)的消息被保存到Kafka中。消息以messageId進(jìn)行分區(qū),這樣就可以保證具有相同messageId的消息能夠始終由同一個(gè)消費(fèi)者處理。

這對(duì)于數(shù)據(jù)處理來說是一件很重要的事情。我們可以通過路由到正確的分區(qū)來查找鍵值,而不是在整個(gè)中央數(shù)據(jù)庫的數(shù)百億條消息中查找,這種方法極大地縮小了查找范圍。

去重“worker”(worker:工人。譯者注,這里表示的是某個(gè)進(jìn)程。為防止引起歧義,下文將直接使用worker)是一個(gè)Go程序,它的功能是從Kafka輸入分區(qū)中讀入數(shù)據(jù),檢查消息是否有重復(fù),如果是新的消息,則發(fā)送到Kafka輸出主題中。

根據(jù)我們的經(jīng)驗(yàn),worker和Kafka拓?fù)浣Y(jié)構(gòu)都非常容易掌握。我們無需使用一組遇到故障時(shí)需要切換到副本的龐大的Memcached實(shí)例。相反,我們只需使用零協(xié)同的嵌入式RocksDB數(shù)據(jù)庫,并以非常低的成本來獲得持久化存儲(chǔ)。

RocksDB的worker進(jìn)程

每一個(gè)worker都會(huì)在本地EBS硬盤上存放了一個(gè)RocksDB數(shù)據(jù)庫。RocksDB是由Facebook開發(fā)的嵌入式鍵值存儲(chǔ)系統(tǒng),它的性能非常高。

每當(dāng)從輸入主題中過來的消息被消費(fèi)時(shí),消費(fèi)者通過查詢RocksDB來確定我們之前是否見過該事件的messageId。

如果RocksDB中不存在該消息,我們就將其添加到RocksDB中,然后將消息發(fā)布到Kafka輸出主題。

如果消息已存在于RocksDB,則worker不會(huì)將其發(fā)布到輸出主題,而是更新輸入分區(qū)的偏移,確認(rèn)已處理過該消息。

性能

為了讓我們的數(shù)據(jù)庫獲得高性能,我們必須對(duì)過來的每個(gè)事件滿足三種查詢模式:

  1. 檢測隨機(jī)key的存在性,這可能不存在于我們的數(shù)據(jù)庫中,但會(huì)在key空間中的任何地方找到。
  2. 高速寫入新的key
  3. 老化那些超出了“去重窗口”的舊的key

實(shí)際上,我們必須不斷地檢索整個(gè)數(shù)據(jù)庫,追加新的key,老化舊的key。在理想情況下,這些發(fā)生在同一數(shù)據(jù)模型中。 

 [[196104]] 

 

我們的數(shù)據(jù)庫必須滿足三種獨(dú)立的查詢模式

一般來說,這些性能大部分取決于我們數(shù)據(jù)庫的性能,所以應(yīng)該了解一下RocksDB的內(nèi)部機(jī)制來提高它的性能。

RocksDB是一個(gè)日志結(jié)構(gòu)合并樹(log-structured-merge-tree, 簡稱LSM)數(shù)據(jù)庫,這意味著它會(huì)不斷地將新的key附加到磁盤上的預(yù)寫日志(write-ahead-log)中,并把排序過的key存放在內(nèi)存中作為memtable的一部分。

  

key存放在內(nèi)存中作為memtable的一部分

寫入key是一個(gè)非??焖俚倪^程。新的消息以追加的方式直接保存到磁盤上,并且數(shù)據(jù)條目在內(nèi)存中進(jìn)行排序,以提供快速的搜索和批量寫入。

每當(dāng)寫入到memtable的條目達(dá)到一定數(shù)量時(shí),這些條目就會(huì)被作為SSTable(排序的字符串表)持久化到磁盤上。由于字符串已經(jīng)在內(nèi)存中排過序了,所以可以將它們直接寫入磁盤。

 

當(dāng)前的memtable零級(jí)寫入磁盤

以下是在我們的生產(chǎn)日志中寫入的示例:

  1. [JOB 40] Syncing log #655020 
  2.  
  3. [default] [JOB 40] Flushing memtable with next log file: 655022 
  4.  
  5. [default] [JOB 40] Level-0 flush table #655023: started 
  6.  
  7. [default] [JOB 40] Level-0 flush table #655023: 15153564 bytes OK 
  8.  
  9. [JOB 40] Try to delete WAL files size 12238598, prev total WAL file size 24346413, number of live WAL files 3.  

每個(gè)SSTable是不可變的,一旦創(chuàng)建,永遠(yuǎn)不會(huì)改變。這是什么寫入新的鍵這么快的原因。無需更新文件,無需寫入擴(kuò)展。相反,在帶外壓縮階段,同一級(jí)別的多個(gè)SSTable可以合并成一個(gè)新的文件。

 

 

當(dāng)在同一級(jí)別的SSTables壓縮時(shí),它們的key會(huì)合并在一起,然后將新的文件升級(jí)到下一個(gè)更高的級(jí)別。

看一下我們生產(chǎn)的日志,可以看到這些壓縮作業(yè)的示例。在這種情況下,作業(yè)41正在壓縮4個(gè)0級(jí)文件,并將它們合并為單個(gè)較大的1級(jí)文件。

  1. /data/dedupe.db$ head -1000 LOG | grep "JOB 41" 
  2.  
  3. [JOB 41] Compacting 4@0 + 4@1 files to L1, score 1.00 
  4.  
  5. [default] [JOB 41] Generated table #655024: 1550991 keys, 69310820 bytes 
  6.  
  7. [default] [JOB 41] Generated table #655025: 1556181 keys, 69315779 bytes 
  8.  
  9. [default] [JOB 41] Generated table #655026: 797409 keys, 35651472 bytes 
  10.  
  11. [default] [JOB 41] Generated table #655027: 1612608 keys, 69391908 bytes 
  12.  
  13. [default] [JOB 41] Generated table #655028: 462217 keys, 19957191 bytes 
  14.  
  15. [default] [JOB 41] Compacted 4@0 + 4@1 files to L1 => 263627170 bytes  

壓縮完成后,新合并的SSTables將成為最終的數(shù)據(jù)庫記錄集,舊的SSTables將被取消鏈接。

如果我們登錄到生產(chǎn)實(shí)例,我們可以看到正在更新的預(yù)寫日志以及正在寫入、讀取和合并的單個(gè)SSTable。

 

日志和最近占用I/O的SSTable

下面生產(chǎn)的SSTable統(tǒng)計(jì)數(shù)據(jù)中,可以看到一共有四個(gè)“級(jí)別”的文件,并且一個(gè)級(jí)別比一個(gè)級(jí)別的文件大。

  1. ** Compaction Stats [default] ** 
  2.  
  3. Level    Files   Size(MB} Score Read(GB}  Rn(GB} Rnp1(GB} Write(GB} Wnew(GB} Moved(GB} W-Amp Rd(MB/s} Wr(MB/s} Comp(sec} Comp(cnt} Avg(sec} KeyIn KeyDrop 
  4.  
  5. ---------------------------------------------------------------------------------------------------------------------------------------------------------- 
  6.  
  7.   L0      1/0      14.46   0.2      0.0     0.0      0.0       0.1      0.1       0.0   0.0      0.0     15.6         7         8    0.925       0      0 
  8.  
  9.   L1      4/0     194.95   0.8      0.5     0.1      0.4       0.5      0.1       0.0   4.7     20.9     20.8        26         2   12.764     12M     40 
  10.  
  11.   L2     48/0    2551.71   1.0      1.4     0.1      1.3       1.4      0.1       0.0  10.7     19.4     19.4        73         2   36.524     34M     14 
  12.  
  13.   L3    351/0   21735.77   0.8      2.0     0.1      1.9       1.9     -0.0       0.0  14.3     18.1     16.9       112         2   56.138     52M  3378K 
  14.  
  15.  Sum    404/0   24496.89   0.0      3.9     0.4      3.5       3.9      0.3       0.0  34.2     18.2     18.1       218        14   15.589     98M  3378K 
  16.  
  17.  Int      0/0       0.00   0.0      3.9     0.4      3.5       3.9      0.3       0.0  34.2     18.2     18.1       218        14   15.589     98M  3378K  

RocksDB保存了索引和存儲(chǔ)在SSTable的特定SSTables的布隆過濾器,并將這些加載到內(nèi)存中。通過查詢這些過濾器和索引可以找到特定的key,然后將完整的SSTable作為LRU基礎(chǔ)的一部分加載到內(nèi)存中。

在絕大多數(shù)情況下,我們就可以看到新的消息了,這使得我們的去重系統(tǒng)成為教科書中的布隆過濾器案例。

布隆過濾器會(huì)告訴我們某個(gè)鍵“可能在集合中”,或者“絕對(duì)在集合中”。要做到這一點(diǎn),布隆過濾器保存了已經(jīng)見過的任何元素的多種哈希函數(shù)的設(shè)置位。如果設(shè)置了散列函數(shù)的所有位,則過濾器將返回消息“可能在集合中”。

 

我們的集合包含{x,y,z},在布隆過濾器中查詢w,則布隆過濾器會(huì)返回“不在集合中”,因?yàn)槠渲杏幸晃粵]有設(shè)置。

如果返回“可能在集合中”,則RocksDB可以從SSTables中查詢到原始數(shù)據(jù),以確定該項(xiàng)是否在該集合中實(shí)際存在。但在大多數(shù)情況下,我們不需查詢?nèi)魏蜸STables,因?yàn)檫^濾器將返回“絕對(duì)不在集合”的響應(yīng)。

在我們查詢RocksDB時(shí),我們會(huì)為所有要查詢的相關(guān)的messageId發(fā)出一個(gè)MultiGet?;谛阅芸紤],我們會(huì)批量地發(fā)布出去,以避免太多的并發(fā)鎖定操作。它還允許我們批量處理來自Kafka的數(shù)據(jù),這是為了實(shí)現(xiàn)順序?qū)懭?,而不是隨機(jī)寫入。

以上回答了為什么讀/寫工作負(fù)載性能這么好的問題,但仍然存在如何老化數(shù)據(jù)這個(gè)問題。

刪除:按大小來限制,而不是按時(shí)間來限制

在我們的去重過程中,我們必須要確定是否要將我們的系統(tǒng)限制在嚴(yán)格的“去重窗口”內(nèi),或者是通過磁盤上的總數(shù)據(jù)庫大小來限制。

為了避免系統(tǒng)突然崩潰導(dǎo)致去重系統(tǒng)接收到所有客戶端的消息,我們決定按照大小來限制接收到消息數(shù)量,而不是按照設(shè)定的時(shí)間窗口來限制。這允許我們?yōu)槊總€(gè)RocksDB實(shí)例設(shè)置***的大小,以能夠處理突然的負(fù)載增加。但是其副作用是可能會(huì)將去重窗口降低到24小時(shí)以下。

我們會(huì)定期在RocksDB中老化舊的key,使其不會(huì)增長到***大小。為此,我們根據(jù)序列號(hào)保留key的第二個(gè)索引,以便我們可以先刪除最早接收到的key。

我們使用每個(gè)插入的key的序列號(hào)來刪除對(duì)象,而不是使用RocksDB TTL(這需要在打開數(shù)據(jù)庫的時(shí)候設(shè)置一個(gè)固定的TTL值)來刪除。

因?yàn)樾蛄刑?hào)是第二索引,所以我們可以快速地查詢,并將其標(biāo)記為已刪除。下面是根據(jù)序列號(hào)進(jìn)行刪除的示例代碼:

  1. func (d *DB) delete(n int) error { 
  2.  
  3.         // open a connection to RocksDB 
  4.  
  5.         ro := rocksdb.NewDefaultReadOptions() 
  6.  
  7.         defer ro.Destroy() 
  8.  
  9.  
  10.         // find our offset to seek through for writing deletes 
  11.  
  12.         hint, err := d.GetBytes(ro, []byte("seek_hint")) 
  13.  
  14.         if err != nil { 
  15.  
  16.                 return err 
  17.  
  18.         } 
  19.  
  20.  
  21.         it := d.NewIteratorCF(ro, d.seq) 
  22.  
  23.         defer it.Close() 
  24.  
  25.  
  26.         // seek to the first key, this is a small 
  27.  
  28.         // optimization to ensure we don't use `.SeekToFirst()` 
  29.  
  30.         // since it has to skip through a lot of tombstones. 
  31.  
  32.         if len(hint) > 0 { 
  33.  
  34.                 it.Seek(hint) 
  35.  
  36.         } else { 
  37.  
  38.                 it.SeekToFirst() 
  39.  
  40.         } 
  41.  
  42.  
  43.         seqs := make([][]byte, 0, n) 
  44.  
  45.         keys := make([][]byte, 0, n) 
  46.  
  47.  
  48.         // look through our sequence numbers, counting up 
  49.  
  50.         // append any data keys that we find to our set to be 
  51.  
  52.         // deleted 
  53.  
  54.         for it.Valid() && len(seqs) < n { 
  55.  
  56.                 k, v := it.Key(), it.Value() 
  57.  
  58.                 key := make([]byte, len(k.Data())) 
  59.  
  60.                 val := make([]byte, len(v.Data())) 
  61.  
  62.  
  63.                 copy(key, k.Data()) 
  64.  
  65.                 copy(val, v.Data()) 
  66.  
  67.                 seqs = append(seqs, key
  68.  
  69.                 keys = append(keys, val) 
  70.  
  71.  
  72.                 it.Next() 
  73.  
  74.                 k.Free() 
  75.  
  76.                 v.Free() 
  77.  
  78.         } 
  79.  
  80.  
  81.         wb := rocksdb.NewWriteBatch() 
  82.  
  83.         wo := rocksdb.NewDefaultWriteOptions() 
  84.  
  85.         defer wb.Destroy() 
  86.  
  87.         defer wo.Destroy() 
  88.  
  89.  
  90.         // preserve next sequence to be deleted. 
  91.  
  92.         // this is an optimization so we can use `.Seek()` 
  93.  
  94.         // instead of letting `.SeekToFirst()` skip through lots of tombstones. 
  95.  
  96.         if len(seqs) > 0 { 
  97.  
  98.                 hint, err := strconv.ParseUint(string(seqs[len(seqs)-1]), 10, 64) 
  99.  
  100.                 if err != nil { 
  101.  
  102.                         return err 
  103.  
  104.                 } 
  105.  
  106.  
  107.                 buf := []byte(strconv.FormatUint(hint+1, 10)) 
  108.  
  109.                 wb.Put([]byte("seek_hint"), buf) 
  110.  
  111.         } 
  112.  
  113.  
  114.         // we not only purge the keys, but the sequence numbers as well 
  115.  
  116.         for i := range seqs { 
  117.  
  118.                 wb.DeleteCF(d.seq, seqs[i]) 
  119.  
  120.                 wb.Delete(keys[i]) 
  121.  
  122.         } 
  123.  
  124.  
  125.         // finally, we persist the deletions to our database 
  126.  
  127.         err = d.Write(wo, wb) 
  128.  
  129.         if err != nil { 
  130.  
  131.                 return err 
  132.  
  133.         } 
  134.  
  135.  
  136.         return it.Err() 
  137.  
  138. }  

為了保證寫入速度,RocksDB不會(huì)立即返回并刪除一個(gè)鍵(記住,這些SSTable是不可變的!)。相反,RocksDB將添加一個(gè)“墓碑”,等到壓縮時(shí)再進(jìn)行刪除。因此,我們可以通過順序?qū)懭雭砜焖俚乩匣?,避免因?yàn)閯h除舊項(xiàng)而破壞內(nèi)存數(shù)據(jù)。

確保正確性

我們已經(jīng)討論了如何確保數(shù)十億條消息投遞的速度、規(guī)模和低成本的搜索。***一個(gè)部分將講述各種故障情況下我們?nèi)绾未_保數(shù)據(jù)的正確性。

EBS快照和附件

為了確保RocksDB實(shí)例不會(huì)因?yàn)殄e(cuò)誤的代碼推送或潛在的EBS停機(jī)而損壞,我們會(huì)定期保存每個(gè)硬盤驅(qū)動(dòng)器的快照。雖然EBS已經(jīng)在底層進(jìn)行了復(fù)制,但是這一步可以防止數(shù)據(jù)庫受到某些底層機(jī)制的破壞。

如果我們想要啟用一個(gè)新實(shí)例,則可以先暫停消費(fèi)者,將相關(guān)聯(lián)的EBS驅(qū)動(dòng)器分開,然后重新附加到新的實(shí)例上去。只要我們保證分區(qū)ID相同,重新分配磁盤是一個(gè)輕松的過程,而且也能保證數(shù)據(jù)的正確性。

如果worker發(fā)生崩潰,我們依靠RocksDB內(nèi)置的預(yù)寫日志來確保不會(huì)丟失消息。消息不會(huì)從輸入主題提交,除非RocksDB已經(jīng)將消息持久化在日志中。

讀取輸出主題

你可能會(huì)注意到,本文直到這里都沒有提到“原子”步驟,以使我們能夠確保只投遞一次消息。我們的worker有可能在任何時(shí)候崩潰,不如:寫入RocksDB時(shí)、發(fā)布到輸出主題時(shí),或確認(rèn)輸入消息時(shí)。

我們需要一個(gè)原子的“提交”點(diǎn),并覆蓋所有這些獨(dú)立系統(tǒng)的事務(wù)。對(duì)于輸入的數(shù)據(jù),需要某個(gè)“事實(shí)來源”:輸出主題。

如果去重worker因?yàn)槟承┰虬l(fā)生崩潰,或者遇到Kafka的某個(gè)錯(cuò)誤,則系統(tǒng)在重新啟動(dòng)時(shí),會(huì)首先查閱這個(gè)“事實(shí)來源”,輸出主題,來判斷事件是否已經(jīng)發(fā)布出去。

如果在輸出主題中找到消息,而不是RocksDB(反之亦然),則去重worker將進(jìn)行必要的修復(fù)工作以保持?jǐn)?shù)據(jù)庫和RocksDB之間的同步。實(shí)際上,我們使用輸出主題作為我們的預(yù)寫入日志和最終的事實(shí)來源,讓RocksDB進(jìn)行檢查和校驗(yàn)。

在生產(chǎn)環(huán)境中

我們的去重系統(tǒng)已經(jīng)在生產(chǎn)運(yùn)行了3個(gè)月,對(duì)其運(yùn)行的結(jié)果我們感到非常滿意。我們有以下這些數(shù)據(jù):

  • 在RocksDB中,有1.5TB的key存儲(chǔ)在磁盤上
  • 在老化舊的key之前,有一個(gè)四個(gè)星期的去重窗口
  • RocksDB實(shí)例中存儲(chǔ)了大約600億個(gè)key
  • 通過去重系統(tǒng)的消息達(dá)到2000億條

該系統(tǒng)快速、高效、容錯(cuò)性強(qiáng),也非常容易理解。

特別是我們的v2版本系統(tǒng)相比舊的去重系統(tǒng)有很多優(yōu)點(diǎn)。

以前我們將所有的key存儲(chǔ)在Memcached中,并使用Memcached的原子CAS(check-and-set)操作來設(shè)置key。 Memcached起到了提交點(diǎn)和“原子”地發(fā)布key的作用。

雖然這個(gè)功能很好,但它需要有大量的內(nèi)存來支撐所有的key。此外,我們必須能夠接受偶爾的Memcached故障,或者將用于高速內(nèi)存故障切換的支出加倍。

Kafka/RocksDB的組合相比舊系統(tǒng)有如下幾個(gè)優(yōu)勢:

  • 數(shù)據(jù)存儲(chǔ)在磁盤上:在內(nèi)存中保存所有的key或完整的索引,其代價(jià)是非常昂貴的。通過將更多的數(shù)據(jù)轉(zhuǎn)移到磁盤,并利用多種不同級(jí)別的文件和索引,能夠大幅削減成本。對(duì)于故障切換,我們能夠使用冷備(EBS),而不用運(yùn)行其他的熱備實(shí)例。
  • 分區(qū):為了縮小key的搜索范圍,避免在內(nèi)存中加載太多的索引,我們需要保證某個(gè)消息能夠路由到正確的worker。在Kafka中對(duì)上游進(jìn)行分區(qū)可以對(duì)這些消息進(jìn)行路由,從而更有效地緩存和查詢。
  • 顯式地進(jìn)行老化處理:在使用Memcached的時(shí)候,我們在每個(gè)key上設(shè)置一個(gè)TTL來標(biāo)記是否超時(shí),然后依靠Memcached進(jìn)程來對(duì)超時(shí)的key進(jìn)行處理。這使得我們在面對(duì)大量數(shù)據(jù)時(shí),可能會(huì)耗盡內(nèi)存,并且在丟棄大量超時(shí)消息時(shí),Memcached的CPU使用率會(huì)飆升。而通過讓客戶端來處理key的刪除,使得我們可以通過縮短去重窗口來優(yōu)雅地處理。
  • 將Kafka作為事實(shí)來源:為了真正地避免對(duì)多個(gè)提交點(diǎn)進(jìn)行消息去重,我們必須使用所有下游消費(fèi)者都常見的事實(shí)來源。使用Kafka作為“事實(shí)來源”是最合適的。在大多數(shù)失敗的情況下(除了Kafka失敗之外),消息要么會(huì)被寫入Kafka,要么不會(huì)。使用Kafka可以確保按順序投遞消息,并在多臺(tái)計(jì)算機(jī)之間進(jìn)行磁盤復(fù)制,而不需要在內(nèi)存中保留大量的數(shù)據(jù)。
  • 批量讀寫:通過Kafka和RocksDB的批量I/O調(diào)用,我們可以通過利用順序讀寫來獲得更好的性能。與之前在Memcached中使用的隨機(jī)訪問不同,我們能夠依靠磁盤的性能來達(dá)到更高的吞吐量,并只在內(nèi)存中保留索引。

總的來說,我們對(duì)自己構(gòu)建的去重系統(tǒng)非常滿意。使用Kafka和RocksDB作為流媒體應(yīng)用的原語開始變得越來越普遍。我們很高興能繼續(xù)在這些原語之上構(gòu)建新的分布式應(yīng)用程序。 

責(zé)任編輯:龐桂玉 來源: CSDN大數(shù)據(jù)
相關(guān)推薦

2019-11-27 18:33:32

Docker架構(gòu)數(shù)據(jù)

2019-04-18 10:55:00

故障演練流量

2025-03-10 00:35:00

AndroidIPC管道

2018-10-11 09:33:51

Kafka消息處理

2013-01-22 17:33:30

2017-12-12 16:17:55

微服務(wù)系統(tǒng)運(yùn)維

2020-06-22 10:06:15

數(shù)據(jù)網(wǎng)絡(luò)泄露

2011-11-09 15:49:52

API

2017-03-29 14:38:05

高可用視頻調(diào)度秒拍

2009-11-20 11:37:11

Oracle完全卸載

2020-09-17 11:02:40

BLESA藍(lán)牙攻擊漏洞

2018-09-13 09:39:03

騰訊運(yùn)維IT

2022-08-29 18:14:55

MQ數(shù)據(jù)不丟失

2020-03-12 09:34:05

Redis數(shù)據(jù)技術(shù)

2016-01-08 10:03:07

硅谷通吃互聯(lián)網(wǎng)

2019-08-08 10:18:15

運(yùn)維架構(gòu)技術(shù)

2010-03-30 10:44:05

Nginx啟動(dòng)

2024-12-04 13:52:30

2022-09-09 08:41:43

Netty服務(wù)端驅(qū)動(dòng)

2021-05-24 10:55:05

Netty單機(jī)并發(fā)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)