自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink執(zhí)行流程與源碼分析

大數(shù)據(jù)
整體的流程與架構(gòu)可能三兩張圖或者三言兩語就可以勾勒出畫面,但是背后源碼的實現(xiàn)是艱辛的。源碼的復(fù)雜度和當(dāng)初設(shè)計框架的抓狂感,我們只有想象。現(xiàn)在我們只是站在巨人的肩膀上去學(xué)習(xí)。

[[422512]]

本文轉(zhuǎn)載自微信公眾號「大數(shù)據(jù)左右手」,作者王了個博。轉(zhuǎn)載本文請聯(lián)系大數(shù)據(jù)左右手公眾號。

Flink主要組件

作業(yè)管理器(JobManager)

(1) 控制一個應(yīng)用程序執(zhí)行的主進程,也就是說,每個應(yīng)用程序 都會被一個不同的Jobmanager所控制執(zhí)行

(2) Jobmanager會先接收到要執(zhí)行的應(yīng)用程序,這個應(yīng)用程序會包括:作業(yè)圖( Job Graph)、邏輯數(shù)據(jù)流圖( ogical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。

(3) Jobmanager會把 Jobgraph轉(zhuǎn)換成一個物理層面的 數(shù)據(jù)流圖,這個圖被叫做 “執(zhí)行圖”(Executiongraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)。Job Manager會向資源管理器( Resourcemanager)請求執(zhí)行任務(wù)必要的資源,也就是 任務(wù)管理器(Taskmanager)上的插槽slot。一旦它獲取到了足夠的資源,就會將執(zhí)行圖分發(fā)到真正運行它們的 Taskmanager上。而在運行過程中Jobmanagera會負責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說檢查點(checkpoints)的協(xié)調(diào)。

任務(wù)管理器(Taskmanager)

(1) Flink中的工作進程。通常在 Flink中會有多個 Taskmanageria運行, 每個 Taskmanageri都包含了一定數(shù)量的插槽( slots)。插槽的數(shù)量限制了Taskmanageri能夠執(zhí)行的任務(wù)數(shù)量。

(2) 啟動之后, Taskmanager會向資源管理器注冊它的插槽;收到資源管理器的指令后, Taskmanageri就會將一個或者多個插槽提供給Jobmanageri調(diào)用。Jobmanager就可以向插槽分配任務(wù)( tasks)來執(zhí)行了。

(3) 在執(zhí)行過程中, 一個 Taskmanagera可以跟其它運行同一應(yīng)用程序的Taskmanager交換數(shù)據(jù)。

資源管理器(Resource Manager)

(1) 主要負責(zé)管理任務(wù)管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定義的處理資源單元。

(2) Flink 為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如YARNMesos、K8s,以及 standalone部署。

(3) 當(dāng) Jobmanager申請插槽資源時, Resourcemanager會將有空閑插槽的Taskmanager?分配給Jobmanager。如果 Resourcemanagery沒有足夠的插槽來滿足 Jobmanager的請求, 它還可以向資源提供平臺發(fā)起會話,以提供啟動 Taskmanager進程的容器。

分發(fā)器(Dispatcher)

(1) 可以跨作業(yè)運行,它為應(yīng)用提交提供了REST接口。

(2)當(dāng)一個應(yīng)用被提交執(zhí)行時,分發(fā)器就會啟動并將應(yīng)用移交給Jobmanage

(3) Dispatcher他會啟動一個 WebUi,用來方便地 展示和監(jiān)控作業(yè)執(zhí)行的信息。

任務(wù)提交流程

  1. 提交應(yīng)用
  2. 啟動并提交應(yīng)用
  3. 請求slots
  4. 任務(wù)啟動
  5. 注冊slots
  6. 發(fā)出提供slot的指令
  7. 提供slots
  8. 提交要在slots中執(zhí)行的任務(wù)
  9. 交換數(shù)據(jù)

任務(wù)提交流程(YARN)

a. Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置

b. 隨后向 Yarn ResourceManager提交任務(wù)ResourceManager分配 Container資源并通知對應(yīng)的NodeManager啟動

c. ApplicationMaster,ApplicationMaster 啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境

d. 然后啟動JobManager , 之后ApplicationMaster 向ResourceManager 申請資源啟動TaskManager

e. ResourceManager 分配 Container 資源后 , 由ApplicationMaster通知資源所在節(jié)點的NodeManager啟動TaskManager

f. NodeManager 加載 Flink 的 Jar 包和配置構(gòu)建環(huán)境并啟動 TaskManager

g. TaskManager 啟動后向 JobManager 發(fā)送心跳包,并等待 JobManager 向其分配任務(wù)。

源碼分析--集群啟動 JobManager 啟動分析

JobManager 的內(nèi)部包含非常重要的三大組件

  • WebMonitorEndpoint
  • ResourceManager
  • Dispatcher

入口,啟動主類:StandaloneSessionClusterEntrypoint

  1. // 入 口 
  2. StandaloneSessionClusterEntrypoint.main() ClusterEntrypoint.runClusterEntrypoint(entrypoint); 
  3. clusterEntrypoint.startCluster();  
  4. runCluster(configuration, pluginManager); 
  5.  
  6. // 第一步:初始化各種服務(wù) 
  7.  /** 
  8.   * 初始化了 主節(jié)點對外提供服務(wù)的時候所需要的 三大核心組件啟動時所需要的基礎(chǔ)服務(wù) 
  9.   *  初始化服務(wù),如 JobManager 的 Akka RPC 服務(wù),HA 服務(wù),心跳檢查服務(wù),metric service 
  10.   *  這些服務(wù)都是 Master 節(jié)點要使用到的一些服務(wù) 
  11.   *  1、commonRpcService:  基于 Akka 的 RpcService 實現(xiàn)。RPC 服務(wù)啟動 Akka 參與者來接收從 RpcGateway 調(diào)用 RPC 
  12.   *  2、haServices:    提供對高可用性所需的所有服務(wù)的訪問注冊,分布式計數(shù)器和領(lǐng)導(dǎo)人選舉 
  13.   *  3、blobServer:    負責(zé)偵聽傳入的請求生成線程來處理這些請求。它還負責(zé)創(chuàng)建要存儲的目錄結(jié)構(gòu) blob 或臨時緩存它們 
  14.   *  4、heartbeatServices:  提供心跳所需的所有服務(wù)。這包括創(chuàng)建心跳接收器和心跳發(fā)送者。 
  15.   *  5、metricRegistry:   跟蹤所有已注冊的 Metric,它作為連接 MetricGroup 和 MetricReporter 
  16.   *  6、archivedExecutionGraphStore:   存儲執(zhí)行圖ExecutionGraph的可序列化形式。 
  17. */ 
  18. initializeServices(configuration, pluginManager); 
  19.  
  20. // 創(chuàng)建 DispatcherResourceManagerComponentFactory, 初始化各種組件的 
  21. 工廠實例 
  22. // 其實內(nèi)部包含了三個重要的成員變量: 
  23. // 創(chuàng)建 ResourceManager 的工廠實例 
  24. // 創(chuàng)建 Dispatcher 的工廠實例 
  25. // 創(chuàng)建 WebMonitorEndpoint 的工廠實例 
  26. createDispatcherResourceManagerComponentFactory(configuration); 
  27.  
  28. // 創(chuàng)建 集群運行需要的一些組件:Dispatcher, ResourceManager 等 
  29. // 創(chuàng) 建 ResourceManager 
  30. // 創(chuàng) 建 Dispatcher 
  31. // 創(chuàng) 建 WebMonitorEndpoint 
  32. clusterComponent = dispatcherResourceManagerComponentFactory.create(...) 

1. initializeServices():初始化各種服務(wù)

  1. // 初 始 化 和 啟 動 AkkaRpcService, 內(nèi) 部 其 實 包 裝 了 一 個 ActorSystem commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...) 
  2.  
  3. // 初始化一個負責(zé) IO 的線程池 
  4. ioExecutor = Executors.newFixedThreadPool(...) 
  5. // 初始化 HA 服務(wù)組件,負責(zé) HA 服務(wù)的是:ZooKeeperHaServices haServices = createHaServices(configuration, ioExecutor); 
  6.  
  7. // 初始化 BlobServer 服務(wù)端 
  8. blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); 
  9.  
  10. // 初始化心跳服務(wù)組件, heartbeatServices = HeartbeatServices heartbeatServices = createHeartbeatServices(configuration); 
  11.  
  12. // 初始化一個用來存儲 ExecutionGraph 的 Store, 實現(xiàn)是: 
  13. FileArchivedExecutionGraphStore 
  14. archivedExecutionGraphStore = createSerializableExecutionGraphStore(...) 

2. createDispatcherResourceManagerComponentFactory(configuration)初始化了多組件的工廠實例

  1. 1、DispatcherRunnerFactory,默認實現(xiàn):DefaultDispatcherRunnerFactory  
  2.  
  3. 2、ResourceManagerFactory,默認實現(xiàn):StandaloneResourceManagerFactory  
  4.  
  5. 3、RestEndpointFactory,默認實現(xiàn):SessionRestEndpointFactory 
  6.  
  7. clusterComponent = dispatcherResourceManagerComponentFactory 
  8.     .create(configuration, ioExecutor, commonRpcService, haServices, 
  9.      blobServer, heartbeatServices, metricRegistry, 
  10.      archivedExecutionGraphStore, 
  11.      new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), 
  12.      this); 

3. 創(chuàng)建 WebMonitorEndpoint

  1. /************************************************* 
  2.   *  創(chuàng)建 WebMonitorEndpoint 實例, 在 Standalone 模式下:DispatcherRestEndpoint 
  3.   *  1、restEndpointFactory = SessionRestEndpointFactory 
  4.   *  2、webMonitorEndpoint = DispatcherRestEndpoint 
  5.   *  3、highAvailabilityServices.getClusterRestEndpointLeaderElectionService() = ZooKeeperLeaderElectionService 
  6.   *  當(dāng)前這個 DispatcherRestEndpoint 的作用是: 
  7.   *  1、初始化的過程中,會一大堆的 Handler 
  8.   *  2、啟動一個 Netty 的服務(wù)端,綁定了這些 Handler 
  9.   *  3、當(dāng) client 通過 flink 命令執(zhí)行了某些操作(發(fā)起 restful 請求), 服務(wù)端由 webMonitorEndpoint 來執(zhí)行處理 
  10.   *  4、舉個例子: 如果通過 flink run 提交一個 Job,那么最后是由 webMonitorEndpoint 中的 JobSubmitHandler 來執(zhí)行處理 
  11.   *  5、補充一個:job 由 JobSubmitHandler 執(zhí)行完畢之后,轉(zhuǎn)交給 Dispatcher 去調(diào)度執(zhí)行 
  12.   */ 
  13.  webMonitorEndpoint = restEndpointFactory.createRestEndpoint( 
  14.   configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, 
  15.   blobServer, executor, metricFetcher, 
  16.   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), 
  17.   fatalErrorHandler 
  18.  ); 

4. 創(chuàng)建 resourceManager

  1. /************************************************* 
  2.  *  創(chuàng)建 StandaloneResourceManager 實例對象 
  3.  *  1、resourceManager = StandaloneResourceManager 
  4.  *  2、resourceManagerFactory = StandaloneResourceManagerFactory 
  5. */ 
  6. resourceManager = resourceManagerFactory.createResourceManager( 
  7.  configuration, ResourceID.generate(), 
  8.  rpcService, highAvailabilityServices, heartbeatServices, 
  9.  fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), 
  10.  webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname 
  11. ); 
  1. protected ResourceManager<ResourceID> createResourceManager( 
  2.   Configuration configuration, 
  3.   ResourceID resourceId, 
  4.   RpcService rpcService, 
  5.   HighAvailabilityServices highAvailabilityServices, 
  6.   HeartbeatServices heartbeatServices, 
  7.   FatalErrorHandler fatalErrorHandler, 
  8.   ClusterInformation clusterInformation, 
  9.   @Nullable String webInterfaceUrl, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, 
  11.   ResourceManagerRuntimeServices resourceManagerRuntimeServices) { 
  12.  
  13.  final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration); 
  14.  
  15.  /************************************************* 
  16.   *  注釋: 得到一個 StandaloneResourceManager 實例對象 
  17.   */ 
  18.  return new StandaloneResourceManager( 
  19.   rpcService, 
  20.   resourceId, 
  21.   highAvailabilityServices, 
  22.   heartbeatServices, 
  23.   resourceManagerRuntimeServices.getSlotManager(), 
  24.   ResourceManagerPartitionTrackerImpl::new, 
  25.   resourceManagerRuntimeServices.getJobLeaderIdService(), 
  26.   clusterInformation, 
  27.   fatalErrorHandler, 
  28.   resourceManagerMetricGroup, 
  29.   standaloneClusterStartupPeriodTime, 
  30.   AkkaUtils.getTimeoutAsTime(configuration) 
  31.  ); 
  32.  
  33.  } 
  34.   
  1. /** 
  2. requestSlot():接受 solt請求 
  3. sendSlotReport(..): 將solt請求發(fā)送TaskManager 
  4. registerJobManager(...): 注冊job管理者。 該job指的是 提交給flink的應(yīng)用程序 
  5. registerTaskExecutor(...): 注冊task執(zhí)行者。 
  6. **/ 
  7. public ResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices, 
  8.   HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, 
  9.   JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) { 
  11.  
  12.  /************************************************* 
  13.   *  注釋: 當(dāng)執(zhí)行完畢這個構(gòu)造方法的時候,會觸發(fā)調(diào)用 onStart() 方法執(zhí)行 
  14.   */ 
  15.  super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null); 
  1. protected RpcEndpoint(final RpcService rpcService, final String endpointId) { 
  2.  this.rpcService = checkNotNull(rpcService, "rpcService"); 
  3.  this.endpointId = checkNotNull(endpointId, "endpointId"); 
  4.  
  5.  /************************************************* 
  6.   *  注釋:ResourceManager 或者 TaskExecutor 中的 RpcServer 實現(xiàn) 
  7.   *  以 ResourceManager 為例說明: 
  8.   *  啟動 ResourceManager 的 RPCServer 服務(wù) 
  9.   *  這里啟動的是 ResourceManager 的 Rpc 服務(wù)端。 
  10.   *  接收 TaskManager 啟動好了而之后, 進行注冊和心跳,來匯報 Taskmanagaer 的資源情況 
  11.   *  通過動態(tài)代理的形式構(gòu)建了一個Server 
  12.   */ 
  13.  this.rpcServer = rpcService.startServer(this); 

5. 在創(chuàng)建resourceManager同級:啟動任務(wù)接收器Starting Dispatcher

  1. /************************************************* 
  2.  
  3.  *  創(chuàng)建 并啟動 Dispatcher 
  4.  *  1、dispatcherRunner = DispatcherRunnerLeaderElectionLifecycleManager 
  5.  *  2、dispatcherRunnerFactory = DefaultDispatcherRunnerFactory 
  6.  *  第一個參數(shù):ZooKeeperLeaderElectionService 
  7.  *  - 
  8.  *  老版本: 這個地方是直接創(chuàng)建一個 Dispatcher 對象然后調(diào)用 dispatcher.start() 來啟動 
  9.  *  新版本: 直接創(chuàng)建一個 DispatcherRunner, 內(nèi)部就是要創(chuàng)建和啟動 Dispatcher 
  10.  *  - 
  11.  *  DispatcherRunner 是對 Dispatcher 的封裝。 
  12.  *  DispatcherRunner被創(chuàng)建的代碼的內(nèi)部,會創(chuàng)建 Dispatcher并啟動 
  13.  */ 
  14. log.debug("Starting Dispatcher."); 
  15. dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( 
  16.  highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, 
  17.  // TODO_ZYM 注釋: 注意第三個參數(shù) 
  18.  new HaServicesJobGraphStoreFactory(highAvailabilityServices), 
  19.  ioExecutor, rpcService, partialDispatcherServices 
  20. ); 

Dispatcher 啟動后,將會等待任務(wù)提交,如果有任務(wù)提交,則會經(jīng)過submitJob(...)函數(shù)進入后續(xù)處理。

提交(一個Flink應(yīng)用的提交必須經(jīng)過三個graph的轉(zhuǎn)換)

首先看下一些名詞

StreamGraph

是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結(jié)構(gòu)??梢杂靡粋€ DAG 來表示),DAG 的頂點是 StreamNode,邊是 StreamEdge,邊包含了由哪個 StreamNode 依賴哪個 StreamNode。

  • StreamNode:用來代表 operator 的類,并具有所有相關(guān)的屬性,如并發(fā)度、入邊和出邊等。
  • StreamEdge:表示連接兩個StreamNode的邊。

DataStream 上常見的 transformation 有 map、flatmap、filter等(見DataStream Transformation了解更多)。這些transformation會構(gòu)造出一棵 StreamTransformation 樹,通過這棵樹轉(zhuǎn)換成 StreamGraph

以map方法為例,看看源碼

  1. public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) { 
  2.   // 通過java reflection抽出mapper的返回值類型 
  3.   TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), 
  4.       Utils.getCallLocationName(), true); 
  5.  
  6.   // 返回一個新的DataStream,SteramMap 為 StreamOperator 的實現(xiàn)類 
  7.   return transform("Map", outType, new StreamMap<>(clean(mapper))); 
  8.  
  9. public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { 
  10.   // read the output type of the input Transform to coax out errors about MissingTypeInfo 
  11.   transformation.getOutputType(); 
  12.  
  13.   // 新的transformation會連接上當(dāng)前DataStream中的transformation,從而構(gòu)建成一棵樹 
  14.   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( 
  15.       this.transformation, 
  16.       operatorName, 
  17.       operator, 
  18.       outTypeInfo, 
  19.       environment.getParallelism()); 
  20.  
  21.   @SuppressWarnings({ "unchecked""rawtypes" }) 
  22.   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); 
  23.  
  24.   // 所有的transformation都會存到 env 中,調(diào)用execute時遍歷該list生成StreamGraph 
  25.   getExecutionEnvironment().addOperator(resultTransform); 
  26.  
  27.   return returnStream; 

map轉(zhuǎn)換將用戶自定義的函數(shù)MapFunction包裝到StreamMap這個Operator中,再將StreamMap包裝到OneInputTransformation,最后該transformation存到env中,當(dāng)調(diào)用env.execute時,遍歷其中的transformation集合構(gòu)造出StreamGraph

JobGraph

(1) StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個符合條件的節(jié)點 chain 在一起作為一個節(jié)點。

  • 將并不涉及到 shuffle 的算子進行合并。
  • 對于同一個 operator chain 里面的多個算子,會在同一個 task 中執(zhí)行。
  • 對于不在同一個 operator chain 里的算子,會在不同的 task 中執(zhí)行。

(2) JobGraph 用來由 JobClient 提交給 JobManager,是由頂點(JobVertex)、中間結(jié)果(IntermediateDataSet)和邊(JobEdge)組成的 DAG 圖。

(3) JobGraph 定義作業(yè)級別的配置,而每個頂點和中間結(jié)果定義具體操作和中間數(shù)據(jù)的設(shè)置。

JobVertex

JobVertex 相當(dāng)于是 JobGraph 的頂點。經(jīng)過優(yōu)化后符合條件的多個StreamNode可能會chain在一起生成一個JobVertex,即一個JobVertex包含一個或多個operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。

IntermediateDataSet

JobVertex的輸出,即經(jīng)過operator處理產(chǎn)生的數(shù)據(jù)集。

JobEdge

job graph中的一條數(shù)據(jù)傳輸通道。source 是IntermediateDataSet,sink 是 JobVertex。即數(shù)據(jù)通過JobEdge由IntermediateDataSet傳遞給目標JobVertex。

(1) 首先是通過API會生成transformations,通過transformations會生成StreamGraph。

(2)將StreamGraph的某些StreamNode Chain在一起生成JobGraph,前兩步轉(zhuǎn)換都是在客戶端完成。

(3)最后會將JobGraph轉(zhuǎn)換為ExecutionGraph,相比JobGraph會增加并行度的概念,這一步是在Jobmanager里完成。

ExecutionJobVertex

ExecutionJobVertex一一對應(yīng)JobGraph中的JobVertex

ExecutionVertex

一個ExecutionJobVertex對應(yīng)n個ExecutionVertex,其中n就是算子的并行度。ExecutionVertex就是并行任務(wù)的一個子任務(wù)

Execution

Execution 是對 ExecutionVertex 的一次執(zhí)行,通過 ExecutionAttemptId 來唯一標識。

IntermediateResult

在 JobGraph 中用 IntermediateDataSet 表示 JobVertex 的對外輸出,一個 JobGraph 可能有 n(n >=0) 個輸出。在 ExecutionGraph 中,與此對應(yīng)的就是 IntermediateResult。每一個 IntermediateResult 就有 numParallelProducers(并行度) 個生產(chǎn)者,每個生產(chǎn)者的在相應(yīng)的 IntermediateResult 上的輸出對應(yīng)一個 IntermediateResultPartition。IntermediateResultPartition 表示的是 ExecutionVertex 的一個輸出分區(qū)

ExecutionEdge

ExecutionEdge 表示 ExecutionVertex 的輸入,通過 ExecutionEdge 將 ExecutionVertex 和 IntermediateResultPartition 連接起來,進而在不同的 ExecutionVertex 之間建立聯(lián)系。

ExecutionGraph的構(gòu)建

  1. 構(gòu)建JobInformation
  2. 構(gòu)建ExecutionGraph
  3. 將JobGraph進行拓撲排序,獲取sortedTopology頂點集合
  1. // ExecutionGraphBuilder 
  2.  public static ExecutionGraph buildGraph( 
  3.   @Nullable ExecutionGraph prior
  4.   JobGraph jobGraph, 
  5.   ...) throws JobExecutionException, JobException { 
  6.   // 構(gòu)建JobInformation 
  7.    
  8.   // 構(gòu)建ExecutionGraph 
  9.    
  10.   // 將JobGraph進行拓撲排序,獲取sortedTopology頂點集合 
  11.   List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); 
  12.    
  13.   executionGraph.attachJobGraph(sortedTopology); 
  14.  
  15.   return executionGraph; 
  16.  } 

構(gòu)建ExecutionJobVertex,連接IntermediateResultPartition和ExecutionVertex

  1. //ExecutionGraph 
  2.  public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException { 
  3.   for (JobVertex jobVertex : topologiallySorted) { 
  4.    // 構(gòu)建ExecutionJobVertex 
  5.    ExecutionJobVertex ejv = new ExecutionJobVertex( 
  6.      this, 
  7.      jobVertex, 
  8.      1, 
  9.      maxPriorAttemptsHistoryLength, 
  10.      rpcTimeout, 
  11.      globalModVersion, 
  12.      createTimestamp); 
  13.    // 連接IntermediateResultPartition和ExecutionVertex 
  14.    ev.connectToPredecessors(this.intermediateResults); 
  15.  } 
  16.    
  17.    
  18.   // ExecutionJobVertex 
  19.  public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException { 
  20.   List<JobEdge> inputs = jobVertex.getInputs(); 
  21.    
  22.   for (int num = 0; num < inputs.size(); num++) { 
  23.    JobEdge edge = inputs.get(num); 
  24.    IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); 
  25.    this.inputs.add(ires); 
  26.    int consumerIndex = ires.registerConsumer(); 
  27.     
  28.    for (int i = 0; i < parallelism; i++) { 
  29.     ExecutionVertex ev = taskVertices[i]; 
  30.     ev.connectSource(num, ires, edge, consumerIndex); 
  31.    } 
  32.   } 
  33.  } 

拆分計劃(可執(zhí)行能力)

  1. // ExecutionVertex 
  2.  public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) { 
  3.  
  4.   final DistributionPattern pattern = edge.getDistributionPattern(); 
  5.   final IntermediateResultPartition[] sourcePartitions = source.getPartitions(); 
  6.  
  7.   ExecutionEdge[] edges; 
  8.  
  9.   switch (pattern) { 
  10.    // 下游 JobVertex 的輸入 partition 算法,如果是 forward 或 rescale 的話為 POINTWISE 
  11.    case POINTWISE: 
  12.     edges = connectPointwise(sourcePartitions, inputNumber); 
  13.     break; 
  14.    // 每一個并行的ExecutionVertex節(jié)點都會鏈接到源節(jié)點產(chǎn)生的所有中間結(jié)果IntermediateResultPartition 
  15.    case ALL_TO_ALL: 
  16.     edges = connectAllToAll(sourcePartitions, inputNumber); 
  17.     break; 
  18.  
  19.    default
  20.     throw new RuntimeException("Unrecognized distribution pattern."); 
  21.  
  22.   } 
  23.  
  24.   inputEdges[inputNumber] = edges; 
  25.   for (ExecutionEdge ee : edges) { 
  26.    ee.getSource().addConsumer(ee, consumerNumber); 
  27.   } 
  28.  } 
  29.  
  30.  
  31.  private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  32.   final int numSources = sourcePartitions.length; 
  33.   final int parallelism = getTotalNumberOfParallelSubtasks(); 
  34.  
  35.   // 如果并發(fā)數(shù)等于partition數(shù),則一對一進行連接 
  36.   if (numSources == parallelism) { 
  37.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; 
  38.   } 
  39.   //  如果并發(fā)數(shù)大于partition數(shù),則一對多進行連接 
  40.   else if (numSources < parallelism) { 
  41.  
  42.    int sourcePartition; 
  43.  
  44.    if (parallelism % numSources == 0) { 
  45.     int factor = parallelism / numSources; 
  46.     sourcePartition = subTaskIndex / factor; 
  47.    } 
  48.    else { 
  49.     float factor = ((float) parallelism) / numSources; 
  50.     sourcePartition = (int) (subTaskIndex / factor); 
  51.    } 
  52.  
  53.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) }; 
  54.   } 
  55.   // 果并發(fā)數(shù)小于partition數(shù),則多對一進行連接 
  56.   else { 
  57.    if (numSources % parallelism == 0) { 
  58.     int factor = numSources / parallelism; 
  59.     int startIndex = subTaskIndex * factor; 
  60.  
  61.     ExecutionEdge[] edges = new ExecutionEdge[factor]; 
  62.     for (int i = 0; i < factor; i++) { 
  63.      edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber); 
  64.     } 
  65.     return edges; 
  66.    } 
  67.    else { 
  68.     float factor = ((float) numSources) / parallelism; 
  69.  
  70.     int start = (int) (subTaskIndex * factor); 
  71.     int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ? 
  72.       sourcePartitions.length : 
  73.       (int) ((subTaskIndex + 1) * factor); 
  74.  
  75.     ExecutionEdge[] edges = new ExecutionEdge[end - start]; 
  76.     for (int i = 0; i < edges.length; i++) { 
  77.      edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber); 
  78.     } 
  79.  
  80.     return edges; 
  81.    } 
  82.   } 
  83.  } 
  84.  
  85.  
  86.  private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  87.   ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length]; 
  88.  
  89.   for (int i = 0; i < sourcePartitions.length; i++) { 
  90.    IntermediateResultPartition irp = sourcePartitions[i]; 
  91.    edges[i] = new ExecutionEdge(irp, this, inputNumber); 
  92.   } 
  93.  
  94.   return edges; 
  95.  } 

返回ExecutionGraph

TaskManager

TaskManager啟動

  1. public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception { 
  2.         //主要初始化一堆的service,并新建一個org.apache.flink.runtime.taskexecutor.TaskExecutor 
  3.   final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration,resourceId); 
  4.   //調(diào)用TaskExecutor的start()方法 
  5.         taskManagerRunner.start(); 

TaskExecutor :submitTask()

接著的重要函數(shù)是shumitTask()函數(shù),該函數(shù)會通過AKKA機制,向TaskManager發(fā)出一個submitTask的消息請求,TaskManager收到消息請求后,會執(zhí)行submitTask()方法。(省略了部分代碼)。

  1. public CompletableFuture<Acknowledge> submitTask( 
  2.    TaskDeploymentDescriptor tdd, 
  3.    JobMasterId jobMasterId, 
  4.    Time timeout) { 
  5.  
  6.     jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); 
  7.     taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); 
  8.     
  9.    TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(xxx); 
  10.  
  11.    InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(xxx); 
  12.  
  13.    TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); 
  14.    CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); 
  15.  
  16.    LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); 
  17.    ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); 
  18.    PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); 
  19.  
  20.    final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( 
  21.     jobId, 
  22.     tdd.getAllocationId(), 
  23.     taskInformation.getJobVertexId(), 
  24.     tdd.getSubtaskIndex()); 
  25.  
  26.    final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); 
  27.  
  28.    final TaskStateManager taskStateManager = new TaskStateManagerImpl( 
  29.     jobId, 
  30.     tdd.getExecutionAttemptId(), 
  31.     localStateStore, 
  32.     taskRestore, 
  33.     checkpointResponder); 
  34.             //新建一個Task 
  35.    Task task = new Task(xxxx); 
  36.  
  37.    log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); 
  38.  
  39.    boolean taskAdded; 
  40.  
  41.    try { 
  42.     taskAdded = taskSlotTable.addTask(task); 
  43.    } catch (SlotNotFoundException | SlotNotActiveException e) { 
  44.     throw new TaskSubmissionException("Could not submit task.", e); 
  45.    } 
  46.  
  47.    if (taskAdded) { 
  48.        //啟動任務(wù) 
  49.     task.startTaskThread(); 
  50.  
  51.     return CompletableFuture.completedFuture(Acknowledge.get()); 
  52.    }  

最后創(chuàng)建執(zhí)行Task的線程,然后調(diào)用startTaskThread()來啟動具體的執(zhí)行線程,Task線程內(nèi)部的run()方法承載了被執(zhí)行的核心邏輯。

Task是執(zhí)行在TaskExecutor進程里的一個線程,下面來看看其run方法

(1) 檢測當(dāng)前狀態(tài),正常情況為CREATED,如果是FAILED或CANCELING直接返回,其余狀態(tài)將拋異常。

(2) 讀取DistributedCache文件。

(3) 啟動ResultPartitionWriter和InputGate。

(4) 向taskEventDispatcher注冊partitionWriter。

(5) 根據(jù)nameOfInvokableClass加載對應(yīng)的類并實例化。

(6) 將狀態(tài)置為RUNNING并執(zhí)行invoke方法。

  1. public void run() { 
  2.         while (true) { 
  3.             ExecutionState current = this.executionState; 
  4.             invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); 
  5.             network.registerTask(this); 
  6.             Environment env = new RuntimeEnvironment(. . . . ); 
  7.             invokable.setEnvironment(env); 
  8.             //  actual task core work 
  9.             if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { 
  10.             } 
  11.             // notify everyone that we switched to running 
  12.             notifyObservers(ExecutionState.RUNNING, null); 
  13.             executingThread.setContextClassLoader(userCodeClassLoader); 
  14.             // run the invokable 
  15.             invokable.invoke(); 
  16.  
  17.             if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { 
  18.                 notifyObservers(ExecutionState.FINISHED, null); 
  19.             } 
  20.             Finally{ 
  21.                 // free the network resources 
  22.                 network.unregisterTask(this); 
  23.                 // free memory resources 
  24.                 if (invokable != null) { 
  25.                     memoryManager.releaseAll(invokable); 
  26.                 } 
  27.                 libraryCache.unregisterTask(jobId, executionId); 
  28.                 removeCachedFiles(distributedCacheEntries, fileCache); 

總結(jié)

整體的流程與架構(gòu)可能三兩張圖或者三言兩語就可以勾勒出畫面,但是背后源碼的實現(xiàn)是艱辛的。源碼的復(fù)雜度和當(dāng)初設(shè)計框架的抓狂感,我們只有想象?,F(xiàn)在我們只是站在巨人的肩膀上去學(xué)習(xí)。

本篇的主題是"Flink架構(gòu)與執(zhí)行流程",做下小結(jié),F(xiàn)link on Yarn的提交執(zhí)行流程:

1 Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置。

2 向Yarn ResourceManager提交任務(wù)。

3 ResourceManager分配Container資源并通知對應(yīng)的NodeManager啟動ApplicationMaster。

4 ApplicationMaster啟動后加載Flink的Jar包和配置構(gòu)建環(huán)境。

5 啟動JobManager之后ApplicationMaster向ResourceManager申請資源啟動TaskManager。

6 ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節(jié)點的NodeManager啟動TaskManager。

7 NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動TaskManager。

8 TaskManager啟動后向JobManager發(fā)送心跳包,并等待JobManager向其分配任務(wù)。

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)左右手
相關(guān)推薦

2022-04-05 12:59:07

源碼線程onEvent

2022-08-27 08:02:09

SQL函數(shù)語法

2012-08-30 09:48:02

Struts2Java

2016-10-21 13:03:18

androidhandlerlooper

2016-11-29 09:38:06

Flume架構(gòu)核心組件

2016-11-25 13:26:50

Flume架構(gòu)源碼

2024-07-15 09:58:03

OpenRestyNginx日志

2016-11-25 13:14:50

Flume架構(gòu)源碼

2022-06-07 10:33:29

Camera組件鴻蒙

2020-07-13 09:09:23

Sentinel源碼Bucket

2009-12-22 13:36:39

Linux Sysfs

2015-01-14 13:22:36

OpenStack創(chuàng)建快照glance api

2017-08-22 13:45:27

2009-07-08 10:30:57

WebWork

2022-07-15 08:52:03

Linux優(yōu)化

2024-10-21 10:45:52

2016-11-29 16:59:46

Flume架構(gòu)源碼

2017-04-19 15:32:46

ReactRouter構(gòu)建源碼

2011-03-15 11:33:18

iptables

2014-08-26 11:11:57

AsyncHttpCl源碼分析
點贊
收藏

51CTO技術(shù)棧公眾號