pytorch默认初始化_“最全PyTorch分布式教程”来了!
前言
本文對使用pytorch進行分布式訓練(單機多卡)的過程進行了詳細的介紹,附加實際代碼,希望可以給正在看的你提供幫助。本文分三個部分展開,分別是:
若想學習分布式的部署,看完本文就足夠了,但為了讀者能了解更多細節,我在第一部分的每個模塊都加了對應的官方文檔的鏈接。
同時,我正在進行PyTorch官方文檔的翻譯工作,除了對其進行便于理解的翻譯,還添加了我的解釋。項目地址:https://github.com/liuzhaoo/Pytorch-API-and-Tutorials-CN,歡迎各位下載使用!
一、先驗知識
分布式訓練涉及到pytorch的很多API,這里對它們進行簡單的介紹,其中重點為第三節DataLoader。若想直接看到使用方法,請看第二部分。
1.DataParallel 和DistributedDataParallel(DDP)
此兩種方法都可以實現多GPU并行訓練,但是后者更快,同時需要寫更多代碼,而DataParallel只需一行代碼就可以搞定。盡管如此,還是建議使用DistributedDataParallel,建議參考官方介紹。
如下,只需在將model加載到device(model.to(device))之后,加上以下代碼即可
- net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
本文極力推薦DDP方法,下文也都是對DDP的說明:
DDP為基于torch.distributed的分布式數據并行結構,工作機制為:在batch維度上對數據進行分組,將輸入的數據分配到指定的設備(GPU)上,從而將程序的模型并行化。對應的,每個GPU上會復制一個模型的副本,負責處理分配到的數據,在后向傳播過程中再對每個設備上的梯度進行平均。
在這里貼上官方文檔,供讀者進行更詳細的了解:DDP
以下是使用方法:
在每個有N個GPU 的主機上,都應該創建N個進程。同時確保每個進程分別在從0到N-1的單獨的GPU上工作。因此,應該分別指定工作的GPU:
>>> torch.cuda.set_device(i) # i為0 - N-1在每個進程中,參考以下內容來構建模塊
>>> from torch.nn.parallel import DistributedDataParallel >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...') >>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)為了在每個節點上產生多個進程,可以使用torch.distributed.launch或torch.multiprocessing.spawn
2. torch.distributed
torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None, group_name='')
torch.distributed包為在一臺或多臺機器上運行的多個計算節點上的多進程并行結構提供PyTorch支持和通信原語。 torch.nn.parallel.DistributedDataParallel()類就是基于此功能構建的,作為任何PyTorch模型的包裝來提供同步分布式訓練。這不同于 Multiprocessing package - torch.multiprocessing 和 torch.nn.DataParallel() 提供的并行結構,因為它支持多臺聯網的機器而且用戶必須顯式地為每個進程啟動主要訓練腳本的副本。以上敘述來自pytorch官方文檔,點擊鏈接可以查看詳細內容。此教程中只涉及到此包的初始化,因此不對其他內容再做介紹。
torch.distributed初始化
目前支持三種初始化方式:TCP初始化,共享文件初始化以及環境變量初始化。
一般使用TCP初始化,使用GPU時backend一般設置為'nccl':
import torch.distributed as dist # Use address of one of the machines dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)3. DataLoader
torch.utils.data.DataLoader類是PyTorch數據加載功能的核心,此類中的很多參數都是數據并行時所需要的,本節將對它進行詳細的介紹。
DataLoader(dataset, batch_size=1, shuffle=False, sampler=None,batch_sampler=None, num_workers=0, collate_fn=None,pin_memory=False, drop_last=False, timeout=0,worker_init_fn=None)- dataset,即獲取的原始數據集,pytorch支持兩種不同類型的數據集
比如有這樣一個數據集,當訪問 dataset[idx]時,可以從磁盤上的文件夾讀取到第idx個圖像以及與它相關的標簽。
比如調用 iter(dataset)時,可以返回從數據庫、遠程服務器讀取的數據流,甚至實時生成的日志。
我們使用的大部分數據集都是map-style類型的數據集
- sampler,batch_sampler及shuffle
這里主要為關于map-style的介紹。
介紹這幾個個參數之前,需要認識另一種類
CLASS torch.utils.data.Sampler(data_source)
同種類型的類還有torch.utils.data.SequentialSampler,torch.utils.data.RandomSampler, torch.utils.data.SubsetRandomSampler torch.utils.data.WeightedRandomSampler torch.utils.data.BatchSampler,torch.utils.data.distributed.DistributedSampler。
這些類的實例會作為參數傳到DataLoader中。它們用來指定數據加載中使用的indices/keys的順序,它們是數據集索引上的可迭代對象。
下面是正式的介紹
簡單來說,sampler是一個取樣器(容器),用來對原始的數據取樣,返回原始數據的多個子集,不同的類也對應不同的取樣方式。DataLoader會根據參數中的shuffle參數自動構建一個sampler類實例,再傳給DataLoader。若shuffle為True,即打亂數據,則參數sampler = torch.utils.data.RandomSampler;若為False,則sampler = torch.utils.data.SequentialSampler。
在分布式訓練時用到的是distributed.DistributedSampler。此種方法會根據當前分布式環境(具體說是worldsize)來將原始數據分為幾個子集。batch_sampler的作用是從sampler中進行批處理,即將sampler中的數據分批,它返回的數據為一個batch的數據。具體細節將在下一小節討論。distributed.DistributedSampler參數- dataset –要進行取樣的數據集
- num_replicas (int, optional) – 參與分布式訓練的進程數量. rank 默認為當前進程組的進程數。
- rank (int, optional) –當前進程在num_replicas的Rank,默認 rank從當前分布式組中檢索。
- shuffle (bool, optional) – If True (default), sampler 會打亂indices。
- seed (int, optional) – 在 shuffle=True時,用來打亂采樣器的隨機種子,這個數字在分布式組中的所有進程之間應該是相同的Default: 0。
注意:在分布式模式下,在每個epoch開始之前應該調用 sampler.set_eopch(i)方法。
- batch_size、drop_last以及collate_fn
本小節與上一小節聯系很大,建議聯系到一起理解。DataLoader通過參數batch_size、drop_last和batch_sampler自動將獲取的單個數據樣本排序成批。
如果batch_size(默認是1)的值不是None,數據加載器會生成成批的樣本,每一批(batch)的樣本數為batch_size的值。drop_last為True時,如果數據集size不能被batch size整除,會丟棄最后一個不完整的batch,此參數默認為False,也就是若不能整除,多出來的部分獨占一個batch。若指定了 batch_size, shuffle, sampler和 drop_last中的任何一個(布爾值為True或具體指定)則batch_sampler就不能再指定了,因為會自動根據參數使用相應的類。batch_size和drop_last參數本質上是用來從sampler中構造batch_sampler的。對于map-style的數據集,sampler可以由用戶提供,也可以基于shuffle參數構造,也就是上面說的,它們是互斥的。collate_fn在批處理和非處理是作用是不同的
若batch_size不是None,則為自動成批模式,此時使用collate_fn參數傳遞的函數來將一個列表中的樣本排列為一個batch。(實際上,batch_sampler和sample作為取樣器,返回的是根據規則排列的indices,并非真實的數據,還要使用collate_fn來排列真實數據)。 collate_fn每次調用一個列表里的數據樣本,它需要將輸入樣本整理為批,以便從data loader迭代器生成。
例如,如果每個數據樣本由一個3通道圖像和一個完整的類標簽組成,也就是說數據集的每個元素都返回一個元組(image,class_index),默認的collate_fn會將包含這樣的元組的列表整理成一個批處理過的圖像tensor的單獨的元組以及一個批處理過的類標簽Tensor。具體來說,collate_fn有以下特點:- 它總是添加一個新維度作為批處理維度。
- 它自動將NumPy數組和Python數值轉換為PyTorch張量。
- 它保留了數據結構,例如,如果每個樣本是一個字典,它輸出具有相同鍵集但批處理過的張量作為值的字典(如果值不能轉換成張量,則值為列表)
用戶可以使用自定義的collate_fn來實現自定義批處理,例如沿第一個維度以外的維度排序、各種長度的填充序列或添加對自定義數據類型的支持。
當batch_size和batch_sampler都為None (batch_sampler的默認值已經為None)時,為非自動成批模式。此時使用作為collate_fn參數傳遞的函數來處理從數據集獲得的每個示例。這時,這個函數只是將Numpy數組轉換維PyTorch的Tensor,其他保持不變。
- 其他參數num_workers 用來進行多進程加載數據,注意這里的多進程只是加載數據時的多進程,不同于多進程訓練。在此模式下,每當創建一個DataLoader的迭代器時(例如,當調用enumerate(dataLoader)時),會創建 num_workers個工作進程。此時,dataset,collate_fn和worker_init_fn被傳你遞給每個worker,它們被用于初始化和獲取數據。這意味著數據集訪問和它的內部IO,以及轉換(包括collate_fn)都在工作進程中運行。
也就是說只有對DataLoader迭代時才會得到真實的數據。pin_memory 為True 會自動將獲取的數據張量放到固定的內存中,從而使數據更快地傳輸到支持cuda的gpu。
以上就是在部署分布式訓練需要了解的知識,更多細節參見官方文檔。下面的配置流程為本教程的核心部分。
二 、使用過程框架
在DDP分布式訓練中,關鍵是要在不同的進程中使用GPU進行數據處理,因此首先應該分配進程。假設只有一個機器,兩塊GPU。總數據量(視頻或圖片數量)為8000。batchsize設置為16。
準備工作:使用pytorch的spawn生成兩個進程(對應GPU數量),分別使用1個GPU進行任務。在每個進程中都執行以下操作。
三、代碼解析
這部分將對應第二部分,給出每一步的代碼以及詳細的解釋或明,但是作為分布式教程,下文主要針對與分布式相關的代碼,而其他部分,如優化策略,學習率改變方法等不進行詳細介紹。
本實驗(圖像分類)是在雙顯卡環境下進行的,在四塊顯卡的服務器上指定了0,3號顯卡:os.environ['CUDA_VISIBLE_DEVICES'] = '0,3'
首先分配進程
import torch.multiprocessing as mp ? opt.world_size = opt.ngpus_per_node * opt.world_size mp.spawn(main_worker, nprocs=opt.ngpus_per_node, args=(opt,))代碼說明: opt為整個程序用到的參數,batch_size,num_classes等參數都已指定,在下文中,每個參數出現時都會進行說明。這里的opt.world_size為總節點數(機器),由于本教程針對單機多卡,因此設置為1。opt.ngpus_per_node 是每個節點的GPU數,設置為2,因此經過運算opt.world_size為2。mp.spawn產生了兩個進程,每個進程都運行 main_worker函數( main_worker是訓練的主函數,包括模型、數據的加載,以及訓練,以下所有內容都是在main_worker函數中的)
def main_worker(index, opt):random.seed(opt.manual_seed)np.random.seed(opt.manual_seed)torch.manual_seed(opt.manual_seed) ?if index >= 0 and opt.device.type == 'cuda':opt.device = torch.device(f'cuda:{index}') ?opt.dist_rank = opt.dist_rank * opt.ngpus_per_node + indexdist.init_process_group(backend='nccl',init_method=opt.dist_url,world_size=opt.world_size,rank=opt.dist_rank)opt.batch_size = int(opt.batch_size / opt.ngpus_per_node)opt.n_threads = int((opt.n_threads + opt.ngpus_per_node - 1) / opt.ngpus_per_node)opt.is_master_node = not opt.distributed or opt.dist_rank == 0 ?model = generate_model(opt)if opt.batchnorm_sync:assert opt.distributed, 'SyncBatchNorm only supports DistributedDataParallel.'model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)model = make_data_parallel(model, opt.distributed, opt.device)parameters = model.parameters()criterion = CrossEntropyLoss().to(opt.device) ?(train_loader, train_sampler, train_logger, train_batch_logger,optimizer, scheduler) = get_train_utils(opt, parameters) ?for i in range(opt.begin_epoch, opt.n_epochs + 1):if not opt.no_train:if opt.distributed:train_sampler.set_epoch(i)current_lr = get_lr(optimizer)train_epoch(i, train_loader, model, criterion, optimizer,opt.device, current_lr, train_logger,train_batch_logger, tb_writer, opt.distributed)if i % opt.checkpoint == 0 and opt.is_master_node:save_file_path = opt.result_path / 'save_{}.pth'.format(i)save_checkpoint(save_file_path, i, opt.arch, model, optimizer,scheduler) scheduler.step()?1. 初始化torch.distributed
def main_worker(index, opt):random.seed(opt.manual_seed)np.random.seed(opt.manual_seed)torch.manual_seed(opt.manual_seed) ?if index >= 0 and opt.device.type == 'cuda':opt.device = torch.device(f'cuda:{index}') ?opt.dist_rank = opt.dist_rank * opt.ngpus_per_node + indexdist.init_process_group(backend='nccl',init_method=opt.dist_url,world_size=opt.world_size,rank=opt.dist_rank)opt.batch_size = int(opt.batch_size / opt.ngpus_per_node)opt.n_threads = int((opt.n_threads + opt.ngpus_per_node - 1) / opt.ngpus_per_node)opt.is_master_node = opt.dist_rank == 0代碼說明: 在每個進程中,都會分配一個index,由于我們有兩個進程,所以在兩個進程中的index 分別為0,1。同樣的,opt為傳入的參數,前三行代碼為指定用到的隨機seed。然后根據index 分別指定每個進程的device:cuda:0 和cuda:1(對應實際的0號和3號GPU)。接著指定opt.dist_rank,它將作為初始化時的rank參數,opt.dist_rank原始值為0,因此經過運算,在兩個進程中的值分別為0,1。
下面就是本步的核心,初始化torch.distributed在它的參數里,在每個進程中init_method和world_size都是一樣的,rank用來標識各自的進程,同樣的,分別為0,1。
因為分了兩個進程,所以對原始指定的batch_size,n_threads(DataLoader中的num_workers)除以進程數2。
2. 加載模型
model = generate_model(opt)此部分沒什么好說的,從其他函數或類中獲取模型。
但是注意到在它之后還有一段代碼,是用來操作batch_norm的,這里不做過多解釋,感興趣可以查看原文檔。
3. 指定本進程對應的GPU
4. 將模型放到當前設備
5. 模型并行化
model = make_data_parallel(model, opt.device)def make_data_parallel(model, device):if device.type == 'cuda' and device.index is not None:local_rank = torch.distributed.get_rank()torch.cuda.set_device(local_rank)model.to(device) ?model = nn.parallel.DistributedDataParallel(model,device_ids=[device])代碼說明: 在兩個進程中分別對模型進行并行化,local_rank是獲得每個進程的rank,分別為0,1。device在第一步中已經定義過。
三行代碼分別對應三個步驟。
6. 數據處理,獲取原始數據
train_data = get_training_data(**kwargs)代碼說明:根據參數獲取原始數據
7. 根據分布式情況以及原始數據指定Sampler,作為DataLoader的參數輸入
train_sampler = torch.utils.data.distributed.DistributedSampler(train_data)8. 使用DataLoader包裝原始數據
train_loader = torch.utils.data.DataLoader(train_data,batch_size=opt.batch_size,shuffle=(train_sampler is None),num_workers=opt.n_threads,pin_memory=True,sampler=train_sampler,worker_init_fn=worker_init_fn)9. 在epoch中進行訓練
for i in range(opt.begin_epoch, opt.n_epochs + 1): ?train_sampler.set_epoch(i)current_lr = get_lr(optimizer)train_epoch(i, train_loader, model, criterion, optimizer,opt.device, current_lr, train_logger,train_batch_logger, tb_writer, opt.distributed)以上即為本教程的全部內容,雖然沒有涵蓋訓練的每個細節,但是你可以學會在你的代碼中適當的位置添加某些內容,從而實現分布式訓練。
本教程僅為本人觀點,如果有錯誤之處,歡迎評論!
總結
以上是生活随笔為你收集整理的pytorch默认初始化_“最全PyTorch分布式教程”来了!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vscode更改插件路径_用好这7个 V
- 下一篇: 知道焊缝长度如何确定节点板尺寸_管桁架加