RT-DETR改进策略【Backbone/主干网络】| 2023 U-Net V2 替换骨干网络,加强细节特征的提取和融合
一、本文介绍
本文记录的是
基于U-Net V2的RT-DETR目标检测改进方法研究
。本文利用
U-Net V2
替换
RT-DETR
的骨干网络,
UNet V2
通过其独特的语义和细节融合模块(SDI),
能够为骨干网络提供更丰富的特征表示
。并且其中的
注意力模块
可以使网络聚焦于图像中与任务相关的区域,增强对关键区域特征的提取,进而提高模型精度
。本文配置了原论文中
pvt_v2_b0
、
pvt_v2_b1
、
pvt_v2_b2
、
pvt_v2_b3
、
pvt_v2_b4
和
pvt_v2_b5
六种模型,以满足不同的需求。
二、U-Net V2模型轻量化设计
U-NET V2: RETHINKING THE SKIP CONNECTIONS OF U-NET FOR MEDICAL IMAGE SEGMENTATION
以下是对
UNet V2
网络的详细介绍:
2.1 出发点
- 改进U-Net连接方式 :传统U-Net基于模型中的连接方式在整合低层次和高层次特征时不够有效。例如在ResNet中,即使在大规模图像数据集上训练,网络学习恒等映射函数也可能存在困难。
- 融合语义和细节信息 :编码器提取的低层次特征细节丰富但语义信息不足且可能含噪声,高层次特征语义信息多但细节缺失(如对象边界)。简单通过拼接融合特征依赖网络学习能力,在医学影像数据有限的情况下是个挑战,且可能限制不同层次信息贡献并引入噪声,还会增加GPU内存消耗和计算量。
2.1 原理
2.2.1 语义和细节融合原理
- 对于输入图像,首先用深度神经网络编码器提取多级特征。然后对于第 i i i 级特征图,通过简单的 Hadamard乘积 操作明确注入更高层次(含更多语义信息)和更低层次(捕捉更精细细节)的特征,增强第 i i i 级特征的语义和细节。最后将改进后的特征传输到解码器进行分辨率重建和分割。
2.2.2 注意力机制原理
-
在
语义和细节融合模块(SDI)中,首先对编码器生成的每一级特征 f i o f_{i}^{o} f i o 应用空间和通道注意力机制,使特征能够整合局部空间信息和全局通道信息,公式为 f i 1 = ϕ i c ( φ i s ( f i 0 ) ) f_{i}^{1}=\phi_{i}^{c}\left(\varphi_{i}^{s}\left(f_{i}^{0}\right)\right) f i 1 = ϕ i c ( φ i s ( f i 0 ) ) 。然后应用 1 × 1 1×1 1 × 1 卷积减少通道数得到 f i 2 f_{i}^{2} f i 2 。
2.3 结构
2.3.1 整体架构
由
编码器
、
SDI模块
和
解码器
三个主要模块组成。
-
编码器
:对于输入图像
I
I
I
(
I
∈
R
H
×
W
×
C
I \in R^{H ×W ×C}
I
∈
R
H
×
W
×
C
),
编码器
产生
M
M
M
级特征,第
i
i
i
级特征记为
f
i
0
f_{i}^{0}
f
i
0
(
1
≤
i
≤
M
1 ≤i ≤M
1
≤
i
≤
M
),这些特征被传输到
SDI模块
进一步细化。
2.3.2 SDI模块
- 首先应用 空间和通道注意力机制 并通过 1 × 1 1×1 1 × 1 卷积调整特征通道数得到 f i 2 f_{i}^{2} f i 2 。然后在每个解码器级别 i i i ,调整其他级别 j j j 的特征图大小以匹配 f i 2 f_{i}^{2} f i 2 的分辨率,再应用 3 × 3 3×3 3 × 3 卷积平滑调整后的特征图 f i j 3 f_{ij}^{3} f ij 3 得到 f i j 4 f_{ij}^{4} f ij 4 。最后对所有调整后的特征图应用 元素级Hadamard乘积 得到 f i 5 f_{i}^{5} f i 5 ,并将其发送到第 i i i 级解码器。
-
解码器
:接收
SDI模块处理后的特征进行分辨率重建和分割。
2.4 优势
-
性能优势
-
在皮肤病变分割和息肉分割的多个公共医学图像分割数据集上进行评估,实验结果表明
UNet V2在分割准确性上优于现有技术方法。例如在ISIC 2017数据集上,DSC分数比U - Net提高了 1.44 % 1.44\% 1.44% ,IoU分数提高了 2.36 % 2.36\% 2.36% ;在Kvasir - SEG数据集上,DSC分数比U - Net提高了 11.0 % 11.0\% 11.0% 。
-
在皮肤病变分割和息肉分割的多个公共医学图像分割数据集上进行评估,实验结果表明
-
效率优势
-
保持了内存和计算效率。与UNet++相比,在使用NVIDIA P100 GPU进行实验时,UNet V2的参数更少,GPU内存使用量更小,FLOPs和FPS更优。例如在相同输入图像大小
(
1
,
3
,
256
,
256
)
(1,3,256,256)
(
1
,
3
,
256
,
256
)
下,
UNet V2的参数为 25.02 M 25.02M 25.02 M ,而UNet++为 29.87 M 29.87M 29.87 M ;UNet V2的GPU内存使用量为 411.42 M B 411.42MB 411.42 MB ,UNet++为 607.31 M B 607.31MB 607.31 MB 。
-
保持了内存和计算效率。与UNet++相比,在使用NVIDIA P100 GPU进行实验时,UNet V2的参数更少,GPU内存使用量更小,FLOPs和FPS更优。例如在相同输入图像大小
(
1
,
3
,
256
,
256
)
(1,3,256,256)
(
1
,
3
,
256
,
256
)
下,
论文: https://arxiv.org/pdf/2311.17791
源码: https://github.com/yaoppeng/U-Net_v2
三、U-Net V2模块的实现代码
UNetV2模块
的实现代码如下:
import os.path
import warnings
import torch
import torch.nn as nn
import torch.nn.functional as F
from functools import partial
from timm.models.layers import DropPath, to_2tuple, trunc_normal_
import math
__all__ = ['pvt_v2_b0', 'pvt_v2_b1', 'pvt_v2_b2', 'pvt_v2_b3', 'pvt_v2_b4', 'pvt_v2_b5']
class ChannelAttention(nn.Module):
def __init__(self, in_planes, ratio=16):
super().__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.max_pool = nn.AdaptiveMaxPool2d(1)
self.fc1 = nn.Conv2d(in_planes, in_planes // 16, 1, bias=False)
self.relu1 = nn.ReLU()
self.fc2 = nn.Conv2d(in_planes // 16, in_planes, 1, bias=False)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
avg_out = self.fc2(self.relu1(self.fc1(self.avg_pool(x))))
max_out = self.fc2(self.relu1(self.fc1(self.max_pool(x))))
out = avg_out + max_out
return self.sigmoid(out)
class SpatialAttention(nn.Module):
def __init__(self, kernel_size=7):
super().__init__()
assert kernel_size in (3, 7), 'kernel size must be 3 or 7'
padding = 3 if kernel_size == 7 else 1
self.conv1 = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
avg_out = torch.mean(x, dim=1, keepdim=True)
max_out, _ = torch.max(x, dim=1, keepdim=True)
x = torch.cat([avg_out, max_out], dim=1)
x = self.conv1(x)
return self.sigmoid(x)
class BasicConv2d(nn.Module):
def __init__(self, in_planes, out_planes, kernel_size, stride=1, padding=0, dilation=1):
super().__init__()
self.conv = nn.Conv2d(in_planes, out_planes,
kernel_size=kernel_size, stride=stride,
padding=padding, dilation=dilation, bias=False)
self.bn = nn.BatchNorm2d(out_planes)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
return x
class Encoder(nn.Module):
def __init__(self, pretrain_path):
super().__init__()
self.backbone = pvt_v2_b2()
if pretrain_path is None:
warnings.warn('please provide the pretrained pvt model. Not using pretrained model.')
elif not os.path.isfile(pretrain_path):
warnings.warn(f'path: {pretrain_path} does not exists. Not using pretrained model.')
else:
print(f"using pretrained file: {pretrain_path}")
save_model = torch.load(pretrain_path)
model_dict = self.backbone.state_dict()
state_dict = {k: v for k, v in save_model.items() if k in model_dict.keys()}
model_dict.update(state_dict)
self.backbone.load_state_dict(model_dict)
def forward(self, x):
f1, f2, f3, f4 = self.backbone(x) # (x: 3, 352, 352)
return f1, f2, f3, f4
class SDI(nn.Module):
def __init__(self, channel):
super().__init__()
self.convs = nn.ModuleList(
[nn.Conv2d(channel, channel, kernel_size=3, stride=1, padding=1) for _ in range(4)])
def forward(self, xs, anchor):
ans = torch.ones_like(anchor)
target_size = anchor.shape[-1]
for i, x in enumerate(xs):
if x.shape[-1] > target_size:
x = F.adaptive_avg_pool2d(x, (target_size, target_size))
elif x.shape[-1] < target_size:
x = F.interpolate(x, size=(target_size, target_size),
mode='bilinear', align_corners=True)
ans = ans * self.convs[i](x)
return ans
class UNetV2(nn.Module):
"""
use SpatialAtt + ChannelAtt
"""
def __init__(self, channel=3, n_classes=1, deep_supervision=True, pretrained_path=None):
super().__init__()
self.deep_supervision = deep_supervision
self.encoder = Encoder(pretrained_path)
self.ca_1 = ChannelAttention(64)
self.sa_1 = SpatialAttention()
self.ca_2 = ChannelAttention(128)
self.sa_2 = SpatialAttention()
self.ca_3 = ChannelAttention(320)
self.sa_3 = SpatialAttention()
self.ca_4 = ChannelAttention(512)
self.sa_4 = SpatialAttention()
self.Translayer_1 = BasicConv2d(64, channel, 1)
self.Translayer_2 = BasicConv2d(128, channel, 1)
self.Translayer_3 = BasicConv2d(320, channel, 1)
self.Translayer_4 = BasicConv2d(512, channel, 1)
self.sdi_1 = SDI(channel)
self.sdi_2 = SDI(channel)
self.sdi_3 = SDI(channel)
self.sdi_4 = SDI(channel)
self.seg_outs = nn.ModuleList([
nn.Conv2d(channel, n_classes, 1, 1) for _ in range(4)])
self.deconv2 = nn.ConvTranspose2d(channel, channel, kernel_size=4, stride=2, padding=1,
bias=False)
self.deconv3 = nn.ConvTranspose2d(channel, channel, kernel_size=4, stride=2,
padding=1, bias=False)
self.deconv4 = nn.ConvTranspose2d(channel, channel, kernel_size=4, stride=2,
padding=1, bias=False)
self.deconv5 = nn.ConvTranspose2d(channel, channel, kernel_size=4, stride=2,
padding=1, bias=False)
self.width_list = [i.size(1) for i in self.forward(torch.randn(1, 3, 640, 640))]
def forward(self, x):
seg_outs = []
f1, f2, f3, f4 = self.encoder(x)
f1 = self.ca_1(f1) * f1
f1 = self.sa_1(f1) * f1
f1 = self.Translayer_1(f1)
f2 = self.ca_2(f2) * f2
f2 = self.sa_2(f2) * f2
f2 = self.Translayer_2(f2)
f3 = self.ca_3(f3) * f3
f3 = self.sa_3(f3) * f3
f3 = self.Translayer_3(f3)
f4 = self.ca_4(f4) * f4
f4 = self.sa_4(f4) * f4
f4 = self.Translayer_4(f4)
f41 = self.sdi_4([f1, f2, f3, f4], f4)
f31 = self.sdi_3([f1, f2, f3, f4], f3)
f21 = self.sdi_2([f1, f2, f3, f4], f2)
f11 = self.sdi_1([f1, f2, f3, f4], f1)
seg_outs.append(self.seg_outs[0](f41))
y = self.deconv2(f41) + f31
seg_outs.append(self.seg_outs[1](y))
y = self.deconv3(y) + f21
seg_outs.append(self.seg_outs[2](y))
y = self.deconv4(y) + f11
seg_outs.append(self.seg_outs[3](y))
for i, o in enumerate(seg_outs):
seg_outs[i] = F.interpolate(o, scale_factor=4, mode='bilinear')
if self.deep_supervision:
return seg_outs[::-1]
else:
return seg_outs[-1]
class Mlp(nn.Module):
def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.):
super().__init__()
out_features = out_features or in_features
hidden_features = hidden_features or in_features
self.fc1 = nn.Linear(in_features, hidden_features)
self.dwconv = DWConv(hidden_features)
self.act = act_layer()
self.fc2 = nn.Linear(hidden_features, out_features)
self.drop = nn.Dropout(drop)
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
elif isinstance(m, nn.Conv2d):
fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
fan_out //= m.groups
m.weight.data.normal_(0, math.sqrt(2.0 / fan_out))
if m.bias is not None:
m.bias.data.zero_()
def forward(self, x, H, W):
x = self.fc1(x)
x = self.dwconv(x, H, W)
x = self.act(x)
x = self.drop(x)
x = self.fc2(x)
x = self.drop(x)
return x
class Attention(nn.Module):
def __init__(self, dim, num_heads=8, qkv_bias=False, qk_scale=None, attn_drop=0., proj_drop=0., sr_ratio=1):
super().__init__()
assert dim % num_heads == 0, f"dim {dim} should be divided by num_heads {num_heads}."
self.dim = dim
self.num_heads = num_heads
head_dim = dim // num_heads
self.scale = qk_scale or head_dim ** -0.5
self.q = nn.Linear(dim, dim, bias=qkv_bias)
self.kv = nn.Linear(dim, dim * 2, bias=qkv_bias)
self.attn_drop = nn.Dropout(attn_drop)
self.proj = nn.Linear(dim, dim)
self.proj_drop = nn.Dropout(proj_drop)
self.sr_ratio = sr_ratio
if sr_ratio > 1:
self.sr = nn.Conv2d(dim, dim, kernel_size=sr_ratio, stride=sr_ratio)
self.norm = nn.LayerNorm(dim)
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
elif isinstance(m, nn.Conv2d):
fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
fan_out //= m.groups
m.weight.data.normal_(0, math.sqrt(2.0 / fan_out))
if m.bias is not None:
m.bias.data.zero_()
def forward(self, x, H, W):
B, N, C = x.shape
q = self.q(x).reshape(B, N, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3)
if self.sr_ratio > 1:
x_ = x.permute(0, 2, 1).reshape(B, C, H, W)
x_ = self.sr(x_).reshape(B, C, -1).permute(0, 2, 1)
x_ = self.norm(x_)
kv = self.kv(x_).reshape(B, -1, 2, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
else:
kv = self.kv(x).reshape(B, -1, 2, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
k, v = kv[0], kv[1]
attn = (q @ k.transpose(-2, -1)) * self.scale
attn = attn.softmax(dim=-1)
attn = self.attn_drop(attn)
x = (attn @ v).transpose(1, 2).reshape(B, N, C)
x = self.proj(x)
x = self.proj_drop(x)
return x
class Block(nn.Module):
def __init__(self, dim, num_heads, mlp_ratio=4., qkv_bias=False, qk_scale=None, drop=0., attn_drop=0.,
drop_path=0., act_layer=nn.GELU, norm_layer=nn.LayerNorm, sr_ratio=1):
super().__init__()
self.norm1 = norm_layer(dim)
self.attn = Attention(
dim,
num_heads=num_heads, qkv_bias=qkv_bias, qk_scale=qk_scale,
attn_drop=attn_drop, proj_drop=drop, sr_ratio=sr_ratio)
# NOTE: drop path for stochastic depth, we shall see if this is better than dropout here
self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
self.norm2 = norm_layer(dim)
mlp_hidden_dim = int(dim * mlp_ratio)
self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop)
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
elif isinstance(m, nn.Conv2d):
fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
fan_out //= m.groups
m.weight.data.normal_(0, math.sqrt(2.0 / fan_out))
if m.bias is not None:
m.bias.data.zero_()
def forward(self, x, H, W):
x = x + self.drop_path(self.attn(self.norm1(x), H, W))
x = x + self.drop_path(self.mlp(self.norm2(x), H, W))
return x
class OverlapPatchEmbed(nn.Module):
""" Image to Patch Embedding
"""
def __init__(self, img_size=224, patch_size=7, stride=4, in_chans=3, embed_dim=768):
super().__init__()
img_size = to_2tuple(img_size)
patch_size = to_2tuple(patch_size)
self.img_size = img_size
self.patch_size = patch_size
self.H, self.W = img_size[0] // patch_size[0], img_size[1] // patch_size[1]
self.num_patches = self.H * self.W
self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=stride,
padding=(patch_size[0] // 2, patch_size[1] // 2))
self.norm = nn.LayerNorm(embed_dim)
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
elif isinstance(m, nn.Conv2d):
fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
fan_out //= m.groups
m.weight.data.normal_(0, math.sqrt(2.0 / fan_out))
if m.bias is not None:
m.bias.data.zero_()
def forward(self, x):
x = self.proj(x)
_, _, H, W = x.shape
x = x.flatten(2).transpose(1, 2)
x = self.norm(x)
return x, H, W
class PyramidVisionTransformerImpr(nn.Module):
def __init__(self, img_size=224, patch_size=16, in_chans=3, num_classes=1000, embed_dims=[64, 128, 256, 512],
num_heads=[1, 2, 4, 8], mlp_ratios=[4, 4, 4, 4], qkv_bias=False, qk_scale=None, drop_rate=0.,
attn_drop_rate=0., drop_path_rate=0., norm_layer=nn.LayerNorm,
depths=[3, 4, 6, 3], sr_ratios=[8, 4, 2, 1]):
super().__init__()
self.num_classes = num_classes
self.depths = depths
# patch_embed
self.patch_embed1 = OverlapPatchEmbed(img_size=img_size, patch_size=7, stride=4, in_chans=in_chans,
embed_dim=embed_dims[0])
self.patch_embed2 = OverlapPatchEmbed(img_size=img_size // 4, patch_size=3, stride=2, in_chans=embed_dims[0],
embed_dim=embed_dims[1])
self.patch_embed3 = OverlapPatchEmbed(img_size=img_size // 8, patch_size=3, stride=2, in_chans=embed_dims[1],
embed_dim=embed_dims[2])
self.patch_embed4 = OverlapPatchEmbed(img_size=img_size // 16, patch_size=3, stride=2, in_chans=embed_dims[2],
embed_dim=embed_dims[3])
# transformer encoder
dpr = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))] # stochastic depth decay rule
cur = 0
self.block1 = nn.ModuleList([Block(
dim=embed_dims[0], num_heads=num_heads[0], mlp_ratio=mlp_ratios[0], qkv_bias=qkv_bias, qk_scale=qk_scale,
drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[cur + i], norm_layer=norm_layer,
sr_ratio=sr_ratios[0])
for i in range(depths[0])])
self.norm1 = norm_layer(embed_dims[0])
cur += depths[0]
self.block2 = nn.ModuleList([Block(
dim=embed_dims[1], num_heads=num_heads[1], mlp_ratio=mlp_ratios[1], qkv_bias=qkv_bias, qk_scale=qk_scale,
drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[cur + i], norm_layer=norm_layer,
sr_ratio=sr_ratios[1])
for i in range(depths[1])])
self.norm2 = norm_layer(embed_dims[1])
cur += depths[1]
self.block3 = nn.ModuleList([Block(
dim=embed_dims[2], num_heads=num_heads[2], mlp_ratio=mlp_ratios[2], qkv_bias=qkv_bias, qk_scale=qk_scale,
drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[cur + i], norm_layer=norm_layer,
sr_ratio=sr_ratios[2])
for i in range(depths[2])])
self.norm3 = norm_layer(embed_dims[2])
cur += depths[2]
self.block4 = nn.ModuleList([Block(
dim=embed_dims[3], num_heads=num_heads[3], mlp_ratio=mlp_ratios[3], qkv_bias=qkv_bias, qk_scale=qk_scale,
drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[cur + i], norm_layer=norm_layer,
sr_ratio=sr_ratios[3])
for i in range(depths[3])])
self.norm4 = norm_layer(embed_dims[3])
# classification head
# self.head = nn.Linear(embed_dims[3], num_classes) if num_classes > 0 else nn.Identity()
self.apply(self._init_weights)
self.width_list = [i.size(1) for i in self.forward(torch.randn(1, 3, 640, 640))]
def _init_weights(self, m):
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
elif isinstance(m, nn.Conv2d):
fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
fan_out //= m.groups
m.weight.data.normal_(0, math.sqrt(2.0 / fan_out))
if m.bias is not None:
m.bias.data.zero_()
def init_weights(self, pretrained=None):
if isinstance(pretrained, str):
logger = 1
#load_checkpoint(self, pretrained, map_location='cpu', strict=False, logger=logger)
def reset_drop_path(self, drop_path_rate):
dpr = [x.item() for x in torch.linspace(0, drop_path_rate, sum(self.depths))]
cur = 0
for i in range(self.depths[0]):
self.block1[i].drop_path.drop_prob = dpr[cur + i]
cur += self.depths[0]
for i in range(self.depths[1]):
self.block2[i].drop_path.drop_prob = dpr[cur + i]
cur += self.depths[1]
for i in range(self.depths[2]):
self.block3[i].drop_path.drop_prob = dpr[cur + i]
cur += self.depths[2]
for i in range(self.depths[3]):
self.block4[i].drop_path.drop_prob = dpr[cur + i]
def freeze_patch_emb(self):
self.patch_embed1.requires_grad = False
@torch.jit.ignore
def no_weight_decay(self):
return {'pos_embed1', 'pos_embed2', 'pos_embed3', 'pos_embed4', 'cls_token'} # has pos_embed may be better
def get_classifier(self):
return self.head
def reset_classifier(self, num_classes, global_pool=''):
self.num_classes = num_classes
self.head = nn.Linear(self.embed_dim, num_classes) if num_classes > 0 else nn.Identity()
# def _get_pos_embed(self, pos_embed, patch_embed, H, W):
# if H * W == self.patch_embed1.num_patches:
# return pos_embed
# else:
# return F.interpolate(
# pos_embed.reshape(1, patch_embed.H, patch_embed.W, -1).permute(0, 3, 1, 2),
# size=(H, W), mode="bilinear").reshape(1, -1, H * W).permute(0, 2, 1)
def forward_features(self, x):
B = x.shape[0]
outs = []
# stage 1
x, H, W = self.patch_embed1(x)
for i, blk in enumerate(self.block1):
x = blk(x, H, W)
x = self.norm1(x)
x = x.reshape(B, H, W, -1).permute(0, 3, 1, 2).contiguous()
outs.append(x)
# stage 2
x, H, W = self.patch_embed2(x)
for i, blk in enumerate(self.block2):
x = blk(x, H, W)
x = self.norm2(x)
x = x.reshape(B, H, W, -1).permute(0, 3, 1, 2).contiguous()
outs.append(x)
# stage 3
x, H, W = self.patch_embed3(x)
for i, blk in enumerate(self.block3):
x = blk(x, H, W)
x = self.norm3(x)
x = x.reshape(B, H, W, -1).permute(0, 3, 1, 2).contiguous()
outs.append(x)
# stage 4
x, H, W = self.patch_embed4(x)
for i, blk in enumerate(self.block4):
x = blk(x, H, W)
x = self.norm4(x)
x = x.reshape(B, H, W, -1).permute(0, 3, 1, 2).contiguous()
outs.append(x)
return outs
# return x.mean(dim=1)
def forward(self, x):
x = self.forward_features(x)
# x = self.head(x)
return x
class DWConv(nn.Module):
def __init__(self, dim=768):
super().__init__()
self.dwconv = nn.Conv2d(dim, dim, 3, 1, 1, bias=True, groups=dim)
def forward(self, x, H, W):
B, N, C = x.shape
x = x.transpose(1, 2).view(B, C, H, W)
x = self.dwconv(x)
x = x.flatten(2).transpose(1, 2)
return x
def _conv_filter(state_dict, patch_size=16):
""" convert patch embedding weight from manual patchify + linear proj to conv"""
out_dict = {}
for k, v in state_dict.items():
if 'patch_embed.proj.weight' in k:
v = v.reshape((v.shape[0], 3, patch_size, patch_size))
out_dict[k] = v
return out_dict
class pvt_v2_b0(PyramidVisionTransformerImpr):
def __init__(self, **kwargs):
super().__init__(
patch_size=4, embed_dims=[32, 64, 160, 256], num_heads=[1, 2, 5, 8], mlp_ratios=[8, 8, 4, 4],
qkv_bias=True, norm_layer=partial(nn.LayerNorm, eps=1e-6), depths=[2, 2, 2, 2], sr_ratios=[8, 4, 2, 1],
drop_rate=0.0, drop_path_rate=0.1)
class pvt_v2_b1(PyramidVisionTransformerImpr):
def __init__(self, **kwargs):
super().__init__(
patch_size=4, embed_dims=[64, 128, 320, 512], num_heads=[1, 2, 5, 8], mlp_ratios=[8, 8, 4, 4],
qkv_bias=True, norm_layer=partial(nn.LayerNorm, eps=1e-6), depths=[2, 2, 2, 2], sr_ratios=[8, 4, 2, 1],
drop_rate=0.0, drop_path_rate=0.1)
class pvt_v2_b2(PyramidVisionTransformerImpr):
def __init__(self, **kwargs):
super().__init__(
patch_size=4, embed_dims=[64, 128, 320, 512], num_heads=[1, 2, 5, 8], mlp_ratios=[8, 8, 4, 4],
qkv_bias=True, norm_layer=partial(nn.LayerNorm, eps=1e-6), depths=[3, 4, 6, 3], sr_ratios=[8, 4, 2, 1],
drop_rate=0.0, drop_path_rate=0.1)
class pvt_v2_b3(PyramidVisionTransformerImpr):
def __init__(self, **kwargs):
super().__init__(
patch_size=4, embed_dims=[64, 128, 320, 512], num_heads=[1, 2, 5, 8], mlp_ratios=[8, 8, 4, 4],
qkv_bias=True, norm_layer=partial(nn.LayerNorm, eps=1e-6), depths=[3, 4, 18, 3], sr_ratios=[8, 4, 2, 1],
drop_rate=0.0, drop_path_rate=0.1)
class pvt_v2_b4(PyramidVisionTransformerImpr):
def __init__(self, **kwargs):
super().__init__(
patch_size=4, embed_dims=[64, 128, 320, 512], num_heads=[1, 2, 5, 8], mlp_ratios=[8, 8, 4, 4],
qkv_bias=True, norm_layer=partial(nn.LayerNorm, eps=1e-6), depths=[3, 8, 27, 3], sr_ratios=[8, 4, 2, 1],
drop_rate=0.0, drop_path_rate=0.1)
class pvt_v2_b5(PyramidVisionTransformerImpr):
def __init__(self, **kwargs):
super().__init__(
patch_size=4, embed_dims=[64, 128, 320, 512], num_heads=[1, 2, 5, 8], mlp_ratios=[4, 4, 4, 4],
qkv_bias=True, norm_layer=partial(nn.LayerNorm, eps=1e-6), depths=[3, 6, 40, 3], sr_ratios=[8, 4, 2, 1],
drop_rate=0.0, drop_path_rate=0.1)
if __name__ == "__main__":
pretrained_path = "/afs/crc.nd.edu/user/y/ypeng4/Polyp-PVT_2/pvt_pth/pvt_v2_b2.pth"
model = pvt_v2_b5()
x = torch.rand((1, 3, 640, 640))
ys = model(x)
print(len(ys))
for y in ys:
print(y.shape)
四、修改步骤
4.1 修改一
① 在
ultralytics/nn/
目录下新建
AddModules
文件夹用于存放模块代码
② 在
AddModules
文件夹下新建
UNetV2.py
,将
第三节
中的代码粘贴到此处
4.2 修改二
在
AddModules
文件夹下新建
__init__.py
(已有则不用新建),在文件内导入模块:
from .UNetV2 import *
4.3 修改三
在
ultralytics/nn/modules/tasks.py
文件中,需要添加各模块类。
① 首先:导入模块
② 在BaseModel类的predict函数中,在如下两处位置中去掉
embed
参数:
③ 在BaseModel类的_predict_once函数,替换如下代码:
def _predict_once(self, x, profile=False, visualize=False):
"""
Perform a forward pass through the network.
Args:
x (torch.Tensor): The input tensor to the model.
profile (bool): Print the computation time of each layer if True, defaults to False.
visualize (bool): Save the feature maps of the model if True, defaults to False.
Returns:
(torch.Tensor): The last output of the model.
"""
y, dt = [], [] # outputs
for m in self.model:
if m.f != -1: # if not from previous layer
x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers
if profile:
self._profile_one_layer(m, x, dt)
x = m(x) # run
y.append(x if m.i in self.save else None) # save output
if visualize:
feature_visualization(x, m.type, m.i, save_dir=visualize)
return x
④ 将
RTDETRDetectionModel类
中的
predict函数
完整替换:
def predict(self, x, profile=False, visualize=False, batch=None, augment=False):
"""
Perform a forward pass through the model.
Args:
x (torch.Tensor): The input tensor.
profile (bool, optional): If True, profile the computation time for each layer. Defaults to False.
visualize (bool, optional): If True, save feature maps for visualization. Defaults to False.
batch (dict, optional): Ground truth data for evaluation. Defaults to None.
augment (bool, optional): If True, perform data augmentation during inference. Defaults to False.
Returns:
(torch.Tensor): Model's output tensor.
"""
y, dt = [], [] # outputs
for m in self.model[:-1]: # except the head part
if m.f != -1: # if not from previous layer
x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers
if profile:
self._profile_one_layer(m, x, dt)
if hasattr(m, 'backbone'):
x = m(x)
for _ in range(5 - len(x)):
x.insert(0, None)
for i_idx, i in enumerate(x):
if i_idx in self.save:
y.append(i)
else:
y.append(None)
# for i in x:
# if i is not None:
# print(i.size())
x = x[-1]
else:
x = m(x) # run
y.append(x if m.i in self.save else None) # save output
if visualize:
feature_visualization(x, m.type, m.i, save_dir=visualize)
head = self.model[-1]
x = head([y[j] for j in head.f], batch) # head inference
return x
⑤ 在
parse_model函数
如下位置替换如下代码:
if verbose:
LOGGER.info(f"\n{'':>3}{'from':>20}{'n':>3}{'params':>10} {'module':<45}{'arguments':<30}")
ch = [ch]
layers, save, c2 = [], [], ch[-1] # layers, savelist, ch out
is_backbone = False
for i, (f, n, m, args) in enumerate(d['backbone'] + d['head']): # from, number, module, args
try:
if m == 'node_mode':
m = d[m]
if len(args) > 0:
if args[0] == 'head_channel':
args[0] = int(d[args[0]])
t = m
m = getattr(torch.nn, m[3:]) if 'nn.' in m else globals()[m] # get module
except:
pass
for j, a in enumerate(args):
if isinstance(a, str):
with contextlib.suppress(ValueError):
try:
args[j] = locals()[a] if a in locals() else ast.literal_eval(a)
except:
args[j] = a
替换后如下:
⑥ 在
parse_model
函数,添加如下代码。
elif m in {
pvt_v2_b0, pvt_v2_b1, pvt_v2_b2, pvt_v2_b3, pvt_v2_b4, pvt_v2_b5
}:
m = m(*args)
c2 = m.width_list
⑦ 在
parse_model函数
如下位置替换如下代码:
if isinstance(c2, list):
is_backbone = True
m_ = m
m_.backbone = True
else:
m_ = nn.Sequential(*(m(*args) for _ in range(n))) if n > 1 else m(*args) # module
t = str(m)[8:-2].replace('__main__.', '') # module type
m_.np = sum(x.numel() for x in m_.parameters()) # number params
m_.i, m_.f, m_.type = i + 4 if is_backbone else i, f, t # attach index, 'from' index, type
if verbose:
LOGGER.info(f'{i:>3}{str(f):>20}{n_:>3}{m_.np:10.0f} {t:<45}{str(args):<30}') # print
save.extend(x % (i + 4 if is_backbone else i) for x in ([f] if isinstance(f, int) else f) if x != -1) # append to savelist
layers.append(m_)
if i == 0:
ch = []
if isinstance(c2, list):
ch.extend(c2)
for _ in range(5 - len(ch)):
ch.insert(0, 0)
else:
ch.append(c2)
return nn.Sequential(*layers), sorted(save)
⑧ 在
ultralytics\nn\autobackend.py
文件的
AutoBackend类
中的
forward函数
,完整替换如下代码:
def forward(self, im, augment=False, visualize=False):
"""
Runs inference on the YOLOv8 MultiBackend model.
Args:
im (torch.Tensor): The image tensor to perform inference on.
augment (bool): whether to perform data augmentation during inference, defaults to False
visualize (bool): whether to visualize the output predictions, defaults to False
Returns:
(tuple): Tuple containing the raw output tensor, and processed output for visualization (if visualize=True)
"""
b, ch, h, w = im.shape # batch, channel, height, width
if self.fp16 and im.dtype != torch.float16:
im = im.half() # to FP16
if self.nhwc:
im = im.permute(0, 2, 3, 1) # torch BCHW to numpy BHWC shape(1,320,192,3)
if self.pt or self.nn_module: # PyTorch
y = self.model(im, augment=augment, visualize=visualize) if augment or visualize else self.model(im)
elif self.jit: # TorchScript
y = self.model(im)
elif self.dnn: # ONNX OpenCV DNN
im = im.cpu().numpy() # torch to numpy
self.net.setInput(im)
y = self.net.forward()
elif self.onnx: # ONNX Runtime
im = im.cpu().numpy() # torch to numpy
y = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im})
elif self.xml: # OpenVINO
im = im.cpu().numpy() # FP32
y = list(self.ov_compiled_model(im).values())
elif self.engine: # TensorRT
if self.dynamic and im.shape != self.bindings['images'].shape:
i = self.model.get_binding_index('images')
self.context.set_binding_shape(i, im.shape) # reshape if dynamic
self.bindings['images'] = self.bindings['images']._replace(shape=im.shape)
for name in self.output_names:
i = self.model.get_binding_index(name)
self.bindings[name].data.resize_(tuple(self.context.get_binding_shape(i)))
s = self.bindings['images'].shape
assert im.shape == s, f"input size {im.shape} {'>' if self.dynamic else 'not equal to'} max model size {s}"
self.binding_addrs['images'] = int(im.data_ptr())
self.context.execute_v2(list(self.binding_addrs.values()))
y = [self.bindings[x].data for x in sorted(self.output_names)]
elif self.coreml: # CoreML
im = im[0].cpu().numpy()
im_pil = Image.fromarray((im * 255).astype('uint8'))
# im = im.resize((192, 320), Image.BILINEAR)
y = self.model.predict({'image': im_pil}) # coordinates are xywh normalized
if 'confidence' in y:
raise TypeError('Ultralytics only supports inference of non-pipelined CoreML models exported with '
f"'nms=False', but 'model={w}' has an NMS pipeline created by an 'nms=True' export.")
# TODO: CoreML NMS inference handling
# from ultralytics.utils.ops import xywh2xyxy
# box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]]) # xyxy pixels
# conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float32)
# y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1)
elif len(y) == 1: # classification model
y = list(y.values())
elif len(y) == 2: # segmentation model
y = list(reversed(y.values())) # reversed for segmentation models (pred, proto)
elif self.paddle: # PaddlePaddle
im = im.cpu().numpy().astype(np.float32)
self.input_handle.copy_from_cpu(im)
self.predictor.run()
y = [self.predictor.get_output_handle(x).copy_to_cpu() for x in self.output_names]
elif self.ncnn: # ncnn
mat_in = self.pyncnn.Mat(im[0].cpu().numpy())
ex = self.net.create_extractor()
input_names, output_names = self.net.input_names(), self.net.output_names()
ex.input(input_names[0], mat_in)
y = []
for output_name in output_names:
mat_out = self.pyncnn.Mat()
ex.extract(output_name, mat_out)
y.append(np.array(mat_out)[None])
elif self.triton: # NVIDIA Triton Inference Server
im = im.cpu().numpy() # torch to numpy
y = self.model(im)
else: # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU)
im = im.cpu().numpy()
if self.saved_model: # SavedModel
y = self.model(im, training=False) if self.keras else self.model(im)
if not isinstance(y, list):
y = [y]
elif self.pb: # GraphDef
y = self.frozen_func(x=self.tf.constant(im))
if len(y) == 2 and len(self.names) == 999: # segments and names not defined
ip, ib = (0, 1) if len(y[0].shape) == 4 else (1, 0) # index of protos, boxes
nc = y[ib].shape[1] - y[ip].shape[3] - 4 # y = (1, 160, 160, 32), (1, 116, 8400)
self.names = {i: f'class{i}' for i in range(nc)}
else: # Lite or Edge TPU
details = self.input_details[0]
integer = details['dtype'] in (np.int8, np.int16) # is TFLite quantized int8 or int16 model
if integer:
scale, zero_point = details['quantization']
im = (im / scale + zero_point).astype(details['dtype']) # de-scale
self.interpreter.set_tensor(details['index'], im)
self.interpreter.invoke()
y = []
for output in self.output_details:
x = self.interpreter.get_tensor(output['index'])
if integer:
scale, zero_point = output['quantization']
x = (x.astype(np.float32) - zero_point) * scale # re-scale
if x.ndim > 2: # if task is not classification
# Denormalize xywh by image size. See https://github.com/ultralytics/ultralytics/pull/1695
# xywh are normalized in TFLite/EdgeTPU to mitigate quantization error of integer models
x[:, [0, 2]] *= w
x[:, [1, 3]] *= h
y.append(x)
# TF segment fixes: export is reversed vs ONNX export and protos are transposed
if len(y) == 2: # segment with (det, proto) output order reversed
if len(y[1].shape) != 4:
y = list(reversed(y)) # should be y = (1, 116, 8400), (1, 160, 160, 32)
y[1] = np.transpose(y[1], (0, 3, 1, 2)) # should be y = (1, 116, 8400), (1, 32, 160, 160)
y = [x if isinstance(x, np.ndarray) else x.numpy() for x in y]
# for x in y:
# print(type(x), len(x)) if isinstance(x, (list, tuple)) else print(type(x), x.shape) # debug shapes
if isinstance(y, (list, tuple)):
return self.from_numpy(y[0]) if len(y) == 1 else [self.from_numpy(x) for x in y]
else:
return self.from_numpy(y)
至此就修改完成了,可以配置模型开始训练了
五、yaml模型文件
5.1 模型改进⭐
在代码配置完成后,配置模型的YAML文件。
此处以
ultralytics/cfg/models/rt-detr/rtdetr-l.yaml
为例,在同目录下创建一个用于自己数据集训练的模型文件
rtdetr-UNetV2.yaml
。
将
rtdetr-l.yaml
中的内容复制到
rtdetr-UNetV2.yaml
文件下,修改
nc
数量等于自己数据中目标的数量。
📌 模型的修改方法是将
骨干网络
替换成
pvt_v2_b1
。
# Ultralytics YOLO 🚀, AGPL-3.0 license
# RT-DETR-l object detection model with P3-P5 outputs. For details see https://docs.ultralytics.com/models/rtdetr
# Parameters
nc: 1 # number of classes
scales: # model compound scaling constants, i.e. 'model=yolov8n-cls.yaml' will call yolov8-cls.yaml with scale 'n'
# [depth, width, max_channels]
l: [1.00, 1.00, 1024]
backbone:
# [from, repeats, module, args]
- [-1, 1, pvt_v2_b0, []] # 4
head:
- [-1, 1, Conv, [256, 1, 1, None, 1, 1, False]] # 5 input_proj.2
- [-1, 1, AIFI, [1024, 8]] # 6
- [-1, 1, Conv, [256, 1, 1]] # 7, Y5, lateral_convs.0
- [-1, 1, nn.Upsample, [None, 2, 'nearest']] # 8
- [3, 1, Conv, [256, 1, 1, None, 1, 1, False]] # 9 input_proj.1
- [[-2, -1], 1, Concat, [1]] # 10
- [-1, 3, RepC3, [256]] # 11, fpn_blocks.0
- [-1, 1, Conv, [256, 1, 1]] # 12, Y4, lateral_convs.1
- [-1, 1, nn.Upsample, [None, 2, 'nearest']] # 13
- [2, 1, Conv, [256, 1, 1, None, 1, 1, False]] # 14 input_proj.0
- [[-2, -1], 1, Concat, [1]] # 15 cat backbone P4
- [-1, 3, RepC3, [256]] # X3 (16), fpn_blocks.1
- [-1, 1, Conv, [256, 3, 2]] # 17, downsample_convs.0
- [[-1, 12], 1, Concat, [1]] # 18 cat Y4
- [-1, 3, RepC3, [256]] # F4 (19), pan_blocks.0
- [-1, 1, Conv, [256, 3, 2]] # 20, downsample_convs.1
- [[-1, 7], 1, Concat, [1]] # 21 cat Y5
- [-1, 3, RepC3, [256]] # F5 (22), pan_blocks.1
- [[16, 19, 22], 1, RTDETRDecoder, [nc]] # Detect(P3, P4, P5)
六、成功运行结果
分别打印网络模型可以看到
UNetV2模块
已经加入到模型中,并可以进行训练了。
rtdetr-UNetV2 :
rtdetr-UNetV2 summary: 539 layers, 21,869,891 parameters, 21,869,891 gradients, 69.2 GFLOPs
from n params module arguments
0 -1 1 3409760 pvt_v2_b0 []
1 -1 1 66048 ultralytics.nn.modules.conv.Conv [256, 256, 1, 1, None, 1, 1, False]
2 -1 1 789760 ultralytics.nn.modules.transformer.AIFI [256, 1024, 8]
3 -1 1 66048 ultralytics.nn.modules.conv.Conv [256, 256, 1, 1]
4 -1 1 0 torch.nn.modules.upsampling.Upsample [None, 2, 'nearest']
5 3 1 41472 ultralytics.nn.modules.conv.Conv [160, 256, 1, 1, None, 1, 1, False]
6 [-2, -1] 1 0 ultralytics.nn.modules.conv.Concat [1]
7 -1 3 2232320 ultralytics.nn.modules.block.RepC3 [512, 256, 3]
8 -1 1 66048 ultralytics.nn.modules.conv.Conv [256, 256, 1, 1]
9 -1 1 0 torch.nn.modules.upsampling.Upsample [None, 2, 'nearest']
10 2 1 16896 ultralytics.nn.modules.conv.Conv [64, 256, 1, 1, None, 1, 1, False]
11 [-2, -1] 1 0 ultralytics.nn.modules.conv.Concat [1]
12 -1 3 2232320 ultralytics.nn.modules.block.RepC3 [512, 256, 3]
13 -1 1 590336 ultralytics.nn.modules.conv.Conv [256, 256, 3, 2]
14 [-1, 12] 1 0 ultralytics.nn.modules.conv.Concat [1]
15 -1 3 2232320 ultralytics.nn.modules.block.RepC3 [512, 256, 3]
16 -1 1 590336 ultralytics.nn.modules.conv.Conv [256, 256, 3, 2]
17 [-1, 7] 1 0 ultralytics.nn.modules.conv.Concat [1]
18 -1 3 2232320 ultralytics.nn.modules.block.RepC3 [512, 256, 3]
19 [16, 19, 22] 1 7303907 ultralytics.nn.modules.head.RTDETRDecoder [1, [256, 256, 256]]
rtdetr-UNetV2 summary: 539 layers, 21,869,891 parameters, 21,869,891 gradients, 69.2 GFLOPs