Distributed TensorFlow
注意:?可以在這里找到示例的完整源代碼?。
2017年6月8日,分布式深度學習的時代開始了。?在那一天,?Facebook發布了一篇文章,展示了他們用于將卷積神經網絡(ImageNet上的RESNET-50)的訓練時間從兩周到一小時減少到32個服務器的256個GPU的方法。?在軟件中,他們引入了一種技術來訓練具有非常大的小批量大小的卷積神經網絡(ConvNets):使學習速率與小批量大小成比例。?這意味著任何人現在都可以使用TensorFlow將分布式訓練擴展到數百個GPU。?但這不是分布式TensorFlow的唯一優勢:通過在許多GPU上并行運行許多實驗,您還可以大幅縮短實驗時間。?這減少了為神經網絡找到優質超參數所需的時間。
隨著計算而擴展的方法是AI的未來。?-Rich Sutton,強化學習之父
在本教程中,我們將探索使用TensorFlow的兩種不同的分布式方法:
我們將在這篇文章中提供方法(1)和(2)的代碼示例,但首先,我們需要闡明我們將要討論的分布式深度學習的類型。
模型并行性與數據并行性
一些神經網絡模型非常龐大,無法適應單個設備(GPU)的內存。?Google的神經機器翻譯系統就是這種網絡的一個例子。?這些模型需要在許多設備上分開(TensorFlow文檔中的工作人員),并行地在設備上進行培訓。?例如,網絡中的不同層可以在不同的GPU上并行訓練。?這種訓練過程通常被稱為“模型并行”(或TensorFlow文檔中的“圖中復制”)。?獲得良好性能是一項挑戰,我們不會進一步涵蓋這種方法。
在“數據并行性”(或TensorFlow文檔中的“圖間復制”)中,每個設備都使用相同的模型,但使用不同的訓練樣本在每個設備中訓練模型。?這與模型并行性形成了對比,模型并行性為每個設備使用相同的數據,但在設備之間劃分模型。?每個設備將獨立地計算其訓練樣本的預測值與標記的輸出(這些訓練樣本的正確值)之間的誤差。?由于每個設備都訓練不同的樣本,因此它會計算模型的不同更改(“梯度”)。?然而,該算法依賴于對每次新迭代使用所有處理的組合結果,就像該算法在單個處理器上運行一樣。?因此,每臺設備都必須將所有更改發送到所有其他設備上的所有型號。
在本文中,我們將重點放在數據并行性上。?圖1顯示了典型的數據并行性,將32個不同的圖像分配給運行單個模型的256個GPU中的每一個。?總共,一個迭代的最小批量大小為8,092個圖像(32 x 256)。
圖1.在數據并行中,設備使用不同的訓練數據子集進行訓練。? 圖片由Jim Dowling提供。同步與異步分布式培訓
隨機梯度下降(SGD)是用于尋找最優值的迭代算法,是AI中最受歡迎的訓練算法之一。?它涉及多輪訓練,每輪的結果都納入模型中,以備下一輪訓練。?輪可以在多個設備上同步或異步運行。
每次SGD迭代運行一小批培訓樣本(Facebook擁有8,092張圖像的大批量小批量)。?在同步培訓中,所有設備都使用單個(大)小批量數據的不同部分來訓練其本地模型。?然后他們將他們本地計算的梯度(直接或間接)傳達給所有設備。?只有在所有設備成功計算并發送了梯度后,模型才會更新。?然后將更新后的模型與下一個最小批次的拆分一起發送到所有節點。?也就是說,設備在小批次的非重疊分割(子集)上進行訓練。
雖然并行有很大的加速培訓的潛力,但它自然會引入開銷。?大型模型和/或慢速網絡會增加訓練時間。?如果有失速(慢速設備或網絡連接),訓練可能會失速。?我們還希望減少訓練模型所需的迭代總次數,因為每次迭代都需要將更新的模型廣播到所有節點。?實際上,這意味著盡可能增加小批量的尺寸,以免降低訓練模型的準確性。
在他們的論文中,Facebook引入了針對學習率的線性縮放規則?,可以用大批量小批量進行培訓。?該規則規定“當小批量大小乘以k時,將學習速率乘以k”,但條件是在達到目標學習速率之前,學習速率應該在幾個時期內緩慢增加。
在異步培訓中,沒有設備等待來自任何其他設備的模型更新。?這些設備可以獨立運行并與對等設備共享結果,或通過一個或多個稱為“參數”服務器的中央服務器進行通信。?在對等體系結構中,每個設備運行一個循環,讀取數據,計算梯度,將它們(直接或間接)發送到所有設備,并將模型更新為最新版本。?在更集中的體系結構中,設備以梯度的形式將其輸出發送到參數服務器。?這些服務器收集和聚合漸變。?在同步培訓中,參數服務器計算模型的最新最新版本,并將其發送回設備。?在異步培訓中,參數服務器將漸變發送到本地計算新模型的設備。?在這兩種體系結構中,循環都會重復直到培訓結束。?圖2說明了異步和同步訓練之間的區別。
圖2.隨機梯度下降(SGD)的異步和同步訓練。? 圖片由Jim Dowling提供。參數服務器架構
當并行SGD使用參數服務器時,算法首先將模型廣播給工作人員(設備)。在每次訓練迭代中,每個工作人員從小批次中讀取自己的分割,計算其自己的漸變,并將這些漸變發送到一個或多個參數服務器。?參數服務器匯總來自工人的所有梯度,并等到所有工人完成之后,才計算下一次迭代的新模型,然后將其廣播給所有工人。?數據流如圖3所示。
圖3.同步隨機梯度下降的參數服務器體系結構? 圖片由Jim Dowling提供。環 - allreduce體系結構
在ring-allreduce體系結構中,沒有集中來自工作者的梯度的中央服務器。?相反,在訓練迭代中,每個工作人員讀取它自己的最小批次拆分,計算其梯度,將梯度發送到環上的后繼鄰居,并從環上的前一個鄰居接收梯度。?對于具有N個工人的環,所有工人將在每個工人發送和接收N-1個梯度消息之后收到計算更新模型所需的梯度。
Ring-allreduce是帶寬最優化的,因為它可以確保每個主機上可用的上傳和下載網絡帶寬得到充分利用(與參數服務器型號不同)。?Ring-allreduce還可以將深層神經網絡中較低層的梯度計算與高層梯度的傳輸重疊,從而進一步縮短訓練時間。?數據流如圖4所示。
圖4.同步隨機梯度下降的ring-allreduce體系結構? 圖片由Jim Dowling提供。平行實驗
到目前為止,我們已經涵蓋分布式培訓。?但是,許多GPU也可用于并行化超參數優化。?也就是說,當我們想要建立適當的學習率或最小批量時,我們可以使用不同的超參數組合并行運行多個實驗。?在所有實驗完成后,我們可以使用結果來確定是否需要更多實驗或者當前超參數值是否足夠好。?如果超參數是可接受的,則可以在許多GPU上訓練模型時使用它們。
TensorFlow中分布式GPU的兩種用途
以下部分說明如何使用TensorFlow進行并行實驗和分布式培訓。
平行實驗
在許多GPU上并行掃描參數很容易,因為我們只需要一個中心點來安排實驗。?TensorFlow不提供啟動和停止TensorFlow服務器的內置支持,因此我們將使用Apache Spark在PySpark映射器函數中運行每個TensorFlow Python程序。?在下面,我們定義了一個啟動函數,該函數需要參數(1)Spark會話對象,(2)一個map_fun命名將在每個Spark執行器上執行的TensorFlow函數,以及(3)包含超參數的args_dict字典。?Spark可以通過在Spark執行程序中運行它們來并行運行許多Tensorflow服務器。?Spark執行程序是執行任務的分布式服務。?在這個例子中,每個執行程序都會使用它的executor_num來計算它應該從args_dict使用的超參數,?args_dict將其索引到正確的param_val?,然后使用這些超參數運行提供的訓練函數。
def launch ( spark_session , map_fun , args_dict ):""" Execute a 'map_fun' for each hyperparameter combination from the dictionary 'args_dict' Args: :spark_session: SparkSession object :map_fun: The TensorFlow function to run (wrapped inside a Spark mapper function) :args_dict: hyperparameters to insert as arguments for each TensorFlow function """ sc = spark_session . sparkContext# Length of the list of the first list of arguments represents the number of Spark tasks num_tasks = len ( args_dict ()[ 0 ])# Create a number of partitions (tasks) nodeRDD = sc . parallelize ( range ( num_tasks ), num_tasks )# Execute each of the hyperparameter arguments as a task nodeRDD . foreachPartition ( _do_search ( map_fun , args_dict )) def _do_search ( map_fun , args_dict ): def _wrapper_fun ( iter ): for i in iter : executor_num = i arg_count = map_fun . func_code . co_argcount names = map_fun . func_code . co_varnames args = [] arg_index = 0 while arg_count > 0 :# Get arguments for hyperparameter combination param_name = names [ arg_index ] param_val = args_dict [ param_val args_dict ][ executor_num ] args . append ( param_val ) arg_count -= 1 arg_index += 1 map_fun ( * args ) return _wrapper_fun現在可以在Spark中調用mnist?TensorFlow培訓函數。?請注意,我們只調用一次啟動,但對于每個超參數組合,任務將在不同的執行程序(共四個)上執行:
args_dict = { 'learning_rate' : [ 0.001 ], 'dropout' args_dict 'learning_rate' : [ args_dict ]} def mnist ( learning_rate , mnist ):""" An implementation of FashionMNIST should go here """ launch ( spark , mnist , args_dict ):分布式培訓
我們將簡要介紹三種TensorFlow分布式培訓框架:本地分布式TensorFlow,TensorFlowOnSpark和Horovod。
分布式TensorFlow
分布式TensorFlow應用程序由包含一個或多個參數服務器和工作人員的集群組成。?由于工作人員在訓練期間計算梯度,因此通常將其放置在GPU上。?參數服務器只需要聚合漸變和廣播更新,因此它們通常放置在CPU上,而不是GPU上。?其中一名工作人員,首席工作人員協調模型培訓,初始化模型,統計完成的培訓步驟數,監控會話,保存TensorBoard的日志,保存和恢復模型檢查點以從故障中恢復。?首席工作人員還管理故障,確保工作人員或參數服務器出現故障時的容錯。?如果主要工作人員自己死亡,則需要從最近的檢查點重新開始培訓。
分布式TensorFlow作為TensorFlow核心的一部分的一個缺點是您必須明確地管理服務器的啟動和停止。?這意味著要跟蹤程序中所有TensorFlow服務器的IP地址和端口,并手動啟動和停止這些服務器。?通常,這會導致代碼中有很多開關語句來確定哪些語句應該在當前服務器上執行。?因此,通過使用集群管理器和Spark,我們將使生活更輕松。?希望你永遠不必像這樣編寫代碼,手動定義ClusterSpec:
tf . train . ClusterSpec ({ "local" : [ "localhost:2222" , "localhost:2223" ]}) tf . train . ClusterSpec ({ "worker" : [ "worker0.example.com:2222" , "worker1.example.com:2222" ,"worker2.example.com:2222"], "ps" : [ "ps0.example.com:2222" ,"ps1.example.com:2222"]}) … if FLAGS . job_name == "ps" : server . join () elif FLAGS . job_name == "worker" : …使用主機端點(IP地址和端口號)創建ClusterSpec是很容易出錯和不切實際的。?相反,您應該使用諸如YARN,Kubernetes或Mesos之類的集群管理器來降低配置和啟動TensorFlow應用程序的復雜性。?主要選項是云管理解決方案(如Google Cloud ML或Databrick的Deep Learning Pipelines)或通用資源管理器(如Mesos或YARN)。
TensorFlowOnSpark
TensorFlowOnSpark是一個允許從Spark程序啟動分布式TensorFlow應用程序的框架。?它可以在獨立的Spark群集或YARN群集上運行。?下面的TensorFlowOnSpark程序使用ImageNet數據集執行Inception的分布式培訓。
它引入的新概念是用于啟動集群的TFCluster對象,以及執行培訓和推理。?集群可以以SPARK模式或TENSORFLOW模式啟動。?SPARK模式使用RDD向TensorFlow工作人員提供數據。?這對于構建從Spark到TensorFlow的集成管道非常有用,但是這是一個性能瓶頸,因為只有一個Python線程可以將RDD序列化為TensorFlow工作者的feed_dict?。?TENSORFLOW輸入模式通常是首選,因為數據可以使用來自分布式文件系統(如HDFS)的更高效的多線程輸入隊列讀取。?當一個集群啟動時,它啟動TensorFlow工作者和參數服務器(可能在不同的主機上)。?參數服務器只執行server.join()命令,而工作人員讀取ImageNet數據并執行分布式培訓。?主要工作人員有task_id '0'。
以下程序收集使用Spark啟動和管理Spark上的參數服務器和工作人員所需的信息。
from __future__ import absolute_import from __future__ import division from __future__ import print_function from pyspark.context import SparkContext from pyspark.conf import SparkConf from tensorflowonspark import TFCluster , TFNode from datetime import datetime import os import sys import tensorflow import as tf import time def main_fun ( argv , ctx ):# extract node metadata from ctx worker_num = ctx . worker_num job_name = ctx . job_name task_index = ctx . task_index in [ 'ps' , 'worker' ], assert job_name ], 'job_name must be ps or worker' from inception import inception_distributed_train from inception.imagenet_data import ImagenetData import tensorflow import as tf# instantiate FLAGS on workers using argv from driver and add job_name and task_id print ( "argv:" , argv ) sys . argv = argv FLAGS = tf . app flags . FLAGS FLAGS . job_name = job_name FLAGS . task_id = task_index print ( "FLAGS:" , FLAGS '__flags' [ '__flags' ])# Get TF cluster and server instances cluster_spec , server = TFNode . start_cluster_server ( ctx , 4 , start_cluster_server ) if FLAGS . job_name == 'ps' :# `ps` jobs wait for incoming connections from the workers. server . join () else :# `worker` jobs will actually do the work. dataset = ImagenetData ( subset = ImagenetData ) assert dataset . data_files ()# Only the chief checks for or creates train_dir. if FLAGS . task_id == 0 : if not tf . gfile . Exists ( train_dir ): tf . gfile . MakeDirs ( train_dir ) inception_distributed_train . train ( server target , dataset , cluster_spec , ctx )# parse arguments needed by the Spark driver import argparse parser = argparse . ArgumentParser () parser . add_argument ( "--epochs" , help = "number of epochs" , type = int , default = 5 ) parser . add_argument ( "--steps" , help = "number of steps" , type = int , default = 500000 ) parser . add_argument ( "--input_mode" , help = "method to ingest data: (spark|tf)" , choices = [ "spark" , "tf" ], default = "tf" ) parser . add_argument ( "--tensorboard" , help = "launch tensorboard process" , action = "store_true" ) ( args , rem ) = parser . parse_known_args () input_mode = TFCluster . InputMode . SPARK if args . input_mode == 'spark' TFCluster . InputMode . TENSORFLOW print ( "{0} ===== Start" ( datetime () . isoformat ())) sc = spark . sparkContext num_executors = int ( _conf ( "spark.executor.instances" )) num_ps = int ( _conf ( "spark.tensorflow.num.ps" )) cluster = TFCluster . run ( sc , main_fun , num_executors , num_ps , tensorboard , input_mode , input_mode ) if input_mode == TFCluster . InputMode . SPARK : dataRDD = sc . newAPIHadoopFile ( newAPIHadoopFile , "org.tensorflow.hadoop.io.TFRecordFileInputFormat" , keyClass = "org.apache.hadoop.io.BytesWritable" , valueClass = "org.apache.hadoop.io.NullWritable" ) cluster . train ( dataRDD , dataRDD ) cluster . shutdown ()請注意,Apache YARN尚不支持GPU作為資源,TensorFlowOnSpark使用YARN節點標簽來調度具有GPU的主機上的TensorFlow工作人員。?前面的例子也可以在確實支持GPU作為資源的Hops YARN上運行,從而實現CPU和GPU資源的更精細共享。
容錯
可以創建MonitoredTrainingSession對象,以便在發生故障時從最新檢查點自動恢復會話的訓練狀態。
saver = tf . train . Saver ( sharded = True ) is_chief = if FLAGS is_chief True . task_id == 0 else False with tf . Session ( server . target ) as sess :# sess.run(init_op)# re-initialze from checkpoint, if there is one. saver . restore ( sess , ... ) while True : if is_chief and step % 1000 == 0 : saver . save ( sess , "hdfs://...." ) with tf . train . MonitoredTrainingSession ( is_chief , is_chief ) as sess : while not sess . should_stop (): sess . run ( train_op )Spark將重啟失敗的執行器。?如果執行者不是主要工作人員,它將聯系參數服務器,并繼續像以前一樣,因為工人實際上是無狀態的。?如果參數服務器死亡,則在新參數服務器加入系統后,首席員工可以從最后一個檢查點恢復。?首席工作人員還每1000步就保存一份模型副本作為檢查點。?如果主要工作人員本身出故障,培訓失敗,并且必須開始新的培訓工作,但它可以從最新的完整檢查點恢復培訓。
Horovod
TensorFlow提供了兩個ring-allreduce框架:?tensorflow.contrib.mpi_collectives?(由百度貢獻)和Uber的Horovod,建立在Nvidia的NCCL 2庫上。?我們將研究Horovod,因為它在Nvidia GPU上具有更簡單的API和良好的性能,如圖5所示。Horovod使用pip進行安裝,并且需要事先安裝Open MPI和NCCL-2庫。?Horovod比TensorFlow或TensorFlowOnSpark需要對TensorFlow程序的更改更少。?它引入了必須初始化的hvd對象,并且必須包裝優化器(hvd使用allreduce或allgather平均漸變)。?GPU使用其本地等級綁定到此進程,并且在初始化期間將等級0的變量廣播到所有其他進程。
使用mpirun命令啟動Horovod Python程序。?它將每臺服務器的主機名稱以及每臺服務器上要使用的GPU數量作為參數。?mpirun的另一種選擇是使用Hops Hadoop平臺在Spark應用程序中運行Horovod,該平臺使用HopsYARN自動管理GPU分配給Horovod進程。?目前,Horovod不支持容錯操作,并且應該定期檢查模型,以便在失敗后,培訓可以從最新的檢查點恢復。
import horovod.tensorflow as hvd ; import tensorflow import as tf def main ( _ ): hvd . init () loss = ... tf . ConfigProto () . gpu_options . visible_device_list = str ( local_rank ()) opt = tf . train . AdagradOptimizer ( 0.01 ) opt = hvd . DistributedOptimizer ( opt ) hooks = [ hvd . BroadcastGlobalVariablesHook ( 0 )] train_op = opt . minimize ( loss ) 圖5.在ImageNet數據集上使用ResNet-101進行培訓時,在DeepLearning11服務器上,Horovod / TensorFlow在DeepLearning11服務器上線性擴展至多10個GPU(成本:15,000美元)。? 圖片由Jim Dowling提供。規模的深度學習層次
在看過許多TensorFlow和大型小批量隨機梯度下降(SGD)的分布式訓練架構之后,我們現在可以定義下面的比例層次結構。?金字塔的頂端是當前TensorFlow算法(包括ring-allreduce)的allreduce系列中最可伸縮的方法,最底層是可擴展性最低(因此也是訓練網絡最慢的方法)。?盡管平行實驗與分布式訓練是互補的,但正如我們已經表明的那樣,它們是平凡并行的(具有較弱的縮放比例),因此在金字塔中被發現較低。
圖6.同步SGD的深度學習層次結構。? 圖片由Jim Dowling提供。結論
做得好!?您現在知道分布式TensorFlow能夠做什么,以及如何修改您的TensorFlow程序以進行分布式培訓或運行并行實驗。?這些例子的完整源代碼可以在這里找到?。
https://www.oreilly.com/ideas/distributed-tensorflow
與50位技術專家面對面20年技術見證,附贈技術全景圖
總結
以上是生活随笔為你收集整理的Distributed TensorFlow的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【译】PGP Web of Trust:
- 下一篇: Orchid兰花协议简介——分布式匿名代