从零实现Transformer的简易版与强大版:从300多行到3000多行
前言?
最近一直在做類ChatGPT項目的部署 微調,關注比較多的是兩個:一個LLaMA,一個ChatGLM,會發現有不少模型是基于這兩個模型去做微調的,說到微調,那具體怎么微調呢,因此又詳細了解了一下微調代碼,發現微調LLM時一般都會用到Hugging face實現的Transformers庫的Trainer類
從而發現,如果大家想從零復現ChatGPT,便得從實現Transformer開始,因此便開啟了本文:從零實現Transformer的簡易版與強大版:從300多行到3000多行,主要分為兩個大部分
- 參考harvard對transformer的實現,按照Transformer每一步的原理逐步逐行從零實現,先編碼器后解碼器,特別是注意力機制(縮放點積、多頭注意力)
- 從頭到尾解讀Hugging face實現的Transformers庫的整體代碼架構,及逐行解讀每一行代碼,而網上沒有關于這個Transformers庫的代碼解讀
且本文的代碼解讀與其他代碼解讀最大的不同是:會對出現在本文的每一行代碼都加以注釋、解釋、說明,甚至對每行代碼中的變量都會做解釋/說明
總之,一如既往的保持對初學者的足夠友好,讓即便沒有太多背景知識的也能順暢理解本文
第一部分 從零實現Transformer編碼器模塊
transformer強大到什么程度呢,基本是17年之后絕大部分有影響力模型的基礎架構都基于的transformer(比如,這里有200來個,包括且不限于基于decode的GPT、基于encode的BERT、基于encode-decode的T5等等)
通過博客內的這篇文章《Transformer通俗筆記:從Word2Vec、Seq2Seq逐步理解到GPT、BERT》,我們已經詳細了解了transformer的原理(如果忘了,建議必復習下再看本文,當然,如果你實在不想跳轉,就只想呆在本文,也行,我努力..)
如果把上圖中的各種細節也顯示出來,則如下大圖所示(此大圖來源于七月在線NLP11里倪老師講的Transformer模型源碼解讀,positional encoding、多頭等沒畫)
具體說來,是一個典型的編碼器-解碼器架構
# 定義一個基于 nn.Module 的編碼器-解碼器類 class EncoderDecoder(nn.Module):# 初始化方法,接收編碼器、解碼器、源嵌入、目標嵌入和生成器作為參數def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):# 調用 nn.Module 的初始化方法super(EncoderDecoder, self).__init__() self.encoder = encoder # 將傳入的編碼器實例保存為類屬性self.decoder = decoder # 將傳入的解碼器實例保存為類屬性self.src_embed = src_embed # 將傳入的源嵌入實例保存為類屬性self.tgt_embed = tgt_embed # 將傳入的目標嵌入實例保存為類屬性self.generator = generator # 將傳入的生成器實例保存為類屬性# 前向傳播方法,接收源序列、目標序列和它們的掩碼作為參數def forward(self, src, tgt, src_mask, tgt_mask):# 對源序列進行編碼,并將編碼結果與掩碼傳遞給解碼器進行解碼return self.decode(self.encode(src, src_mask), src_mask,tgt, tgt_mask)# 編碼方法,接收源序列和掩碼作為參數def encode(self, src, src_mask):# 將源序列進行嵌入,然后將嵌入后的序列和源序列掩碼傳給編碼器return self.encoder(self.src_embed(src), src_mask)# 解碼方法,接收編碼器輸出(memory)、源序列掩碼、目標序列和目標序列掩碼作為參數def decode(self, memory, src_mask, tgt, tgt_mask):# 將目標序列進行嵌入,然后將嵌入后的序列、編碼器輸出、源序列掩碼和目標序列掩碼傳給解碼器return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)# 定義一個基于 nn.Module 的生成器類 class Generator(nn.Module):# 初始化方法,接收模型維度(d_model)和詞匯表大小(vocab)作為參數def __init__(self, d_model, vocab):# 調用 nn.Module 的初始化方法super(Generator, self).__init__() # 定義一個線性層,將模型的輸出維度映射到詞匯表大小self.proj = nn.Linear(d_model, vocab) # 前向傳播方法,接收輸入 xdef forward(self, x):# 將輸入 x 傳入線性層,然后對輸出應用 log-softmax 激活函數(在最后一個維度上)return F.log_softmax(self.proj(x), dim=-1)考慮到Hugging face實現的Transformers庫雖然功能強大,但3000多行,對于初次實現的初學者來說,理解難度比較大,因此,咱們一步步結合對應的原理來逐行編碼實現一個簡易版的transformer
1.1 關于輸入的處理:針對輸入做embedding,然后加上位置編碼
?為了方便后面代碼的編寫,先引入一些庫
import numpy as np # 導入NumPy庫,用于進行矩陣運算和數據處理 import torch # 導入PyTorch庫,用于構建神經網絡及相關操作 import torch.nn as nn # 導入PyTorch神經網絡模塊,用于構建神經網絡層 import torch.nn.functional as F # 導入PyTorch神經網絡函數庫,用于激活函數、損失函數等 import math, copy, time # 導入數學庫、復制庫和時間庫,用于各種數學計算、復制操作和計時 from torch.autograd import Variable # 從PyTorch自動微分庫中導入Variable類,用于構建自動微分計算圖 import matplotlib.pyplot as plt # 導入Matplotlib的pyplot模塊,用于繪制圖表和可視化 import seaborn # 導入Seaborn庫,用于繪制統計圖形和美化圖表 seaborn.set_context(context="talk") # 設置Seaborn的上下文環境,設置圖表的尺寸和標簽字體大小等 %matplotlib inline # IPython魔術命令,使Matplotlib繪制的圖形直接顯示在Notebook內1.1.1 針對輸入做embedding
對于模型來說,每一句話比如“七月的服務真好,答疑的速度很快”,在模型中都是一個詞向量,但如果每句話都臨時抱佛腳去生成對應的詞向量,則處理起來無疑會費時費力,所以在實際應用中,我們會事先預訓練好各種embedding矩陣,這些embedding矩陣包含常用領域常用單詞的向量化表示,且提前做好分詞
| 維度1 | 維度2 | 維度3 | 維度4 | ... | 維度512 | |
| 教育 | ||||||
| 機構 | ||||||
| 在線 | ||||||
| 課程 | ||||||
| .. | ||||||
| 服務 | ||||||
| 答疑 | ||||||
| 老師 |
從而當模型接收到“七月的服務真好,答疑的速度很快”這句輸入時,便可以從對應的embedding矩陣里查找對應的詞向量,最終把整句輸入轉換成對應的向量表示
這部分的代碼 可以如下表示
# 定義一個名為Embeddings的類,繼承自PyTorch的nn.Module類 class Embeddings(nn.Module):# 初始化Embeddings類def __init__(self, d_model, vocab):# 調用父類nn.Module的初始化方法super(Embeddings, self).__init__()# 創建一個詞嵌入層,參數為詞匯表大小和詞嵌入維度self.lut = nn.Embedding(vocab, d_model)# 將詞嵌入維度保存為類屬性self.d_model = d_model# 定義前向傳播方法def forward(self, x):# 通過詞嵌入層將輸入的單詞編碼為向量,并乘以詞嵌入維度的平方根進行縮放return self.lut(x) * math.sqrt(self.d_model)1.1.2 位置編碼的深意:如何編碼更好
然,如此篇文章所述,RNN的結構包含了序列的時序信息,而Transformer卻完全把時序信息給丟掉了,比如“他欠我100萬”,和“我欠他100萬”,兩者的意思千差萬別,故為了解決時序的問題,Transformer的作者用了一個絕妙的辦法:位置編碼(Positional Encoding)。
即將每個位置編號,從而每個編號對應一個向量,最終通過結合位置向量和詞向量,作為輸入embedding,就給每個詞都引入了一定的位置信息,這樣Attention就可以分辨出不同位置的詞了,具體怎么做呢?
至于是embedding向量的位置下標對2求商并取整(可用雙斜杠表示整數除法,即求商并取整),它的取值范圍是,比如
,
,
,
,
,
,
...
,
,
是指向量維度中的偶數維,即第0維、第2維、第4維...,第510維,用sin函數計算
是向量維度中的奇數維,即第1維、第3維、第5維..,第511維,用cos函數計算
不要小看transformer的這個位置編碼,不少做NLP多年的人也不一定對其中的細節有多深入,而網上大部分文章談到這個位置編碼時基本都是千篇一律、泛泛而談,很少有深入,故本文還是細致探討下
考慮到一圖勝千言 一例勝萬語,舉個例子,當我們要編碼「我 愛 你」的位置向量,假定每個token都具備512維,如果位置下標從0開始時,則根據位置編碼的計算公式可得『且為讓每個讀者閱讀本文時一目了然,我計算了每個單詞對應的位置編碼示例(在此之前,這些示例在其他地方基本沒有)』
- 當對上的單詞「我」進行位置編碼時,它本身的維度有512維
- 當對上的單詞「愛」進行位置編碼時,它本身的維度有512維
?然后再疊加上embedding向量,可得
- 當對上的單詞「你」進行位置編碼時,它本身的維度有512維
- ....
最終得到的可視化效果如下圖所示
代碼實現如下
“”“位置編碼的實現,調用父類nn.Module的構造函數”“” class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) # 初始化dropout層# 計算位置編碼并將其存儲在pe張量中pe = torch.zeros(max_len, d_model) # 創建一個max_len x d_model的全零張量position = torch.arange(0, max_len).unsqueeze(1) # 生成0到max_len-1的整數序列,并添加一個維度# 計算div_term,用于縮放不同位置的正弦和余弦函數div_term = torch.exp(torch.arange(0, d_model, 2) *-(math.log(10000.0) / d_model))# 使用正弦和余弦函數生成位置編碼,對于d_model的偶數索引,使用正弦函數;對于奇數索引,使用余弦函數。pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0) # 在第一個維度添加一個維度,以便進行批處理self.register_buffer('pe', pe) # 將位置編碼張量注冊為緩沖區,以便在不同設備之間傳輸模型時保持其狀態# 定義前向傳播函數def forward(self, x):# 將輸入x與對應的位置編碼相加x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)# 應用dropout層并返回結果return self.dropout(x)本文發布之后,有同學留言問,上面中的第11行、12行代碼
div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))為什么先轉換為了等價的指數+對數運算,而不是直接冪運算?是效率、精度方面有差異嗎?
這里使用指數和對數運算的原因是為了確保數值穩定性和計算效率。
- 一方面,直接使用冪運算可能會導致數值上溢或下溢。當d_model較大時,10000.0 ** (-i / d_model)中的冪可能會變得非常小,以至于在數值計算中產生下溢。通過將其轉換為指數和對數運算,可以避免這種情況,因為這樣可以在計算過程中保持更好的數值范圍
- 二方面,在許多計算設備和庫中,指數和對數運算的實現通常比冪運算更快。這主要是因為指數和對數運算在底層硬件和軟件中有特定的優化實現,而冪運算通常需要計算更多的中間值
所以,使用指數和對數運算可以在保持數值穩定性的同時提高計算效率。
既然提到了這行代碼,我們干脆就再講更細致些,上面那行代碼對應的公式為
其中的中括號對應的是一個從 0 到 的等差數列(步長為 2),設為
且上述公式與這個公式是等價的
為何,原因在于,從而有
最終,再通過下面這兩行代碼完美實現位置編碼
# 使用正弦和余弦函數生成位置編碼,對于d_model的偶數索引,使用正弦函數;對于奇數索引,使用余弦函數。pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)1.2?經過「embedding + 位置編碼」后乘以三個權重矩陣得到三個向量Q K V
從下圖可知,經過「embedding + 位置編碼」得到的輸入,會乘以「三個權重矩陣:??」得到查詢向量Q、鍵向量K、值向量V(你可以簡單粗暴的理解為弄出來了三個分身)
舉個例子,針對「我想吃酸菜魚」這句話,經過embedding + 位置編碼后,可得(注:可以512維,也可以是768維,但由于transformer論文中作者設置的512維,所以除了這個酸菜魚的例子暫為768維外,其他地方均統一為512維)
然后乘以三個權重矩陣得
?為此,我們可以先創建4個相同的線性層,每個線性層都具有 d_model 的輸入維度和 d_model 的輸出維度
self.linears = clones(nn.Linear(d_model, d_model), 4)前三個線性層分別用于對 Q向量、K向量、V向量進行線性變換(至于這第4個線性層在隨后的第3點)
1.3 對輸入和Multi-Head Attention做Add&Norm,再對上步輸出和Feed Forward做Add&Norm
我們聚焦下transformer論文中原圖的這部分,可知,輸入通過embedding+位置編碼后,先后做以下兩個步驟
最終這個編碼器層代碼可以完整的寫為
"""編碼器(Encoder)由自注意力(self-attention)層和前饋神經網絡(feed forward)層組成""" class EncoderLayer(nn.Module):# 初始化函數,接收size(層的維度大小)、self_attn(自注意力層實例)# feed_forward(前饋神經網絡實例)和dropout(dropout率)作為輸入參數def __init__(self, size, self_attn, feed_forward, dropout):super(EncoderLayer, self).__init__() # 調用父類nn.Module的構造函數self.self_attn = self_attn # 將自注意力層實例保存為類實例的屬性self.feed_forward = feed_forward # 將前饋神經網絡實例保存為類實例的屬性# 創建兩個具有相同參數的SublayerConnection實例(用于殘差連接和層歸一化)self.sublayer = clones(SublayerConnection(size, dropout), 2) self.size = size # 將層的維度大小保存為類實例的屬性def forward(self, x, mask):# 先對輸入x進行自注意力操作# 然后將結果傳遞給第一個SublayerConnection實例(包括殘差連接和層歸一化)x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))# 將上一步的輸出傳遞給前饋神經網絡# 然后將結果傳遞給第二個SublayerConnection實例(包括殘差連接和層歸一化),最后返回結果return self.sublayer[1](x, self.feed_forward)1.3.1 縮放點積注意力(Scaled Dot-Product Attention)
接下來,先看下縮放點積注意力(Scaled Dot-Product Attention)的整體實現步驟
還是拿上面那個例子:「我想吃酸菜魚」,則Q乘以K的轉置如下圖所示 最終得到的矩陣有6行6列,從上往下逐行來看的話,每一個格子里都會有一個數值,每一個數值依次代表:
? 單詞我與「我 想 吃 酸 菜 魚」各自的點積結果或相似度,比如可能是0.3?0.2 0.2?0.1 0.1 0.1,代表編碼1時放在「我 想 吃 酸 菜 魚」上面的注意力大小
同時,可以看到模型在對當前位置的信息進行編碼時,會過度的將注意力集中于自身的位置(當然 這無可厚非,畢竟自己與自己最相似嘛),而可能忽略了其它位置。很快你會看到,作者采取的一種解決方案就是采用多頭注意力機制(Multi-Head Attention)
? 想與「我 想 吃 酸 菜 魚」各自的點積結果或相似度
? 吃與「我 想 吃 酸 菜 魚」各自的點積結果或相似度
? 酸與「我 想 吃 酸 菜 魚」各自的點積結果或相似度
? 菜與「我 想 吃 酸 菜 魚」各自的點積結果或相似度
? 魚與「我 想 吃 酸 菜 魚」各自的點積結果或相似度?
上面兩步的代碼可以如下編寫 # torch.matmul是PyTorch庫提供的矩陣乘法函數# 具體操作即是將第一個矩陣的每一行與第二個矩陣的每一列進行點積(對應元素相乘并求和),得到新矩陣的每個元素scores = torch.matmul(query, key.transpose(-2, -1)) \/ math.sqrt(d_k)
同樣的方法,也可以計算出,如下圖8所示, b2就是拿q2去對其他的key做attention,最后再與其他的value值相乘取weighted sum得到,最終每個單詞都包含了上下文相關單詞的語義信息,不再只是attention計算之前,每個單詞只有它自己的信息,和上下文沒有關聯
另外,這里面還有一點值得注意的是,可能有同學疑問:當我們計算x1與x2、x3、x4的相似度之后,x2會再與x1、x3、x4再依次計算一遍相似度,這兩個過程中,前者算過了x1和x2的相似度,后者則再算一遍x2與x1的相似度,這不是重復計算么?其實不然,這是兩碼事,原因很簡單,正如你喜歡一個人 你會覺得她對你很重要,但那個人不一定喜歡你 她不會覺得你對她有多重要..
最終,Scaled Dot-Product Attention這部分對應的完整代碼可以寫為
'''計算“縮放點積注意力''' # query, key, value 是輸入的向量組 # mask 用于遮掩某些位置,防止計算注意力 # dropout 用于添加隨機性,有助于防止過擬合 def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1) # 獲取 query 向量的最后一個維度的大小,即詞嵌入的維度# 計算 query 和 key 的點積,并對結果進行縮放,以減少梯度消失或爆炸的可能性scores = torch.matmul(query, key.transpose(-2, -1)) \/ math.sqrt(d_k)# 如果提供了 mask,根據 mask 對 scores 進行遮掩# 遮掩的具體方法就是設為一個很大的負數比如-1e9,從而softmax后 對應概率基本為0if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)# 對 scores 進行 softmax 操作,得到注意力權重 p_attnp_attn = F.softmax(scores, dim = -1)# 如果提供了 dropout,對注意力權重 p_attn 進行 dropout 操作if dropout is not None:p_attn = dropout(p_attn)# 用注意力權重 p_attn 對 value 向量進行加權求和,得到最終的輸出return torch.matmul(p_attn, value), p_attn1.3.2 多頭注意力(Multi-Head Attention)
先看2個頭的例子,依然還是通過生成對應的三個矩陣、、,然后這三個矩陣再各自乘以兩個轉移矩陣得到對應的分矩陣,如
- 矩陣對應的兩個分矩陣、?
- 矩陣對應的兩個分矩陣為、
- 矩陣對應的兩個分矩陣為、
至于同理,也生成對應的6個分矩陣、、、、、
接下來編碼時,分兩步
如果是8個頭呢,計算步驟上也是一樣的,只是從2個頭變化到8個頭而已,最終把每個頭得到的結果直接concat,最后經過一個linear變換,得到最終的輸出,整體如下所示
這部分Multi-Head Attention的代碼可以寫為
'''代碼來自nlp.seas.harvard.edu,我針對每一行代碼、甚至每行代碼中的部分變量都做了詳細的注釋/解讀''' class MultiHeadedAttention(nn.Module):# 輸入模型的大小(d_model)和注意力頭的數量(h)def __init__(self, h, d_model, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert d_model % h == 0 # 確保 d_model 可以被 h 整除# 我們假設 d_v(值向量的維度)總是等于 d_k(鍵向量的維度)self.d_k = d_model // h # 計算每個注意力頭的維度self.h = h # 保存注意力頭的數量self.linears = clones(nn.Linear(d_model, d_model), 4) # 上文解釋過的四個線性層self.attn = None # 初始化注意力權重為 Noneself.dropout = nn.Dropout(p=dropout) # 定義 dropout 層# 實現多頭注意力的前向傳播def forward(self, query, key, value, mask=None):if mask is not None:# 對所有 h 個頭應用相同的 maskmask = mask.unsqueeze(1)nbatches = query.size(0) # 獲取 batch 的大小# 1) 批量執行從 d_model 到 h x d_k 的線性投影query, key, value = \[l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)for l, x in zip(self.linears, (query, key, value))]# 2) 在批量投影的向量上應用注意力# 具體方法是調用上面實現Scaled Dot-Product Attention的attention函數x, self.attn = attention(query, key, value, mask=mask,dropout=self.dropout)# 3) 使用 view 函數進行“拼接concat”,然后做下Linear變換x = x.transpose(1, 2).contiguous() \.view(nbatches, -1, self.h * self.d_k)return self.linears[-1](x) # 返回多頭注意力的輸出1.3.3?Position-wise前饋網絡的實現
在上文,咱們逐一編碼實現了embedding、位置編碼、縮放點積/多頭注意力,以及Add和Norm,整個編碼器部分還剩最后一個模塊,即下圖框里的Feed Forward Network(簡稱FFN)
其中包括兩個線性變換:維度上先擴大后縮小,最終輸入和輸出的維數為,內層的維度為,過程中使用ReLU作為激活函數
雖然線性變換在不同位置上是相同的,但它們在層與層之間使用不同的參數,相當于使用了兩個內核大小為1的卷積
這部分的代碼可以如下編寫
‘’‘定義一個名為PositionwiseFeedForward的類,繼承自nn.Module’‘’ class PositionwiseFeedForward(nn.Module):# 文檔字符串:實現FFN方程# 初始化方法,接受三個參數:d_model,d_ff和dropout(默認值為0.1)def __init__(self, d_model, d_ff, dropout=0.1):# 調用父類nn.Module的初始化方法super(PositionwiseFeedForward, self).__init__() self.w_1 = nn.Linear(d_model, d_ff) # 定義一個全連接層,輸入維度為d_model,輸出維度為d_ffself.w_2 = nn.Linear(d_ff, d_model) # 定義一個全連接層,輸入維度為d_ff,輸出維度為d_modelself.dropout = nn.Dropout(dropout) # 定義一個dropout層,dropout概率為傳入的dropout參數# 定義前向傳播方法,接受一個輸入參數xdef forward(self, x):# 將輸入x通過第一個全連接層w_1后,經過ReLU激活函數,再通過dropout層,最后通過第二個全連接層w_2,返回最終結果return self.w_2(self.dropout(F.relu(self.w_1(x))))1.4 對整個transformer? block復制N份最終成整個encode模塊
N可以等于6或其他數值
class Encoder(nn.Module): # 定義一個名為Encoder的類,它繼承了nn.Module類# 一個具有N層堆疊的核心編碼器# 初始化方法,接受兩個參數:layer(編碼器層的類型)和N(編碼器層的數量)def __init__(self, layer, N): super(Encoder, self).__init__() # 調用父類nn.Module的初始化方法self.layers = clones(layer, N) # 創建N個編碼器層的副本,并將其賦值給實例變量self.layersself.norm = LayerNorm(layer.size) # 創建一個LayerNorm層,并將其賦值給實例變量self.norm# 定義前向傳播方法,接受兩個參數:x(輸入數據)和mask(掩碼)def forward(self, x, mask): # 文檔字符串:解釋本方法的功能是將輸入(及其掩碼)依次傳遞給每一層for layer in self.layers: # 遍歷self.layers中的每一個編碼器層x = layer(x, mask) # 將輸入x和mask傳遞給當前編碼器層,并將輸出結果賦值給xreturn self.norm(x) # 對最終的輸出x應用LayerNorm層,并將結果返回其中的clone函數的代碼為
def clones(module, N):"Produce N identical layers."return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])第二部分 從零實現Transformer解碼器模塊
咱們再回顧下transformer的整個模型架構,特別是解碼器的部分,畢竟BERT外,GPT等很有影響力的模型都用的transformer decode結構
從底至上,
- 輸入包括2部分,下方是前一個time step的輸出的embedding
再加上一個表示位置的Positional Encoding - 接著是Masked Multi-Head Self-attention,masked字面意思是屏蔽 然后做一下Add&Norm
- 再往上是一個不帶mask的Multi-Head Attention層,它的Key、Value矩陣使用 Encoder 的編碼信息矩陣,而Query使用上一個 Decoder block 的輸出計算
然后再做一下Add&Norm - 繼續往上,經過一個FFN層,也做一下Add&Norm
- 最后做下linear變換后,通過Softmax 層計算下一個翻譯單詞的概率
由于在第一部分介紹過了embedding、positional encoding、FFN、Add&Norm、linear、softmax、multi-head attention,故本部分只重點介紹下Masked Multi-Head Self-attention
2.1?Masked Multi-Head Self-attention
本過程和第一部分介紹的Multi-Head self-attention基本一致,區別在于加了個mask機制
2.2 transformer解碼器架構與整體編碼-解碼架構的實現
整個解碼器架構的代碼可以如下編寫『有一點值得注意的是,如下文代碼中所述
- 在對輸入x執行自注意力計算并進行第一個子層的處理(帶mask),最后一個參數是tgt_mask,即x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
- 但對輸入x執行源注意力計算并進行第二個子層的處理時(不帶mask),最后一個參數是src_mask,即x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))?』
且Decoder也是由N=6個相同層組成
class Decoder(nn.Module):"Generic N layer decoder with masking."def __init__(self, layer, N):super(Decoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, memory, src_mask, tgt_mask):for layer in self.layers:x = layer(x, memory, src_mask, tgt_mask)return self.norm(x)最終,整個transformer完整模型的整體封裝代碼為
def make_model(src_vocab, tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):"Helper: Construct a model from hyperparameters."c = copy.deepcopyattn = MultiHeadedAttention(h, d_model)ff = PositionwiseFeedForward(d_model, d_ff, dropout)position = PositionalEncoding(d_model, dropout)model = EncoderDecoder(Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),nn.Sequential(Embeddings(d_model, src_vocab), c(position)),nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),Generator(d_model, tgt_vocab))# This was important from their code. # Initialize parameters with Glorot / fan_avg.for p in model.parameters():if p.dim() > 1:nn.init.xavier_uniform(p)return model# Small example model. tmp_model = make_model(10, 10, 2) None2.3 編碼器與解碼器的協同
當我們把編碼器和解碼器組合到一起后,看下它兩是如何一塊協作的
需要注意的是
而Decoder只有Q來自于上一個Decoder單元的輸出,K與V都來自于Encoder最后一層的輸出。也就是說,Decoder是要通過當前狀態與Encoder的輸出算出權重后(計算query與各個key的相似度),最后將Encoder的編碼加權得到下一層的狀態
比如當我們要把“Hello Word”翻譯為“你好,世界”時
Decoder會計算“你好”這個query分別與“Hello”、“Word”這兩個key的相似度
很明顯,“你好”與“Hello”更相似,從而給“Hello”更大的權重,從而把“你好”對應到“Hello”,達到的效果就是“Hello”翻譯為“你好”
第三部分?Transformer的整個訓練過程:預處理與迭代
3.1 預處理階段:創建詞匯表
具體實現時,先創建批次和掩碼
class Batch: def __init__(self, src, trg=None, pad=0):self.src = src # 輸入數據源(通常為源語言)self.src_mask = (src != pad).unsqueeze(-2) # 創建源語言的掩碼,用于忽略填充部分if trg is not None: # 如果目標語言數據存在self.trg = trg[:, :-1] # 目標語言數據,去掉最后一個詞self.trg_y = trg[:, 1:] # 目標語言數據,去掉第一個詞self.trg_mask = \self.make_std_mask(self.trg, pad) # 創建目標語言的掩碼,用于忽略填充部分和未來詞匯self.ntokens = (self.trg_y != pad).data.sum() # 計算目標語言中非填充詞的數量@staticmethod def make_std_mask(tgt, pad):"Create a mask to hide padding and future words."tgt_mask = (tgt != pad).unsqueeze(-2) # 創建目標語言的掩碼,用于忽略填充部分tgt_mask = tgt_mask & Variable(subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)) # 使用子掩碼屏蔽未來詞匯return tgt_mask # 返回完整的目標語言掩碼3.2 訓練三部曲:隨機初始化、損失函數、反向傳播
接下來,我們創建一個通用的訓練和得分函數來跟蹤損失。我們傳入一個通用的損失計算函數,它也處理參數更新
def run_epoch(data_iter, model, loss_compute): start = time.time() # 記錄當前時間 total_tokens = 0 # 初始化總tokens計數 total_loss = 0 # 初始化總損失 tokens = 0 # 初始化tokens計數# 遍歷數據集中的每個批次 for i, batch in enumerate(data_iter): # 對每個批次進行前向傳播 out = model.forward(batch.src, batch.trg, batch.src_mask, batch.trg_mask) # 計算每個批次的損失loss = loss_compute(out, batch.trg_y, batch.ntokens) # 累加損失total_loss += loss total_tokens += batch.ntokens # 累加tokenstokens += batch.ntokens # 累加tokens# 每50個批次進行一次日志記錄if i % 50 == 1: elapsed = time.time() - start # 計算已用時間# 輸出當前批次,損失和每秒處理的tokensprint("Epoch Step: %d Loss: %f Tokens per Sec: %f" %(i, loss / batch.ntokens, tokens / elapsed)) start = time.time() # 重置開始時間tokens = 0 # 重置tokens計數return total_loss / total_tokens # 返回平均損失下面這段代碼定義了一個名為 SimpleLossCompute 的類,實現了簡單的損失計算和訓練函數
- 在調用該類的實例時,輸入預測輸出、目標輸出和規范化因子,計算損失值并進行梯度更新
- 如果提供了優化器,還會更新模型參數和清空梯度緩存
3.2.1 Adam優化器:自動調整學習率并具有動量效應
優化器(optimizer)經常用于在訓練過程中更新模型參數以最小化損失函數,而Adam(Adaptive Moment Estimation)是一種常用的優化器,它結合了兩種傳統優化算法的優點:Momentum和RMSprop
為了通俗易懂地理解Adam,可以將其比作一個賽車手。訓練模型就像是找到一輛賽車在賽道上的最佳行駛速度和路徑,以達到最快的速度并取得優異的成績。在這個過程中,速度的調整(即學習率)非常重要
首先,Adam像Momentum一樣,具有動量效應。這意味著賽車手(模型)會積累動量,使其在下坡時更快,而在上坡時減速。這有助于模型更快地穿越平坦區域,并避免在最低點附近擺動
其次,Adam像RMSprop一樣,會自適應地調整每個參數的學習率。在我們的賽車比喻中,這就像賽車手會針對每個輪胎的摩擦系數(賽道狀況)做出相應的速度調整。這有助于模型更快地收斂到最優解
總之,Adam可以自動調整學習率,并具有動量效應。總的來說,它能幫助我們的“賽車手”在不同的賽道狀況下更快地找到最佳行駛速度和路徑,從而更快地訓練出高效的模型
transformer原始論文便選擇的Adam作為優化器,其參數為,和,根據以下公式,我們在訓練過程中改變了學習率:
在預熱中隨步數線性地增加學習速率,并且此后與步數的反平方根成比例地減小它,設置預熱步數為4000
我們來看下具體的編碼實現。下面這段代碼定義了一個名為 NoamOpt 的類,實現了一種自適應學習率調整策略,該策略在訓練 Transformer 模型時常用。在訓練的前幾個步驟(預熱期)中,學習率會線性增長,之后學習率會隨著步數的增加而逐漸降低。這種策略有助于模型在訓練初期更快地收斂,同時在訓練后期保持較低的學習率,有利于模型的穩定訓練。
# 定義 NoamOpt 類,實現自適應學習率調整策略 class NoamOpt:# 初始化 NoamOpt 類的實例def __init__(self, model_size, factor, warmup, optimizer):self.optimizer = optimizer # 優化器對象(如 Adam)self._step = 0 # 記錄優化步數self.warmup = warmup # 預熱步數self.factor = factor # 縮放因子self.model_size = model_size # 模型維度大小self._rate = 0 # 初始學習率# 更新模型參數和學習率def step(self):self._step += 1 # 優化步數加 1rate = self.rate() # 計算當前學習率for p in self.optimizer.param_groups: # 更新優化器中的學習率p['lr'] = rateself._rate = rate # 存儲當前學習率self.optimizer.step() # 更新模型參數# 計算當前步數的學習率def rate(self, step=None):if step is None: # 如果未提供步數,使用當前步數step = self._stepreturn self.factor * \(self.model_size ** (-0.5) * # 計算學習率公式中的模型維度項min(step ** (-0.5), step * self.warmup ** (-1.5))) # 計算學習率公式中的最小值項# 定義用于獲取 NoamOpt 類實例的函數 def get_std_opt(model):return NoamOpt(model.src_embed[0].d_model, 2, 4000,torch.optim.Adam(model.parameters(), lr=0, betas=(0.9, 0.98), eps=1e-9))最后總結一下Transformer的影響力
- OpenAI基于它發展出了GPT,并不斷迭代出GPT2、GPT3、GPT3.5及火爆全球的 ChatGPT
- Google則基于它發展出了在ChatGPT出現之前統治NLP各大任務的BERT,多好的青春年華!
第四部分?Hugging face社區實現的Transformers庫的整體解讀
目前絕大部分有影響力的大模型基本都基于transformer的架構 (這個頁面底部可以看到基于transformer的200多個有影響力的模型),既然基于transformer便得實現transformer
- 而上文更多只是為了方便理解原理而做的簡易版的實現
- 實際運用時基本都用的Hugging face社區實現的Transformers庫 「比如此文的2.2節:Stanford Alpaca的微調拆解——見證LLM微調的一般模式?」:?https://github.com/huggingface/transformers/tree/main/src/transformers,功能強大且便捷
然要分析這么一個大庫是不容易的,如下圖所示,包括分詞等等各種功能
且光trainer.py(https://github.com/huggingface/transformers/blob/main/src/transformers/trainer.py)這一個項目文件的實現就有3858行
4.1 逐行解讀:3858行的transformers/src/transformers/trainer.py
4.1.1 導入一系列Python/numpy/torch里面的各種庫
- # coding=utf-8:這行定義了此腳本文件的編碼格式為utf-8
- 2-12. 這些行是關于版權和Apache許可證的聲明。代碼可以在遵守這些許可證條款的情況下被使用
這是一個模塊級的docstring,解釋了這個模塊的主要功能,即創建一個可以輕松訓練或微調HuggingFace Transformers模型的Trainer類 - 15-30. 導入了一些常用的Python標準庫,包括對文件、操作系統、時間、警告等的操作,以及一些集合和類型檢查的工具
- 33-36. 這里首先導入了和模型訓練相關的集成工具。這些工具包括了報告集成回調、超參數、判斷fairscale(一個優化PyTorch模型訓練的庫)是否可用等
- 39-45. 導入了numpy和torch以及其分布式模塊,這些是進行深度學習計算的基礎庫。同時也導入了huggingface_hub的Repository和create_repo,它們是用于與HuggingFace模型Hub進行交互的工具
- 46-51. 導入了torch內的nn模塊,以及torch.utils.data模塊中的DataLoader, Dataset, RandomSampler, SequentialSampler,這些是用于處理神經網絡和數據的基本工具
這行導入了當前模塊的版本信息 - 54-87. 這些行導入了許多與模型訓練相關的工具和函數,包括預訓練模型和配置,數據整理,調試工具,優化器,層標準化,分布式訓練工具(比如deepspeed),回調函數等
- 89-132. 這些行導入了一些與訓練有關的工具和函數,包括分布式策略,內存跟蹤,優化器名稱,訓練參數等
- 134-173. 這些行導入了一些工具和函數,主要用于處理適配器,配置,權重,日志,數據集,設備檢測等
- 175-177. 定義了默認的回調函數列表和默認的進度回調函數
- 179-189. 根據環境的可用性,可能會導入和Notebook, Apex, 數據集, Torch TPU, Fairscale相關的模塊
- 191-200. 如果SageMaker模型并行可用,那么導入與其相關的模塊,并檢查其版本
如果安全張量庫可用,就導入它
如果性能分析工具PEFT可用,就導入它 - 206-217. 如果Accelerate可用,那么導入與其相關的模塊,并檢查其版本
使用TYPE_CHECKING做類型檢查,如果是,就導入optuna模塊
設置了logger用于日志記錄 - 224-230. 定義了一些常量,它們是用于保存訓練時的參數,狀態,優化器,調度器,梯度縮放器等信息的文件名
4.1.2?定義class Trainer,先做一些初始化設置
然后定義class Trainer,逐一實現了如下函數
- func __init__
硬件配置:代碼首先判斷是否需要將模型放置在特定的設備(如 GPU 或 CPU)上。一些特殊情況,如使用了模型并行、深度學習庫DeepSpeed、完全bf16或fp16評估、數據并行處理和完全分片的數據并行處理,都會對這個決定產生影響。
數據預處理:然后,代碼會創建一個用于數據處理的 data_collator,這個 data_collator 會根據是否有分詞器(tokenizer)來選擇默認的數據整理器。這個整理器將在訓練和驗證過程中用于整理數據。
優化器與學習率調度器:然后,代碼檢查了優化器和學習率調度器是否已經設置,并在必要時進行了一些配置。在這里,還進行了一些錯誤檢查,以防模型和優化器參數不在同一個設備上,或者優化器與使用的并行處理庫(如Fairscale、Deepspeed或PyTorch FSDP)不兼容。
回調函數:最后,代碼初始化了一些默認的回調函數,并在需要時創建了一個遠程倉庫的克隆和輸出目錄。這些回調函數將在訓練過程中的不同時間點被調用,可以用來做一些自定義的操作,比如在每個 epoch 結束后保存模型。
混合精度設置:代碼首先檢查是否需要使用混合精度訓練(即使用 fp16 或 bf16)。如果需要,根據后端類型(例如 "cuda_amp" 或 "cpu_amp"),選擇正確的混合精度訓練策略。在這里,也進行了一些錯誤檢查,以防混合精度訓練與使用的并行處理庫(如SageMaker Model Parallelism)不兼容。
標簽平滑:然后,代碼檢查是否需要使用標簽平滑(一種常見的防止過擬合的技巧),并在需要時設置相應的對象。
訓練器狀態和控制:接下來,代碼初始化了訓練器的狀態和控制對象,這兩個對象將在訓練過程中用于跟蹤訓練的進展和控制訓練的流程。
其他設置:最后,代碼還進行了一些其他的設置,比如初始化內存跟蹤器,設置訓練批次的大小,以及處理一些特定的訓練參數(如 "torch_compile")
- func add_callback
- func pop_callback
- func remove_callback
- func _move_model_to_device
- func _set_signature_columns_if_needed
- func _remove_unused_columns
- func _get_collator_with_removed_columns
4.1.3 訓練數據集、驗證數據集相關
- func _get_train_sampler # 獲取訓練采樣器def _get_train_sampler(self) -> Optional[torch.utils.data.Sampler]: if self.train_dataset is None or not has_length(self.train_dataset): # 如果沒有訓練數據集或訓練數據集沒有長度,返回Nonereturn None# 創建采樣器if self.args.group_by_length: # 如果參數設定了按長度分組if is_datasets_available() and isinstance(self.train_dataset, datasets.Dataset): # 如果有datasets庫并且訓練數據集是datasets.Dataset的實例lengths = (self.train_dataset[self.args.length_column_name]if self.args.length_column_name in self.train_dataset.column_nameselse None) # 如果訓練數據集中有長度列名,獲取長度,否則長度為Noneelse:lengths = None # 否則,長度為Nonemodel_input_name = self.tokenizer.model_input_names[0] if self.tokenizer is not None else None # 獲取模型輸入名稱return LengthGroupedSampler( # 返回長度分組采樣器self.args.train_batch_size * self.args.gradient_accumulation_steps,dataset=self.train_dataset,lengths=lengths,model_input_name=model_input_name,)else:return RandomSampler(self.train_dataset) # 否則,返回隨機采樣器
- func get_train_dataloader # 獲取訓練數據的 DataLoaderdef get_train_dataloader(self) -> DataLoader:"""返回訓練[`~torch.utils.data.DataLoader`]。如果`train_dataset`未實現`__len__`,將不使用采樣器,否則,使用適應于分布式訓練的隨機采樣器。如果想注入一些自定義行為,可以在子類中重寫此方法。"""# 如果訓練集為空,則拋出 ValueErrorif self.train_dataset is None:raise ValueError("Trainer: training requires a train_dataset.")# 創建訓練數據集和數據整理器train_dataset = self.train_datasetdata_collator = self.data_collator# 如果訓練集是數據集的實例,移除未使用的列if is_datasets_available() and isinstance(train_dataset, datasets.Dataset):train_dataset = self._remove_unused_columns(train_dataset, description="training")# 否則,使用數據整理器移除未使用的列else:data_collator = self._get_collator_with_removed_columns(data_collator, description="training")# 定義 DataLoader 參數dataloader_params = {"batch_size": self._train_batch_size,"collate_fn": data_collator,"num_workers": self.args.dataloader_num_workers,"pin_memory": self.args.dataloader_pin_memory,}# 如果訓練集不是迭代的數據集,設定采樣器和其他參數if not isinstance(train_dataset, torch.utils.data.IterableDataset):dataloader_params["sampler"] = self._get_train_sampler()dataloader_params["drop_last"] = self.args.dataloader_drop_lastdataloader_params["worker_init_fn"] = seed_worker# 返回由 accelerator 處理過的 DataLoaderreturn self.accelerator.prepare(DataLoader(train_dataset, **dataloader_params))
- func _get_eval_sampler # 獲取評估數據的采樣器def _get_eval_sampler(self, eval_dataset: Dataset) -> Optional[torch.utils.data.Sampler]:# 廢棄的代碼if self.args.use_legacy_prediction_loop:# 如果是在TPU上運行,返回 SequentialDistributedSamplerif is_torch_tpu_available():return SequentialDistributedSampler(eval_dataset, num_replicas=xm.xrt_world_size(), rank=xm.get_ordinal())# 如果是在Sagemaker多處理器環境中運行,返回SequentialDistributedSamplerelif is_sagemaker_mp_enabled():return SequentialDistributedSampler(eval_dataset,num_replicas=smp.dp_size(),rank=smp.dp_rank(),batch_size=self.args.per_device_eval_batch_size,)# 其他情況下,返回順序采樣器else:return SequentialSampler(eval_dataset)# 如果是單機環境,返回順序采樣器;否則,返回 Noneif self.args.world_size <= 1:return SequentialSampler(eval_dataset)else:return None
- func get_eval_dataloader # 獲取評估數據的 DataLoaderdef get_eval_dataloader(self, eval_dataset: Optional[Dataset] = None) -> DataLoader:"""返回評估[`~torch.utils.data.DataLoader`]。如果想注入一些自定義行為,可以在子類中重寫此方法。Args:eval_dataset (`torch.utils.data.Dataset`, *optional*):如果提供,將覆蓋`self.eval_dataset`。如果它是一個[`~datasets.Dataset`],自動刪除模型的`forward()`方法不接受的列。必須實現`__len__`。"""# 如果評估集為空,則拋出 ValueErrorif eval_dataset is None and self.eval_dataset is None:raise ValueError("Trainer: evaluation requires an eval_dataset.")# 創建評估數據集和數據整理器eval_dataset = eval_dataset if eval_dataset is not None else self.eval_datasetdata_collator = self.data_collator# 如果評估集是數據集的實例,移除未使用的列if is_datasets_available() and isinstance(eval_dataset, datasets.Dataset):eval_dataset = self._remove_unused_columns(eval_dataset, description="evaluation")# 否則,使用數據整理器移除未使用的列else:data_collator = self._get_collator_with_removed_columns(data_collator, description="evaluation")# 定義 DataLoader 參數dataloader_params = {"batch_size": self.args.eval_batch_size,"collate_fn": data_collator,"num_workers": self.args.dataloader_num_workers,"pin_memory": self.args.dataloader_pin_memory,}# 如果評估集不是迭代的數據集,設定采樣器和其他參數if not isinstance(eval_dataset, torch.utils.data.IterableDataset):dataloader_params["sampler"] = self._get_eval_sampler(eval_dataset)dataloader_params["drop_last"] = self.args.dataloader_drop_last# 返回由 accelerator 處理過的 DataLoaderreturn self.accelerator.prepare(DataLoader(eval_dataset, **dataloader_params))
- func get_test_dataloader def get_test_dataloader(self, test_dataset: Dataset) -> DataLoader:"""返回測試集的數據加載器 [`~torch.utils.data.DataLoader`]如果需要插入一些自定義行為,可以在子類中重寫此方法Args:test_dataset (`torch.utils.data.Dataset`, *optional*):要使用的測試數據集。如果它是一個 [`~datasets.Dataset`],則自動刪除 `model.forward()` 方法不接受的列。它必須實現 `__len__`"""data_collator = self.data_collator # 獲取數據處理器# 如果datasets庫可用且test_dataset是datasets.Dataset類型,移除不必要的列if is_datasets_available() and isinstance(test_dataset, datasets.Dataset):test_dataset = self._remove_unused_columns(test_dataset, description="test")else:data_collator = self._get_collator_with_removed_columns(data_collator, description="test")# 定義數據加載器參數dataloader_params = {"batch_size": self.args.eval_batch_size, # 批大小"collate_fn": data_collator, # 數據處理函數"num_workers": self.args.dataloader_num_workers, # 工作線程數量"pin_memory": self.args.dataloader_pin_memory, # 是否將數據加載器的數據放在固定的內存區域}# 如果test_dataset不是可迭代數據集,添加采樣器和drop_last參數if not isinstance(test_dataset, torch.utils.data.IterableDataset):dataloader_params["sampler"] = self._get_eval_sampler(test_dataset) # 添加采樣器dataloader_params["drop_last"] = self.args.dataloader_drop_last # 是否丟棄最后不完整的批次# 返回加速器準備好的數據加載器return self.accelerator.prepare(DataLoader(test_dataset, **dataloader_params))
4.1.4 一系列優化器函數的實現
- func create_optimizer_and_scheduler def create_optimizer_and_scheduler(self, num_training_steps: int):"""設置優化器和學習率調度器我們提供一個合理的默認值,工作得很好。如果你想使用其他的,你可以在Trainer的init中通過`optimizers`傳遞一個元組,或者在子類中重寫此方法(或`create_optimizer`和/或`create_scheduler`)。"""self.create_optimizer() # 創建優化器# 如果SageMaker版本大于等于1.10且啟用了fp16,解包優化器if IS_SAGEMAKER_MP_POST_1_10 and smp.state.cfg.fp16:optimizer = self.optimizer.optimizerelse:optimizer = self.optimizerself.create_scheduler(num_training_steps=num_training_steps, optimizer=optimizer) # 創建學習率調度器
- func create_optimizer def create_optimizer(self):"""設置優化器。我們提供一個合理的默認值,工作得很好。如果你想使用其他的,你可以在Trainer的init中通過`optimizers`傳遞一個元組,或者在子類中重寫此方法。"""# 根據是否啟用了SageMaker模型并行,選擇不同的模型opt_model = self.model_wrapped if is_sagemaker_mp_enabled() else self.model# 如果優化器為空,初始化一個新的優化器if self.optimizer is None:# 獲取待優化參數,并區分是否需要權重衰減decay_parameters = get_parameter_names(opt_model, ALL_LAYERNORM_LAYERS)decay_parameters = [name for name in decay_parameters if "bias" not in name]optimizer_grouped_parameters = [{"params": [p for n, p in opt_model.named_parameters() if (n in decay_parameters and p.requires_grad)],"weight_decay": self.args.weight_decay, # 權重衰減},{"params": [p for n, p in opt_model.named_parameters() if (n not in decay_parameters and p.requires_grad)],"weight_decay": 0.0, # 不需要權重衰減},]# 獲取優化器類和參數optimizer_cls, optimizer_kwargs = Trainer.get_optimizer_cls_and_kwargs(self.args)# 如果啟用了簡單的分片DDP,使用OSS作為優化器,否則使用獲取的優化器if self.sharded_ddp == ShardedDDPOption.SIMPLE:self.optimizer = OSS(params=optimizer_grouped_parameters,optim=optimizer_cls,**optimizer_kwargs,)else:self.optimizer = optimizer_cls(optimizer_grouped_parameters, **optimizer_kwargs)if optimizer_cls.__name__ == "Adam8bit":import bitsandbytesmanager = bitsandbytes.optim.GlobalOptimManager.get_instance()skipped = 0for module in opt_model.modules():if isinstance(module, nn.Embedding):skipped += sum({p.data_ptr(): p.numel() for p in module.parameters()}.values())logger.info(f"skipped {module}: {skipped/2**20}M params")manager.register_module_override(module, "weight", {"optim_bits": 32})logger.debug(f"bitsandbytes: will optimize {module} in fp32")logger.info(f"skipped: {skipped/2**20}M params")# 如果啟用了SageMaker模型并行,使用SageMaker的分布式優化器if is_sagemaker_mp_enabled():self.optimizer = smp.DistributedOptimizer(self.optimizer)return self.optimizer
- func get_optimizer_cls_and_kwargs
根據提供的參數,選擇并配置合適的優化器,以便在模型訓練中使用
4.1.5 學習率相關函數的實現
- func create_scheduler # 定義創建學習率調度器的函數def create_scheduler(self, num_training_steps: int, optimizer: torch.optim.Optimizer = None):"""設置調度器。訓練器的優化器必須在調用此方法之前已經設置好,或者作為參數傳遞。Args:num_training_steps (int): 要進行的訓練步數。"""# 如果調度器還未設置if self.lr_scheduler is None:# 使用 get_scheduler 函數創建調度器self.lr_scheduler = get_scheduler(self.args.lr_scheduler_type,optimizer=self.optimizer if optimizer is None else optimizer,num_warmup_steps=self.args.get_warmup_steps(num_training_steps),num_training_steps=num_training_steps,)# 返回創建的學習率調度器return self.lr_scheduler
- func num_examples
- func _hp_search_setup
- func _report_to_hp_search
- func _tune_save_checkpoint
- func call_model_init
- func torch_jit_model_eval
4.1.6 分布式訓練相關函數的實現
- func ipex_optimize_model
首先檢查了 Intel PyTorch Extension (IPEX) 是否可用。IPEX 是一個基于 Intel oneAPI Deep Neural Network Library (oneDNN) 的 PyTorch 擴展庫,可以幫助在 Intel 的硬件(如 CPU)上更高效地運行 PyTorch 程序
如果處于訓練模式,函數會使用 IPEX 對模型和優化器進行優化;如果處于非訓練模式(例如評估或測試),則僅對模型進行優化 - func_wrap_model
根據參數設置,可能會首先使用 IPEX 對模型進行優化。
如果啟用了 Sagemaker 的模型并行,會將模型包裝為 Sagemaker 的 DistributedModel。模型并行是一種訓練大型模型的技術,它將模型的部分放在不同的 GPU 上,以克服單個 GPU 內存限制
如果模型已經被包裝(可能在之前的步驟中被包裝),則直接返回該模型
使用 NVIDIA APEX(一種可以提高 GPU 利用率和擴展訓練的庫)進行混合精度訓練。這主要針對 PyTorch 版本小于1.6的情況,因為 PyTorch 1.6 及以上版本已經內置了混合精度訓練的支持
如果啟用了多 GPU 訓練,且模型不是8bit模型(即該模型不支持 int8 類型),則使用 PyTorch 的 DataParallel 對模型進行數據并行處理。數據并行是一種將輸入數據分塊在多個 GPU 上并行處理的技術,可以有效地利用多個 GPU 進行訓練。
如果啟用了 JIT 模式評估,則對模型進行 JIT 編譯。PyTorch 的 JIT 編譯器可以將模型編譯為中間表示(IR),然后在運行時對其進行優化,從而提高模型的運行效率。
如果不是訓練模式(例如評估或測試),則在這個階段返回模型,否則繼續對模型進行進一步的包裝 - func auto_wrapper_callable
- func patched_optimizer_step
4.1.7 主要訓練入口:func train和func_inner_training_loop
- func train """主要訓練入口"""def train(self,# 可選參數,接收字符串或布爾類型,代表從哪個檢查點恢復訓練resume_from_checkpoint: Optional[Union[str, bool]] = None,# 可選參數,接收Optuna的Trial實例或者包含超參數的字典 trial: Union["optuna.Trial", Dict[str, Any]] = None,# 可選參數,接收一個字符串列表,代表在模型輸出中需要忽略的鍵值 ignore_keys_for_eval: Optional[List[str]] = None, **kwargs, # 接收其他關鍵字參數,用于隱藏已棄用的參數):# 如果resume_from_checkpoint為False,將其設置為Noneif resume_from_checkpoint is False: resume_from_checkpoint = None# 內存指標 - 必須盡早設置self._memory_tracker.start()args = self.args# 設置訓練狀態為Trueself.is_in_train = True # do_train可能未設置,但仍然可能調用.train(),所以下面的操作是為了避免這種情況if (args.fp16_full_eval or args.bf16_full_eval) and not args.do_train:self._move_model_to_device(self.model, args.device)# 如果關鍵字參數中包含model_pathif "model_path" in kwargs: # 將model_path的值賦給resume_from_checkpoint并在kwargs中刪除model_pathresume_from_checkpoint = kwargs.pop("model_path") warnings.warn("`model_path` is deprecated and will be removed in a future version. Use `resume_from_checkpoint` ""instead.", # 發出關于model_path將在未來版本中刪除的警告FutureWarning,)# 如果還有未處理的關鍵字參數if len(kwargs) > 0: raise TypeError(f"train() received got unexpected keyword arguments: {', '.join(list(kwargs.keys()))}.") # 拋出類型錯誤# 這可能會改變隨機種子,因此需要先運行self._hp_search_setup(trial)self._train_batch_size = self.args.train_batch_size # 設置訓練批次大小# 重載模型model_reloaded = Falseif self.model_init is not None: # 如果模型初始化方法存在# 在實例化模型時,必須先設置隨機種子enable_full_determinism(self.args.seed) if self.args.full_determinism else set_seed(self.args.seed)# 使用試驗的超參數初始化模型self.model = self.call_model_init(trial)# 將模型重載標記設置為True model_reloaded = True # 重新初始化優化器和調度器self.optimizer, self.lr_scheduler = None, None# 加載可能存在的模型檢查點# 如果resume_from_checkpoint是bool類型且值為Trueif isinstance(resume_from_checkpoint, bool) and resume_from_checkpoint: # 從輸出目錄中獲取最新的檢查點 resume_from_checkpoint = get_last_checkpoint(args.output_dir) # 如果沒有找到有效的檢查點 if resume_from_checkpoint is None: raise ValueError(f"No valid checkpoint found in output directory ({args.output_dir})") # 拋出值錯誤# 如果resume_from_checkpoint不為None,并且SageMaker MP和DeepSpeed沒有啟用if resume_from_checkpoint is not None and not is_sagemaker_mp_enabled() and not self.is_deepspeed_enabled:# 從檢查點恢復模型self._load_from_checkpoint(resume_from_checkpoint) # 如果模型已經重載,將其放在正確的設備上并更新self.model_wrappedif model_reloaded:if self.place_model_on_device:self._move_model_to_device(self.model, args.device)self.model_wrapped = self.model# 查找可執行的批次大小inner_training_loop = find_executable_batch_size(self._inner_training_loop, self._train_batch_size, args.auto_find_batch_size)# 進行內部訓練循環return inner_training_loop(args=args,resume_from_checkpoint=resume_from_checkpoint,trial=trial,ignore_keys_for_eval=ignore_keys_for_eval,)
- func_inner_training_loop
首先,代碼計算了每個epoch中的訓練步驟數量(steps_in_epoch),這可以是數據加載器的長度,或者是最大步數乘以梯度累積步數。
然后,它會處理開始新的訓練epoch,包括可能的從檢查點恢復訓練的步驟。
代碼遍歷了每個訓練步驟,每個步驟接收輸入數據,并進行以下操作:
- 在每個需要的步驟上同步隨機數生成器的狀態
- 跳過已經訓練過的步驟(如果從檢查點恢復訓練)
- 調用回調函數處理步驟的開始
- 執行訓練步驟,并計算訓練損失
- 如果損失是NaN或Inf(無窮),則根據前面記錄的損失進行調整
- 計算浮點運算的數量
- 如果達到了梯度累積的步驟,或者是最后一步,會進行以下操作:
- 執行梯度裁剪(如果需要)
- 執行優化器步驟,并判斷優化器是否真正執行了步驟
- 如果優化器步驟執行了,進行學習率調度(除了在使用ReduceLROnPlateau學習率調度器的情況下,它需要在生成度量之后才執行調度)
- 模型的梯度清零
- 更新全局步驟和epoch數
- 調用回調函數處理步驟的結束
- 有條件地記錄、保存和評估模型
- 如果訓練應該停止,或者已經完成了所有的步驟,則退出循環
在每個epoch結束時,代碼處理epoch的結束,可能會記錄、保存和評估模型,檢查是否有配置的TPU,并決定是否應該停止整個訓練
4.1.8 對模型的加載、檢查、評估、保存
- func_get_output_dir
- func_load_from_checkpoint
- func_load_best_model
- func_issue_warnings_after_load
- func_maybe_log_save_evaluate
這個函數主要執行的是在訓練過程中的日志記錄、模型評估和模型保存的操作。主要步驟包括:
- func_load_rng_state
- func_save_checkpoint
- func_load_optimizer_and_scheduler
用于從給定的檢查點位置加載優化器和學習率調度器的狀態
這通常在訓練的中斷后恢復訓練時使用,以確保訓練可以從上次停止的地方繼續。在加載狀態時,需要考慮一些因素,例如是否啟用了DeepSpeed,是否啟用了SageMaker多處理,是否可用TPU,是否啟用了全尺寸數據并行(FSDP)等。各種情況需要采用不同的方式來加載狀態 - func opt_load_hook
- func opt_load_hook
- func hyperparameter_search
用于啟動超參數搜索。可以使用不同的后端進行搜索,包括optuna、Ray Tune或SigOpt,默認使用optuna
該方法接收一個定義超參數搜索空間的函數,一個計算目標函數的函數,試驗次數,優化方向(最小化或最大化),使用的后端,定義試驗名稱的函數,以及其他參數。這個方法用于尋找最佳的超參數組合,以使模型的性能達到最優 - func log
- func _prepare_input
- func _prepare_inputs
- func compute_loss_context_manager
- func autocast_smart_context_manager
4.1.9 一個訓練步驟的實現:前向后向傳播、計算損失
- training_step (第2661行-2660行)
一個訓練步驟的實現,它涵蓋了一個批量數據的前向和后向傳播
# `training_step`函數表示訓練過程中的一步操作,涵蓋了模型的前向和后向傳播def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:# 將模型設置為訓練模式,這對于某些層(如Dropout或BatchNorm)的行為有影響,因為它們在訓練和評估階段的行為是不同的model.train() # 調用一個輔助方法準備模型的輸入,具體的實現取決于模型的需求inputs = self._prepare_inputs(inputs) # 如果啟用了 SageMaker Model Parallelism,則使用 `smp_forward_backward` 在多個 GPU 上執行前向和后向操作# 然后減小損失,并將其從計算圖中分離if is_sagemaker_mp_enabled():loss_mb = smp_forward_backward(model, inputs, self.args.gradient_accumulation_steps)return loss_mb.reduce_mean().detach().to(self.args.device)# 計算損失值with self.compute_loss_context_manager():loss = self.compute_loss(model, inputs)# 如果使用的 GPU 數量大于 1,則對損失值取平均,以處理多 GPU 并行訓練if self.args.n_gpu > 1:loss = loss.mean() # mean() to average on multi-gpu parallel training# 根據是否進行梯度縮放,選擇不同的后向傳播方式if self.do_grad_scaling:self.scaler.scale(loss).backward() # 使用梯度縮放進行后向傳播,可以防止在混合精度訓練中出現梯度下溢elif self.use_apex:with amp.scale_loss(loss, self.optimizer) as scaled_loss: # 如果使用了APEX工具進行混合精度訓練,則需要對損失進行縮放后再進行后向傳播scaled_loss.backward()else:self.accelerator.backward(loss) # 使用加速器進行后向傳播,適用于沒有使用梯度縮放和APEX的情況# 返回損失值,如果設置了梯度累積步驟,則需要將損失值除以梯度累積步驟數return loss.detach() / self.args.gradient_accumulation_steps- compute_loss
計算損失
# `compute_loss`函數用于計算模型的損失值def compute_loss(self, model, inputs, return_outputs=False):# 如果存在標簽平滑處理器且輸入中有標簽,則將標簽從輸入中移除if self.label_smoother is not None and "labels" in inputs:labels = inputs.pop("labels")else:labels = None# 使用模型進行前向傳播,得到輸出outputs = model(**inputs)# 如果存在之前的狀態信息,保存它# TODO: 這部分需要在未來進行清理和優化if self.args.past_index >= 0:self._past = outputs[self.args.past_index]# 如果標簽存在,使用標簽平滑處理器計算損失if labels is not None:if unwrap_model(model)._get_name() in MODEL_FOR_CAUSAL_LM_MAPPING_NAMES.values():loss = self.label_smoother(outputs, labels, shift_labels=True)else:loss = self.label_smoother(outputs, labels)else:# 如果輸出是一個字典,但并未包含損失,那么拋出錯誤if isinstance(outputs, dict) and "loss" not in outputs:raise ValueError("The model did not return a loss from the inputs, only the following keys: "f"{','.join(outputs.keys())}. For reference, the inputs it received are {','.join(inputs.keys())}.")# 我們并未直接使用.outputs,因為模型可能返回的是元組,而非ModelOutputloss = outputs["loss"] if isinstance(outputs, dict) else outputs[0]# 如果`return_outputs`為真,返回損失和輸出;否則只返回損失return (loss, outputs) if return_outputs else loss- func is_local_process_zero
- func is_world_process_zero
- func save_model
此函數用于保存模型。如果給出了輸出目錄,則將在該目錄中保存模型,否則將在args.output_dir中保存模型。保存操作依賴于環境,例如,如果是在TPU上,將會調用`_save_tpu`。如果是在SageMaker多處理中,則會保存模型的狀態字典。另外,此函數也考慮了`ShardedDDPOption`的設置等。最后,如果設置了`args.push_to_hub`,那么在用戶調用`save_model`時,模型會被推送到Hub - func _save_tpu
在TPU上保存模型的專用函數 - func _save
保存模型的基本函數。這個函數不檢查進程是否為零,因為只有在進程為零的情況下才會調用此函數 - func store_flos
存儲進入模型的浮點運算數。如果模型在分布式模式下運行,該函數會將當前浮點運算數的總數加到`state.total_flos`上,然后將當前浮點運算數歸零。在非分布式模式下,也執行相同的操作,只是不需要分布式廣播浮點運算數 - func_sorted_checkpoints
返回排序后的檢查點列表。使用修改時間或檢查點編號進行排序,然后返回路徑列表。如果設置了最佳模型檢查點,那么確保我們不會刪除最佳模型 - func_rotate_checkpoints
- func evaluate
運行評估并返回指標。需要用戶提供計算指標的方法,因為它們是任務依賴的。你也可以重寫此方法以注入自定義行為。函數返回包含評估損失和可能從預測中計算出的指標的字典。該字典也包含來自訓練狀態的epoch編號 - func predict def predict(self, test_dataset: Dataset, ignore_keys: Optional[List[str]] = None, metric_key_prefix: str = "test" ) -> PredictionOutput:# 設置內存跟蹤器,盡早啟動self._memory_tracker.start()# 獲取測試數據集的數據加載器test_dataloader = self.get_test_dataloader(test_dataset)# 記錄開始時間start_time = time.time()# 選擇預測循環或評估循環,這取決于args中的use_legacy_prediction_loop參數eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop# 運行選定的循環,并獲得預測或評估輸出output = eval_loop(test_dataloader, description="Prediction", ignore_keys=ignore_keys, metric_key_prefix=metric_key_prefix)# 計算總批次大小,包括所有的并行處理單元total_batch_size = self.args.eval_batch_size * self.args.world_size# 如果度量指標中包含jit編譯時間,那么將這段時間加到開始時間中if f"{metric_key_prefix}_jit_compilation_time" in output.metrics:start_time += output.metrics[f"{metric_key_prefix}_jit_compilation_time"]# 更新度量指標,包括預測速度相關的指標output.metrics.update(speed_metrics(metric_key_prefix,start_time,num_samples=output.num_samples,num_steps=math.ceil(output.num_samples / total_batch_size),))# 使用回調處理器進行預測后的操作,并更新控制狀態self.control = self.callback_handler.on_predict(self.args, self.state, self.control, output.metrics)# 停止內存跟蹤器,并更新相關度量指標self._memory_tracker.stop_and_update_metrics(output.metrics)# 返回預測結果,包括預測值,標簽(如果存在)和度量指標return PredictionOutput(predictions=output.predictions, label_ids=output.label_ids, metrics=output.metrics)
- func evaluation_loop
- func_nested_gather
- func_pad_across_processes
- func prediction_step
- func floating_point_ops
- func init_git_repo
- func create_model_card
- func_push_from_checkpoint
- func push_to_hub
- func prediction_loop
- func_gather_and_numpify
- func_add_sm_patterns_to_gitignore
- func create_accelerator_and_postp
// 待更
參考文獻與推薦閱讀
附錄:創作/修改記錄
分詞代碼的實現:tokenization_chatglm.py
quantization:模型量化——減小模型大小和推理時間
把原有的“第五部分 基于LangChain + ChatGLM-6B的本地知識庫的應用實現”,獨立成文為:給LLM裝上知識:從LangChain+LLM的本地知識庫問答到LLM與知識圖譜的結合
總結
以上是生活随笔為你收集整理的从零实现Transformer的简易版与强大版:从300多行到3000多行的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CNN+DNN训练,过了100个epoc
- 下一篇: 中间件的地图发布数据源详解