【Pytorch神经网络实战案例】28 GitSet模型进行步态与身份识别(CASIA-B数据集)
生活随笔
收集整理的這篇文章主要介紹了
【Pytorch神经网络实战案例】28 GitSet模型进行步态与身份识别(CASIA-B数据集)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1 CASIA-B數據集
本例使用的是預處理后的CASIA-B數據集, 數據集下載網址如下。
? ? http://www.cbsr.ia.ac.cn/china/Gait%20Databases%20cH.asp
該數據集是一個大規模的、多視角的步態庫。其中包括124個人,每個人有11個視角(0,18,36,...,180),在3種行走條件(普通、穿大衣、攜帶包裹)下采集。
1.1 CASIA-B數據集的兩種形式
CASIA-B數據集有視頻和輪廓兩種形式。
1.1.1 案例講解
本例直接使用輪廓數據集進行訓練, 如圖2-13(a)所示。
在本例中, 對CASIA-B的輪廓數據集做二次處理, 將圖片中人物的頂端和底部背景去掉,方便模型的訓練預處理后的數據集如圖2-13(b)所示。
1.2 數據集的目錄結構
2 代碼實戰 GitSet模型進行步態與身份識別(CASIA-B數據集)
代碼總覽https://blog.csdn.net/qq_39237205/article/details/124199534
2.1 代碼實戰:定義函數,加載文件夾的文件名稱---GaitSet_DataLoader.py(第一部分)
import numpy as np # 引入基礎庫 import os import torch.utils.data as tordata from PIL import Image from tqdm import tqdm import random# 1.1定義函數,加載文件夾的文件名稱# load_data函數, 分為3個步驟:# def load_data(dataset_path,imgresize,label_train_num,label_shuffle): # 完成了整體數據集的封裝# 主要分為三個步驟# ①以人物作為標簽,將完整的數據集分為兩部分,分別用于訓練和測試。# ②分別根據訓練集和測試集中的人物標簽遍歷文件夾,獲得對應的圖片文件名稱。# ③用torch.utils.data接口將圖片文件名稱轉化為數據集, 使其能夠將圖片載入并返回。label_str = sorted(os.listdir(dataset_path)) # 以人物為標簽# 將不完整的樣本忽略,只載入完整樣本removelist = ['005','026','037','079','109','088','068','048'] # 對數據集中樣本不完整的人物標簽進行過濾,留下可用樣本。代碼中不完整的人物標簽可以通過調用load_dir函數來查找。for removename in removelist:if removename in label_str:label_str.remove(removename)print("label_str",label_str) # -start--------根據亂序標志來處理樣本標簽順序,并將其分為訓練集和測試集----label_index = np.arange(len(label_str)) # 序列數組if label_shuffle:np.random.seed(0)# 打亂數組順序label_shuffle_index = np.random.permutation( len(label_str) )train_list = label_shuffle_index[0:label_train_num]test_list = label_shuffle_index[label_train_num:]else:train_list = label_index[0:label_train_num]test_list = label_index[label_train_num:] # -end--------根據亂序標志來處理樣本標簽順序,并將其分為訓練集和測試集----print("train_list",test_list)# 加載人物列表中的圖片文件名稱data_seq_dir,data_label,meta_data = load_dir(dataset_path,train_list,label_str) # 代碼調用load_dir函數,將標簽列表所對應的圖片文件名稱載入。①test_data_seq_dir, test_data_label, test_meta_data = load_dir(dataset_path, test_list, label_str) # 代碼調用load_dir函數,將標簽列表所對應的圖片文件名稱載入。②# 將圖片文件名稱轉化為數據集train_source = DataSet(data_seq_dir, data_label, meta_data, imgresize,True) # 調用自定義類DataSet, 返回PyTorch支持的數據集對象,且只對訓練集進行緩存處理,測試集不做緩存處理。①# test數據不緩存test_source = DataSet(test_data_seq_dir, test_data_label, test_meta_data, imgresize, False) # 調用自定義類DataSet, 返回PyTorch支持的數據集對象,且只對訓練集進行緩存處理,測試集不做緩存處理。②return train_source,test_source2.2?代碼實戰:實現load_dir函數加載圖片文件名稱---GaitSet_DataLoader.py(第二部分)
# 1.2 實現load_dir函數加載圖片文件名稱, def load_dir(dataset_path,label_index,label_str):# 在load_dir函數中, 通過文件夾的逐級遍歷, 將標簽列表中每個人物的圖片文件名稱載入。# 該函數返回3個列表對象:圖片文件名稱、圖片文件名稱對應的標簽索引、圖片文件名稱對應的元數據(人物、行走條件、拍攝角度)data_seq_dir,data_label,meta_data = [],[],[]for i_label in label_index: # 獲取樣本個體label_path = os.path.join(dataset_path, label_str[i_label]) # 拼接目錄for _seq_type in sorted(os.listdir(label_path)): # 獲取樣本類型,普通條件、穿大衣、攜帶物品seq_type_path = os.path.join(label_path, _seq_type) # 拼接目錄for _view in sorted(os.listdir(seq_type_path)): # 獲取拍攝角度_seq_dir = os.path.join(seq_type_path, _view) # 拼接圖片目錄if len(os.listdir(_seq_dir)) > 0: # 有圖片data_seq_dir.append(_seq_dir) # 圖片目錄data_label.append(i_label) # 圖片目錄對應的標簽meta_data.append((label_str[i_label], _seq_type, _view))else:print("No files:", _seq_dir) # 輸出數據集中樣本不完整的標簽。# 當發現某個標簽文件夾中沒有圖片時會將該標簽輸出。在使用時,可以先用load_dir函數將整個數據集遍歷一遍, 并根據輸出樣本不完整的標簽,回填到第18行代碼。return data_seq_dir, data_label, meta_data # 返回結果2.3?實現定義數據類DataSet---GaitSet_DataLoader.py(第三部分)
# 1.3 實現定義數據類DataSet # PyTorch提供了一個torch.utils.data接口,可以用來對數據集進行封裝。 # 在實現時,只需要繼承torch.utils.data.Dataset類,并重載其__getitem__方法。 # 在使用時,框架會向getitem方法傳入索引index。在__getitem__方法內部,根據指定index加載數據。 class DataSet(tordata.DataLoader):def __init__(self,data_seq_dir,data_label,meta_data,imgresize,cache=True): # 初始化self.data_seq_dir = data_seq_dir # 存儲圖片文件名稱self.data = [None] * len(self.data_seq_dir) # 存放圖片self.cache = cache # 緩存標志self.meta_data = meta_data # 數據的元信息self.data_label = np.asarray(data_label) # 存放標簽self.imgresize = int(imgresize) # 載入的圖片大小self.cut_padding = int(float(imgresize)/64*10) # 指定圖片裁剪的大小def load_all_data(self): # 加載所有數據for i in tqdm(range(len(self.data_seq_dir))):self.__getitem__(i)def __loader__(self,path): # 讀取圖片并裁剪frame_imgs = self.img2xarray(path)/255.0# 將圖片橫軸方向的前10列與后10列去掉frame_imgs = frame_imgs[:,:,self.cut_padding:-self.cut_padding]return frame_imgsdef __getitem__(self, index): # 加載指定索引數據if self.data[index] is None: # 第一次加載data = self.__loader__(self.data_seq_dir[index])else:data = self.data[index]if self.cache : # 保存到緩存里self.data[index] = datareturn data,self.meta_data[index],self.data_label[index]def img2xarray(self,file_path): # 讀取指定路徑的數據frame_list = [] # 存放圖片數據imgs = sorted(list(os.listdir(file_path)))for _img in imgs : # 讀取圖片,放到數組里_img_path = os.path.join(file_path, _img)if os.path.isfile(_img_path):img = np.asarray(Image.open(_img_path).resize((self.imgresize, self.imgresize)))if len(img.shape) == 3: # 加載預處理后的圖片frame_list.append(img[..., 0])else:frame_list.append(img)return np.asarray(frame_list, dtype=np.float) # [幀數,高,寬]def __len__(self): # 計算數據集長度return len(self.data_seq_dir)2.4 代碼實戰:測試數據集---train.py(第一部分)
# 1.4 測試數據集 # 在完成數據集的制作之后,對其進行測試。 # 將樣本文件夾perdata放到當前目錄下,并編寫代碼生成數據集對象。 # 從數據集對象中取出一條數據,并顯示該數據的詳細內容。 from GaitSet_DataLoader import load_data # 加載項目模塊# 輸出當前CPU-GPU print("torch V",torch.__version__,"cuda V",torch.version.cuda) pathstr = './data/perdata/perdata' label_train_num = 10 # 訓練集的個數。剩下是測試集 batch_size = (3, 6) frame_num = 8 hidden_dim = 64 # label_train_num = 70 # 訓練數據集的個數,剩下的是測試數據庫dataconf = { # 方便導入參數'dataset_path':pathstr,'imgresize':'64','label_train_num':label_train_num,'label_shuffle':True, } print("加載訓練數據...") train_source,test_cource = load_data(**dataconf) # 一次全載入,經過load_data()分別生成訓練和測試數據集對象。 print("訓練數據集長度",len(train_source)) # label_num * type10* view11 # 顯示數據集里面的標簽 train_label_set = set(train_source.data_label) print("數據集里面的標簽:",train_label_set)dataimg,matedata,lebelimg = train_source.__getitem__(4) # 從數據集中獲取一條數據,并顯示其詳細信息。print("圖片樣本數據形狀:", dataimg.shape," 數據的元信息:", matedata," 數據標簽索引:",lebelimg)plt.imshow(dataimg[0]) # 顯示圖片 plt.axis('off') # 不顯示坐標軸 plt.show()def imshow(img):print("圖片形狀",np.shape(img))npimg = img.numpy()plt.axis('off')plt.imshow(np.transpose(npimg, (1, 2, 0)))plt.show()imshow(torchvision.utils.make_grid(torch.from_numpy(dataimg[-10:]).unsqueeze(1),nrow=10)) # 顯示十張圖片2.5 代碼實戰:實現自定義采集器---GaitSet_DataLoader.py(第四部分)
# 1.5 實現自定義采集器 # 步態識別模型需要通過三元損失進行訓練。三元損失可以輔助模型特征提取的取向,使相同標簽的特征距離更近,不同標簽的特征距離更遠。 # 由于三元損失需要輸入的批次數據中,要包含不同標簽(這樣才可以使用矩陣方式進行正/負樣本的采樣),需要額外對數據集進行處理。 # 這里使用自定義采樣器完成含有不同標簽數據的采樣功能。# torch.utils.data.sampler類需要配合torch.utils.data.Data Loader模塊一起使用。# torch.utils.data.DataLoader是PyTorch中的數據集處理接口。# 根據torch.utils.data.sampler類的采樣索引,在數據源中取出指定的數據,并放到collate_fn中進行二次處理,最終返回所需要的批次數據。# 實現自定義采樣器TripletSampler類,來從數據集中選取不同標簽的索引,并將其返回。# 再將兩個collate_fn函數collate_fn_for_train、collate_fn_for_test分別用于對訓練數據和測試數據的二次處理。class TripletSample(tordata.sampler.Sampler): # 繼承torch.utils.data.sampler類,實現自定義采樣器。# TripletSampler類的實現,在該類的初始化函數中,支持兩個參數傳入:數集與批次參數。其中批次參數包含兩個維度的批次大小,分別是標簽個數與樣本個數。def __init__(self,dataset,batch_size):self.dataset = dataset # 獲得數據集self.batch_size = batch_size # 獲得批次參數,形狀為(標簽個數,樣本個數)self.label_set = list(set(dataset.data_label)) # 標簽集合def __iter__(self): # 實現采樣器的取值過程:從數據集中隨機抽取指定個數的標簽,并在每個標簽中抽取指定個數的樣本,最終以生成器的形式返回。while(True):sample_indices = []# 隨機抽取指定個數的標簽label_list = random.sample(self.label_set,self.batch_size[0])# 在每個標簽中抽取指定個數的樣本for _label in label_list: # 按照標簽個數循環data_index = np.where(self.dataset.data_label == _label)[0]index = np.random.choice(data_index,self.batch_size[1],replace=False)sample_indices += index.tolist()yield np.asarray(sample_indices) # 以生成器的形式返回def __len__(self):return len(self.dataset) # 計算長度# 用于訓練數據的采樣器處理函數 def collate_fn_train(batch,frame_num):# collate_fn_train函數會對采樣器傳入的批次數據進行重組,并對每條數據按照指定幀數frame_num進行抽取。# 同時也要保證每條數據的帖數都大于等于幀數frame_num。如果幀數小于frame_num,則為其添加重復幀。batch_data, batch_label,batch_meta = [],[],[]batch_size = len(batch) #獲得數據條數for i in range(batch_size) : # 依次對每條數據進行處理batch_label.append(batch[i][2]) # 添加數據的標簽batch_meta.append(batch[i][1]) # 添加數據的元信息data = batch[i][0] # 獲取該數據的樣本信息if data.shape[0] < frame_num: # 如果幀數較少,則隨機加入幾個# 復制幀,用于幀數很少的情況multy = (frame_num - data.shape[0])//data.shape[0] + 1# 額外隨機加入的幀的個數choicenum = (frame_num - data.shape[0])%data.shape[0]choice_index = np.random(data.shape[0],choicenum,replace = False)choice_index = list(range(0,data.shape[9])) * multy + choice_index.tolist()else: # 隨機抽取指定個數的幀choice_index = np.random.choice(data.shape[0],frame_num,replace = False)batch_data.append(data[choice_index]) # 增加指定個數的幀數據# 重新組合合成用于訓練的樣本數據batch = [np.asarray(batch_data),batch_meta,batch_label]return batchdef collate_fn_for_test(batch,frame_num): # 用于測試數據的采樣器處理函數# collate_fn_for_test函數會對采樣器傳入的批次數據進行重組,并按照批次數據中最大幀數進行補0對齊。# 同時也要保證母條數據的幀數都大于等于幀數frame_num。如果幀數小于frame_num,則為其添加重復幀。batch_size = len(batch) # 獲得數據的條數batch_frames = np.zeros(batch_size,np.int)batch_data,batch_label,batch_meta = [],[],[]for i in range(batch_size): # 依次對每條數據進行處理batch_label.append(batch[i][2]) # 添加數據的標簽batch_meta.append(batch[i][1]) # 添加數據的元信息data = batch[i][0] # 獲取該數據的幀樣本信息if data.shape[0] < frame_num: # 如果幀數較少,隨機加入幾個print(batch_meta, data.shape[0])multy = (frame_num - data.shape[0]) // data.shape[0] + 1choicenum = (frame_num - data.shape[0]) % data.shape[0]choice_index = np.random.choice(data.shape[0], choicenum, replace=False)choice_index = list(range(0, data.shape[0])) * multy + choice_index.tolist()data = np.asarray(data[choice_index])batch_frames[i] = data.shape[0] # 保證所有的都大于等于frame_numbatch_data.append(data)max_frame = np.max(batch_frames) # 獲得最大的幀數# 對其他幀進行補0填充batch_data = np.asarray([np.pad(batch_data[i], ((0, max_frame - batch_data[i].shape[0]), (0, 0), (0, 0)),'constant', constant_values=0)for i in range(batch_size)])# 重新組合成用于訓練的樣本數據batch = [batch_data, batch_meta, batch_label]return batch2.6?測試采樣器---train.py(第二部分)
# 1.6 測試采樣器 from GaitSet_DataLoader import TripletSample,collate_fn_trainbatch_size = (4,8) # 定義批次(4個標簽,每個標簽8個數據) frame_num = 32 # 定義幀數 num_workers = torch.cuda.device_count() # 設置采樣器的線程數# 在設置數據加載器額外啟動進程的數量時,最好要與GPU數量匹配,即一個進程服務于一個GPU。如果額外啟動進程的數量遠遠大于GPU數量,則性能瓶頸主要會卡在GPU運行的地方,起不到提升效率的作用。 print("當前GPU數量:",num_workers) if num_workers <= 1 : # 如果只有一塊GPU,或者沒有GPU,則使用主線程處理num_workers = 0 print("數據加載器額外啟動進程的數量",num_workers)# 實例化采樣器:得到對象triplet_sampler。 triplet_sampler = TripletSample(train_source,batch_size) # 初始化采樣器的處理函數:用偏函數的方法對采樣器的處理函數進行初始化。 collate_train = partial(collate_fn_train,frame_num=frame_num)# 定義數據加載器:每次迭代,按照采樣器的索引在train_source中取出數據 # 將對象triplet_sampler和采樣器的處理函數collate_train傳入tordata.DataLoader,得到一個可用于訓練的數據加載器對象train_loader。 # 同時對數據加載器額外啟動進程的數量進行了設置,如果額外啟動進程的數量num_workers是0,則在加載數據時不額外啟動其他進程。 train_loader = tordata.DataLoader(dataset=train_source,batch_sampler=triplet_sampler, collate_fn=collate_train,num_workers=num_workers)# 從數據加載器中取出一條數據 batch_data,batch_meta,batch_label = next(iter(train_loader)) print("該批次數據的總長度:",len(batch_data)) # 輸出該數據的詳細信息 print("每條數據的形狀為",batch_data.shape) print(batch_label) # 輸出該數據的標簽2.7 代碼實戰:定義基礎卷積類---GaitSet.py(第一部分)
import torch import torch.nn as nn import torch.autograd as autograd import torch.nn.functional as F# 搭建GaitSet模型: 分為兩部分:基礎卷積(BasicConv2d) 類和GaitSetNet類。# 1.7 定義基礎卷積類:對原始卷積函數進行封裝。在卷積結束后,用Mish激活函數和批量正則化處理對特征進行二次處理。 class BasicConv2d(nn.Module):def __init__(self,in_channels,out_channels,kernel_size,**kwargs):super(BasicConv2d,self).__init__()self.conv = nn.Conv2d(in_channels,out_channels,kernel_size,bias=False,**kwargs) # 卷積操作self.BatchNorm = nn.BatchNorm2d(out_channels) # BN操作def forward(self,x): # 自定義前向傳播方法x = self.conv(x)x = x * ( torch.tanh(F.softplus(x))) # 實現Mish激活函數:PyTorch沒有現成的Mish激活函數,手動實現Mish激活函數,并對其進行調用。return self.BatchNorm(x) # 返回卷積結果2.8 代碼實戰:定義GaitSetNet類---GaitSet.py(第二部分)
# 1.8 定義GaitSetNet類: # ①實現3個MGP。 # ②對MGP的結果進行HPM處理。每層MGP的結構是由兩個卷積層加一次下采樣組成的。在主分支下采樣之后,與輔助分支所提取的幀級特征加和,傳入下一個MGP中。 class GaitSetNet(nn.Module):def __init__(self, hidden_dim, frame_num):super(GaitSetNet, self).__init__()self.hidden_dim = hidden_dim # 輸出的特征維度# 定義MGP部分cnls = [1, 32, 64, 128] # 定義卷積層通道數量self.set_layer1 = BasicConv2d(cnls[0], cnls[1], 5, padding=2)self.set_layer2 = BasicConv2d(cnls[1], cnls[1], 3, padding=1)self.set_layer1_down = BasicConv2d(cnls[1], cnls[1], 2, stride=2) # 下采樣操作,通過步長為2的2x2卷積實現。self.set_layer3 = BasicConv2d(cnls[1], cnls[2], 3, padding=1)self.set_layer4 = BasicConv2d(cnls[2], cnls[2], 3, padding=1)self.set_layer2_down = BasicConv2d(cnls[2], cnls[2], 2, stride=2)# 下采樣操作,通過步長為2的2x2卷積實現。self.gl_layer2_down = BasicConv2d(cnls[2], cnls[2], 2, stride=2)# 下采樣操作,通過步長為2的2x2卷積實現。self.set_layer5 = BasicConv2d(cnls[2], cnls[3], 3, padding=1)self.set_layer6 = BasicConv2d(cnls[3], cnls[3], 3, padding=1)self.gl_layer1 = BasicConv2d(cnls[1], cnls[2], 3, padding=1)self.gl_layer2 = BasicConv2d(cnls[2], cnls[2], 3, padding=1)self.gl_layer3 = BasicConv2d(cnls[2], cnls[3], 3, padding=1)self.gl_layer4 = BasicConv2d(cnls[3], cnls[3], 3, padding=1)self.bin_num = [1, 2, 4, 8, 16] # 定義MGP部分self.fc_bin = nn.ParameterList([nn.Parameter(nn.init.xavier_uniform_(torch.zeros(sum(self.bin_num) * 2, 128, hidden_dim)))])def frame_max(self, x, n): # 用最大特征方法提取幀級特征:# 調用torch.max函數,實現從形狀[批次個數,幀數,通道數,高度,寬度]的特征中,沿著幀維度,提取最大值,得到形狀[批次個數,通道數,高度,寬度]的特征提取幀級特征的過程。return torch.max(x.view(n, -1, x.shape[1], x.shape[2], x.shape[3]), 1)[0] # 取max后的值def forward(self, xinput): # 定義前向處理方法n = xinput.size()[0] # 形狀為[批次個數,幀數,高,寬]x = xinput.reshape(-1, 1, xinput.shape[-2], xinput.shape[-1])del xinput # 刪除不用的變量# MGP 第一層x = self.set_layer1(x)x = self.set_layer2(x)x = self.set_layer1_down(x)gl = self.gl_layer1(self.frame_max(x, n)) # 將每一層的幀取最大值# MGP 第二層gl = self.gl_layer2(gl)gl = self.gl_layer2_down(gl)x = self.set_layer3(x)x = self.set_layer4(x)x = self.set_layer2_down(x)# MGP 第三層gl = self.gl_layer3(gl + self.frame_max(x, n))gl = self.gl_layer4(gl)x = self.set_layer5(x)x = self.set_layer6(x)x = self.frame_max(x, n)gl = gl + x# srart-------HPM處理:按照定義的特征尺度self.bin_num,將輸入特征分成不同尺度,并對每個尺度的特征進行均值和最大化計算,從而組合成新的特征,放到列表feature中。feature = list() # 用于存放HPM特征n, c, h, w = gl.size()for num_bin in self.bin_num:z = x.view(n, c, num_bin, -1)z = z.mean(3) + z.max(3)[0]feature.append(z)z = gl.view(n, c, num_bin, -1)z = z.mean(3) + z.max(3)[0]feature.append(z)# end-------HPM處理:按照定義的特征尺度self.bin_num,將輸入特征分成不同尺度,并對每個尺度的特征進行均值和最大化計算,從而組合成新的特征,放到列表feature中。# 對HPM特征中的特征維度進行轉化# srart-------將每個特征維度由128轉化為指定的輸出的特征維度hidden_dim。因為輸入數據是三維的,無法直接使用全連接API,所以使用矩陣相乘的方式實現三維數據按照最后一個維度進行全連接的效果。feature = torch.cat(feature, 2).permute(2, 0, 1).contiguous() # 62 n cfeature = feature.matmul(self.fc_bin[0])feature = feature.permute(1, 0, 2).contiguous()# end-------將每個特征維度由128轉化為指定的輸出的特征維度hidden_dim。因為輸入數據是三維的,無法直接使用全連接API,所以使用矩陣相乘的方式實現三維數據按照最后一個維度進行全連接的效果。return feature # 返回結果2.8.1 提取幀級特征的過程
2.9?自定義三元損失類---GaitSet.py(第三部分)
# 1.9 實現 自定義三元損失類 # 定義三元損失(TripletLoss)類, 實現三元損失的計算。具體步驟: # ①對輸入樣本中的標簽進行每兩個一組自由組合,生成標簽矩陣,從標簽矩陣中得到正/負樣本對的掩碼 # ②對輸入樣本中的特征進行每兩個一組自由組合,生成特征矩陣,計算出特征矩陣的歐氏距離。 # ③按照正/負樣本對的掩碼,對帶有距離的特征矩陣進行提取,得到正/負兩種標簽的距離。 # ④將正/負兩種標簽的距離相減,再減去間隔值,得到三元損失。 class TripletLoss(nn.Module): # 定義三元損失類def __init__(self,batch_size,hard_or_full,margin): # 初始化super(TripletLoss, self).__init__()self.batch_size = batch_sizeself.margin =margin # 正/負樣本的三元損失間隔self.hard_or_full = hard_or_full # 三元損失方式def forward(self,feature,label): # 定義前向傳播方法:# 接收的參數feature為模型根據輸入樣本所計算出來的特征。該參數的形狀為[n.m.d],n:HPM處理時的尺度個數62。m:樣本個數32。d:維度256。# 在計算過程中,將三元損失看作n份,用矩陣的方式對每份m個樣本、d維度特征做三元損失計算,最后將這n份平均。n,m,d = feature.size() # 形狀為[n,m,d]# 生成標簽矩陣,并從中找出正/負樣本對的編碼,輸出形狀[n,m,m]并且展開hp_mask = (label.unsqueeze(1) == label.unsqueeze(2)).view(-1)hn_mask = (label.unsqueeze(1) != label.unsqueeze(2)).view(-1)dist = self.batch_dist(feature) # 計算出特征矩陣的距離mean_dist = dist.mean(1).mean(1) # 計算所有的平均距離dist = dist.view(-1)# start-----計算三元損失的hard模式hard_hp_dist = torch.max(torch.masked_select(dist, hp_mask).view(n, m, -1), 2)[0]hard_hn_dist = torch.min(torch.masked_select(dist, hn_mask).view(n, m, -1), 2)[0]hard_loss_metric = F.relu(self.margin + hard_hp_dist - hard_hn_dist).view(n, -1) # 要讓間隔最小化,到0為止# 對三元損失取均值,得到最終的hard模式loss[n]hard_loss_metric_mean = torch.mean(hard_loss_metric,1)# end-----計算三元損失的hard模式# start-----計算三元損失的full模式# 計算三元損失的full模型full_hp_dist = torch.masked_select(dist, hp_mask).view(n, m, -1, 1) # 按照編碼得到所有正向樣本距離[n,m,正樣本個數,1] [62, 32, 8, 1]full_hn_dist = torch.masked_select(dist, hn_mask).view(n, m, 1, -1) # 照編碼得到所有負向樣本距離[n,m,1,負樣本個數] [62, 32, 1, 24]full_loss_metric = F.relu(self.margin + full_hp_dist - full_hn_dist).view(n, -1) # 讓正/負間隔最小化,到0為止 [62,32*8*24]# 計算[n]中每個三元損失的和full_loss_metric_sum = full_loss_metric.sum(1) # 計算[62]中每個loss的和# 計算[n]中每個三元損失的個數(去掉矩陣對角線以及符合條件的三元損失)full_loss_num = (full_loss_metric != 0).sum(1).float() # 計算[62]中每個loss的個數# 計算均值full_loss_metric_mean = full_loss_metric_sum / full_loss_num # 計算平均值full_loss_metric_mean[full_loss_num == 0] = 0 # 將無效值設為0# end-----計算三元損失的full模式return full_loss_metric_mean, hard_loss_metric_mean, mean_dist, full_loss_num # ,lossdef batch_dist(self, x): # 計算特征矩陣的距離x2 = torch.sum(x ** 2, 2) # 平方和 [62, 32]# dist [62, 32, 32]dist = x2.unsqueeze(2) + x2.unsqueeze(2).transpose(1, 2) - 2 * torch.matmul(x, x.transpose(1, 2)) # 計算特征矩陣的距離dist = torch.sqrt(F.relu(dist)) # 對結果進行開平方return distdef ts2var(x):return autograd.Variable(x).cuda()def np2var(x):return ts2var(torch.from_numpy(x))2.10 代碼實戰:訓練模型并保存權重文件---train.py(第三部分)
# 1.10 訓練模型并保存權重文件:實例化模型類,并遍歷數據加載器,進行訓練。 from GaitSet import GaitSetNet, TripletLoss, np2varhidden_dim = 256 # 定義樣本的輸出維度 encoder = GaitSetNet(hidden_dim, frame_num).float() encoder = nn.DataParallel(encoder) # 使用多卡并行訓練 encoder.cuda() # 將模型轉儲到GPU encoder.train() # 設置模型為訓練模型optimizer = Ranger(encoder.parameters(), lr=0.004) # 定義Ranger優化器TripletLossmode = 'full' # 設置三元損失的模式 triplet_loss = TripletLoss(int(np.prod(batch_size)), TripletLossmode, margin=0.2) # 實例化三元損失 triplet_loss = nn.DataParallel(triplet_loss) # 使用多卡并行訓練 triplet_loss.cuda() # 將模型轉儲到GPUckp = 'checkpoint' # 設置模型名稱 os.makedirs(ckp, exist_ok=True) save_name = '_'.join(map(str, [hidden_dim, int(np.prod(batch_size)),frame_num, 'full']))ckpfiles = sorted(os.listdir(ckp)) # 載入預訓練模型 if len(ckpfiles) > 1:modecpk = os.path.join(ckp, ckpfiles[-2])optcpk = os.path.join(ckp, ckpfiles[-1])encoder.module.load_state_dict(torch.load(modecpk)) # 加載模型文件optimizer.load_state_dict(torch.load(optcpk))print("load cpk !!! ", modecpk) # 定義訓練參數 hard_loss_metric = [] full_loss_metric = [] full_loss_num = [] dist_list = [] mean_dist = 0.01 restore_iter = 0 total_iter = 1000 # 迭代次數 lastloss = 65535 # 初始的損失值 trainloss = []_time1 = datetime.now() # 計算迭代時間 for batch_data, batch_meta, batch_label in train_loader:restore_iter += 1optimizer.zero_grad() # 梯度清零batch_data = np2var(batch_data).float() # torch.cuda.DoubleTensor變為torch.cuda.FloatTensorfeature = encoder(batch_data) # 將標簽轉為張量# 將標簽轉化為張量target_label = np2var(np.array(batch_label)).long() # len=32triplet_feature = feature.permute(1, 0, 2).contiguous() # 對特征結果進行變形,形狀變為[62, 32, 256]triplet_label = target_label.unsqueeze(0).repeat(triplet_feature.size(0), 1) # 復制12份標簽,[62, 32]# 計算三元損失(full_loss_metric_, hard_loss_metric_, mean_dist_, full_loss_num_) = triplet_loss(triplet_feature, triplet_label)if triplet_loss.module.hard_or_full == 'full': #提取損失值loss = full_loss_metric_.mean()else:loss = hard_loss_metric_.mean()trainloss.append(loss.data.cpu().numpy()) # 保存損失值hard_loss_metric.append(hard_loss_metric_.mean().data.cpu().numpy())full_loss_metric.append(full_loss_metric_.mean().data.cpu().numpy())full_loss_num.append(full_loss_num_.mean().data.cpu().numpy())dist_list.append(mean_dist_.mean().data.cpu().numpy())if loss > 1e-9: # 若損失值過小,則不參加反向傳播loss.backward()optimizer.step()else:print("損失值過小:", loss)if restore_iter % 1000 == 0:print("restore_iter 1000 time:", datetime.now() - _time1)_time1 = datetime.now()if restore_iter % 100 == 0: # 輸出訓練結果print('iter {}:'.format(restore_iter), end='')print(', hard_loss_metric={0:.8f}'.format(np.mean(hard_loss_metric)), end='')print(', full_loss_metric={0:.8f}'.format(np.mean(full_loss_metric)), end='')print(', full_loss_num={0:.8f}'.format(np.mean(full_loss_num)), end='')print(', mean_dist={0:.8f}'.format(np.mean(dist_list)), end='')print(', lr=%f' % optimizer.param_groups[0]['lr'], end='')print(', hard or full=%r' % TripletLossmode)if lastloss > np.mean(trainloss): # 保存模型print("lastloss:", lastloss, " loss:", np.mean(trainloss), "need save!")lastloss = np.mean(trainloss)modecpk = os.path.join(ckp,'{}-{:0>5}-encoder.pt'.format(save_name, restore_iter))optcpk = os.path.join(ckp,'{}-{:0>5}-optimizer.pt'.format(save_name, restore_iter))torch.save(encoder.module.state_dict(), modecpk) # 一定要用encoder對象的module中的參數進行保存。否則模型數的名字中會含有“module”字符串,使其不能被非并行的模型載入。torch.save(optimizer.state_dict(), optcpk)else:print("lastloss:", lastloss, " loss:", np.mean(trainloss), "don't save")print("__________________")sys.stdout.flush()hard_loss_metric.clear()full_loss_metric.clear()full_loss_num.clear()dist_list.clear()trainloss.clear()if restore_iter == total_iter: # 如果滿足迭代次數,則訓練結束break2.11 代碼實戰:測試模型---GaitSet_test.py(全)
import os import numpy as np from datetime import datetime from functools import partial from tqdm import tqdm import torch.nn as nn import torch.nn.functional as F import torch import torch.utils.data as tordata from GaitSet_DataLoader import load_data,collate_fn_for_test from GaitSet import GaitSetNet,np2var # 為了測試模型識別步態的效果不依賴于拍攝角度和行走條件,可以多角度識別人物步分別取3組行走條件(普通、穿大衣、攜帶包裹)的樣本輸入模型,查看該模型所計算出的生征與其他行走條件的匹配程度。 # 1.11 測試模型 print("torch v:",torch.__version__,"cuda v:",torch.version.cuda)pathstr = './data/perdata/perdata' label_train_num = 70 # 訓練數據集的個數,剩下是測試數據集 batch_size = (8,16) frame_num = 30 hidden_dim = 256# 設置處理流程 num_workers = torch.cuda.device_count() print("cuda.device_count",num_workers) if num_workers <= 1: # 僅有一塊GPU或沒有GPU,則使用CPUnum_workers = 0 print("num_workers",num_workers)dataconf = { # 初始化數據集參數'dataset_path':pathstr,'imgresize':'64','label_train_num':label_train_num, # 訓練數據集的個數,剩下的是測試數據集'label_shuffle':True, } train_source,test_source = load_data(**dataconf)sampler_batch_size = 4 # 定義采樣批次 # 初始化采樣數據的二次處理函數 collate_train = partial(collate_fn_for_test,frame_num=frame_num) # 定義數據加載器:每次迭代,按照采樣器的索引在test_source中取出數據 test_loader = tordata.DataLoader(dataset=test_source,batch_size=sampler_batch_size,sampler=tordata.sampler.SequentialSampler(test_source),collate_fn=collate_train,num_workers=num_workers)# 實例化模型 encoder = GaitSetNet(hidden_dim,frame_num).float() encoder = nn.DataParallel(encoder) encoder.cuda() encoder.eval()ckp = './checkpoint' # 設置模型文件路徑 save_name = '_'.join(map(str,[hidden_dim,int(np.prod( batch_size )),frame_num,'full'])) ckpfiles = sorted(os.listdir(ckp)) # 加載模型 print("ckpfiles::::",ckpfiles) if len(ckpfiles) > 1:# modecpk = ckp + '/'+ckpfiles[-1]modecpk = os.path.join(ckp,ckpfiles[-1])encoder.module.load_state_dict(torch.load(modecpk), False) # 加載模型文件print("load cpk !!! ", modecpk) else:print("No cpk!!!")def cuda_dist(x,y): # 計算距離x = torch.from_numpy(x).cuda()y = torch.from_numpy(y).cuda()dist = torch.sum(x ** 2, 1).unsqueeze(1) + torch.sum(y ** 2, 1).unsqueeze(1).transpose(0, 1) - 2 * torch.matmul(x, y.transpose(0, 1))dist = torch.sqrt(F.relu(dist))return distdef de_diag(acc,each_angle=False): # 計算多角度準確率,計算與其他拍攝角度相關的準確率result = np.sum(acc - np.diag(np.diag(acc)), 1) / 10.0if not each_angle:result = np.mean(result)return resultdef evaluation(data): # 評估模型函數feature, meta, label = dataview, seq_type = [], []for i in meta:view.append(i[2])seq_type.append(i[1])label = np.array(label)view_list = list(set(view))view_list.sort()view_num = len(view_list)probe_seq = [['nm-05', 'nm-06'], ['bg-01', 'bg-02'], ['cl-01', 'cl-02']] # 定義采集數據的行走條件gallery_seq = [['nm-01', 'nm-02', 'nm-03', 'nm-04']] # 定義比較數據的行走條件num_rank = 5 # 取前5個距離最近的數據acc = np.zeros([len(probe_seq), view_num, view_num, num_rank])for (p, probe_s) in enumerate(probe_seq): # 依次將采集的數據與比較數據相比for gallery_s in gallery_seq:# Start---獲取指定條件的樣本特征后,按照采集數據特征與比較數據特之間的距離大小匹配對應的標簽,并計算其準確率。# 步驟如下:# ①計算采集數據特征與比較數據特征之間的距離。# ②對距離進行排序,返回最小的前5個排序索引。# ③按照索引從比較數據中取出前5個標簽,并與采集數據中的標簽做比較。# ④將比較結果的正確數量累加起來,使每個樣本對應5個記錄,分別代表前5個果中的識別正確個數。如[True,True,True,False,False],# 累加后結果為[1,2,3,3,3],表明離采集數據最近的前3個樣本特征中識別出來3個正確結果,前5個樣本特征中識別出來3個正確結果。# ⑤將累加結果與0比較,并判斷每個排名中大于0的個數。# ⑥將排名1-5的識別正確個數分別除以采集樣本個數,再乘以100,便得到每個排名的準確率for (v1, probe_view) in enumerate(view_list):for (v2, gallery_view) in enumerate(view_list): # 遍歷所有視角gseq_mask = np.isin(seq_type, gallery_s) & np.isin(view, [gallery_view])gallery_x = feature[gseq_mask, :] # 取出樣本特征gallery_y = label[gseq_mask] # 取出標簽pseq_mask = np.isin(seq_type, probe_s) & np.isin(view, [probe_view])probe_x = feature[pseq_mask, :] # 取出樣本特征probe_y = label[pseq_mask] # 取出標簽if len(probe_x) > 0 and len(gallery_x) > 0:dist = cuda_dist(probe_x, gallery_x) # 計算特征之間的距離idx = dist.sort(1)[1].cpu().numpy() # 對距離按照由小到大排序,返回排序后的索引(【0】是排序后的值)# 分別計算前五個結果的精確率:步驟③~⑥rank_data = np.round(np.sum(np.cumsum(np.reshape(probe_y,[-1,1]) == gallery_y[idx[:,0:num_rank]],1)>0,0)*100/dist.shape[0],2)# End---獲取指定條件的樣本特征后,按照采集數據特征與比較數據特之間的距離大小匹配對應的標簽,并計算其準確率。acc[p, v1, v2, 0:len(rank_data)] = rank_datareturn accprint('test_loader', len(test_loader)) time = datetime.now() print('開始評估模型...') feature_list = list() view_list = list() seq_type_list = list() label_list = list() batch_meta_list = []# 在遍歷數據集前加入了withtorch.nograd()語句。該語句可以使模型在運行時,不額外創建梯度相關的內存。 # 在顯存不足的情況下,使用withtorch.nogradO語句非常重要,它可以節省系統資源。 # 雖然在實例化模型時,使用了模型的eval方法來設置模型的使用方式,但這僅注意是修改模型中具有狀態分支的處理流程(如dropout或BN等),并不會省去創建顯存存放梯度的開銷。 with torch.no_grad():for i, x in tqdm(enumerate(test_loader)): # 遍歷數據集batch_data, batch_meta, batch_label = xbatch_data = np2var(batch_data).float() # [2, 212, 64, 44]feature = encoder(batch_data) # 將數據載入模型 [4, 62, 64]feature_list.append(feature.view(feature.shape[0], -1).data.cpu().numpy()) # 保存特征結果,共sampler_batch_size 個特征batch_meta_list += batch_metalabel_list += batch_label # 保存樣本標簽# 將樣本特征、標簽以及對應的元信息組合起來 test = (np.concatenate(feature_list, 0), batch_meta_list, label_list) acc = evaluation(test) # 對組合數據進行評估 print('評估完成. 耗時:', datetime.now() - time)for i in range(1): # 計算第一個的精確率print('===Rank-%d 準確率===' % (i + 1))print('攜帶包裹: %.3f,\t普通: %.3f,\t穿大衣: %.3f' % (np.mean(acc[0, :, :, i]),np.mean(acc[1, :, :, i]),np.mean(acc[2, :, :, i])))for i in range(1): # 計算第一個的精確率(除去自身的行走條件)print('===Rank-%d 準確率(除去自身的行走條件)===' % (i + 1))print('攜帶包裹: %.3f,\t普通: %.3f,\t穿大衣: %.3f' % (de_diag(acc[0, :, :, i]),de_diag(acc[1, :, :, i]),de_diag(acc[2, :, :, i])))np.set_printoptions(precision=2, floatmode='fixed') # 設置輸出精度 for i in range(1): # 顯示多拍攝角度的詳細結果print('===Rank-%d 的每個角度準確率 (除去自身的行走條件)===' % (i + 1))print('攜帶包裹:', de_diag(acc[0, :, :, i], True))print('普通:', de_diag(acc[1, :, :, i], True))print('穿大衣:', de_diag(acc[2, :, :, i], True))總結
以上是生活随笔為你收集整理的【Pytorch神经网络实战案例】28 GitSet模型进行步态与身份识别(CASIA-B数据集)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Pytorch神经网络实战案例】18
- 下一篇: python tkinter 输入数字