import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from my_dataset import ImageSegmentationDataset # 自定义数据集
from NestedUNet import NestedUNet # 模型定义文件
# 定义超参数
batch_size = 1
learning_rate = 1e-4
num_epochs = 200
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
# 计算新尺寸,将原始尺寸除以 2
new_height = 2048 // 2
new_width = 3072 // 2
# 数据预处理和数据增强
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 将图像大小调整为原始尺寸的一半
transforms.ToTensor() # 转换为 PyTorch 张量
])
# 加载数据
train_dataset = ImageSegmentationDataset(image_dir='./dataset/train/images',
mask_dir='./dataset/train/masks',
transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# 初始化模型、损失函数、优化器
model = NestedUNet(num_classes=2, input_channels=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# 训练循环
for epoch in range(num_epochs):
model.train()
epoch_loss = 0.0
for images, masks in train_loader:
images, masks = images.to(device), masks.to(device)
# 确保目标张量的形状为 [batch_size, height, width]
masks = torch.squeeze(masks, dim=1) # 去除通道维度
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, masks)
loss.backward()
optimizer.step()
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
# 保存训练好的模型
torch.save(model.state_dict(), './model.pth')
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 将图像大小调整为原始尺寸的一半
transforms.ToTensor() # 转换为 PyTorch 张量
])
(new_height, new_width)
,这里是对原始尺寸 (2048, 3072)
进行缩小。train_dataset = ImageSegmentationDataset(image_dir='./dataset/train/images',
mask_dir='./dataset/train/masks',
transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
model = NestedUNet(num_classes=2, input_channels=3).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
for epoch in range(num_epochs):
model.train()
epoch_loss = 0.0
for images, masks in train_loader:
images, masks = images.to(device), masks.to(device)
# 确保目标张量的形状为 [batch_size, height, width]
masks = torch.squeeze(masks, dim=1) # 去除通道维度
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, masks)
loss.backward()
optimizer.step()
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
在图像分割任务中:
[batch_size, channels, height, width]
,例如 [1, 3, 1024, 1536]
。[batch_size, 1, height, width]
,例如 [1, 1, 1024, 1536]
。然而,nn.CrossEntropyLoss
函数要求目标掩码的形状为 [batch_size, height, width]
,即不包含通道维度。
因此,需要使用 torch.squeeze
函数去除掩码的通道维度:
masks = torch.squeeze(masks, dim=1)
这将掩码的形状从 [batch_size, 1, height, width]
变为 [batch_size, height, width]
,满足损失函数的要求。
[batch_size, num_classes, height, width]
,例如 [1, 2, 1024, 1536]
。torch.save(model.state_dict(), './model.pth')
model.pth
,方便后续加载和推理。这段代码的主要功能是加载一个预训练的 NestedUNet 模型,使用它对指定目录下的图像进行分割,并将结果保存到输出目录。代码的执行流程如下:
import os
import torch
import numpy as np
from PIL import Image
from torchvision import transforms
from NestedUNet import NestedUNet # 模型定义文件
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# 加载模型
def load_model(model, path):
if not os.path.exists(path):
raise FileNotFoundError(f"Model file not found: {path}")
model.load_state_dict(torch.load(path, map_location=device))
model.eval()
return model
# 进行推理
def segment_images(model, image_dir, output_dir):
# 计算新尺寸,将原始尺寸除以 2
new_height = 2048 // 2
new_width = 3072 // 2
# 数据预处理和数据增强
transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 将图像大小调整为原始尺寸的一半
transforms.ToTensor() # 转换为 PyTorch 张量
])
os.makedirs(output_dir, exist_ok=True)
for filename in os.listdir(image_dir):
if filename.endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(image_dir, filename)
image = Image.open(filepath).convert('RGB')
input_tensor = transform(image).unsqueeze(0).to(device) # 增加批量维度
with torch.no_grad():
outputs = model(input_tensor)
prediction = torch.argmax(outputs, dim=1).squeeze(0) # 获取分割结果
# 保存分割结果
output_filename = filename.split('.')[0] + '_segmentation.png'
output_path = os.path.join(output_dir, output_filename)
# 将类别值映射到 0-255 范围
pred_img = prediction.cpu().numpy().astype(np.uint8) * 255
Image.fromarray(pred_img).save(output_path)
# 主执行代码
if __name__ == "__main__":
model = NestedUNet(num_classes=2, input_channels=3).to(device)
model = load_model(model, './model.pth') # 加载预训练模型
# 定义输入目录和输出目录
input_dirs = [
'./dataset/1-2000',
'./dataset/2001-4000',
'./dataset/4001-6000',
'./dataset/6001-8000',
'./dataset/8001-9663'
]
base_output_dir = './dataset/segmentation_results' # 基础输出结果目录
for input_dir in input_dirs:
output_dir = os.path.join(base_output_dir, os.path.basename(input_dir))
segment_images(model, input_dir, output_dir)
print(f"Segmentation results saved to: {base_output_dir}")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
torch.cuda.is_available()
检查)来选择计算设备。如果有 GPU 可用,代码会选择 GPU,否则使用 CPU。def load_model(model, path):
if not os.path.exists(path):
raise FileNotFoundError(f"Model file not found: {path}")
model.load_state_dict(torch.load(path, map_location=device))
model.eval()
return model
load_model
函数:
torch.load()
加载预训练的模型参数。model.eval()
将模型设置为评估模式(禁用 dropout 等操作)。def segment_images(model, image_dir, output_dir):
new_height = 2048 // 2
new_width = 3072 // 2
2048 // 2
和 3072 // 2
)。transform = transforms.Compose([
transforms.Resize((new_height, new_width)), # 调整图像大小
transforms.ToTensor() # 转换为 PyTorch 张量
])
Resize
变换调整为新尺寸。ToTensor()
转换为 PyTorch 张量格式,方便输入到模型。for filename in os.listdir(image_dir):
if filename.endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(image_dir, filename)
image = Image.open(filepath).convert('RGB')
input_tensor = transform(image).unsqueeze(0).to(device) # 增加批量维度
image_dir
目录中的所有图像文件(支持 .png
、.jpg
和 .jpeg
格式)。transform
转换为张量,并添加一个批量维度(unsqueeze(0)
),使形状变为 [1, C, H, W]
(适配模型输入)。with torch.no_grad():
outputs = model(input_tensor)
prediction = torch.argmax(outputs, dim=1).squeeze(0) # 获取分割结果
torch.no_grad()
禁用梯度计算,节省内存和加速推理。model(input_tensor)
会返回模型的输出(每个像素的类别概率)。torch.argmax(outputs, dim=1)
:对每个像素,取类别概率最大的一项作为预测类别。squeeze(0)
:去除批量维度,得到的 prediction
形状为 [H, W]
。output_filename = filename.split('.')[0] + '_segmentation.png'
output_path = os.path.join(output_dir, output_filename)
pred_img = prediction.cpu().numpy().astype(np.uint8) * 255
Image.fromarray(pred_img).save(output_path)
output_filename
:为每个输出图像文件命名,格式为原始文件名加 _segmentation.png
。prediction.cpu().numpy()
:将预测结果从 GPU 移到 CPU,并转换为 NumPy 数组。astype(np.uint8) * 255
:将预测类别(0 或 1)映射到灰度值(0 或 255),使得结果可以保存为黑白图片。pred_img
保存为 PNG 格式。if __name__ == "__main__":
model = NestedUNet(num_classes=2, input_channels=3).to(device)
model = load_model(model, './model.pth') # 加载预训练模型
input_dirs = [
'./dataset/1-2000',
'./dataset/2001-4000',
'./dataset/4001-6000',
'./dataset/6001-8000',
'./dataset/8001-9663'
]
base_output_dir = './dataset/segmentation_results' # 基础输出结果目录
for input_dir in input_dirs:
output_dir = os.path.join(base_output_dir, os.path.basename(input_dir))
segment_images(model, input_dir, output_dir)
print(f"Segmentation results saved to: {base_output_dir}")
NestedUNet
模型并加载权重。input_dirs
)的列表,每个文件夹包含待分割的图像。import os
import numpy as np
import torch
from PIL import Image
class ImageSegmentationDataset:
def __init__(self, image_dir, mask_dir, transform=None):
self.image_dir = image_dir
self.mask_dir = mask_dir
self.transform = transform
self.image_files = sorted(os.listdir(image_dir)) # 获取图片文件列表并排序
def __getitem__(self, idx):
# 获取图像文件名
image_file = self.image_files[idx]
image_path = os.path.join(self.image_dir, image_file)
# 构建掩膜文件名,假设掩膜文件以 "_mask" 结尾
mask_file = image_file.replace(".jpg", "_mask.png")
mask_path = os.path.join(self.mask_dir, mask_file)
# 加载图像和掩膜
image = Image.open(image_path).convert('RGB')
mask = Image.open(mask_path).convert('L') # 灰度图
# 如果有 transform(数据增强等),应用 transform
if self.transform:
image = self.transform(image)
mask = self.transform(mask)
mask = torch.tensor(np.array(mask, dtype=np.int64))
return image, mask
def __len__(self):
# 返回数据集中图像文件的数量
return len(self.image_files)
__init__
构造函数def __init__(self, image_dir, mask_dir, transform=None):
self.image_dir = image_dir
self.mask_dir = mask_dir
self.transform = transform
self.image_files = sorted(os.listdir(image_dir)) # 获取图片文件列表并排序
image_dir
:图像存放的目录路径。mask_dir
:掩膜图像存放的目录路径。每张图像都会有一个对应的掩膜图像,掩膜是标注了目标区域的图像。transform
:如果有数据预处理或数据增强的操作,可以传递给 transform
。例如,可能会进行图像大小调整、归一化等。image_files
:获取 image_dir
中的所有文件名并进行排序,确保图像的顺序与掩膜的顺序一致。__getitem__
方法def __getitem__(self, idx):
# 获取图像文件名
image_file = self.image_files[idx]
image_path = os.path.join(self.image_dir, image_file)
# 构建掩膜文件名,假设掩膜文件以 "_mask" 结尾
mask_file = image_file.replace(".jpg", "_mask.png")
mask_path = os.path.join(self.mask_dir, mask_file)
# 加载图像和掩膜
image = Image.open(image_path).convert('RGB')
mask = Image.open(mask_path).convert('L') # 灰度图
idx
:传入的索引,表示要加载数据集中的哪一张图片及其对应的掩膜。
image_file
:根据 idx
获取当前图像文件的名称。
image_path
:根据文件名构建图像的完整路径。
mask_file
:假设掩膜图像与原图像的文件名一致,只是在原文件名的基础上加上 _mask
后缀(假设原图为 .jpg
,掩膜图像为 .png
)。可以根据需要修改这个规则。
mask_path
:根据掩膜文件名构建掩膜图像的完整路径。
加载图像和掩膜:
Pillow
的 Image.open()
函数加载图像,并使用 .convert('RGB')
确保图像是三通道的 RGB 格式。.convert('L')
,使其成为单通道的灰度图像。if self.transform:
image = self.transform(image)
mask = self.transform(mask)
transform
(例如数据增强或预处理操作),则对图像和掩膜应用该操作。通常,这里会进行如调整大小、归一化、数据增强等操作。mask = torch.tensor(np.array(mask, dtype=np.int64))
Pillow
图像对象转换为 NumPy 数组。int64
。这里使用 int64
是因为通常分割任务的标签是整数类型(比如每个像素对应的类别 ID)。__len__
方法def __len__(self):
# 返回数据集中图像文件的数量
return len(self.image_files)
这段代码实现了一个名为 Nested U-Net 的深度学习模型,主要用于 图像分割 任务。Nested U-Net 是在传统 U-Net 基础上改进的一种结构,通过增加嵌套跳跃连接(nested skip connections),进一步提升了模型的分割精度。下面我会详细解释代码中的各个部分,特别是每个模块的作用。
class VGGBlock(nn.Module):
def __init__(self, in_channels, middle_channels, out_channels):
super().__init__()
self.relu = nn.ReLU(inplace=True)
self.conv1 = nn.Conv2d(in_channels, middle_channels, 3, padding=1)
self.bn1 = nn.BatchNorm2d(middle_channels)
self.conv2 = nn.Conv2d(middle_channels, out_channels, 3, padding=1)
self.bn2 = nn.BatchNorm2d(out_channels)
def forward(self, x):
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
return out
conv1
和 conv2
,都使用 3x3
的卷积核,并且使用了 padding=1 保证输出尺寸和输入相同。bn1
和 bn2
,用于加速训练并稳定模型。这个块被多次重复调用,构成 U-Net 和 Nested U-Net 的基础卷积操作。
class NestedUNet(nn.Module):
def __init__(self, num_classes=2, input_channels=2, deep_supervision=False, **kwargs):
super().__init__()
nb_filter = [32, 64, 128, 256, 512]
self.deep_supervision = deep_supervision
self.pool = nn.MaxPool2d(2, 2)
self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
# 定义每一层的卷积模块
self.conv0_0 = VGGBlock(input_channels, nb_filter[0], nb_filter[0])
self.conv1_0 = VGGBlock(nb_filter[0], nb_filter[1], nb_filter[1])
self.conv2_0 = VGGBlock(nb_filter[1], nb_filter[2], nb_filter[2])
self.conv3_0 = VGGBlock(nb_filter[2], nb_filter[3], nb_filter[3])
self.conv4_0 = VGGBlock(nb_filter[3], nb_filter[4], nb_filter[4])
# 定义嵌套的卷积模块(即跳跃连接)
self.conv0_1 = VGGBlock(nb_filter[0]+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_1 = VGGBlock(nb_filter[1]+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_1 = VGGBlock(nb_filter[2]+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv3_1 = VGGBlock(nb_filter[3]+nb_filter[4], nb_filter[3], nb_filter[3])
self.conv0_2 = VGGBlock(nb_filter[0]*2+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_2 = VGGBlock(nb_filter[1]*2+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv2_2 = VGGBlock(nb_filter[2]*2+nb_filter[3], nb_filter[2], nb_filter[2])
self.conv0_3 = VGGBlock(nb_filter[0]*3+nb_filter[1], nb_filter[0], nb_filter[0])
self.conv1_3 = VGGBlock(nb_filter[1]*3+nb_filter[2], nb_filter[1], nb_filter[1])
self.conv0_4 = VGGBlock(nb_filter[0]*4+nb_filter[1], nb_filter[0], nb_filter[0])
# 最终的输出层,支持深度监督(Deep Supervision)
if self.deep_supervision:
self.final1 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
self.final2 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
self.final3 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
self.final4 = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
else:
self.final = nn.Conv2d(nb_filter[0], num_classes, kernel_size=1)
卷积层:模型的每一层都是由 VGGBlock
组成。每一层的输出通道数逐渐增加(32, 64, 128, 256, 512),然后在之后的嵌套层中通过跳跃连接进一步进行融合。
跳跃连接:这种设计是 Nested U-Net 的关键,每一层的输出不仅用于下一层,还和其他层的输出拼接(concatenate)。这种设计帮助保留了更多的细节信息,并改善了分割精度。
上采样(Upsampling):通过 Upsample
将图像尺寸增大,并进行跳跃连接后,再进行卷积操作。
深度监督(Deep Supervision):通过在多个阶段产生输出,增强模型的学习效果。这是 Nested U-Net 的一个特点,可以让模型在不同的深度层次上进行监督,提高性能。
def forward(self, input):
# 各种卷积操作
x0_0 = self.conv0_0(input)
x1_0 = self.conv1_0(self.pool(x0_0))
x0_1 = self.conv0_1(torch.cat([x0_0, self.up(x1_0)], 1))
x2_0 = self.conv2_0(self.pool(x1_0))
x1_1 = self.conv1_1(torch.cat([x1_0, self.up(x2_0)], 1))
x0_2 = self.conv0_2(torch.cat([x0_0, x0_1, self.up(x1_1)], 1))
# 继续进行嵌套连接和卷积操作,直到最后一层
x3_0 = self.conv3_0(self.pool(x2_0))
x2_1 = self.conv2_1(torch.cat([x2_0, self.up(x3_0)], 1))
x1_2 = self.conv1_2(torch.cat([x1_0, x1_1, self.up(x2_1)], 1))
x0_3 = self.conv0_3(torch.cat([x0_0, x0_1, x0_2, self.up(x1_2)], 1))
x4_0 = self.conv4_0(self.pool(x3_0))
x3_1 = self.conv3_1(torch.cat([x3_0, self.up(x4_0)], 1))
x2_2 = self.conv2_2(torch.cat([x2_0, x2_1, self.up(x3_1)], 1))
x1_3 = self.conv1_3(torch.cat([x1_0, x1_1, x1_2, self.up(x2_2)], 1))
x0_4 = self.conv0_4(torch.cat([x0_0, x0_1, x0_2, x0_3, self.up(x1_3)], 1))
if self.deep_supervision:
output1 = self.final1(x0_1)
output2 = self.final2(x0_2)
output3 = self.final3(x0_3)
output4 = self.final4(x0_4)
return [output1, output2, output3, output4]
else:
output = self.final(x0_4)
return output
self.pool
进行下采样(池化),通过 self.up
进行上采样(反卷积),并通过拼接(torch.cat
)将不同层的输出合并起来。此文由 Mix Space 同步更新至 xLog
原始链接为 https://blog.kanes.top/posts/ArtificialIntelligence/AnalysisofImageSegmentationCode