自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Facebook Velox 運(yùn)行機(jī)制全面解析

開發(fā) 前端
小節(jié)一下,Task 負(fù)責(zé)將由 PlanNode 組成的 PlanTree 翻譯成由 Operator 組成的 Pipeline,并且對 Pipeline 進(jìn)行并發(fā)運(yùn)行。在此期間,Task 會維護(hù) Operator 間的共享狀態(tài)、協(xié)調(diào) Operator 間的運(yùn)行依賴。

概述

Facebook Velox 是一個針對 SQL 運(yùn)行時的 C++ 庫,旨在統(tǒng)一 Facebook 各種計(jì)算流,包括 Spark 和 Presto,使用推的模式、支持向量計(jì)算。

Velox 接受一棵優(yōu)化過的 PlanNode Tree,然后將其切成一個個的線性的 Pipeline,Task 負(fù)責(zé)這個轉(zhuǎn)變過程,每個 Task 針對一個 PlanTree Segment。大多數(shù)算子是一對一翻譯的,但是有一些特殊的算子,通常出現(xiàn)在多個 Pipeline 的切口處,通常來說,這些切口對應(yīng)計(jì)劃樹的分叉處,如 HashJoinNode,CrossJoinNode, MergeJoinNode ,通常會翻譯成 XXProbe 和 XXBuild。但也有一些例外,比如 LocalPartitionNode 和 LocalMergeNode 。

圖片

邏輯計(jì)劃翻譯成物理計(jì)劃,可調(diào)整 Pipeline 并發(fā)度

為了提高執(zhí)行的并行度,Velox 引入了 LocalPartitionNode 節(jié)點(diǎn),可以將一個 Pipeline 進(jìn)行多線程(每個線程一個實(shí)例)并行運(yùn)行,并且互斥的消費(fèi)數(shù)據(jù)。其中每個實(shí)例稱為 Driver。該算子在輸入計(jì)劃樹里并沒有分叉(即沒有多個 source),但在翻譯成物理算子時,會在此節(jié)點(diǎn)處進(jìn)行切開,并在切口前后改變執(zhí)行的并行度,對應(yīng)的物理算子是LocalPartition  和  LocalExchange。

圖片

 調(diào)整并發(fā)度算子,一個邏輯算子翻譯成兩個物理算子

還有一個特殊節(jié)點(diǎn),稱為 LocalMergeNode,該對輸入有要求:必須有序,然后會進(jìn)行單線程的歸并排序,從而使輸出全局有序。也因此,由其而切開的消費(fèi) Pipeline 一定是單 Driver 的。翻譯成算子,對應(yīng)兩個 CallbackSink 和 LocalMerge。

圖片

Merge 算子,也是一種邏輯翻譯成兩種物理算子

總結(jié)一下,上述五個 PlanNode,HashJoinNode,CrossJoinNode, MergeJoinNode ,LocalPartitionNode ,LocalMergeNode 在翻譯時會造成切口,即將邏輯 PlanTree 切成多個物理 Pipeline,因此在切口處會將一個邏輯算子翻譯成多個物理算子,分到不同 Pipeline 上。每個 Pipeline 會有一個從 0 開始的編號:Pipeline ID,是全局粒度的。

并且,可以由 LocalPartitionNode 來按需改變每個 Pipeline 并行度,其中 Pipeline 的每個線程由一個 Driver 來執(zhí)行。每個 Driver 也有一個從 0 開始的編號:Driver ID,是 Pipeline 粒度的。

其他 PlanNode 到算子的翻譯基本都是一對一的,感興趣的可以看官方文檔的這個頁面:Plan Nodes and Operators。

下面展開一些細(xì)節(jié)。

Splits

Velox 允許應(yīng)用層(即 Velox 的使用方)以 Splits (每個算子的輸入片段稱為 Split)的方式給 Pipeline 喂數(shù)據(jù),可以流式的喂,因此有兩個 API:

  1. Task::addSplit(planNodeId, split) :喂一份數(shù)據(jù)給 Velox
  2. Task::noMoreSplits() :通知 Velox 我喂完了。

Velox 會使用一個隊(duì)列在緩存這些 Splits 數(shù)據(jù)。在數(shù)據(jù)喂完之前的任意一個時刻,Pipeline 的葉子算子(對的,外部喂數(shù)據(jù)只能發(fā)生在葉子節(jié)點(diǎn),如 TableScan,Exchange 和 MergeExchange)都可以從隊(duì)列中取數(shù)據(jù),對應(yīng) API 是 Task::getSplitOrFuture(planNodeId) ,返回值有兩種:

  1. 如果隊(duì)列中有數(shù)據(jù),則返回一個 Split
  2. 如果隊(duì)列中無數(shù)據(jù),但還沒有收到喂完的信號,則返回一個 Future (類似于一個欠條,之后有數(shù)據(jù)之后,會憑該欠條兌付)。

圖片

Task 是 PlanTree Segment 執(zhí)行單位,可以通過 Splits 方式流式喂數(shù)據(jù)

Join Bridges and Barriers

Join (HashJoinNode 和 CrossJoinNode)會翻譯成 XXProbe 和 XXBuild 兩個算子,并且通過一個共享的 Bridge 來溝通數(shù)據(jù),兩側(cè) Pipeline 都可以通過 Task::getHashJoinBridge() 函數(shù)來根據(jù) PlanNodeId 獲取該共享的 Bridge。

為了提高 build 速度,build 側(cè) Pipeline 通常使用多個 Driver 并發(fā)執(zhí)行。但由于只有一個 Bridge,每個 Driver 在結(jié)束時可以調(diào)用 Task::allPeersFinished() (內(nèi)部是使用一個 BarrierState 的結(jié)構(gòu)來實(shí)現(xiàn)的)來判斷自己是否為最后一個 Driver,如果是,則將所有 Driver 的輸出進(jìn)行合并后送到 Bridge。

當(dāng)然,在 RIGHT and FULL OUTER join 情況下,Probe 側(cè)也需要將沒有 match 上的數(shù)據(jù)喂給 Bridge,此時也需要由最后一個 Driver 來負(fù)責(zé)這件事,于是同樣需要調(diào)用  Task::allPeersFinished() 函數(shù)。

圖片

使用 Bridge 對 Join 兩側(cè) Pipeline 進(jìn)行數(shù)據(jù)橋接(Build->Probe)

下面來詳細(xì)看下 Join 類算子的切分細(xì)節(jié)。以 HashJoin 為例,Task 在切分 PlanTree 時,會將邏輯上的一個 HashJoin 算子,轉(zhuǎn)化成物理上的一對算子:HashProbe 和 HashJoin,并且使用異步機(jī)制進(jìn)行通知:在 HashJoin 完成后,通知 HashProbe 所在 Pipeline 繼續(xù)執(zhí)行,在此之前,后者是阻塞等待的。

圖片

 Join 兩側(cè) Pipeline 是可以調(diào)整并發(fā)度的

如上圖,每個 Pipeline 在實(shí)例化(邏輯 PlanNode 轉(zhuǎn)物理 Operator)的時候,可以生成多份,進(jìn)行并發(fā)執(zhí)行,互斥的消費(fèi)數(shù)據(jù)。并且,每個 Pipeline 的并行粒度可以不一樣,如上圖 Probe Pipeline 實(shí)例化了兩份,而 Build Pipeline 實(shí)例化了三份。并且,Build Pipeline 組中最后一個運(yùn)行完的 Pipeline 負(fù)責(zé)將數(shù)據(jù)通過 Bridge 發(fā)送給 Probe Pipeline。

Exchange Clients

Velox 使用 Exchange Clients 來獲取遠(yuǎn)程 worker 的數(shù)據(jù)。分兩個步驟:

第一步,Pipeline 中第一個 Driver (driverId == 0) 的 Exchange 算子從 Task 中獲取一個 Split,并且初始化一個共享 Exchange Client。

第二步,Exchange Client 會為上游每個 Task 構(gòu)造一個 Exchange Source,并行的拉取每個上游 Task 同一個 Partition (圖中是 Partition-15)數(shù)據(jù),然后將其放在 Client 的隊(duì)列 Queue 中。Exchange 的每個 Driver 都會去隊(duì)列中拉取這些數(shù)據(jù)。

如何從上游 Task 拉取數(shù)據(jù)的邏輯,需要由用戶自定義實(shí)現(xiàn)  ExchangeSource 和 ExchangeSource::Factory  。每個 ExchangeSource 接受一個上游 Task 的字符串 ID、Partition 編號和一個隊(duì)列作為參數(shù)。然后會從上游 Task 中拉取該 Partition 的數(shù)據(jù),并且放到隊(duì)列中。

圖片

 向上游 Task 遠(yuǎn)程(跨進(jìn)程)拉取數(shù)據(jù),也叫 MaterializePage

Local Exchange Queues

Local exchange 用于在一個 Task 內(nèi)部調(diào)整數(shù)據(jù)并發(fā)度,會被翻譯成兩個物理算子:LocalPartition 和 LocalExchange。其中,LocalPartition 在生產(chǎn)側(cè) Pipeline,LocalExchange 在消費(fèi)側(cè) Pipeline。

中間通過 LocalExchangeQueues 來溝通生產(chǎn)者和消費(fèi)者,這些隊(duì)列在 Task 類中。對于每個消費(fèi)者(也即 LocalExchange 側(cè) Driver)Task 都會構(gòu)建一個 LocalExchangeQueue 隊(duì)列;每個生產(chǎn)者 (LocalPartition)可以訪問所有隊(duì)列。在產(chǎn)生一條數(shù)據(jù)是,會對其按照某種方式進(jìn)行 Partition,然后寫到對應(yīng)隊(duì)列中。這個過程類似于 MapReduce 中的 Shuffle 階段。

圖片

 本地改變并發(fā)度時,使用一個隊(duì)列進(jìn)行數(shù)據(jù)溝通

具體來說,Local Exchange 可以有幾種方式改變并行度。如一改多、多改一。多改一,典型的例子如,并行 sort:先切成多個分片每個分片分別 sort,后通過 Local Exchange 進(jìn)行 merge sort。不僅單個 Pipeline 的多個 Driver 在進(jìn)行數(shù)據(jù)合并時可以用 Local Exchange,多個 Pipeline 的合并也可以用 Local Exchange,不妨稱之為多并一。典型例子有,Union All,將多個數(shù)據(jù)集合并起來。

圖片

 多改一 

圖片

 多并一

一改多通常用在,在經(jīng)歷了某些必須使用單線程的算子后(比如一些 Shuffle 算子),重新對數(shù)據(jù)分片提高并發(fā)度,使用多線程運(yùn)行。

圖片

 一改多

Local Merge Sources

LocalMerge 算子和 LocalExchange 算子類似,但對并發(fā)數(shù)和輸入都有限定。其所在 Pipeline 只會單線程運(yùn)行,但會接受多線程運(yùn)行的 Pipeline 的輸入。并且要求所有輸入有序,然后將輸入進(jìn)行歸并,保證輸出是有序的。

LocalMerge 算子通過 Task::getLocalMergeSources() 來獲取所有待 Merge 的 sources。因此,每個 LocalMergeNode 會初始化給定并發(fā)數(shù)個 LocalMergeSource。

Merge Join Sources

MergeJoin 算子提供了某種接受右側(cè)輸入的方法。Task 會在右側(cè) Pipeline 增加一個 CallbackSink 算子,來匯集數(shù)據(jù)。左側(cè)算子可以通過 Task::getMergeJoinSource() 接口來獲取該 CallbackSink 的輸出。

擴(kuò)展性

Velox 允許用戶自定義 PlanNode 和 Operator,以及 Join 相關(guān)的 Operator 和 Bridge。自定義 Operator 可以訪問 task 中的 splits 并使用 barriers。

但 Exchange clients, local exchange queues 和 local merge sources、 merge join sources 等狀態(tài)由于不是通用的,因此訪問不了。

總結(jié)

小節(jié)一下,Task 負(fù)責(zé)將由 PlanNode 組成的 PlanTree 翻譯成由 Operator 組成的 Pipeline,并且對 Pipeline 進(jìn)行并發(fā)運(yùn)行。在此期間,Task 會維護(hù) Operator 間的共享狀態(tài)、協(xié)調(diào) Operator 間的運(yùn)行依賴。這些共享狀態(tài)包括:

  • Splits
  • Join bridges and barriers
  • Exchange clients
  • Local exchange queues
  • Local merge sources
  • Merge join sources

上述的每個狀態(tài)都是和特定 PlanNode 關(guān)聯(lián)的(即不是全局范圍的,而是和 PlanNode 綁定的),因此 Opeator 需要使用 PlanNodeID 來訪問相關(guān)狀態(tài)。前兩個狀態(tài)是所有算子都有的,因此自定義算子可以訪問到,后幾個狀態(tài)是某些算子特有的,因此自定義算子訪問不到。

責(zé)任編輯:武曉燕 來源: 木鳥雜記
相關(guān)推薦

2010-02-23 10:15:22

WCF運(yùn)行機(jī)制

2015-11-20 11:20:54

js開發(fā)

2017-05-31 13:16:35

PHP運(yùn)行機(jī)制原理解析

2019-05-10 14:00:21

小程序運(yùn)行機(jī)制前端

2009-02-03 14:00:20

PHP運(yùn)行PHP調(diào)用PHP原理

2009-12-11 10:52:37

PHP運(yùn)行機(jī)制

2010-02-01 17:19:30

C++運(yùn)行機(jī)制

2019-08-15 10:17:16

Webpack運(yùn)行瀏覽器

2019-10-11 09:00:00

JavaScriptEvent Loop前端

2010-01-05 16:10:21

.NET Framew

2018-12-26 16:30:09

SQL Server內(nèi)部運(yùn)行機(jī)制數(shù)據(jù)庫

2012-03-06 10:22:00

程序

2009-10-22 17:10:04

CLR和JRE運(yùn)行機(jī)制

2010-09-28 11:05:49

jQuery

2015-11-16 11:17:30

PHP底層運(yùn)行機(jī)制原理

2016-12-13 14:12:25

程序機(jī)制

2016-12-14 14:41:20

Hello World程序運(yùn)行機(jī)制

2017-07-12 14:58:21

AndroidInstant Run

2010-05-06 17:54:54

Oracle鎖

2022-02-11 23:11:09

Kubernetes集群容器化
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號