使用Ray創(chuàng)建高效的深度學(xué)習(xí)數(shù)據(jù)管道
用于訓(xùn)練深度學(xué)習(xí)模型的GPU功能強(qiáng)大但價(jià)格昂貴。為了有效利用GPU,開(kāi)發(fā)者需要一個(gè)高效的數(shù)據(jù)管道,以便在GPU準(zhǔn)備好計(jì)算下一個(gè)訓(xùn)練步驟時(shí)盡快將數(shù)據(jù)傳輸?shù)紾PU。使用Ray可以大大提高數(shù)據(jù)管道的效率。
1、訓(xùn)練數(shù)據(jù)管道的結(jié)構(gòu)
首先考慮下面的模型訓(xùn)練偽代碼。
for step in range(num_steps):
sample, target = next(dataset) # 步驟1
train_step(sample, target) # 步驟2
在步驟1中,獲取下一個(gè)小批量的樣本和標(biāo)簽。在步驟2中,它們被傳遞給train_step函數(shù),該函數(shù)會(huì)將它們復(fù)制到GPU上,執(zhí)行前向傳遞和反向傳遞以計(jì)算損失和梯度,并更新優(yōu)化器的權(quán)重。
接下來(lái)詳細(xì)了解一下步驟1。當(dāng)數(shù)據(jù)集太大無(wú)法放入內(nèi)存時(shí),步驟1將從磁盤(pán)或網(wǎng)絡(luò)中獲取下一個(gè)小批量數(shù)據(jù)。此外,步驟1還涉及一定量的預(yù)處理——輸入數(shù)據(jù)必須轉(zhuǎn)換為數(shù)字張量或張量集合,然后再饋送給模型。在某些情況下,在將它們傳遞給模型之前,張量上還會(huì)應(yīng)用其他轉(zhuǎn)換(例如歸一化、繞軸旋轉(zhuǎn)、隨機(jī)打亂等)。
如果工作流程是嚴(yán)格按順序執(zhí)行的,即先執(zhí)行步驟1,然后再執(zhí)行步驟2,那么模型將始終需要等待下一批數(shù)據(jù)的輸入、輸出和預(yù)處理操作。GPU將無(wú)法得到有效利用,它將在加載下一個(gè)小批量數(shù)據(jù)時(shí)處于空閑狀態(tài)。
為了解決這個(gè)問(wèn)題,可以將數(shù)據(jù)管道視為生產(chǎn)者——消費(fèi)者的問(wèn)題。數(shù)據(jù)管道生成小批量數(shù)據(jù)并寫(xiě)入有界緩沖區(qū)。模型/GPU從緩沖區(qū)中消費(fèi)小批量數(shù)據(jù),執(zhí)行前向/反向計(jì)算并更新模型權(quán)重。如果數(shù)據(jù)管道能夠以模型/GPU消費(fèi)的速度快速生成小批量數(shù)據(jù),那么訓(xùn)練過(guò)程將會(huì)非常高效。
圖片
2、Tensorflow tf.data API
Tensorflow tf.data API提供了一組豐富的功能,可用于高效創(chuàng)建數(shù)據(jù)管道,使用后臺(tái)線(xiàn)程獲取小批量數(shù)據(jù),使模型無(wú)需等待。僅僅預(yù)先獲取數(shù)據(jù)還不夠,如果生成小批量數(shù)據(jù)的速度比GPU消費(fèi)數(shù)據(jù)的速度慢,那么就需要使用并行化來(lái)加快數(shù)據(jù)的讀取和轉(zhuǎn)換。為此,Tensorflow提供了交錯(cuò)功能以利用多個(gè)線(xiàn)程并行讀取數(shù)據(jù),以及并行映射功能使用多個(gè)線(xiàn)程對(duì)小批量數(shù)據(jù)進(jìn)行轉(zhuǎn)換。
由于這些API基于多線(xiàn)程,因此可能會(huì)受到Python全局解釋器鎖(GIL)的限制。Python GIL限制了Python解釋器一次只能運(yùn)行單個(gè)線(xiàn)程的字節(jié)碼。如果在管道中使用純TensorFlow代碼,通常不會(huì)受到這種限制,因?yàn)門(mén)ensorFlow核心執(zhí)行引擎在GIL的范圍之外工作。但是,如果使用的第三方庫(kù)沒(méi)有發(fā)布GIL或者使用Python進(jìn)行大量計(jì)算,那么依賴(lài)多線(xiàn)程來(lái)并行化管道就不可行。
3、使用多進(jìn)程并行化數(shù)據(jù)管道
考慮以下生成器函數(shù),該函數(shù)模擬加載和執(zhí)行一些計(jì)算以生成小批量數(shù)據(jù)樣本和標(biāo)簽。
def data_generator():
for _ in range(10):
# 模擬獲取
# 從磁盤(pán)/網(wǎng)絡(luò)
time.sleep(0.5)
# 模擬計(jì)算
for _ in range(10000):
pass
yield (
np.random.random((4, 1000000, 3)).astype(np.float32),
np.random.random((4, 1)).astype(np.float32)
)
接下來(lái),在虛擬的訓(xùn)練管道中使用該生成器,并測(cè)量生成小批量數(shù)據(jù)所花費(fèi)的平均時(shí)間。
generator_dataset = tf.data.Dataset.from_generator(
data_generator,
output_types=(tf.float64, tf.float64),
output_shapes=((4, 1000000, 3), (4, 1))
).prefetch(tf.data.experimental.AUTOTUNE)
st = time.perf_counter()
times = []
for _ in generator_dataset:
en = time.perf_counter()
times.append(en - st)
# 模擬訓(xùn)練步驟
time.sleep(0.1)
st = time.perf_counter()
print(np.mean(times))
據(jù)觀察,平均耗時(shí)約為0.57秒(在配備Intel Core i7處理器的Mac筆記本電腦上測(cè)量)。如果這是一個(gè)真實(shí)的訓(xùn)練循環(huán),GPU的利用率將相當(dāng)?shù)停恍杌ㄙM(fèi)0.1秒進(jìn)行計(jì)算,然后閑置0.57秒等待下一個(gè)批次數(shù)據(jù)。
為了加快數(shù)據(jù)加載速度,可以使用多進(jìn)程生成器。
from multiprocessing import Queue, cpu_count, Process
def mp_data_generator():
def producer(q):
for _ in range(10):
# 模擬獲取
# 從磁盤(pán)/網(wǎng)絡(luò)
time.sleep(0.5)
# 模擬計(jì)算
for _ in range(10000000):
pass
q.put((
np.random.random((4, 1000000, 3)).astype(np.float32),
np.random.random((4, 1)).astype(np.float32)
))
q.put("DONE")
queue = Queue(cpu_count()*2)
num_parallel_processes = cpu_count()
producers = []
for _ in range(num_parallel_processes):
p = Process(target=producer, args=(queue,))
p.start()
producers.append(p)
done_counts = 0
while done_counts < num_parallel_processes:
msg = queue.get()
if msg == "DONE":
done_counts += 1
else:
yield msg
queue.join()
現(xiàn)在,如果測(cè)量等待下一個(gè)小批次數(shù)據(jù)所花費(fèi)的時(shí)間,得到的平均時(shí)間為0.08秒。速度提高了近7倍,但理想情況下,希望這個(gè)時(shí)間接近0。
如果進(jìn)行分析,可以發(fā)現(xiàn)相當(dāng)多的時(shí)間都花在了準(zhǔn)備數(shù)據(jù)的反序列化上。在多進(jìn)程生成器中,生產(chǎn)者進(jìn)程會(huì)返回大型NumPy數(shù)組,這些數(shù)組需要進(jìn)行準(zhǔn)備,然后在主進(jìn)程中進(jìn)行反序列化。能否在進(jìn)程之間傳遞大型數(shù)組時(shí)提高效率?
4、使用Ray并行化數(shù)據(jù)管道
這就是Ray發(fā)揮作用的地方。Ray是一個(gè)用于在Python中運(yùn)行分布式計(jì)算的框架。它帶有一個(gè)共享內(nèi)存對(duì)象存儲(chǔ)區(qū),可在不同進(jìn)程間高效地傳輸對(duì)象。特別的是,在不進(jìn)行任何序列化和反序列化的情況下,對(duì)象存儲(chǔ)區(qū)中的Numpy數(shù)組可在同一節(jié)點(diǎn)上的worker之間共享。Ray還可以輕松實(shí)現(xiàn)數(shù)據(jù)加載在多臺(tái)機(jī)器上的擴(kuò)展,并使用Apache Arrow高效地序列化和反序列化大型數(shù)組。
Ray帶有一個(gè)實(shí)用函數(shù)from_iterators,可以創(chuàng)建并行迭代器,開(kāi)發(fā)者可以用它包裝data_generator生成器函數(shù)。
import ray
def ray_generator():
num_parallel_processes = cpu_count()
return ray.util.iter.from_iterators(
[data_generator]*num_parallel_processes
).gather_async()
使用ray_generator,測(cè)量等待下一個(gè)小批量數(shù)據(jù)所花費(fèi)的時(shí)間為0.02秒,比使用多進(jìn)程處理的速度提高了4倍。