pytorch分布式训练(二):torch.nn.parallel.DistributedDataParallel
??之前介紹了Pytorch的DataParallel方法來構建分布式訓練模型,這種方法最簡單但是并行加速效果很有限,并且只適用于單節點多gpu的硬件拓撲結構。除此之外Pytorch還提供了DistributedDataParallel來構建分布式模型,并行加速程度更高,且支持多節點多gpu的硬件拓撲結構。
一、Overall Design
??整體的系統方案設計intuition也非常直觀:每個gpu上都有一個local的model和一個mini-batch的數據,進行數據分布式訓練時,每個gpu進行當前設備上的forward pass和backward pass。不同gpu上的模型在構造時通過broadcast機制統一初始化參數,每次訓練iteration結束后,不同gpu上的參數梯度匯總到一起取均值作為整個大batch的對應梯度,然后將該梯度分發給各個gpu上的模型進行更新。不同gpu上的模型初始值相同且每次更新值也相同,就能保證各個gpu上的model雖然使用不同數據進行訓練,但是模型始終是“參數相同”的。結合基于隨機梯度下降的優化算法原理,不難得到結論,多個gpu上的梯度匯總平均后再分發,相當于將同一個模型在單獨gpu上使用大batch進行訓練,即完成了數據并行的分布式訓練。
??基于上述設計理念,Pytorch實現了DistributedDataParallel這個API可以把一個普通的單gpu模型wrap成數據并行分布式模型,然后就可以類似單gpu模型那樣進行訓練了,通過下面的sample code來看,一行code就可以完成工作。
二、Gradient Average Design Details
??這部分需要完成的工作很簡單,就是把各個gpu上backward后每個參數的梯度拿到,然后匯總到一起取均值,再下發給各個設備。為了高效的完成這個過程,結合硬件的拓撲結構和計算特點,Pytorch設計了相應的策略。
??各個gpu進行本地的前向傳播后,需要進行反向傳播計算所有參數的梯度,這個過程不妨稱之為computation;不同gpu上相同參數的梯度需要通過gpu之間的通信匯總到一起計算均值,這個過程不妨稱之為communication。神經網絡是典型的有向無環圖,反向傳播是一個鏈式計算的過程,即求完一個再求另一個。那么如何安排computation和communication的相互順序,首先考慮兩種極端情況。
2.1. communication after every computation
此時每計算完一個參數的梯度,就進行一次各gpu之間的通信得到均值后再返回各gpu上覆蓋原參數梯度。但是這樣做會浪費gpu之間的帶寬,因為單次communication的是有開銷的。實驗表明,各gpu之間的通信搬運大批量數據時候效率更高。定性的來說,n個相同大小的數據,分n次搬運每次搬一個的耗時要遠大于一次搬運n個數據。例如,60M的torch.float32數據,通過不同的搬運次數和每次搬運數量來進行gpu之間的communication,搬運次數越多(每次搬運數據量越少)越耗時。
2.2 communication after all computation
等待各個gpu上所有參數的梯度computation全部結束后,再進行一次gpu之間的communication,計算所有參數的梯度均值后再寫會各個gpu覆蓋原始梯度。此時computation和communication成了串行結構,沒有做到充分并行。
2.3 gradient buckets
??根據上面的分析,為了實現高效的gradient average,Pytorch設計了基于buckets的gradient average策略。具體來說,將模型的所有參數分進若干個buckets,每一個bucket里裝一部分參數。在模型進行反向傳播時,所有參數的梯度是一個接一個計算的,當所有gpu上某個相同bucket里面所有的參數梯度都計算完了,則可以進行當前bucket梯度的communication過程。與此同時,其他參數的梯度繼續計算。理想情況下,上一bucket的communication結束后,剛好又有一個bucket的參數梯度都計算完了,則gpu通信無縫連接這一個bucket的通信工作,使得模型的反向傳播和gpu之間梯度通信實現幾乎百分百的并行,具體還有一些實現的細節。
2.3.1 參數順序
??在反向傳播時,參數分配進若干個buckets的順序是按照前向傳播時參數調用的倒序排列的,這樣可以幾乎滿足反向傳播時梯度計算的先后順序,使得先完成計算的參數梯度盡快進入某個bucket并進行gpu之間的通信。另外為了保證在個別情況下(兩個參數p2p_2p2?、p3p_3p3?在邏輯上同時進行前向和反向傳播,但是實際反向傳播執行時,某個gpu上p2p_2p2?的梯度g2g_2g2?先于p3p_3p3?的梯度g3g_3g3?被計算,另外一個gpu上p3p_3p3?的梯度g3g_3g3?先于p2p_2p2?的梯度g2g_2g2?被計算)各個gpu的參數能夠一一對應,在每個bucket中各個參數的先后順序也是保持固定順序的。
2.3.2 允許部分參數不計算梯度
??按照gradient buckets的設計思路,同一個bucket中只有所有參數都完成了梯度計算后,該bucket才能準備進行communication。當所有gpu的該對應bucket都準備好后才能進行各gpu針對該bucket的通信。但是在網絡中可能存在一些算子或子網絡,在某些iteration里是不進行前向傳播和反向傳播的,而在其他iteration里是進行前向傳播和反向傳播的,例如添加了dropout的全連接層。當某個參數在當前iteration前向傳播中沒有參與計算,那么在反向傳播時也就不會計算其梯度(因為當前iteration中該參數對應的算子沒有出現在鏈接圖中),那么該參數所在的bucket始終無法ready(下圖中的g3g_3g3?),也就卡住了后續的梯度更新環節,訓練中止。
??為了解決上述問題,設計了前向傳播過程中遍歷每一個參數,并且確認其對應的算子是否在前向傳播過程中被調用。如果某個參數對應的算子在前向傳播中被調用,則該算子對應的參數在反向傳播中一定會計算梯度并被傳入某個bucket中。否則就將該參數標記為unused_parameters并將他在其所在的bucket中的狀態置位ready,此時該參數雖然沒有梯度,但不會block整個bucket的ready狀態。當前iteration完成后,再將所有參數的unused狀態清空,等待下一次iteration。
2.3.3 允許梯度累積
??在某些清空下,也許不需要每一次iteration都進行各個gpu之間的通信來計算梯度的均值。比如希望加快數據并行分布式訓練的速度,可以累積幾個iteration的bucket,再進行一次communication;或者進行超大batch的訓練,即使切分為若干個子batch后將每個子batch分配到單個gpu上仍然超出了單個gpu的顯存,則可以把每個子batch再拆分為n個子子batch,這些子子batch在單個gpu上進行n次iteration后,再進行一次gpu通信。
??為實現上述目的,設計了允許梯度累積的策略。具體來說,Pytorch的DistributedDataParallel模塊提供了額外的接口(no_sync上下文)來實現允許若干個iteration內進行梯度累積后再進行gpu通信,如下sample code:
三、DistributedDataParallel執行過程
3.1 執行過程偽代碼
??在通過DistributedDataParallel構造一個分布式訓練模型時,初始化函數里主要完成兩步:向各個gpu上廣播模型的初始化參數和狀態;在每個算子中注冊反向傳播的hook函數。forward函數中除了模型的前向傳播以外,還會遍歷所有parameter,確定那些在本次iteration中沒有參與訓練的算子,并將其對應的parameter設置為unused parameter。backward時除了模型正常的反向傳播計算各個參數的梯度外,還通過hook函數得到當前參數的梯度,根據預先設定的index順序將其添加進對應的bucket中,等待所有bucket的ready狀態后觸發gpu之間的reduce通信。下面2圖分別為執行的偽代碼和示意圖:
3.2 sample code
根據一個toy example的sample code來解釋如何在一般情況下應用DistributedDataParallel構建多節點多gpu上的分布式模型。在每個gpu上開啟一個進程,負責當前gpu上的數據前向傳播和反向傳播。所有進程組成當前的進程組,DistributedDataParallel API自動獲取當前進程組,并在進程組之間通過reduce_mean方法來完成各gpu上的梯度通信,將梯度均值分發給各個模型進行更新。
import os from datetime import datetime import argparse import torch.multiprocessing as mp import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as distdef train(gpu, args):# 當前gpu上的進程在總進程組中的rankrank = args.nr * args.gpus + gpu# 通過pytorch的dist模塊將當前進程加入進程組dist.init_process_group(backend='nccl',init_method='env://',world_size=args.world_size,rank=rank)# 模型、損失函數和優化器等定義與普通模型相同model = SomeModel()# 如果模型中有原始的batchnorm,要記得通過下面一句將當前模型的所有普通batchnorm改成可在多gpu同步的syncbatchnorm,否則每個gpu單獨使用當前的子batch計算的mean和variance訓練,可能導致效果不好甚至不收斂。Syncbatchnorm也是在當前的進程組里找到其他的gpu進程,然后將各gpu的mean和variance做一個reduce_mean再分發。# model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)torch.cuda.set_device(gpu)model.cuda(gpu)batch_size = 100# define loss function (criterion) and optimizercriterion = nn.CrossEntropyLoss().cuda(gpu)optimizer = torch.optim.SGD(model.parameters(), 1e-4)# Wrap the modelmodel = nn.parallel.DistributedDataParallel(model,device_ids=[gpu])# Data loading codetrain_dataset = torchvision.datasets.MNIST(root='./data',train=True,transform=transforms.ToTensor(),download=True)# 使用DistributedSampler來進行數據集的采樣,指定總gpu數量=節點數*每節點gpu數,當前gpu的rank,train_loader可以根據總子batch數和當前batch的index,為當前gpu產生對應的子batchtrain_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=args.world_size,rank=rank)train_loader = torch.utils.data.DataLoader(dataset=train_dataset,# 這個batch_size是指單個gpu上的子batch_size,一次iteration的實際batch=batch_size * args.gpus * args.nodesbatch_size=batch_size,# 這里的shuffle必須設置為Falseshuffle=False, num_workers=0,pin_memory=True,# 傳入指定的sampler劃分訓練集sampler=train_sampler) start = datetime.now()total_step = len(train_loader)for epoch in range(args.epochs):for i, (images, labels) in enumerate(train_loader):images = images.cuda(non_blocking=True)labels = labels.cuda(non_blocking=True)# Forward passoutputs = model(images)loss = criterion(outputs, labels)# Backward and optimizeoptimizer.zero_grad()loss.backward()optimizer.step()if (i + 1) % 100 == 0 and gpu == 0:print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1,args.epochs,i + 1,total_step,loss.item()))if gpu == 0:print("Training complete in: " + str(datetime.now() - start))def main():parser = argparse.ArgumentParser()parser.add_argument('-n', '--nodes', default=1,type=int, metavar='N')parser.add_argument('-g', '--gpus', default=1, type=int,help='number of gpus per node')parser.add_argument('-nr', '--nr', default=0, type=int,help='ranking within the nodes')parser.add_argument('--epochs', default=2, type=int,metavar='N',help='number of total epochs to run')args = parser.parse_args()# 每個gpu上開一個進程,總進程數=節點數*每節點gpu數args.world_size = args.gpus * args.nodes# 多個節點之間的相互通信,指定一個監聽的主節點 os.environ['MASTER_ADDR'] = '10.57.23.164' os.environ['MASTER_PORT'] = '8888'# 通過multiprocessing模塊開啟多進程,train為各個進程上分配的執行函數,其被調用時,傳入的參數是(index,args),其中index為當前進程在總進程組中的rank mp.spawn(train, nprocs=args.gpus, args=(args,)) if __name__ == '__main__':main()不妨設總共用4個節點,每個節點有8個gpu,首先在主節點(即ip為10.57.23.164的節點)上執行:
python src/mnist-distributed.py -n 4 -g 8 -nr 0然后在其余三個節點執行:
python src/mnist-distributed.py -n 4 -g 8 -nr 1 python src/mnist-distributed.py -n 4 -g 8 -nr 2 python src/mnist-distributed.py -n 4 -g 8 -nr 3四、總結
??相比于DataParallel,DistributedDataParallel是進一步的數據并行分布式訓練,加速效果十分明顯。通過構建多進程,避免了單進程多線程由于GIL對python解釋器和相應資源的依賴;各gpu上單獨計算每個子batch的前向傳播和反向傳播,gpu之間通信僅涉及到每個gpu上計算得到的梯度求reduce_mean,且通過設計策略充分利用了通信帶寬。支持多節點多gpu的硬件架構,還可以結合apex擴展為混合精度訓練,速度提升效果更佳。后面準備再寫一篇關于混合精度訓練相關的流水賬。草草記錄,如有不妥請指正。
總結
以上是生活随笔為你收集整理的pytorch分布式训练(二):torch.nn.parallel.DistributedDataParallel的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vivado实现异步复位的D触发器
- 下一篇: 图片也要查重了?期刊用AI审论文防造假,