數(shù)據(jù)流時代,Teads如何做到每天賦予1000億事件價值?

在這篇文章中,我們描述了如何協(xié)調(diào)Kafka,Dataflow和BigQuery共同采集和轉(zhuǎn)換大數(shù)據(jù)流。當(dāng)增加了模式和延時的約束時,調(diào)優(yōu)和重新排序成了很大的挑戰(zhàn),下面展示了我們是如何解決它的。

發(fā)布者Tead是由Analytics提供支持的web應(yīng)用之一
在數(shù)字廣告中,日常運營產(chǎn)生了許多我們需要跟蹤的事件,以便透明地報道活動的效益。這些事件來自:
用戶與廣告互動,通過瀏覽器發(fā)送。這些事件被稱為可以標(biāo)準化的(開始、完成、暫停、回復(fù)等)跟蹤事件,或者使用Teads Studio構(gòu)建的具有互動創(chuàng)意的自定義事件。我們每天收到大約100億個跟蹤事件。
來自我們的后端這些事件都是關(guān)于廣告拍賣的大部分(實時出價流程)細節(jié)。在抽樣之前我們每天產(chǎn)生的這些事件超過600億,在2018年這個數(shù)字將翻一番。
在這篇文章中,我們聚焦于跟蹤事件,因為它們是我們業(yè)務(wù)上最關(guān)鍵部分的。

簡單概述了我們技術(shù)環(huán)境的兩個主要事件源
瀏覽器通過HTTP將跟蹤數(shù)據(jù)發(fā)送到一個專用組件,其他的事情都列進了Kafka的topic中。Analytics是這些事件的服務(wù)對象之一。
我們用一個Analytics小組,他們的任務(wù)是按照如下定義管理這些事件:
- 我們獲取了log的增長量,
- 我們將它們轉(zhuǎn)化成面向業(yè)務(wù)的數(shù)據(jù)
- 我們?yōu)槊恳晃活櫩吞峁└咝叶ㄖ频姆?wù)。
為了完成這個任務(wù),我們建立和維護了一系列處理工具和管道。由于公司的有機增長和新產(chǎn)品的需求,我們定期挑戰(zhàn)我們的結(jié)構(gòu)。
為什么我們移向了BigQuery
回顧2016年,我們的Analytics跟蹤基于lambda architecture系統(tǒng)架構(gòu)(Storm、 Spark和Cassandra項目),并且出現(xiàn)了一些問題:
- 數(shù)據(jù)的模式使它不可能存放在單一的Cassandra表中,這會妨礙高效的交叉查詢,
- 它是一個復(fù)雜的基礎(chǔ)框架,在批處理和速度層都會出現(xiàn)代碼復(fù)制,這阻礙了我們新功能的高效發(fā)布,
- 最終它將難以發(fā)展且不具有成本效益。
這時候,我們有了幾種可能的選擇。首先,我們可以建立一個增強的lambda,但它只能推遲我們要面臨的問題。
我們考慮了幾個有前景的替代品,像Druid何BigQuery。我們最終選擇遷移到BiQuery,因為他有很多強大的功能。
通過BigQuery我們能夠:
- 工作在原始事件,
- 使用SQL作為高效的數(shù)據(jù)處理語言,
- 使用BigQuery作為處理引擎,
- 使解釋性訪問數(shù)據(jù)更容易(相比Spark SQL或者Hive)
感謝flat-rate計劃,我們高強度的用法(查詢和存儲方式)是具有高成本效益的。
然而,我們的技術(shù)環(huán)境不適合BigQuery。我們想用它來存儲和轉(zhuǎn)換來自多個Kafka topic 的所有事件。我們無法讓我們的Kafka群組移出AWS,也無法使用與Kafka托管等效的Pub/Sub,因為這些群集也被我們托管在AWS上的一些廣告投放組件使用。因此,我們不得不處理來自運營的多云基礎(chǔ)框架的挑戰(zhàn)。
今天,BigQuery是我們的數(shù)據(jù)倉庫系統(tǒng),用于我們的跟蹤數(shù)據(jù)與其他的原始數(shù)據(jù)的協(xié)調(diào)核對。
獲取
當(dāng)處理追蹤事件的時候,你面對的首要問題就是,你必須在不知道延遲的情況下無序地處理他們。
事件實際發(fā)生的時間(事件觸發(fā)時間,event time)和系統(tǒng)注意到這個事件的時間(處理時間,processing time)之間的時間間隔的范圍涵蓋了從毫秒級到小時級。這些巨大的延遲并不罕見,而且當(dāng)用戶在瀏覽會話的時間中間連接斷開了或者開啟了飛行模式,就會出現(xiàn)這種情況。

事件觸發(fā)時間和處理時間的時間差
如果要獲取流數(shù)據(jù)處理遇到的問題相關(guān)更多信息,我們建議去看Google Cloud Next’17 中Tyler Akidau(Google數(shù)據(jù)處理技術(shù)主管)和 Loïc Jaures(Teads的共同創(chuàng)始人和技術(shù)部高級副總裁)討論《批處理和流處理之間的來回轉(zhuǎn)換》。本文就是受到這個討論的啟發(fā)。
流的嚴酷現(xiàn)實
Dataflow是一個管理流系統(tǒng),為了應(yīng)對我們面對的事件的混亂本質(zhì)的挑戰(zhàn)而生。Dataflow有一個統(tǒng)一的流和批處理編程模型,流是它的主推特性。
由于Dataflow的承諾和對流模式的大膽嘗試,我們購買了它。不幸的是,在面對真實生產(chǎn)環(huán)境的數(shù)據(jù)傳輸,我們感到了驚駭:BigQuery的流插入代價。
我們對壓縮數(shù)據(jù)大小(即,通過網(wǎng)絡(luò)的字節(jié)的實際數(shù)據(jù)卷)和非BigQuery的原始數(shù)據(jù)格式大小已經(jīng)有了基本估算。幸運的是現(xiàn)在已經(jīng)為每個數(shù)據(jù)類型提供了文檔,因此你也可以做計算。
那時候,我們低估了這個額外代價的100倍,這幾乎是我們整個獲取渠道(Dataflow + BigQuery)的兩倍代價。我們也遇到了其他的局限,例如100,000 events/s 速率限制,這已經(jīng)幾乎接近我們在做的事情了。
好消息是,有一種方法可以完全避免流插入限制:批量加載到BigQuery。
理想情況下,我們希望在流模式中使用Dataflow,在批處理模式下使用BigQuery。在那個時候,Dataflow SDK中沒有用于無限制數(shù)據(jù)流的BigQuery批處理接收器。
然后我們考慮開發(fā)自己的自定義接收器。不幸的是,當(dāng)時不可能在無限制的數(shù)據(jù)流中添加一個自定義的接收器(見Dataflow計劃為在將來的版本中增加對編寫無界數(shù)據(jù)的自定義接收器的支持——現(xiàn)在這是有可能的,Beam是官方的Dataflow SDK)。
我們別無選擇,只能把我們的數(shù)據(jù)轉(zhuǎn)換成批處理模式。由于Dataflow的統(tǒng)一模型,這僅僅是幾行代碼的問題。幸運的是,我們可以接收由切換到批處理模式所引入的額外數(shù)據(jù)處理延遲。
繼續(xù)向前推進,我們目前的接入架構(gòu)是基于Scio,這是一個由Spotify提供的Dataflow開源的Scala API。如前所述,Dataflow原生支持Pub/Sub,但集成Kafka還不太成熟。我們必須擴展Scio以支持檢查點持久性和有效的并行性。
微型的批處理管道
我們的結(jié)果處理架構(gòu)是一個30個節(jié)點的Dataflow批處理作業(yè)的鏈,按順序排列,讀取Kafka topic,并使用加載作業(yè)來寫入BigQuery。

數(shù)據(jù)流小批量處理的多個階段。
其中一個關(guān)鍵是找到理想的分批時間。我們發(fā)現(xiàn)在成本和讀取性能之間有一個最佳的平衡點(因此延遲)。調(diào)整的變量是Kafka讀取階段的持續(xù)時間。
要得到完整的批處理時間,您必須將寫入操作添加到BigQuery階段也算在里面(不是成比例增加的,而是與讀操作時間密切相關(guān)),再加上一個常量,也就是啟動和關(guān)閉消耗的時間。
值得一提:
- 讀取階段太短會降低讀取和非讀取階段之間的比例。在一個理想的情況下,1:1的比值意味著你必須能夠以同樣的速度進行讀取和寫入。在上面的例子中,我們有20分鐘的讀取階段,對一個30分鐘的批處理(比值為3:2)。這意味著我們必須能夠在讀取數(shù)據(jù)時比我們寫入數(shù)據(jù)的速度快1.5倍。小的比值意味著需要更大的實例。
- 過長的讀取階段將簡單地增加事件的發(fā)生時刻與BigQuery中其可用的時刻之間的延遲。
性能調(diào)優(yōu)
為簡便以及更易于失敗管理,數(shù)據(jù)流作業(yè)按順序啟動。這是我們愿意采取的延遲所做的折衷。如果某項作業(yè)失敗了,我們只需返回上次所提交的Kafka偏移即可。
我們必須修改我們的Kafka集群的拓撲結(jié)構(gòu),并增加分區(qū)的數(shù)量,以便能夠更快地unstack消息。根據(jù)你在Dataflow中所進行的轉(zhuǎn)換,受限的因素很可能是在處理能力或網(wǎng)絡(luò)吞吐量上。為了實現(xiàn)高效的并行,你應(yīng)該始終嘗試保留大量CPU線程,這個數(shù)字是你所擁有的分區(qū)數(shù)量的一個因子(推論:Kafka分區(qū)的數(shù)量是多因子合數(shù),這是很不錯的)。
在極少數(shù)的延遲情況下,我們可以用較長的讀取序列對作業(yè)進行微調(diào)。通過使用更大的批處理,我們也能夠以延遲為代價來趕上這類延遲。
為了處理大部分情況,我們調(diào)整Dataflow使其讀取速度以比實際速度快3倍。用單個n1-highcpu-16實例讀取20分鐘可以unstack 60分鐘的消息。

隨著時間變化的攝取延時(單位:分鐘)
在我們的用例中,我們最終得到的鋸齒式延遲,震蕩范圍在3分鐘(Write BQ階段的最小時長)和30分鐘(作業(yè)的總時長)之間。
轉(zhuǎn)換
原始數(shù)據(jù)是不可避免地體積龐大,我們有太多的事件,并照目前狀態(tài)無法查詢它們。我們需要匯總這些原始數(shù)據(jù)以保持較低的讀取時間和緊湊的體積大小。以下是我們在BigQuery中的做法:

跨AWS和GCP的架構(gòu)綜述
與傳統(tǒng)ETL過程中數(shù)據(jù)在加載之前進行轉(zhuǎn)換不同的是,我們選擇以原始格式首先存儲它(ELT)。
它有兩個主要的好處:
- 它讓我們可以訪問每一個原始事件以進行精確的分析和調(diào)試,
- 它通過讓BigQuery用簡單但強大的SQL方言完成轉(zhuǎn)換來簡化整個鏈。
我們希望直接寫入每天分區(qū)的原始事件表。我們不能因為Dataflow批處理就必須使用特定的目標(biāo)(表或分區(qū))來定義,并且可以包含針對不同分區(qū)的數(shù)據(jù)。我們通過將每個批裝載到一個臨時表中來解決這個問題,然后開始轉(zhuǎn)換它。
對于這些臨時批處理表,我們運行一組轉(zhuǎn)換,這些轉(zhuǎn)換被具體化成SQL查詢,輸出到其他表。其中一個轉(zhuǎn)換只是將所有數(shù)據(jù)附加到大型原始事件表,并在白天進行分區(qū)。
另一個轉(zhuǎn)換是rollup:給定一組維度數(shù)據(jù)的聚合。所有這些轉(zhuǎn)換都是冪等的,可以在錯誤或需要進行數(shù)據(jù)再處理的情況下安全地重新運行。
Rollups
直接查詢原始事件表是很好的調(diào)試,也有利于深入分析,但是直接查詢原始表不可能達到可接受的性能,更不用說這種操作的成本了。
為了給你一個想法,這個表格只保留了4個月,包含1萬億個事件,大小接近250TB。

rollup轉(zhuǎn)換的示例。
在上面的示例中,我們將事件計數(shù)設(shè)置為3個維度:小時、Ad ID、網(wǎng)站ID。事件也被旋轉(zhuǎn)并轉(zhuǎn)換為列。該示例顯示了2.5x的減少,而實際情況則接近70x。
在BigQuery大型并行上下文中,查詢運行時不會受到太大影響,改進是根據(jù)使用的槽數(shù)來衡量的。
Rollups還讓我們將數(shù)據(jù)劃分為小塊:事件被分組到小的表中,每一個小時(事件時間的小時,而不是處理時間)。因此,如果您需要查詢給定小時的數(shù)據(jù),您將查詢單個表(<10M行,<10GB)。
Rollups是一種通用的聚合,我們可以更有效地查詢所有事件,給定了大量的維度。還有一些其他的用例,我們希望對數(shù)據(jù)有專門的視圖。它們每個都可以實現(xiàn)一組特定的轉(zhuǎn)換,最終得到一個專門的和優(yōu)化的表。
管理服務(wù)的限制
BigQuery,雖然功能很強大但是也存在限制:
- BigQuery不允許查詢具有不同模式(即使查詢沒使用不同的字段)的多個表。當(dāng)我們需要添加一個字段,我們用一個腳本來做上百個表的批量更新。
- BigQuery不支持列刪除。沒什么大不了的,因為這對技術(shù)而言沒什么負擔(dān)。
- 查詢多個小時:BigQuery的表名支持通配符,但是性能非常差,我們生成查詢的時候,需要使用UNION ALL來明確要查詢的每張表。
- 我們總是需要連接帶有托管在其他數(shù)據(jù)庫(例如,給事件提供更多的廣告活動信息)上數(shù)據(jù)的這些事件,但是BigQuery也不支持這個。我們現(xiàn)在不得不定期把完整的表拷貝到BigQuery上,以便能在單個查詢中做數(shù)據(jù)連接。
云間數(shù)據(jù)傳輸?shù)臉啡?/strong>
通過在AWS中Teads的廣告投放基礎(chǔ)設(shè)施和Kafka群組來與其它組件共享,我們別無選擇,只能在AWS和GCP云之間移動大量數(shù)據(jù),當(dāng)然這不容易,無疑也不會便宜。我們將Dataflow實例(這主要的是GCP的切入點)盡可能靠近放置在我們的AWS基礎(chǔ)設(shè)施旁邊。幸運的是,AWS和GCP之間的連接足夠好,以至于我們可以簡便的使用托管的VPN。
雖然我們運行這些VPN遇到了一些不穩(wěn)定性,但我們想辦法整理出了一個簡單的腳本,用來再一次的打開和關(guān)閉VPN。我們從未面對過一個足夠巨大的問題來證明專用鏈路的成本。
又一次,費用成了你不得不密切關(guān)注的事情,出口是令人擔(dān)憂的,在你看到賬單之前費用是難以估計的。為了壓縮成本,你需要仔細選擇壓縮數(shù)據(jù)的方法。
只有一半

分析大局
在BigQuery中所擁有的這些事件是不夠的。為了給業(yè)務(wù)帶來價值,數(shù)據(jù)必須與不同的規(guī)則和度量相結(jié)合。此外,BigQuery不適合實時用例。
由于并發(fā)限制和不可壓縮的查詢延遲3到5秒(可接受和固有的設(shè)計),BigQuery必須與其他工具混合,以服務(wù)應(yīng)用程序(指示板、web ui等)。
這個任務(wù)由我們的分析服務(wù)來執(zhí)行,它是一個Scala組件,它利用BigQuery來生成按需報告(電子表格)和定制的數(shù)據(jù)集市(每日或每小時更新)。
我們選擇了AWS Redshift來存儲和服務(wù)我們的數(shù)據(jù)集市。盡管服務(wù)于面向用戶的應(yīng)用程序似乎不是一個清晰的選擇,但Redshift對我們很適用,因為我們的并發(fā)用戶數(shù)量是有限的。
另外,使用鍵/值存儲器需要更多的開發(fā)工作。通過保持中間的關(guān)系數(shù)據(jù)庫,數(shù)據(jù)集市的消費變得更容易了。
關(guān)于如何規(guī)劃化地構(gòu)建、維護和查詢這些數(shù)據(jù)集市,這會有很多話題,但他們將成為另一篇文章的主題。