自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

機器學習分布式框架Ray

人工智能 機器學習 分布式
Ray是UC Berkeley RISELab新推出的高性能分布式執(zhí)行框架,具有比Spark更優(yōu)異的計算性能,而且部署和改造更簡單,同時支持機器學習和深度學習的分布式訓練,支持主流的深度學習框架。

 [[422916]]

Python中文社區(qū) (ID:python-china)

1.什么是Ray

分布式計算框架大家一定都耳熟能詳,諸如離線計算的Hadoop(map-reduce),spark, 流式計算的strom,Flink等。相對而言,這些計算框架都依賴于其他大數(shù)據(jù)組件,安裝部署也相對復雜。

在python中,之前有分享過的Celery可以提供分布式的計算。今天和大家分享另外一個開源的分布式計算框架Ray。Ray是UC Berkeley RISELab新推出的高性能分布式執(zhí)行框架,具有比Spark更優(yōu)異的計算性能,而且部署和改造更簡單,同時支持機器學習和深度學習的分布式訓練,支持主流的深度學習框架(pytorch,tensorflow,keras等)。

2. Ray架構

Ray的架構參見最早發(fā)布的論文Ray: A Distributed Framework for Emerging AI Applications

由上圖可知Ray主要包括:

  •  Node: 節(jié)點,主要是head和worker, head可以認為是Master,worker是執(zhí)行任務的單元
    •   每個節(jié)點都有自己的本地調(diào)度器local scheduler
    •   object store:一個內(nèi)存對象存儲,允許Node之間進行通信
  •  scheduler:有兩個調(diào)度器,每個節(jié)點都有本地的調(diào)度器, 在提交任務時,Local Scheduler會判斷是否需要提交給Global Scheduler分發(fā)給其他worker來執(zhí)行。
  •  GCS:全局狀態(tài)控制記錄了Ray中各種對象的狀態(tài)信息,可以認為是meta數(shù)據(jù),是Ray容錯的保證

Ray適用于任何分布式計算的任務,包括分布式訓練。筆者最近是用在大量的時間序列預測模型訓練和在線預測上。

Ray目前庫支持超參數(shù)調(diào)優(yōu)Ray tune, 梯度下降Ray SGD,推理服務RaySERVE, 分布式數(shù)據(jù)Dataset以及分布式增強學習RLlib。還有其他第三方庫,如下所示:

3. 簡單使用

3.1 安裝部署 

  1. pip install --upgrade pip  
  2. # pip install ray  
  3. pip install ray == 1.6.0  
  4. # ImportError: cannot import name 'deep_mapping' from 'attr.validators'  
  5. # pip install attr == 19.1.0 

3.2 單機使用

  •  簡單例子 Ray 通過@ray.remote裝飾器使得函數(shù)變成可分布式調(diào)用的任務。通過函數(shù)名.remote方式進行提交任務,通過ray.get方式來獲取任務返回值。單擊情況下和多線程異步執(zhí)行的方式類似。 
  1. import time  
  2.   import ray  
  3.   ray.init(num_cpus = 4) # Specify this system has 4 CPUs.  
  4.   @ray.remote  
  5.   def do_some_work(x):  
  6.       time.sleep(1) # Replace this is with work you need to do.  
  7.       return x  
  8.   start = time.time()  
  9.   results = ray.get([do_some_work.remote(x) for x in range(4)])  
  10.   print("duration =", time.time() - start)  
  11.   print("results = ", results)   
  12.   # duration = 1.0107324123382568  
  13.   # results =  [0, 1, 2, 3] 

remote返回的對象的id 如ObjectRef(7f10737098927148ffffffff0100000001000000)。需要通過ray.get來獲取實際的值, 需要注意的是ray.get是阻塞式的調(diào)用,不能[ray.get(do_some_work.remote(x)) for x in range(4)]

  •  注意小任務使用情況 需要注意的是ray分布式計算在調(diào)度的時候需要發(fā)費額外的時間,如調(diào)度,進程間通信以及任務狀態(tài)的更新等等,所以避免過小的任務。可以把小任務進行合并   
  1. @ray.remote  
  2.    def tiny_work(x):  
  3.        time.sleep(0.0001) # Replace this is with work you need to do.  
  4.        return x  
  5.    start = time.time()  
  6.    result_ids = [tiny_work.remote(x) for x in range(100000)]  
  7.    results = ray.get(result_ids)  
  8.    print("duration =", time.time() - start) 
  •  ray.put ray.put() 把一個對象放到對象存儲上,返回一個object id, 這個id可以在分布式機器上都可以調(diào)用,該操作為異步的。通過ray.get()可以是獲取。 
  1. num = ray.put(10)  
  2. ray.get(num) 
  •  ray.wait 如果任務返回多個結果,ray.get()會等所有結果都完成之后才會執(zhí)行后續(xù)的操作。如果多個結果執(zhí)行的耗時不同,此時短板在于最長的那個任務。

          這個時候可以采用ray.wait()方法,ray.wait()返回執(zhí)行完畢的和未執(zhí)行完畢的任務結果,執(zhí)行完成的結果可以繼續(xù)后續(xù)的操作   

  1. import random  
  2.    @ray.remote  
  3.    def do_some_work(x):  
  4.        time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.  
  5.        return   
  6.    def process_incremental(sum, result):  
  7.        time.sleep(1) # Replace this with some processing code.  
  8.        return sum + result  
  9.    start = time.time()  
  10.    result_ids = [do_some_work.remote(x) for x in range(4)]  
  11.    sum = 0  
  12.    while len(result_ids):  
  13.        done_id, result_ids = ray.wait(result_ids)  
  14.        sum = process_incremental(sum, ray.get(done_id[0])) 
  15.     print("duration =", time.time() - start, "\nresult = ", sum)  
  16.    # duration = 5.270821809768677   
  17.    # result =  6    

2.3 集群部署

Ray的架構遵循master-slave的模式。Head Node 可以認為是Master,其他的Node為worker。在集群部署時,Head Node需要首先啟動ray start --head, 其他機器依次啟動worker,注意需要指定head Node的地址確定關系,ray start --address 10.8.xx.3:6379。

關閉服務,需要每一臺機器執(zhí)行 ray.stop

 

  1. # To start a head node.  
  2. #ray start --head --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>  
  3. ray start --head --node-ip-address 10.8.xx.3 --port=6379  
  4. # To start a non-head node.  
  5. # ray start --address=<address> --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>  
  6. ray start --address 10.8.xx.3:6379 --node-ip-address 10.8.xx.3 --num-cpus 10 --temp-dir={your temp path} 

  •  提交任務 任何一臺worker機器都可以提交任務, 先通過init連接Head Node就可以remote起來了。 
  1. import ray  
  2. ray.init(10.8.xx.3:6379) 

3. 不同任務的例子

  •  任務依賴 任務之間存在依賴關系,Ray和Spark一樣也是通過生成DAG圖的方式來確定依賴關系,確定可以并行跑的任務。如下圖所示zeros是可以并行跑的。 
  1. import numpy as np  
  2. # Define two remote functions. Invocations of these functions create tasks  
  3. # that are executed remotely.  
  4. @ray.remote  
  5. def multiply(x, y):  
  6.     return np.dot(x, y)  
  7. @ray.remote  
  8. def zeros(size):  
  9.     return np.zeros(size)  
  10. # Start two tasks in parallel. These immediately return futures and the  
  11. # tasks are executed in the background.  
  12. x_id = zeros.remote((100, 100))  
  13. y_id = zeros.remote((100, 100))  
  14. # Start a third task. This will not be scheduled until the first two  
  15. # tasks have completed.  
  16. z_id = multiply.remote(x_id, y_id)  
  17. # Get the result. This will block until the third task completes.  
  18. z = ray.get(z_id)  
  19. print(z)     

  •  有狀態(tài)任務 上面提到的任務都是無狀態(tài)的(除依賴外),即任務之間都是無關系的。Ray也是支持有狀態(tài)的任務成為Actor。常是在python class上加@ray.remote,ray會跟蹤每個class內(nèi)部狀態(tài)的不同狀態(tài)。 
  1. @ray.remote  
  2.  class Counter(object):  
  3.      def __init__(self):  
  4.          self.n = 0  
  5.      def increment(self):  
  6.          self.n += 1  
  7.      def read(self):  
  8.          return self.n  
  9.  counters = [Counter.remote() for i in range(4)]  
  10.  # 不斷的執(zhí)行可以每個counter計數(shù)不斷增加  
  11.  [c.increment.remote() for c in counters]  
  12.  futures = [c.read.remote() for c in counters]  
  13.  print(ray.get(futures))  
  14.  # [1, 1, 1, 1]  
  15.  # [11, 11, 11, 11] 
  • map-reduce 任務 map-reduce任務其實可以其他分布式任務是一樣的。主要是各種聚合操作。Map-Reduce常規(guī)操作如下  

  •  - word count例子見:https://github.com/ray-project/ray/blob/master/doc/examples/streaming/streaming.py

         這里舉一個簡單的例子:   

  1. @ray.remote  
  2.    def map(obj, f):  
  3.        return f(obj)  
  4.    @ray.remote  
  5.    def sum_results(*elements):  
  6.        return np.sum(elements)  
  7.    items = list(range(100))  
  8.    map_func = lambda i : i*2  
  9.    remote_elements = [map.remote(i, map_func) for i in items]  
  10.    # simple reduce  
  11.    remote_final_sum = sum_results.remote(*remote_elements)  
  12.    result = ray.get(remote_final_sum) 
  13.    # tree reduce  
  14.    intermediate_results = [sum_results.remote(  
  15.        *remote_elements[i * 20: (i + 1) * 20]) for i in range(5)]  
  16.    remote_final_sum = sum_results.remote(*intermediate_results)  
  17.    result = ray.get(remote_final_sum)     

  •  訓練模型如pytorch 官網(wǎng)提供了Best Practices: Ray with PyTorch, 主要是下載訓練/測試數(shù)據(jù)和訓練多個模型(感覺不是很實用)。訓練多個模型,可以進行參數(shù)融合。

           參見 https://docs.ray.io/en/latest/using-ray-with-pytorch.html

4. 總結

本文分享了高效的Python分布式計算框架Ray,希望對你有幫助??偨Y如下:

  •  Ray是UC Berkeley RISELab新推出的高性能分布式執(zhí)行框架, Spark也是伯克利出品的
  •  Ray架構關鍵:兩個調(diào)度器, Head和worker節(jié)點,GCS全局狀態(tài)控制保證計算容錯
  •  Ray應用簡單:@ray.remote把任務變成分布式任務, x.remote提交任務, get/wait獲取結果
  •  集群不是:ray start
  •  Ray支持多種任務:有依賴DAG,有狀態(tài)Actor以及深度學習支持
  •  不斷豐富的庫:RaySERVE, RaySGD, RayTune, Ray data,rllib 
責任編輯:龐桂玉 來源: Python中文社區(qū)
相關推薦

2025-01-13 08:05:04

2017-09-11 15:19:05

CoCoA機器學習分布式

2023-11-01 18:02:33

RayPython分布式

2015-06-10 09:47:18

微軟分布式云平臺

2016-08-31 07:02:51

2018-11-07 09:23:21

服務器分布式機器學習

2017-04-08 17:32:39

人工智能喬丹Ray

2022-08-03 20:18:58

機器學習算法分析數(shù)據(jù)

2017-12-05 14:55:56

2017-06-29 13:29:34

大數(shù)據(jù)PAI機器學習

2023-06-26 00:14:28

Openjob分布式任務

2023-08-24 08:49:27

2019-07-04 15:13:16

分布式緩存Redis

2021-12-13 11:07:10

鴻蒙HarmonyOS應用

2021-06-02 22:16:56

框架CAPBASE

2010-06-03 19:46:44

Hadoop

2024-01-05 07:28:50

分布式事務框架

2021-08-30 18:36:33

鴻蒙HarmonyOS應用

2021-08-30 20:19:55

應用程序

2019-10-10 09:16:34

Zookeeper架構分布式
點贊
收藏

51CTO技術棧公眾號