【语义分割专栏】:FCN实战篇(附上完整可运行的代码pytorch)
- 前言
- FCN全流程代碼
- 模型搭建(model)
- 數據處理(dataloader)
- 評價指標(metric)
- 訓練流程(train)
- 模型測試(test)
- 效果圖
- 結語
前言
FCN原理篇講解:【語義分割專欄】:FCN原理篇 - carpell - 博客園
代碼地址,下載可復現:fouen6/FCN_semantic-segmentation
本篇文章收錄于語義分割專欄,如果對語義分割領域感興趣的,可以去看看專欄,會對經典的模型以及代碼進行詳細的講解哦!其中會包含可復現的代碼!
上篇文章已經帶大家學習過了FCN的原理,相信大家對于原理應該有了比較深的了解。本文將會帶大家去手動復現屬于自己的一個語義分割模型。將會深入代碼進行講解,如果有講錯的地方歡迎大家批評指正!
其實所有的深度學習模型的搭建我認為可以總結成五部分:模型的構建,數據集的處理,評價指標的設定,訓練流程,測試。其實感覺有點深度學習代碼八股文的那種意思。本篇同樣的也會按照這樣的方式進行講解,希望大家能夠深入代碼去進行了解學習。
請記?。?strong>只懂原理不懂代碼,你就算有了很好的想法創新點,你也難以去實現,所以希望大家能夠深入去了解,最好能夠參考著本文自己復現一下。
FCN全流程代碼
模型搭建(model)
我們這里根據原論文一樣采用VGG作為我們的特征提取網絡,如果你對VGG網絡還不太了解的話,可以先去看看我對VGG網絡的講解。
我們都知道VGG采用了一些重復的結構,所以我們根據maxpool出現的位置將其劃分為5個stage。這樣我們可以同時用不同深度的VGG的網絡,VGG11到VGG19都可以使用,因為其結構是一樣的。
backbone = get_backbone(backbone=backbone, pretrained=True)
features = list(backbone.features.children())
pool_indices = [i + 1 for i, layer in enumerate(features) if isinstance(layer, nn.MaxPool2d)]
pool_indices = [0] + pool_indices + [len(features)]
# 劃分階段
self.stage1 = nn.Sequential(*features[pool_indices[0]:pool_indices[1]])
self.stage2 = nn.Sequential(*features[pool_indices[1]:pool_indices[2]])
self.stage3 = nn.Sequential(*features[pool_indices[2]:pool_indices[3]])
self.stage4 = nn.Sequential(*features[pool_indices[3]:pool_indices[4]])
self.stage5 = nn.Sequential(*features[pool_indices[4]:pool_indices[5]])
然后一個非常重要的,我們采用我們的雙線性插值來初始化我們的反卷積,使用雙線性插值來初始化,可以在訓練初期保證模型有一個比較好的輸出然后在通過訓練調整。
def _make_bilinear_weights(size,num_channels):
factor = (size+1)//2
if size % 2 == 1:
center = factor - 1
else:
center = factor - 0.5
og = torch.FloatTensor(size, size)
for i in range(size):
for j in range(size):
og[i, j] = (1-abs((i-center)/factor)) * (1-abs((j-center)/factor))
filter = torch.zeros(num_channels,num_channels,size,size)
for i in range(num_channels):
filter[i,i] = og
return filter
最后我們先搭建我們的FCN32s模型,首先我們加載預訓練的VGG16,當然別的VGG也行,你自己選擇就行了。我們只需要其全連接層前的所有層,劃分不同的stage,然后構建fcn檢測頭,先7x7的卷積,等效于對7×7感受野做全連接,輸出4096通道,然后再1x1的卷積,提取語義特征,最后在1x1卷積輸出每個空間位置上的類別分布(如21類)。FCN32s是直接從最后的層進行32倍上采樣,當然了這樣的結果就比較粗糙了。所以效果不會太好。
這里有個細節哈,x = x[:, :, :input_size[0], :input_size[1]],我們裁剪了保證初始大小,因為上采樣過程中可能會造成圖像的尺度超出一點點的,比如上采樣后應該是224,然后最后是225,所以裁剪保證與初始大小相同。
class FCN32s(nn.Module):
def __init__(self,num_classes = 21,backbone='vgg16'):
super(FCN32s, self).__init__()
self.num_classes = num_classes
backbone = get_backbone(backbone=backbone, pretrained=True)
features = list(backbone.features.children())
pool_indices = [i + 1 for i, layer in enumerate(features) if isinstance(layer, nn.MaxPool2d)]
pool_indices = [0] + pool_indices + [len(features)]
# 劃分階段
self.stage1 = nn.Sequential(*features[pool_indices[0]:pool_indices[1]])
self.stage2 = nn.Sequential(*features[pool_indices[1]:pool_indices[2]])
self.stage3 = nn.Sequential(*features[pool_indices[2]:pool_indices[3]])
self.stage4 = nn.Sequential(*features[pool_indices[3]:pool_indices[4]])
self.stage5 = nn.Sequential(*features[pool_indices[4]:pool_indices[5]])
self.fcn_head = nn.Sequential(
nn.Conv2d(512, 4096, kernel_size=7,padding=3),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Conv2d(4096,4096,kernel_size=1),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Conv2d(4096, self.num_classes, kernel_size=1),
)
self.upsample32 = nn.ConvTranspose2d(self.num_classes,self.num_classes,kernel_size = 64,stride = 32,padding = 16,bias = False)
for m in self.modules():
if isinstance(m, nn.ConvTranspose2d):
m.weight.data.zero_()
m.weight.data = _make_bilinear_weights(m.kernel_size[0], m.out_channels)
def forward(self, x):
input_size = x.size()[2:]
x = self.stage1(x)
x = self.stage2(x)
x = self.stage3(x)
x = self.stage4(x)
x = self.stage5(x)
x = self.fcn_head(x)
x = self.upsample32(x)
x = x[:, :, :input_size[0], :input_size[1]]
return x
然后就是FCN16s了,我們通過將stage4的輸出作為我們的pool4,同時我們將pool4經過卷積輸出通道到變為類別分數,到時候方便跟最終的輸出做跳躍連接。經過fcn_head輸出的x先上采樣兩倍到跟pool4相同的shape,然后兩者做跳躍連接相加后再上采樣16倍到輸入圖像的shape大小。
class FCN16s(nn.Module):
def __init__(self,num_classes = 21,backbone='vgg16'):
super(FCN16s, self).__init__()
self.num_classes = num_classes
backbone = get_backbone(backbone=backbone, pretrained=True)
features = list(backbone.features.children())
pool_indices = [i + 1 for i, layer in enumerate(features) if isinstance(layer, nn.MaxPool2d)]
pool_indices = [0] + pool_indices + [len(features)]
# 劃分階段
self.stage1 = nn.Sequential(*features[pool_indices[0]:pool_indices[1]])
self.stage2 = nn.Sequential(*features[pool_indices[1]:pool_indices[2]])
self.stage3 = nn.Sequential(*features[pool_indices[2]:pool_indices[3]])
self.stage4 = nn.Sequential(*features[pool_indices[3]:pool_indices[4]])
self.stage5 = nn.Sequential(*features[pool_indices[4]:pool_indices[5]])
self.fcn_head = nn.Sequential(
nn.Conv2d(512, 4096, kernel_size=7,padding=3),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Conv2d(4096,4096,kernel_size=1),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Conv2d(4096, self.num_classes, kernel_size=1),
)
self.pool4_score = nn.Conv2d(512,self.num_classes, kernel_size=1)
self.upsample2 = nn.ConvTranspose2d(self.num_classes,self.num_classes,kernel_size = 4,stride = 2,padding = 1,
bias = False)
self.upsample16 = nn.ConvTranspose2d(self.num_classes, self.num_classes, kernel_size=32, stride=16, padding=8,
bias=False)
for m in self.modules():
if isinstance(m, nn.ConvTranspose2d):
m.weight.data.zero_()
m.weight.data = _make_bilinear_weights(m.kernel_size[0], m.out_channels)
def forward(self, x):
input_size = x.size()[2:]
x = self.stage1(x)
x = self.stage2(x)
x = self.stage3(x)
x = self.stage4(x)
pool4 = x
x = self.stage5(pool4)
x = self.fcn_head(x)
x = self.upsample2(x)
pool4_score = self.pool4_score(pool4)
pool4_score = pool4_score[:, :, :x.size()[2], :x.size()[3]]
x = x + pool4_score
x = self.upsample16(x)
x = x[:, :, :input_size[0], :input_size[1]]
return x
然后就是FCN8s了,我們通過將stage3和stage4的輸出作為我們的pool3和pool4,同時我們將分別將pool3和pool4經過卷積輸出將通道到變為類別分數,到時候方便跟最終的輸出做跳躍連接。經過fcn_head輸出的x先上采樣兩倍到跟pool4相同的shape,然后兩者做跳躍連接相加后再上采樣2倍到pool3的shape大小。再與pool3做跳躍連接相加,上采樣8倍數到輸出圖像的shape大小。
class FCN8s(nn.Module):
def __init__(self,num_classes = 21,backbone='vgg16'):
super(FCN8s, self).__init__()
self.num_classes = num_classes
backbone = get_backbone(backbone=backbone,pretrained=True)
features = list(backbone.features.children())
pool_indices = [i +1 for i, layer in enumerate(features) if isinstance(layer, nn.MaxPool2d)]
pool_indices = [0] + pool_indices + [len(features)]
# 劃分階段
self.stage1 = nn.Sequential(*features[pool_indices[0]:pool_indices[1]])
self.stage2 = nn.Sequential(*features[pool_indices[1]:pool_indices[2]])
self.stage3 = nn.Sequential(*features[pool_indices[2]:pool_indices[3]])
self.stage4 = nn.Sequential(*features[pool_indices[3]:pool_indices[4]])
self.stage5 = nn.Sequential(*features[pool_indices[4]:pool_indices[5]])
self.fcn_head = nn.Sequential(
nn.Conv2d(512, 4096, kernel_size=7,padding=3),
nn.ReLU(inplace=True),
nn.Dropout2d(),
nn.Conv2d(4096,4096,kernel_size=1),
nn.ReLU(inplace=True),
nn.Dropout2d(),
nn.Conv2d(4096, self.num_classes, kernel_size=1),
)
self.pool3_score = nn.Conv2d(256,self.num_classes, kernel_size=1)
self.pool4_score = nn.Conv2d(512, self.num_classes, kernel_size=1)
self.upsample2_1 = nn.ConvTranspose2d(self.num_classes,self.num_classes,kernel_size = 4,stride = 2,padding = 1,
bias = False)
self.upsample2_2 = nn.ConvTranspose2d(self.num_classes, self.num_classes, kernel_size=4, stride=2, padding=1,
bias=False)
self.upsample8 = nn.ConvTranspose2d(self.num_classes, self.num_classes, kernel_size=16, stride=8, padding=4,
bias=False)
for m in self.modules():
if isinstance(m, nn.ConvTranspose2d):
m.weight.data.zero_()
m.weight.data=_make_bilinear_weights(m.kernel_size[0],m.out_channels)
def forward(self, x):
input_size = x.size()[2:]
x = self.stage1(x)
x = self.stage2(x)
pool3 = self.stage3(x)
pool4 = self.stage4(pool3)
x = self.stage5(pool4)
x = self.fcn_head(x)
x = self.upsample2_1(x)
pool4_score = self.pool4_score(pool4)
pool4_score = pool4_score[:, :, :x.size()[2], :x.size()[3]]
x = x + pool4_score
x = self.upsample2_2(x)
pool3_score = self.pool3_score(pool3)
pool3_score = pool3_score[:, :, :x.size()[2], :x.size()[3]]
x = x + pool3_score
x = self.upsample8(x)
x = x[:, :, :input_size[0], :input_size[1]]
return x
數據處理(dataloader)
數據集名稱:VOC2012
數據集下載地址:The PASCAL Visual Object Classes Challenge 2012 (VOC2012)
在這里下載哈,2GB的那個。
這里我已經專門發了一篇博客對語義分割任務常用的數據集做了深入的介紹,已經具體講解了其實現的處理代碼。如果你對語義分割常用數據集有不了解的話,可以先去我的語義分割專欄中進行了解哦?。? 我這里就直接附上代碼了。
import torch
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import os
import random
import torchvision.transforms as T
VOC_CLASSES = [
'background','aeroplane','bicycle','bird','boat','bottle','bus',
'car','cat','chair','cow','diningtable','dog','horse',
'motorbike','person','potted plant','sheep','sofa','train','tv/monitor'
]
class VOCSegmentation(Dataset):
def __init__(self, root, split='train', img_size=320, augment=True):
super(VOCSegmentation, self).__init__()
self.root = root
self.split = split
self.img_size = img_size
self.augment = augment
img_dir = os.path.join(root, 'JPEGImages')
mask_dir = os.path.join(root, 'SegmentationClass')
split_file = os.path.join(root, 'ImageSets', 'Segmentation', f'{split}.txt')
if not os.path.exists(split_file):
raise FileNotFoundError(split_file)
with open(split_file, 'r') as f:
file_names = [x.strip() for x in f.readlines()]
self.images = [os.path.join(img_dir, x + '.jpg') for x in file_names]
self.masks = [os.path.join(mask_dir, x + '.png') for x in file_names]
assert len(self.images) == len(self.masks)
print(f" {split} set loaded: {len(self.images)} samples")
self.normalize = T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
def __getitem__(self, index):
img = Image.open(self.images[index]).convert('RGB')
mask = Image.open(self.masks[index]) # mask為P模式(0~20的類別)
# Resize
img = img.resize((self.img_size, self.img_size), Image.BILINEAR)
mask = mask.resize((self.img_size, self.img_size), Image.NEAREST)
# 轉Tensor
img = T.functional.to_tensor(img)
mask = torch.from_numpy(np.array(mask)).long() # 0~20
# 數據增強
if self.augment:
if random.random() > 0.5:
img = T.functional.hflip(img)
mask = T.functional.hflip(mask)
if random.random() > 0.5:
img = T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2)(img)
img = self.normalize(img)
return img, mask
def __len__(self):
return len(self.images)
def get_dataloader(data_path, batch_size=4, img_size=320, num_workers=4):
train_dataset = VOCSegmentation(root=data_path, split='train', img_size=img_size, augment=True)
val_dataset = VOCSegmentation(root=data_path, split='val', img_size=img_size, augment=False)
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size, pin_memory=True, num_workers=num_workers)
val_loader = DataLoader(val_dataset, shuffle=False, batch_size=batch_size, pin_memory=True, num_workers=num_workers)
return train_loader, val_loader
評價指標(metric)
我們這里語義分割采用的評價指標為:PA(像素準確率),CPA(類別像素準確率),MPA(類別平均像素準確率),IoU(交并比),mIoU(平均交并比),FWIoU(頻率加權交并比),mF1(平均F1分數)。
這里我已經專門發了一篇博客對這些平均指標做了深入的介紹,已經具體講解了其實現的代碼。如果你對這些評價指標有不了解的話,可以先去我的語義分割專欄中進行了解哦??! 我這里就直接附上代碼了。
import numpy as np
__all__ = ['SegmentationMetric']
class SegmentationMetric(object):
def __init__(self, numClass):
self.numClass = numClass
self.confusionMatrix = np.zeros((self.numClass,) * 2)
def genConfusionMatrix(self, imgPredict, imgLabel):
mask = (imgLabel >= 0) & (imgLabel < self.numClass)
label = self.numClass * imgLabel[mask] + imgPredict[mask]
count = np.bincount(label, minlength=self.numClass ** 2)
confusionMatrix = count.reshape(self.numClass, self.numClass)
return confusionMatrix
def addBatch(self, imgPredict, imgLabel):
assert imgPredict.shape == imgLabel.shape
self.confusionMatrix += self.genConfusionMatrix(imgPredict, imgLabel)
return self.confusionMatrix
def pixelAccuracy(self):
acc = np.diag(self.confusionMatrix).sum() / self.confusionMatrix.sum()
return acc
def classPixelAccuracy(self):
denominator = self.confusionMatrix.sum(axis=1)
denominator = np.where(denominator == 0, 1e-12, denominator)
classAcc = np.diag(self.confusionMatrix) / denominator
return classAcc
def meanPixelAccuracy(self):
classAcc = self.classPixelAccuracy()
meanAcc = np.nanmean(classAcc)
return meanAcc
def IntersectionOverUnion(self):
intersection = np.diag(self.confusionMatrix)
union = np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - np.diag(
self.confusionMatrix)
union = np.where(union == 0, 1e-12, union)
IoU = intersection / union
return IoU
def meanIntersectionOverUnion(self):
mIoU = np.nanmean(self.IntersectionOverUnion())
return mIoU
def Frequency_Weighted_Intersection_over_Union(self):
denominator1 = np.sum(self.confusionMatrix)
denominator1 = np.where(denominator1 == 0, 1e-12, denominator1)
freq = np.sum(self.confusionMatrix, axis=1) / denominator1
denominator2 = np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - np.diag(
self.confusionMatrix)
denominator2 = np.where(denominator2 == 0, 1e-12, denominator2)
iu = np.diag(self.confusionMatrix) / denominator2
FWIoU = (freq[freq > 0] * iu[freq > 0]).sum()
return FWIoU
def classF1Score(self):
tp = np.diag(self.confusionMatrix)
fp = self.confusionMatrix.sum(axis=0) - tp
fn = self.confusionMatrix.sum(axis=1) - tp
precision = tp / (tp + fp + 1e-12)
recall = tp / (tp + fn + 1e-12)
f1 = 2 * precision * recall / (precision + recall + 1e-12)
return f1
def meanF1Score(self):
f1 = self.classF1Score()
mean_f1 = np.nanmean(f1)
return mean_f1
def reset(self):
self.confusionMatrix = np.zeros((self.numClass, self.numClass))
def get_scores(self):
scores = {
'Pixel Accuracy': self.pixelAccuracy(),
'Class Pixel Accuracy': self.classPixelAccuracy(),
'Intersection over Union': self.IntersectionOverUnion(),
'Class F1 Score': self.classF1Score(),
'Frequency Weighted Intersection over Union': self.Frequency_Weighted_Intersection_over_Union(),
'Mean Pixel Accuracy': self.meanPixelAccuracy(),
'Mean Intersection over Union(mIoU)': self.meanIntersectionOverUnion(),
'Mean F1 Score': self.meanF1Score()
}
return scores
訓練流程(train)
到這里,所有的前期準備都已經就緒,我們就要開始訓練我們的模型了。
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--data_root', type=str, default='./datasets/VOC2012', help='Dataset root path')
parser.add_argument('--classes_name', type=str, default='VOC', help='Dataset class names')
parser.add_argument('--backbone', type=str, default='vgg16', help='Backbone model')
parser.add_argument('--head', type=str, default='fcn8s', help='Segmentation head')
parser.add_argument('--num_classes', type=int, default=21, help='Number of classes')
parser.add_argument('--epochs', type=int, default=50, help='Epochs')
parser.add_argument('--lr', type=float, default=0.005, help='Learning rate')
parser.add_argument('--momentum', type=float, default=0.9, help='Momentum')
parser.add_argument('--weight-decay', type=float, default=1e-4, help='Weight decay')
parser.add_argument('--batch_size', type=int, default=8, help='Batch size')
parser.add_argument('--checkpoint', type=str, default='./checkpoint', help='Checkpoint directory')
parser.add_argument('--resume', type=str, default=None, help='Resume checkpoint path')
return parser.parse_args()
首先來看看我們的一些參數的設定,一般我們都是這樣放在最前面,能夠讓人更加快速的了解其代碼的一些核心參數設置。首先就是我們的數據集位置(),然后就是我們的數據集名稱(classes_name),這個暫時沒什么用,因為我們目前只用了VOC數據集,然后就是特征提取網絡的選擇(backbone),這里我們可以選擇不同深度的VGG網絡,檢測模型的選擇(head),我們可以選擇不同的fcn模型,數據集的類別數(num_classes),訓練epoch數,這個你設置大一點也行,因為我們會在訓練過程中保存最好結果的模型的。學習率(lr),動量(momentum),權重衰減(weight-decay),這些都屬于模型超參數,大家可以嘗試不同的數值,多試試,就會有個大致的了解的,批量大小(batch_size)根據自己電腦性能來設置,一般都是為2的倍數,保存權重的文件夾(checkpoint),是否繼續訓練(resume)。
def train(args):
if not os.path.exists(args.checkpoint):
os.makedirs(args.checkpoint)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
n_gpu = torch.cuda.device_count()
print(f"Device: {device}, GPUs available: {n_gpu}")
# Dataloader
train_loader, val_loader = get_dataloader(args.data_root, batch_size=args.batch_size)
train_dataset_size = len(train_loader.dataset)
val_dataset_size = len(val_loader.dataset)
print(f"Train samples: {train_dataset_size}, Val samples: {val_dataset_size}")
# Model
model = get_model(args.head, backbone=args.backbone, num_classes=args.num_classes)
model.to(device)
# Loss + Optimizer + Scheduler
criterion = nn.CrossEntropyLoss(ignore_index=255)
#optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)
optimizer = torch.optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
scaler = torch.cuda.amp.GradScaler()
# Resume
start_epoch = 0
best_miou = 0.0
if args.resume and os.path.isfile(args.resume):
print(f"Loading checkpoint '{args.resume}'")
checkpoint = torch.load(args.resume)
start_epoch = checkpoint['epoch']
best_miou = checkpoint['best_miou']
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
print(f"Loaded checkpoint (epoch {start_epoch})")
# Training history
history = {
'train_loss': [],
'val_loss': [],
'pixel_accuracy': [],
'miou': []
}
print(f" Start training ({args.head})")
for epoch in range(start_epoch, args.epochs):
model.train()
train_loss = 0.0
t0 = time.time()
for images, masks in tqdm(train_loader, desc=f'Epoch {epoch+1}/{args.epochs} [Train]'):
images = images.to(device)
masks = masks.to(device)
optimizer.zero_grad()
with torch.cuda.amp.autocast():
outputs = model(images)
loss = criterion(outputs, masks)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
train_loss += loss.item() * images.size(0)
train_loss /= train_dataset_size
history['train_loss'].append(train_loss)
# Validation
model.eval()
val_loss = 0.0
evaluator = SegmentationMetric(args.num_classes)
with torch.no_grad():
for images, masks in tqdm(val_loader, desc=f'Epoch {epoch+1}/{args.epochs} [Val]'):
images = images.to(device)
masks = masks.to(device)
outputs = model(images)
loss = criterion(outputs, masks)
val_loss += loss.item() * images.size(0)
predictions = torch.argmax(outputs, dim=1)
if isinstance(predictions, torch.Tensor):
predictions = predictions.cpu().numpy()
if isinstance(masks, torch.Tensor):
masks = masks.cpu().numpy()
evaluator.addBatch(predictions, masks)
val_loss /= val_dataset_size
history['val_loss'].append(val_loss)
scores = evaluator.get_scores()
print(f"\n Validation Epoch {epoch+1}:")
for k, v in scores.items():
if isinstance(v, np.ndarray):
print(f"{k}: {np.round(v, 3)}")
else:
print(f"{k}: {v:.4f}")
history['pixel_accuracy'].append(scores['Pixel Accuracy'])
history['miou'].append(scores['Mean Intersection over Union(mIoU)'])
# Save best
if scores['Mean Intersection over Union(mIoU)'] > best_miou:
best_miou = scores['Mean Intersection over Union(mIoU)']
torch.save({
'epoch': epoch + 1,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict(),
'best_miou': best_miou,
}, os.path.join(args.checkpoint, f'{args.head}_best.pth'))
print(f"Saved best model! mIoU: {best_miou:.4f}")
scheduler.step()
print(f" Epoch time: {time.time() - t0:.2f}s\n")
print(" Training complete!")
然后就是我們的訓練流程了。訓練流程也是有套路的哦,我們該怎么去搭建一個更好的訓練流程,可以從多方面入手的。
首先我們確定我們的代碼運行設備,基本上都是要GPU的。然后就是加載我們處理好的數據,這里就是dataloader的那部分了,加載好數據之后,我們加載我們構建好的模型,這就是我們在model那部分做的工作。然后就是loss函數,Optimizer 和 Scheduler,這是我們比較重要的幾個部分。loss函數的選擇有很多,不同的loss函數在一定程度上會決定我們的模型收斂好壞,像語義分割任務就基本上都是用交叉熵損失函數了。Optimizer 也有很多,SGD,Adam之類的,都可以去嘗試下。Scheduler就是我們的學習策略,學習率的更新,希望一開始學習率大,訓練到后期學習率小,這樣加速收斂,避免震蕩。然后還有個scaler,這是AMP(自動混合精度訓練),能夠節省我們的內存,讓我們的小電腦也能跑起來模型。
還有個斷點重訓功能,為了避免因為一些意外的情況導致訓練中斷,可能這是我們訓練好久的結果,所以我們可以通過這個功能繼續從斷點進行訓練。然后就是訓練了,我們加載數據,通過模型的預測與mask得到損失,然后梯度誤差反傳,更新模型參數。當一個epoch中的數據都訓練結束之后,我們就需要評估下我們的模型怎么樣了,這里就是根據我們的評價指標進行評價,其中我們標記best_mIoU,當更好的時候就重新保存模型文件。
最后當訓練結束后我們就會獲得最好的模型參數的文件了。
完整代碼:
import argparse
import os
import time
from tqdm import tqdm
import numpy as np
import torch
import torch.nn as nn
from datasets.VOC_dataloader import get_dataloader
from model import get_model
from metric import SegmentationMetric
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--data_root', type=str, default='./datasets/VOC2012', help='Dataset root path')
parser.add_argument('--classes_name', type=str, default='VOC', help='Dataset class names')
parser.add_argument('--backbone', type=str, default='vgg16', help='Backbone model')
parser.add_argument('--head', type=str, default='fcn8s', help='Segmentation head')
parser.add_argument('--num_classes', type=int, default=21, help='Number of classes')
parser.add_argument('--epochs', type=int, default=50, help='Epochs')
parser.add_argument('--lr', type=float, default=0.005, help='Learning rate')
parser.add_argument('--momentum', type=float, default=0.9, help='Momentum')
parser.add_argument('--weight-decay', type=float, default=1e-4, help='Weight decay')
parser.add_argument('--batch_size', type=int, default=8, help='Batch size')
parser.add_argument('--checkpoint', type=str, default='./checkpoint', help='Checkpoint directory')
parser.add_argument('--resume', type=str, default=None, help='Resume checkpoint path')
return parser.parse_args()
def train(args):
if not os.path.exists(args.checkpoint):
os.makedirs(args.checkpoint)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
n_gpu = torch.cuda.device_count()
print(f"Device: {device}, GPUs available: {n_gpu}")
# Dataloader
train_loader, val_loader = get_dataloader(args.data_root, batch_size=args.batch_size)
train_dataset_size = len(train_loader.dataset)
val_dataset_size = len(val_loader.dataset)
print(f"Train samples: {train_dataset_size}, Val samples: {val_dataset_size}")
# Model
model = get_model(args.head, backbone=args.backbone, num_classes=args.num_classes)
model.to(device)
# Loss + Optimizer + Scheduler
criterion = nn.CrossEntropyLoss(ignore_index=255)
#optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)
optimizer = torch.optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
scaler = torch.cuda.amp.GradScaler()
# Resume
start_epoch = 0
best_miou = 0.0
if args.resume and os.path.isfile(args.resume):
print(f"Loading checkpoint '{args.resume}'")
checkpoint = torch.load(args.resume)
start_epoch = checkpoint['epoch']
best_miou = checkpoint['best_miou']
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
print(f"Loaded checkpoint (epoch {start_epoch})")
# Training history
history = {
'train_loss': [],
'val_loss': [],
'pixel_accuracy': [],
'miou': []
}
print(f" Start training ({args.head})")
for epoch in range(start_epoch, args.epochs):
model.train()
train_loss = 0.0
t0 = time.time()
for images, masks in tqdm(train_loader, desc=f'Epoch {epoch+1}/{args.epochs} [Train]'):
images = images.to(device)
masks = masks.to(device)
optimizer.zero_grad()
with torch.cuda.amp.autocast():
outputs = model(images)
loss = criterion(outputs, masks)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
train_loss += loss.item() * images.size(0)
train_loss /= train_dataset_size
history['train_loss'].append(train_loss)
# Validation
model.eval()
val_loss = 0.0
evaluator = SegmentationMetric(args.num_classes)
with torch.no_grad():
for images, masks in tqdm(val_loader, desc=f'Epoch {epoch+1}/{args.epochs} [Val]'):
images = images.to(device)
masks = masks.to(device)
outputs = model(images)
loss = criterion(outputs, masks)
val_loss += loss.item() * images.size(0)
predictions = torch.argmax(outputs, dim=1)
if isinstance(predictions, torch.Tensor):
predictions = predictions.cpu().numpy()
if isinstance(masks, torch.Tensor):
masks = masks.cpu().numpy()
evaluator.addBatch(predictions, masks)
val_loss /= val_dataset_size
history['val_loss'].append(val_loss)
scores = evaluator.get_scores()
print(f"\n Validation Epoch {epoch+1}:")
for k, v in scores.items():
if isinstance(v, np.ndarray):
print(f"{k}: {np.round(v, 3)}")
else:
print(f"{k}: {v:.4f}")
history['pixel_accuracy'].append(scores['Pixel Accuracy'])
history['miou'].append(scores['Mean Intersection over Union(mIoU)'])
# Save best
if scores['Mean Intersection over Union(mIoU)'] > best_miou:
best_miou = scores['Mean Intersection over Union(mIoU)']
torch.save({
'epoch': epoch + 1,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict(),
'best_miou': best_miou,
}, os.path.join(args.checkpoint, f'{args.head}_best.pth'))
print(f"Saved best model! mIoU: {best_miou:.4f}")
scheduler.step()
print(f" Epoch time: {time.time() - t0:.2f}s\n")
print(" Training complete!")
if __name__ == '__main__':
args = parse_arguments()
train(args)
模型測試(test)
這里就到了我們的最后一步了,測試我們的模型。
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--image_dir', type=str, default='./datasets/test', help='Input image or folder')
parser.add_argument('--checkpoint', type=str, default='./checkpoint/fcn8s_best.pth', help='Checkpoint path')
parser.add_argument('--backbone', type=str, default='vgg16', help='Backbone model')
parser.add_argument('--head', type=str, default='fcn8s', help='Segmentation head')
parser.add_argument('--num_classes', type=int, default=21, help='Number of classes')
parser.add_argument('--save_dir', type=str, default='./predictions', help='Directory to save results')
parser.add_argument('--overlay', type=bool, default=True, help='Save overlay image')
return parser.parse_args()
同樣的來看,我們所需要的一些參數設定哈!我們所需要進行測試的圖片文件夾(image_dir),我們訓練時候所保存的權重文件夾(checkpoint),我們選擇的特征提取網絡(backbone),我們使用的檢測模型(head),還有數據集的類別數(num_classes),保持的結果的文件夾(save_dir),還要個非常重要的參數,是否將預測圖覆蓋在原圖上(overlay),通過這個我們可以更好的看語義分割的效果怎么樣。
def load_image(image_path):
image = Image.open(image_path).convert('RGB')
transform = T.Compose([
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
return transform(image).unsqueeze(0), image # tensor, PIL image
#把類別mask ? 彩色圖 (用VOC_COLORMAP)
def mask_to_color(mask):
color_mask = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8)
for label in range(len(VOC_COLORMAP)):
color_mask[mask == label] = VOC_COLORMAP[label]
return color_mask
def save_mask(mask, save_path):
color_mask = mask_to_color(mask)
Image.fromarray(color_mask).save(save_path)
def overlay_mask_on_image(raw_image, mask, alpha=0.6):
mask_color = mask_to_color(mask)
mask_pil = Image.fromarray(mask_color)
mask_pil = mask_pil.resize(raw_image.size, resample=Image.NEAREST)
blended = Image.blend(raw_image, mask_pil, alpha=alpha)
return blended
然后來看測試過程中會用到的一些函數,當然測試首先肯定要加載我們的圖片吶。注意看這里有個細節,加載圖片的時候我們進行了標準化的,為什么這么做?因為我們在訓練模型的時候,圖片就進行了標準化的操作,所有測試圖片,我們肯定要保持圖片和訓練時候的條件一樣。然后為了更好的可視化,我們需要將預測的mask圖轉換為彩色圖。根據VOC_COLORMAP的顏色進行轉換即可。還有個overlay_mask_on_image函數,通過將預測的可視化圖與原圖進行疊加混合能夠讓我們更加直觀。
def predict(args):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")
# 模型
model = get_model(args.head, backbone=args.backbone, num_classes=args.num_classes)
checkpoint = torch.load(args.checkpoint, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval()
os.makedirs(args.save_dir, exist_ok=True)
# 預測單張 or 批量
if os.path.isdir(args.image):
image_list = [os.path.join(args.image, f) for f in os.listdir(args.image) if f.lower().endswith(('jpg', 'png', 'jpeg'))]
else:
image_list = [args.image]
print(f" Found {len(image_list)} images to predict.")
for img_path in tqdm(image_list):
img_tensor, raw_img = load_image(img_path)
img_tensor = img_tensor.to(device)
with torch.no_grad():
output = model(img_tensor)
pred = torch.argmax(output.squeeze(), dim=0).cpu().numpy()
# 保存 mask
base_name = os.path.basename(img_path).split('.')[0]
mask_save_path = os.path.join(args.save_dir, f"{base_name}_mask.png")
save_mask(pred, mask_save_path)
# 保存 overlay
if args.overlay:
overlay_img = overlay_mask_on_image(raw_img, pred)
overlay_save_path = os.path.join(args.save_dir, f"{base_name}_overlay.png")
overlay_img.save(overlay_save_path)
print(f"Saved: {mask_save_path}")
if args.overlay:
print(f"Saved overlay: {overlay_save_path}")
print(" Prediction done!")
然后就到了預測環節,其實流程跟train的流程差不多,但是不在需要像train的時候什么梯度反傳更新參數了,直接預測得出結果然后保存即可。
首先確定設備哈,一般都是GPU的,然后就是就是加載數據和模型了,最后預測保存結果即可,這些代碼應該還是比較容易理解的,直接看代碼即可。
完整代碼:
import argparse
import os
import torch
import numpy as np
from PIL import Image
from tqdm import tqdm
from model import get_model
import torchvision.transforms as T
from datasets import *
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--image_dir', type=str, default='./datasets/test', help='Input image or folder')
parser.add_argument('--checkpoint', type=str, default='./checkpoint/fcn8s_best.pth', help='Checkpoint path')
parser.add_argument('--backbone', type=str, default='vgg16', help='Backbone model')
parser.add_argument('--head', type=str, default='fcn8s', help='Segmentation head')
parser.add_argument('--num_classes', type=int, default=21, help='Number of classes')
parser.add_argument('--save_dir', type=str, default='./predictions', help='Directory to save results')
parser.add_argument('--overlay', type=bool, default=True, help='Save overlay image')
return parser.parse_args()
def load_image(image_path):
image = Image.open(image_path).convert('RGB')
transform = T.Compose([
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
return transform(image).unsqueeze(0), image # tensor, PIL image
#把類別mask ? 彩色圖 (用VOC_COLORMAP)
def mask_to_color(mask):
color_mask = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8)
for label in range(len(VOC_COLORMAP)):
color_mask[mask == label] = VOC_COLORMAP[label]
return color_mask
def save_mask(mask, save_path):
color_mask = mask_to_color(mask)
Image.fromarray(color_mask).save(save_path)
def overlay_mask_on_image(raw_image, mask, alpha=0.6):
mask_color = mask_to_color(mask)
mask_pil = Image.fromarray(mask_color)
mask_pil = mask_pil.resize(raw_image.size, resample=Image.NEAREST)
blended = Image.blend(raw_image, mask_pil, alpha=alpha)
return blended
def predict(args):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")
# 模型
model = get_model(args.head, backbone=args.backbone, num_classes=args.num_classes)
checkpoint = torch.load(args.checkpoint, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval()
os.makedirs(args.save_dir, exist_ok=True)
# 預測單張 or 批量
if os.path.isdir(args.image):
image_list = [os.path.join(args.image, f) for f in os.listdir(args.image) if f.lower().endswith(('jpg', 'png', 'jpeg'))]
else:
image_list = [args.image]
print(f" Found {len(image_list)} images to predict.")
for img_path in tqdm(image_list):
img_tensor, raw_img = load_image(img_path)
img_tensor = img_tensor.to(device)
with torch.no_grad():
output = model(img_tensor)
pred = torch.argmax(output.squeeze(), dim=0).cpu().numpy()
# 保存 mask
base_name = os.path.basename(img_path).split('.')[0]
mask_save_path = os.path.join(args.save_dir, f"{base_name}_mask.png")
save_mask(pred, mask_save_path)
# 保存 overlay
if args.overlay:
overlay_img = overlay_mask_on_image(raw_img, pred)
overlay_save_path = os.path.join(args.save_dir, f"{base_name}_overlay.png")
overlay_img.save(overlay_save_path)
print(f"Saved: {mask_save_path}")
if args.overlay:
print(f"Saved overlay: {overlay_save_path}")
print(" Prediction done!")
if __name__ == '__main__':
args = parse_arguments()
predict(args)
效果圖
我就訓練了50個epoch,效果還行,效果圖如下所示
結語
希望上列所述內容對你有所幫助,如果有錯誤的地方歡迎大家批評指正!
并且如果可以的話希望大家能夠三連鼓勵一下,謝謝大家!
如果你覺得講的還不錯想轉載,可以直接轉載,不過麻煩指出本文來源出處即可,謝謝!
總結
以上是生活随笔為你收集整理的【语义分割专栏】:FCN实战篇(附上完整可运行的代码pytorch)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 远程服务器(腾讯云轻量服务器)上安装SQ
- 下一篇: 探秘Transformer系列之(31)