RT-DETR改进策略【模型轻量化】| 替换骨干网络为 MobileViTv1高效的信息编码与融合模块,获取局部和全局信息
一、本文介绍
本文记录的是
基于MobileNet V4的RT-DETR目标检测轻量化改进方法研究
。其中
MobileViT块
旨在以较少的参数对输入张量中的
局部和全局信息
进行建模,
结合卷积与 Transformer 的优势,并实现有效的信息编码与融合
,。本文配置了原论文中
MobileViT-S
、
MobileViT-XS
和
MobileViT-XXS
三种模型,以满足不同的需求。
| 模型 | 参数量 | 计算量 | 推理速度 |
|---|---|---|---|
| rtdetr-l | 32.8M | 108.0GFLOPs | 11.6ms |
| Improved | 12.0M | 32.3GFLOPs | 8.9ms |
二、MobileViT V1模型轻量化设计
MOBILEVIT: LIGHT-WEIGHT, GENERAL-PURPOSE, AND MOBILE-FRIENDLY VISION TRANSFORMER
2.1 出发点
- 轻量级卷积神经网络(CNNs)虽在移动视觉任务中有应用,但存在空间局限性。基于自注意力的视觉Transformer(ViTs)虽可学习全局表示,但模型较重。因此需要结合两者优势,构建适用于移动视觉任务的轻量级、低延迟网络。
2.2 原理
-
将
Transformer视为卷积,学习全局表示的同时,隐式地结合卷积的特性(如空间偏差),并能用简单的训练方法(如基本的数据增强)学习表示,还能轻松与下游架构集成。
2.3 结构
2.3.1 MobileViT块
- 对于输入张量 x ∈ R H × W × C x\in\mathbb{R}^{H\times W\times C} x ∈ R H × W × C ,先应用 n × n n\times n n × n 标准卷积层,再用点式(或 1 × 1 1\times1 1 × 1 )卷积层得到 X L ∈ R H × W × d X_{L}\in\mathbb{R}^{H\times W\times d} X L ∈ R H × W × d 。
- 将 X L X_{L} X L 展开为 N N N 个非重叠的扁平块 X U ∈ R P × N × d X_{U}\in\mathbb{R}^{P\times N\times d} X U ∈ R P × N × d ,对每个块应用Transformer得到 X G ∈ R P × N × d X_{G}\in\mathbb{R}^{P\times N\times d} X G ∈ R P × N × d 。
- 折叠 X G X_{G} X G 得到 X F ∈ R H × W × d X_{F}\in\mathbb{R}^{H\times W\times d} X F ∈ R H × W × d ,经点式卷积投影到低维空间后与 X X X 通过拼接操作合并,再用一个 n × n n\times n n × n 卷积层融合这些拼接特征。
2.3.2 整体架构
-
受轻量级CNNs启发,网络在不同参数预算下有不同配置。初始层是步长为
3
×
3
3\times3
3
×
3
的标准卷积,接着是
MobileNetv2块和MobileViT块。使用Swish作为激活函数,在MobileViT块中 n = 3 n = 3 n = 3 ,特征图的空间维度通常是 2 2 2 的倍数且 h , w ≤ n h, w\leq n h , w ≤ n ,设置 h = w = 2 h = w = 2 h = w = 2 。
2.4 优势
-
性能更好
:在不同移动视觉任务中,对于给定的参数预算,
MobileViT比现有的轻量级CNNs性能更好。例如在ImageNet - 1k数据集上,约600万个参数时,MobileViT的top - 1准确率比MobileNetv3高3.2%。 -
泛化能力强
:泛化能力指训练和评估指标之间的差距。与之前的ViT变体相比,
MobileViT显示出 更好的泛化能力 。 - 鲁棒性好 :对超参数(如数据增强和L2正则化)不敏感,用基本的数据增强方法训练,对L2正则化不太敏感。
-
计算成本
:理论上,
MobileViT的 多头自注意力 计算成本为 O ( N 2 P d ) O(N^{2}Pd) O ( N 2 P d ) ,比ViT的 O ( N 2 d ) O(N^{2}d) O ( N 2 d ) 低效,但实际上更高效。例如在ImageNet - 1K数据集上,MobileViT比DeIT的FLOP少 2 X 2X 2 X ,准确率高1.8%。
论文: https://arxiv.org/pdf/2110.02178
源码: https://github.com/apple/ml-cvnets
三、MobileViTV1模块的实现代码
MobileViTV1模块
的实现代码如下:
"""
original code from apple:
https://github.com/apple/ml-cvnets/blob/main/cvnets/models/classification/mobilevit.py
"""
import math
import numpy as np
import torch
import torch.nn as nn
from torch import Tensor
from torch.nn import functional as F
from typing import Tuple, Dict, Sequence
from typing import Union, Optional
__all__ = ['mobile_vit_small', 'mobile_vit_x_small', 'mobile_vit_xx_small']
def make_divisible(
v: Union[float, int],
divisor: Optional[int] = 8,
min_value: Optional[Union[float, int]] = None,
) -> Union[float, int]:
"""
This function is taken from the original tf repo.
It ensures that all layers have a channel number that is divisible by 8
It can be seen here:
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
:param v:
:param divisor:
:param min_value:
:return:
"""
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
def bound_fn(
min_val: Union[float, int], max_val: Union[float, int], value: Union[float, int]
) -> Union[float, int]:
return max(min_val, min(max_val, value))
def get_config(mode: str = "xxs") -> dict:
width_multiplier = 0.5
ffn_multiplier = 2
layer_0_dim = bound_fn(min_val=16, max_val=64, value=32 * width_multiplier)
layer_0_dim = int(make_divisible(layer_0_dim, divisor=8, min_value=16))
# print("layer_0_dim: ", layer_0_dim)
if mode == "xx_small":
mv2_exp_mult = 2
config = {
"layer1": {
"out_channels": 16,
"expand_ratio": mv2_exp_mult,
"num_blocks": 1,
"stride": 1,
"block_type": "mv2",
},
"layer2": {
"out_channels": 24,
"expand_ratio": mv2_exp_mult,
"num_blocks": 3,
"stride": 2,
"block_type": "mv2",
},
"layer3": { # 28x28
"out_channels": 48,
"transformer_channels": 64,
"ffn_dim": 128,
"transformer_blocks": 2,
"patch_h": 2, # 8,
"patch_w": 2, # 8,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"num_heads": 4,
"block_type": "mobilevit",
},
"layer4": { # 14x14
"out_channels": 64,
"transformer_channels": 80,
"ffn_dim": 160,
"transformer_blocks": 4,
"patch_h": 2, # 4,
"patch_w": 2, # 4,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"num_heads": 4,
"block_type": "mobilevit",
},
"layer5": { # 7x7
"out_channels": 80,
"transformer_channels": 96,
"ffn_dim": 192,
"transformer_blocks": 3,
"patch_h": 2,
"patch_w": 2,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"num_heads": 4,
"block_type": "mobilevit",
},
"last_layer_exp_factor": 4,
"cls_dropout": 0.1
}
elif mode == "x_small":
mv2_exp_mult = 4
config = {
"layer1": {
"out_channels": 32,
"expand_ratio": mv2_exp_mult,
"num_blocks": 1,
"stride": 1,
"block_type": "mv2",
},
"layer2": {
"out_channels": 48,
"expand_ratio": mv2_exp_mult,
"num_blocks": 3,
"stride": 2,
"block_type": "mv2",
},
"layer3": { # 28x28
"out_channels": 64,
"transformer_channels": 96,
"ffn_dim": 192,
"transformer_blocks": 2,
"patch_h": 2,
"patch_w": 2,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"num_heads": 4,
"block_type": "mobilevit",
},
"layer4": { # 14x14
"out_channels": 80,
"transformer_channels": 120,
"ffn_dim": 240,
"transformer_blocks": 4,
"patch_h": 2,
"patch_w": 2,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"num_heads": 4,
"block_type": "mobilevit",
},
"layer5": { # 7x7
"out_channels": 96,
"transformer_channels": 144,
"ffn_dim": 288,
"transformer_blocks": 3,
"patch_h": 2,
"patch_w": 2,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"num_heads": 4,
"block_type": "mobilevit",
},
"last_layer_exp_factor": 4,
"cls_dropout": 0.1
}
elif mode == "small":
mv2_exp_mult = 4
config = {
"layer1": {
"out_channels": 32,
"expand_ratio": mv2_exp_mult,
"num_blocks": 1,
"stride": 1,
"block_type": "mv2",
},
"layer2": {
"out_channels": 64,
"expand_ratio": mv2_exp_mult,
"num_blocks": 3,
"stride": 2,
"block_type": "mv2",
},
"layer3": { # 28x28
"out_channels": 96,
"transformer_channels": 144,
"ffn_dim": 288,
"transformer_blocks": 2,
"patch_h": 2,
"patch_w": 2,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"num_heads": 4,
"block_type": "mobilevit",
},
"layer4": { # 14x14
"out_channels": 128,
"transformer_channels": 192,
"ffn_dim": 384,
"transformer_blocks": 4,
"patch_h": 2,
"patch_w": 2,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"num_heads": 4,
"block_type": "mobilevit",
},
"layer5": { # 7x7
"out_channels": 160,
"transformer_channels": 240,
"ffn_dim": 480,
"transformer_blocks": 3,
"patch_h": 2,
"patch_w": 2,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"num_heads": 4,
"block_type": "mobilevit",
},
"last_layer_exp_factor": 4,
"cls_dropout": 0.1
}
elif mode == "2xx_small":
mv2_exp_mult = 2
config = {
"layer0": {
"img_channels": 3,
"out_channels": layer_0_dim,
},
"layer1": {
"out_channels": int(make_divisible(64 * width_multiplier, divisor=16)),
"expand_ratio": mv2_exp_mult,
"num_blocks": 1,
"stride": 1,
"block_type": "mv2",
},
"layer2": {
"out_channels": int(make_divisible(128 * width_multiplier, divisor=8)),
"expand_ratio": mv2_exp_mult,
"num_blocks": 2,
"stride": 2,
"block_type": "mv2",
},
"layer3": { # 28x28
"out_channels": int(make_divisible(256 * width_multiplier, divisor=8)),
"attn_unit_dim": int(make_divisible(128 * width_multiplier, divisor=8)),
"ffn_multiplier": ffn_multiplier,
"attn_blocks": 2,
"patch_h": 2,
"patch_w": 2,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"block_type": "mobilevit",
},
"layer4": { # 14x14
"out_channels": int(make_divisible(384 * width_multiplier, divisor=8)),
"attn_unit_dim": int(make_divisible(192 * width_multiplier, divisor=8)),
"ffn_multiplier": ffn_multiplier,
"attn_blocks": 4,
"patch_h": 2,
"patch_w": 2,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"block_type": "mobilevit",
},
"layer5": { # 7x7
"out_channels": int(make_divisible(512 * width_multiplier, divisor=8)),
"attn_unit_dim": int(make_divisible(256 * width_multiplier, divisor=8)),
"ffn_multiplier": ffn_multiplier,
"attn_blocks": 3,
"patch_h": 2,
"patch_w": 2,
"stride": 2,
"mv_expand_ratio": mv2_exp_mult,
"block_type": "mobilevit",
},
"last_layer_exp_factor": 4,
}
else:
raise NotImplementedError
for k in ["layer1", "layer2", "layer3", "layer4", "layer5"]:
config[k].update({"dropout": 0.1, "ffn_dropout": 0.0, "attn_dropout": 0.0})
return config
class ConvLayer(nn.Module):
"""
Applies a 2D convolution over an input
Args:
in_channels (int): :math:`C_{in}` from an expected input of size :math:`(N, C_{in}, H_{in}, W_{in})`
out_channels (int): :math:`C_{out}` from an expected output of size :math:`(N, C_{out}, H_{out}, W_{out})`
kernel_size (Union[int, Tuple[int, int]]): Kernel size for convolution.
stride (Union[int, Tuple[int, int]]): Stride for convolution. Default: 1
groups (Optional[int]): Number of groups in convolution. Default: 1
bias (Optional[bool]): Use bias. Default: ``False``
use_norm (Optional[bool]): Use normalization layer after convolution. Default: ``True``
use_act (Optional[bool]): Use activation layer after convolution (or convolution and normalization).
Default: ``True``
Shape:
- Input: :math:`(N, C_{in}, H_{in}, W_{in})`
- Output: :math:`(N, C_{out}, H_{out}, W_{out})`
.. note::
For depth-wise convolution, `groups=C_{in}=C_{out}`.
"""
def __init__(
self,
in_channels: int, # 输入通道数
out_channels: int, # 输出通道数
kernel_size: Union[int, Tuple[int, int]], # 卷积核大小
stride: Optional[Union[int, Tuple[int, int]]] = 1, # 步长
groups: Optional[int] = 1, # 分组卷积
bias: Optional[bool] = False, # 是否使用偏置
use_norm: Optional[bool] = True, # 是否使用归一化
use_act: Optional[bool] = True, # 是否使用激活函数
) -> None:
super().__init__()
if isinstance(kernel_size, int):
kernel_size = (kernel_size, kernel_size)
if isinstance(stride, int):
stride = (stride, stride)
assert isinstance(kernel_size, Tuple)
assert isinstance(stride, Tuple)
padding = (
int((kernel_size[0] - 1) / 2),
int((kernel_size[1] - 1) / 2),
)
block = nn.Sequential()
conv_layer = nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
groups=groups,
padding=padding,
bias=bias
)
block.add_module(name="conv", module=conv_layer)
if use_norm:
norm_layer = nn.BatchNorm2d(num_features=out_channels, momentum=0.1) # BatchNorm2d
block.add_module(name="norm", module=norm_layer)
if use_act:
act_layer = nn.SiLU() # Swish activation
block.add_module(name="act", module=act_layer)
self.block = block
def forward(self, x: Tensor) -> Tensor:
return self.block(x)
class MultiHeadAttention(nn.Module):
"""
This layer applies a multi-head self- or cross-attention as described in
`Attention is all you need <https://arxiv.org/abs/1706.03762>`_ paper
Args:
embed_dim (int): :math:`C_{in}` from an expected input of size :math:`(N, P, C_{in})`
num_heads (int): Number of heads in multi-head attention
attn_dropout (float): Attention dropout. Default: 0.0
bias (bool): Use bias or not. Default: ``True``
Shape:
- Input: :math:`(N, P, C_{in})` where :math:`N` is batch size, :math:`P` is number of patches,
and :math:`C_{in}` is input embedding dim
- Output: same shape as the input
"""
def __init__(
self,
embed_dim: int,
num_heads: int,
attn_dropout: float = 0.0,
bias: bool = True,
*args,
**kwargs
) -> None:
super().__init__()
if embed_dim % num_heads != 0:
raise ValueError(
"Embedding dim must be divisible by number of heads in {}. Got: embed_dim={} and num_heads={}".format(
self.__class__.__name__, embed_dim, num_heads
)
)
self.qkv_proj = nn.Linear(in_features=embed_dim, out_features=3 * embed_dim, bias=bias)
self.attn_dropout = nn.Dropout(p=attn_dropout)
self.out_proj = nn.Linear(in_features=embed_dim, out_features=embed_dim, bias=bias)
self.head_dim = embed_dim // num_heads
self.scaling = self.head_dim ** -0.5
self.softmax = nn.Softmax(dim=-1)
self.num_heads = num_heads
self.embed_dim = embed_dim
def forward(self, x_q: Tensor) -> Tensor:
# [N, P, C]
b_sz, n_patches, in_channels = x_q.shape
# self-attention
# [N, P, C] -> [N, P, 3C] -> [N, P, 3, h, c] where C = hc
qkv = self.qkv_proj(x_q).reshape(b_sz, n_patches, 3, self.num_heads, -1)
# [N, P, 3, h, c] -> [N, h, 3, P, C]
qkv = qkv.transpose(1, 3).contiguous()
# [N, h, 3, P, C] -> [N, h, P, C] x 3
query, key, value = qkv[:, :, 0], qkv[:, :, 1], qkv[:, :, 2]
query = query * self.scaling
# [N h, P, c] -> [N, h, c, P]
key = key.transpose(-1, -2)
# QK^T
# [N, h, P, c] x [N, h, c, P] -> [N, h, P, P]
attn = torch.matmul(query, key)
attn = self.softmax(attn)
attn = self.attn_dropout(attn)
# weighted sum
# [N, h, P, P] x [N, h, P, c] -> [N, h, P, c]
out = torch.matmul(attn, value)
# [N, h, P, c] -> [N, P, h, c] -> [N, P, C]
out = out.transpose(1, 2).reshape(b_sz, n_patches, -1)
out = self.out_proj(out)
return out
class TransformerEncoder(nn.Module):
"""
This class defines the pre-norm `Transformer encoder <https://arxiv.org/abs/1706.03762>`_
Args:
embed_dim (int): :math:`C_{in}` from an expected input of size :math:`(N, P, C_{in})`
ffn_latent_dim (int): Inner dimension of the FFN
num_heads (int) : Number of heads in multi-head attention. Default: 8
attn_dropout (float): Dropout rate for attention in multi-head attention. Default: 0.0
dropout (float): Dropout rate. Default: 0.0
ffn_dropout (float): Dropout between FFN layers. Default: 0.0
Shape:
- Input: :math:`(N, P, C_{in})` where :math:`N` is batch size, :math:`P` is number of patches,
and :math:`C_{in}` is input embedding dim
- Output: same shape as the input
"""
def __init__(
self,
embed_dim: int,
ffn_latent_dim: int,
num_heads: Optional[int] = 8,
attn_dropout: Optional[float] = 0.0,
dropout: Optional[float] = 0.0,
ffn_dropout: Optional[float] = 0.0,
*args,
**kwargs
) -> None:
super().__init__()
attn_unit = MultiHeadAttention(
embed_dim,
num_heads,
attn_dropout=attn_dropout,
bias=True
)
self.pre_norm_mha = nn.Sequential(
nn.LayerNorm(embed_dim),
attn_unit,
nn.Dropout(p=dropout)
)
self.pre_norm_ffn = nn.Sequential(
nn.LayerNorm(embed_dim),
nn.Linear(in_features=embed_dim, out_features=ffn_latent_dim, bias=True),
nn.SiLU(),
nn.Dropout(p=ffn_dropout),
nn.Linear(in_features=ffn_latent_dim, out_features=embed_dim, bias=True),
nn.Dropout(p=dropout)
)
self.embed_dim = embed_dim
self.ffn_dim = ffn_latent_dim
self.ffn_dropout = ffn_dropout
self.std_dropout = dropout
def forward(self, x: Tensor) -> Tensor:
# multi-head attention
res = x
x = self.pre_norm_mha(x)
x = x + res
# feed forward network
x = x + self.pre_norm_ffn(x)
return x
class LinearSelfAttention(nn.Module):
"""
This layer applies a self-attention with linear complexity, as described in `MobileViTv2 <https://arxiv.org/abs/2206.02680>`_ paper.
This layer can be used for self- as well as cross-attention.
Args:
opts: command line arguments
embed_dim (int): :math:`C` from an expected input of size :math:`(N, C, H, W)`
attn_dropout (Optional[float]): Dropout value for context scores. Default: 0.0
bias (Optional[bool]): Use bias in learnable layers. Default: True
Shape:
- Input: :math:`(N, C, P, N)` where :math:`N` is the batch size, :math:`C` is the input channels,
:math:`P` is the number of pixels in the patch, and :math:`N` is the number of patches
- Output: same as the input
.. note::
For MobileViTv2, we unfold the feature map [B, C, H, W] into [B, C, P, N] where P is the number of pixels
in a patch and N is the number of patches. Because channel is the first dimension in this unfolded tensor,
we use point-wise convolution (instead of a linear layer). This avoids a transpose operation (which may be
expensive on resource-constrained devices) that may be required to convert the unfolded tensor from
channel-first to channel-last format in case of a linear layer.
"""
def __init__(self,
embed_dim: int,
attn_dropout: Optional[float] = 0.0,
bias: Optional[bool] = True,
*args,
**kwargs) -> None:
super().__init__()
self.attn_dropout = nn.Dropout(p=attn_dropout)
self.qkv_proj = ConvLayer(
in_channels=embed_dim,
out_channels=embed_dim * 2 + 1,
kernel_size=1,
bias=bias,
use_norm=False,
use_act=False
)
self.out_proj = ConvLayer(
in_channels=embed_dim,
out_channels=embed_dim,
bias=bias,
kernel_size=1,
use_norm=False,
use_act=False,
)
self.embed_dim = embed_dim
def forward(self, x: Tensor, x_prev: Optional[Tensor] = None, *args, **kwargs) -> Tensor:
if x_prev is None:
return self._forward_self_attn(x, *args, **kwargs)
else:
return self._forward_cross_attn(x, x_prev, *args, **kwargs)
def _forward_self_attn(self, x: Tensor, *args, **kwargs) -> Tensor:
# [B, C, P, N] --> [B, h + 2d, P, N]
qkv = self.qkv_proj(x)
# [B, h + 2d, P, N] --> [B, h, P, N], [B, d, P, N], [B, 1, P, N]
# Query --> [B, 1, P ,N]
# Value, key --> [B, d, P, N]
query, key, value = torch.split(
qkv, [1, self.embed_dim, self.embed_dim], dim=1
)
# 在M通道上做softmax
context_scores = F.softmax(query, dim=-1)
context_scores = self.attn_dropout(context_scores)
# Compute context vector
# [B, d, P, N] x [B, 1, P, N] -> [B, d, P, N]
context_vector = key * context_scores
# [B, d, P, N] --> [B, d, P, 1]
context_vector = context_vector.sum(dim=-1, keepdim=True)
# combine context vector with values
# [B, d, P, N] * [B, d, P, 1] --> [B, d, P, N]
out = F.relu(value) * context_vector.expand_as(value)
out = self.out_proj(out)
return out
def _forward_cross_attn(
self, x: Tensor, x_prev: Optional[Tensor] = None, *args, **kwargs):
# x --> [B, C, P, N]
# x_prev --> [B, C, P, N]
batch_size, in_dim, kv_patch_area, kv_num_patches = x.shape
q_patch_area, q_num_patches = x.shape[-2:]
assert (
kv_patch_area == q_patch_area
), "The number of patches in the query and key-value tensors must be the same"
# compute query, key, and value
# [B, C, P, M] --> [B, 1 + d, P, M]
qk = F.conv2d(
x_prev,
weight=self.qkv_proj.block.conv.weight[: self.embed_dim + 1, ...],
bias=self.qkv_proj.block.conv.bias[: self.embed_dim + 1, ...],
)
# [B, 1 + d, P, M] --> [B, 1, P, M], [B, d, P, M]
query, key = torch.split(qk, split_size_or_sections=[1, self.embed_dim], dim=1)
# [B, C, P, N] --> [B, d, P, N]
value = F.conv2d(
x,
weight=self.qkv_proj.block.conv.weight[self.embed_dim + 1:, ...],
bias=self.qkv_proj.block.conv.bias[self.embed_dim + 1:, ...],
)
context_scores = F.softmax(query, dim=-1)
context_scores = self.attn_dropout(context_scores)
context_vector = key * context_scores
context_vector = torch.sum(context_vector, dim=-1, keepdim=True)
out = F.relu(value) * context_vector.expand_as(value)
out = self.out_proj(out)
return out
class LinearAttnFFN(nn.Module):
"""
This class defines the pre-norm transformer encoder with linear self-attention in `MobileViTv2 <https://arxiv.org/abs/2206.02680>`_ paper
Args:
embed_dim (int): :math:`C_{in}` from an expected input of size :math:`(B, C_{in}, P, N)`
ffn_latent_dim (int): Inner dimension of the FFN
attn_dropout (Optional[float]): Dropout rate for attention in multi-head attention. Default: 0.0
dropout (Optional[float]): Dropout rate. Default: 0.0
ffn_dropout (Optional[float]): Dropout between FFN layers. Default: 0.0
norm_layer (Optional[str]): Normalization layer. Default: layer_norm_2d
Shape:
- Input: :math:`(B, C_{in}, P, N)` where :math:`B` is batch size, :math:`C_{in}` is input embedding dim,
:math:`P` is number of pixels in a patch, and :math:`N` is number of patches,
- Output: same shape as the input
"""
def __init__(
self,
embed_dim: int,
ffn_latent_dim: int,
attn_dropout: Optional[float] = 0.0,
dropout: Optional[float] = 0.1,
ffn_dropout: Optional[float] = 0.0,
*args,
**kwargs
) -> None:
super().__init__()
attn_unit = LinearSelfAttention(
embed_dim=embed_dim, attn_dropout=attn_dropout, bias=True
)
self.pre_norm_attn = nn.Sequential(
nn.GroupNorm(num_channels=embed_dim, num_groups=1),
attn_unit,
nn.Dropout(p=dropout)
)
self.pre_norm_ffn = nn.Sequential(
nn.GroupNorm(num_channels=embed_dim, num_groups=1),
ConvLayer(
in_channels=embed_dim,
out_channels=ffn_latent_dim,
kernel_size=1,
stride=1,
bias=True,
use_norm=False,
use_act=True,
),
nn.Dropout(p=ffn_dropout),
ConvLayer(
in_channels=ffn_latent_dim,
out_channels=embed_dim,
kernel_size=1,
stride=1,
bias=True,
use_norm=False,
use_act=False,
),
nn.Dropout(p=dropout)
)
self.embed_dim = embed_dim
self.ffn_dim = ffn_latent_dim
self.ffn_dropout = ffn_dropout
self.std_dropout = dropout
def forward(self,
x: Tensor, x_prev: Optional[Tensor] = None, *args, **kwargs
) -> Tensor:
if x_prev is None:
# self-attention
x = x + self.pre_norm_attn(x)
else:
# cross-attention
res = x
x = self.pre_norm_attn[0](x) # norm
x = self.pre_norm_attn[1](x, x_prev) # attn
x = self.pre_norm_attn[2](x) # drop
x = x + res # residual
x = x + self.pre_norm_ffn(x)
return x
def make_divisible(
v: Union[float, int],
divisor: Optional[int] = 8,
min_value: Optional[Union[float, int]] = None,
) -> Union[float, int]:
"""
This function is taken from the original tf repo.
It ensures that all layers have a channel number that is divisible by 8
It can be seen here:
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
:param v:
:param divisor:
:param min_value:
:return:
"""
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
class Identity(nn.Module):
"""
This is a place-holder and returns the same tensor.
"""
def __init__(self):
super(Identity, self).__init__()
def forward(self, x: Tensor) -> Tensor:
return x
def profile_module(self, x: Tensor) -> Tuple[Tensor, float, float]:
return x, 0.0, 0.0
class InvertedResidual(nn.Module):
"""
This class implements the inverted residual block, as described in `MobileNetv2 <https://arxiv.org/abs/1801.04381>`_ paper
Args:
in_channels (int): :math:`C_{in}` from an expected input of size :math:`(N, C_{in}, H_{in}, W_{in})`
out_channels (int): :math:`C_{out}` from an expected output of size :math:`(N, C_{out}, H_{out}, W_{out)`
stride (int): Use convolutions with a stride. Default: 1
expand_ratio (Union[int, float]): Expand the input channels by this factor in depth-wise conv
skip_connection (Optional[bool]): Use skip-connection. Default: True
Shape:
- Input: :math:`(N, C_{in}, H_{in}, W_{in})`
- Output: :math:`(N, C_{out}, H_{out}, W_{out})`
.. note::
If `in_channels =! out_channels` and `stride > 1`, we set `skip_connection=False`
"""
def __init__(
self,
in_channels: int,
out_channels: int,
stride: int,
expand_ratio: Union[int, float], # 扩张因子,到底要在隐层将通道数扩张多少倍
skip_connection: Optional[bool] = True, # 是否使用跳跃连接
) -> None:
assert stride in [1, 2]
hidden_dim = make_divisible(int(round(in_channels * expand_ratio)), 8)
super().__init__()
block = nn.Sequential()
if expand_ratio != 1:
block.add_module(
name="exp_1x1",
module=ConvLayer(
in_channels=in_channels,
out_channels=hidden_dim,
kernel_size=1
),
)
block.add_module(
name="conv_3x3",
module=ConvLayer(
in_channels=hidden_dim,
out_channels=hidden_dim,
stride=stride,
kernel_size=3,
groups=hidden_dim # depth-wise convolution
),
)
block.add_module(
name="red_1x1",
module=ConvLayer(
in_channels=hidden_dim,
out_channels=out_channels,
kernel_size=1,
use_act=False, # 最后一层不使用激活函数
use_norm=True,
),
)
self.block = block
self.in_channels = in_channels
self.out_channels = out_channels
self.exp = expand_ratio
self.stride = stride
self.use_res_connect = (
self.stride == 1 and in_channels == out_channels and skip_connection
)
def forward(self, x: Tensor, *args, **kwargs) -> Tensor:
if self.use_res_connect: # 如果需要使用残差连接
return x + self.block(x)
else:
return self.block(x)
class MobileViTBlock(nn.Module):
"""
This class defines the `MobileViT block <https://arxiv.org/abs/2110.02178?context=cs.LG>`_
Args:
opts: command line arguments
in_channels (int): :math:`C_{in}` from an expected input of size :math:`(N, C_{in}, H, W)`
transformer_dim (int): Input dimension to the transformer unit
ffn_dim (int): Dimension of the FFN block
n_transformer_blocks (int): Number of transformer blocks. Default: 2
head_dim (int): Head dimension in the multi-head attention. Default: 32
attn_dropout (float): Dropout in multi-head attention. Default: 0.0
dropout (float): Dropout rate. Default: 0.0
ffn_dropout (float): Dropout between FFN layers in transformer. Default: 0.0
patch_h (int): Patch height for unfolding operation. Default: 8
patch_w (int): Patch width for unfolding operation. Default: 8
transformer_norm_layer (Optional[str]): Normalization layer in the transformer block. Default: layer_norm
conv_ksize (int): Kernel size to learn local representations in MobileViT block. Default: 3
no_fusion (Optional[bool]): Do not combine the input and output feature maps. Default: False
"""
def __init__(
self,
in_channels: int, # 输入通道数
transformer_dim: int, # 输入到transformer的每个token序列长度
ffn_dim: int, # feed forward network的维度
n_transformer_blocks: int = 2, # transformer block的个数
head_dim: int = 32,
attn_dropout: float = 0.0,
dropout: float = 0.0,
ffn_dropout: float = 0.0,
patch_h: int = 8,
patch_w: int = 8,
conv_ksize: Optional[int] = 3, # 卷积核大小
*args,
**kwargs
) -> None:
super().__init__()
conv_3x3_in = ConvLayer(
in_channels=in_channels,
out_channels=in_channels,
kernel_size=conv_ksize,
stride=1
)
conv_1x1_in = ConvLayer(
in_channels=in_channels,
out_channels=transformer_dim,
kernel_size=1,
stride=1,
use_norm=False,
use_act=False
)
conv_1x1_out = ConvLayer(
in_channels=transformer_dim,
out_channels=in_channels,
kernel_size=1,
stride=1
)
conv_3x3_out = ConvLayer(
in_channels=2 * in_channels,
out_channels=in_channels,
kernel_size=conv_ksize,
stride=1
)
self.local_rep = nn.Sequential()
self.local_rep.add_module(name="conv_3x3", module=conv_3x3_in)
self.local_rep.add_module(name="conv_1x1", module=conv_1x1_in)
assert transformer_dim % head_dim == 0 # 验证transformer_dim是否可以被head_dim整除
num_heads = transformer_dim // head_dim
global_rep = [
TransformerEncoder(
embed_dim=transformer_dim,
ffn_latent_dim=ffn_dim,
num_heads=num_heads,
attn_dropout=attn_dropout,
dropout=dropout,
ffn_dropout=ffn_dropout
)
for _ in range(n_transformer_blocks)
]
global_rep.append(nn.LayerNorm(transformer_dim))
self.global_rep = nn.Sequential(*global_rep)
self.conv_proj = conv_1x1_out
self.fusion = conv_3x3_out
self.patch_h = patch_h
self.patch_w = patch_w
self.patch_area = self.patch_w * self.patch_h
self.cnn_in_dim = in_channels
self.cnn_out_dim = transformer_dim
self.n_heads = num_heads
self.ffn_dim = ffn_dim
self.dropout = dropout
self.attn_dropout = attn_dropout
self.ffn_dropout = ffn_dropout
self.n_blocks = n_transformer_blocks
self.conv_ksize = conv_ksize
def unfolding(self, x: Tensor) -> Tuple[Tensor, Dict]:
patch_w, patch_h = self.patch_w, self.patch_h
patch_area = patch_w * patch_h
batch_size, in_channels, orig_h, orig_w = x.shape
new_h = int(math.ceil(orig_h / self.patch_h) * self.patch_h) # 为后文判断是否需要插值做准备
new_w = int(math.ceil(orig_w / self.patch_w) * self.patch_w) # 为后文判断是否需要插值做准备
interpolate = False
if new_w != orig_w or new_h != orig_h:
# Note: Padding can be done, but then it needs to be handled in attention function.
x = F.interpolate(x, size=(new_h, new_w), mode="bilinear", align_corners=False)
interpolate = True
# number of patches along width and height
num_patch_w = new_w // patch_w # n_w
num_patch_h = new_h // patch_h # n_h
num_patches = num_patch_h * num_patch_w # N
# [B, C, H, W] -> [B * C * n_h, p_h, n_w, p_w]
x = x.reshape(batch_size * in_channels * num_patch_h, patch_h, num_patch_w, patch_w)
# [B * C * n_h, p_h, n_w, p_w] -> [B * C * n_h, n_w, p_h, p_w]
x = x.transpose(1, 2)
# [B * C * n_h, n_w, p_h, p_w] -> [B, C, N, P] where P = p_h * p_w and N = n_h * n_w
x = x.reshape(batch_size, in_channels, num_patches, patch_area)
# [B, C, N, P] -> [B, P, N, C]
x = x.transpose(1, 3)
# [B, P, N, C] -> [BP, N, C]
x = x.reshape(batch_size * patch_area, num_patches, -1)
info_dict = {
"orig_size": (orig_h, orig_w),
"batch_size": batch_size,
"interpolate": interpolate,
"total_patches": num_patches,
"num_patches_w": num_patch_w,
"num_patches_h": num_patch_h,
}
return x, info_dict
def folding(self, x: Tensor, info_dict: Dict) -> Tensor:
n_dim = x.dim()
assert n_dim == 3, "Tensor should be of shape BPxNxC. Got: {}".format(
x.shape
)
# [BP, N, C] --> [B, P, N, C]
# 将x变成连续的张量,以便进行重塑操作
x = x.contiguous().view(
# 重塑x的第一个维度为批量大小
info_dict["batch_size"],
# 重塑x的第二个维度为每个图像块的像素数
self.patch_area,
# 重塑x的第三个维度为每个批次中的图像块总数
info_dict["total_patches"],
# 保持x的最后一个维度不变
-1
)
batch_size, pixels, num_patches, channels = x.size()
num_patch_h = info_dict["num_patches_h"]
num_patch_w = info_dict["num_patches_w"]
# [B, P, N, C] -> [B, C, N, P]
x = x.transpose(1, 3)
# [B, C, N, P] -> [B*C*n_h, n_w, p_h, p_w]
x = x.reshape(batch_size * channels * num_patch_h, num_patch_w, self.patch_h, self.patch_w)
# [B*C*n_h, n_w, p_h, p_w] -> [B*C*n_h, p_h, n_w, p_w]
x = x.transpose(1, 2)
# [B*C*n_h, p_h, n_w, p_w] -> [B, C, H, W]
x = x.reshape(batch_size, channels, num_patch_h * self.patch_h, num_patch_w * self.patch_w)
if info_dict["interpolate"]:
x = F.interpolate(
x,
size=info_dict["orig_size"],
mode="bilinear",
align_corners=False,
)
return x
def forward(self, x: Tensor) -> Tensor:
res = x
fm = self.local_rep(x) # [4, 64, 28, 28]
# convert feature map to patches
patches, info_dict = self.unfolding(fm) # [16, 196, 64]
# print(patches.shape)
# learn global representations
for transformer_layer in self.global_rep:
patches = transformer_layer(patches)
# [B x Patch x Patches x C] -> [B x C x Patches x Patch]
# Patch 所有的条状Patch的数量
# Patches 每个条状Patch的长度
fm = self.folding(x=patches, info_dict=info_dict)
fm = self.conv_proj(fm)
fm = self.fusion(torch.cat((res, fm), dim=1))
return fm
class MobileViTBlockV2(nn.Module):
"""
This class defines the `MobileViTv2 <https://arxiv.org/abs/2206.02680>`_ block
Args:
opts: command line arguments
in_channels (int): :math:`C_{in}` from an expected input of size :math:`(N, C_{in}, H, W)`
attn_unit_dim (int): Input dimension to the attention unit
ffn_multiplier (int): Expand the input dimensions by this factor in FFN. Default is 2.
n_attn_blocks (Optional[int]): Number of attention units. Default: 2
attn_dropout (Optional[float]): Dropout in multi-head attention. Default: 0.0
dropout (Optional[float]): Dropout rate. Default: 0.0
ffn_dropout (Optional[float]): Dropout between FFN layers in transformer. Default: 0.0
patch_h (Optional[int]): Patch height for unfolding operation. Default: 8
patch_w (Optional[int]): Patch width for unfolding operation. Default: 8
conv_ksize (Optional[int]): Kernel size to learn local representations in MobileViT block. Default: 3
dilation (Optional[int]): Dilation rate in convolutions. Default: 1
attn_norm_layer (Optional[str]): Normalization layer in the attention block. Default: layer_norm_2d
"""
def __init__(self,
in_channels: int,
attn_unit_dim: int,
ffn_multiplier: Optional[Union[Sequence[Union[int, float]], int, float]] = 2.0,
n_transformer_blocks: Optional[int] = 2,
attn_dropout: Optional[float] = 0.0,
dropout: Optional[float] = 0.0,
ffn_dropout: Optional[float] = 0.0,
patch_h: Optional[int] = 8,
patch_w: Optional[int] = 8,
conv_ksize: Optional[int] = 3,
*args,
**kwargs) -> None:
super(MobileViTBlockV2, self).__init__()
cnn_out_dim = attn_unit_dim
conv_3x3_in = ConvLayer(
in_channels=in_channels,
out_channels=in_channels,
kernel_size=conv_ksize,
stride=1,
use_norm=True,
use_act=True,
groups=in_channels,
)
conv_1x1_in = ConvLayer(
in_channels=in_channels,
out_channels=cnn_out_dim,
kernel_size=1,
stride=1,
use_norm=False,
use_act=False,
)
self.local_rep = nn.Sequential(conv_3x3_in, conv_1x1_in)
self.global_rep, attn_unit_dim = self._build_attn_layer(
d_model=attn_unit_dim,
ffn_mult=ffn_multiplier,
n_layers=n_transformer_blocks,
attn_dropout=attn_dropout,
dropout=dropout,
ffn_dropout=ffn_dropout,
)
self.conv_proj = ConvLayer(
in_channels=cnn_out_dim,
out_channels=in_channels,
kernel_size=1,
stride=1,
use_norm=True,
use_act=False,
)
self.patch_h = patch_h
self.patch_w = patch_w
self.patch_area = self.patch_w * self.patch_h
self.cnn_in_dim = in_channels
self.cnn_out_dim = cnn_out_dim
self.transformer_in_dim = attn_unit_dim
self.dropout = dropout
self.attn_dropout = attn_dropout
self.ffn_dropout = ffn_dropout
self.n_blocks = n_transformer_blocks
self.conv_ksize = conv_ksize
def _build_attn_layer(self,
d_model: int,
ffn_mult: Union[Sequence, int, float],
n_layers: int,
attn_dropout: float,
dropout: float,
ffn_dropout: float,
attn_norm_layer: str = "layer_norm_2d",
*args,
**kwargs) -> Tuple[nn.Module, int]:
if isinstance(ffn_mult, Sequence) and len(ffn_mult) == 2:
ffn_dims = (
np.linspace(ffn_mult[0], ffn_mult[1], n_layers, dtype=float) * d_model
)
elif isinstance(ffn_mult, Sequence) and len(ffn_mult) == 1:
ffn_dims = [ffn_mult[0] * d_model] * n_layers
elif isinstance(ffn_mult, (int, float)):
ffn_dims = [ffn_mult * d_model] * n_layers
else:
raise NotImplementedError
ffn_dims = [int((d // 16) * 16) for d in ffn_dims]
global_rep = [
LinearAttnFFN(
embed_dim=d_model,
ffn_latent_dim=ffn_dims[block_idx],
attn_dropout=attn_dropout,
dropout=dropout,
ffn_dropout=ffn_dropout,
)
for block_idx in range(n_layers)
]
global_rep.append(nn.GroupNorm(1, d_model))
return nn.Sequential(*global_rep), d_model
def forward(
self, x: Union[Tensor, Tuple[Tensor]], *args, **kwargs
) -> Union[Tensor, Tuple[Tensor, Tensor]]:
if isinstance(x, Tuple) and len(x) == 2:
# for spatio-temporal data (e.g., videos)
return self.forward_temporal(x=x[0], x_prev=x[1])
elif isinstance(x, Tensor):
# for image data
return self.forward_spatial(x)
else:
raise NotImplementedError
def forward_spatial(self, x: Tensor, *args, **kwargs) -> Tensor:
x = self.resize_input_if_needed(x)
# learn global representations on all patches
fm = self.local_rep(x)
patches, output_size = self.unfolding_pytorch(fm)
# print(f"original x.shape = {patches.shape}")
patches = self.global_rep(patches)
# [B x Patch x Patches x C] --> [B x C x Patches x Patch]
fm = self.folding_pytorch(patches=patches, output_size=output_size)
fm = self.conv_proj(fm)
return fm
def forward_temporal(
self, x: Tensor, x_prev: Optional[Tensor] = None
) -> Union[Tensor, Tuple[Tensor, Tensor]]:
x = self.resize_input_if_needed(x)
fm = self.local_rep(x)
patches, output_size = self.unfolding_pytorch(fm)
for global_layer in self.global_rep:
if isinstance(global_layer, LinearAttnFFN):
patches = global_layer(x=patches, x_prev=x_prev)
else:
patches = global_layer(patches)
fm = self.folding_pytorch(patches=patches, output_size=output_size)
fm = self.conv_proj(fm)
return fm, patches
def resize_input_if_needed(self, x: Tensor) -> Tensor:
# print(f"original x.shape = {x.shape}")
batch_size, in_channels, orig_h, orig_w = x.shape
if orig_h % self.patch_h != 0 or orig_w % self.patch_w != 0:
new_h = int(math.ceil(orig_h / self.patch_h) * self.patch_h)
new_w = int(math.ceil(orig_w / self.patch_w) * self.patch_w)
x = F.interpolate(
x, size=(new_h, new_w), mode="bilinear", align_corners=True
)
# print(f"changed x.shape = {x.shape}")
return x
def unfolding_pytorch(self, feature_map: Tensor) -> Tuple[Tensor, Tuple[int, int]]:
batch_size, in_channels, img_h, img_w = feature_map.shape
# [B, C, H, W] --> [B, C, P, N]
patches = F.unfold(
feature_map,
kernel_size=(self.patch_h, self.patch_w),
stride=(self.patch_h, self.patch_w),
)
patches = patches.reshape(
batch_size, in_channels, self.patch_h * self.patch_w, -1
)
return patches, (img_h, img_w)
def folding_pytorch(self, patches: Tensor, output_size: Tuple[int, int]) -> Tensor:
batch_size, in_dim, patch_size, n_patches = patches.shape
# [B, C, P, N]
patches = patches.reshape(batch_size, in_dim * patch_size, n_patches)
feature_map = F.fold(
patches,
output_size=output_size,
kernel_size=(self.patch_h, self.patch_w),
stride=(self.patch_h, self.patch_w),
)
return feature_map
class MobileViT(nn.Module):
"""
This class implements the `MobileViT architecture <https://arxiv.org/abs/2110.02178?context=cs.LG>`_
"""
def __init__(self, model_cfg: Dict, num_classes: int = 1000):
super().__init__()
image_channels = 3
out_channels = 16
self.conv_1 = ConvLayer(
in_channels=image_channels,
out_channels=out_channels,
kernel_size=3,
stride=2
)
self.layer_1, out_channels = self._make_layer(input_channel=out_channels, cfg=model_cfg["layer1"])
self.layer_2, out_channels = self._make_layer(input_channel=out_channels, cfg=model_cfg["layer2"])
self.layer_3, out_channels = self._make_layer(input_channel=out_channels, cfg=model_cfg["layer3"])
self.layer_4, out_channels = self._make_layer(input_channel=out_channels, cfg=model_cfg["layer4"])
self.layer_5, out_channels = self._make_layer(input_channel=out_channels, cfg=model_cfg["layer5"])
exp_channels = min(model_cfg["last_layer_exp_factor"] * out_channels, 960)
self.conv_1x1_exp = ConvLayer(
in_channels=out_channels,
out_channels=exp_channels,
kernel_size=1
)
self.classifier = nn.Sequential() # 有可能会被冻结,来进行网络微调
self.classifier.add_module(name="global_pool", module=nn.AdaptiveAvgPool2d(1))
self.classifier.add_module(name="flatten", module=nn.Flatten())
if 0.0 < model_cfg["cls_dropout"] < 1.0:
self.classifier.add_module(name="dropout", module=nn.Dropout(p=model_cfg["cls_dropout"]))
self.classifier.add_module(name="fc", module=nn.Linear(in_features=exp_channels, out_features=num_classes))
# weight init
self.apply(self.init_parameters)
self.width_list = [i.size(1) for i in self.forward(torch.randn(1, 3, 640, 640))]
def _make_layer(self, input_channel, cfg: Dict) -> Tuple[nn.Sequential, int]:
block_type = cfg.get("block_type", "mobilevit")
if block_type.lower() == "mobilevit":
return self._make_mit_layer(input_channel=input_channel, cfg=cfg)
else:
return self._make_mobilenet_layer(input_channel=input_channel, cfg=cfg)
@staticmethod
def _make_mobilenet_layer(input_channel: int, cfg: Dict) -> Tuple[nn.Sequential, int]:
output_channels = cfg.get("out_channels")
num_blocks = cfg.get("num_blocks", 2)
expand_ratio = cfg.get("expand_ratio", 4)
block = []
for i in range(num_blocks):
stride = cfg.get("stride", 1) if i == 0 else 1
layer = InvertedResidual(
in_channels=input_channel,
out_channels=output_channels,
stride=stride,
expand_ratio=expand_ratio
)
block.append(layer)
input_channel = output_channels
return nn.Sequential(*block), input_channel
@staticmethod
def _make_mit_layer(input_channel: int, cfg: Dict):
# def _make_mit_layer(input_channel: int, cfg: Dict) -> [nn.Sequential, int]:
stride = cfg.get("stride", 1)
block = []
if stride == 2:
layer = InvertedResidual(
in_channels=input_channel,
out_channels=cfg.get("out_channels"),
stride=stride,
expand_ratio=cfg.get("mv_expand_ratio", 4)
)
block.append(layer)
input_channel = cfg.get("out_channels")
transformer_dim = cfg["transformer_channels"]
ffn_dim = cfg.get("ffn_dim")
num_heads = cfg.get("num_heads", 4)
head_dim = transformer_dim // num_heads
if transformer_dim % head_dim != 0:
raise ValueError("Transformer input dimension should be divisible by head dimension. "
"Got {} and {}.".format(transformer_dim, head_dim))
block.append(MobileViTBlock(
in_channels=input_channel,
transformer_dim=transformer_dim,
ffn_dim=ffn_dim,
n_transformer_blocks=cfg.get("transformer_blocks", 1),
patch_h=cfg.get("patch_h", 2),
patch_w=cfg.get("patch_w", 2),
dropout=cfg.get("dropout", 0.1),
ffn_dropout=cfg.get("ffn_dropout", 0.0),
attn_dropout=cfg.get("attn_dropout", 0.1),
head_dim=head_dim,
conv_ksize=3
))
return nn.Sequential(*block), input_channel
@staticmethod
def init_parameters(m):
if isinstance(m, nn.Conv2d):
if m.weight is not None:
nn.init.kaiming_normal_(m.weight, mode="fan_out")
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, (nn.LayerNorm, nn.BatchNorm2d)):
if m.weight is not None:
nn.init.ones_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, (nn.Linear,)):
if m.weight is not None:
nn.init.trunc_normal_(m.weight, mean=0.0, std=0.02)
if m.bias is not None:
nn.init.zeros_(m.bias)
else:
pass
def forward(self, x):
unique_tensors = {}
x = self.conv_1(x)
width, height = x.shape[2], x.shape[3]
unique_tensors[(width, height)] = x
x = self.layer_1(x)
width, height = x.shape[2], x.shape[3]
unique_tensors[(width, height)] = x
x = self.layer_2(x)
width, height = x.shape[2], x.shape[3]
unique_tensors[(width, height)] = x
x = self.layer_3(x)
width, height = x.shape[2], x.shape[3]
unique_tensors[(width, height)] = x
x = self.layer_4(x)
width, height = x.shape[2], x.shape[3]
unique_tensors[(width, height)] = x
x = self.layer_5(x)
width, height = x.shape[2], x.shape[3]
unique_tensors[(width, height)] = x
x = self.conv_1x1_exp(x)
width, height = x.shape[2], x.shape[3]
unique_tensors[(width, height)] = x
result_list = list(unique_tensors.values())[-4:]
return result_list
def mobile_vit_xx_small(num_classes: int = 1000):
# pretrain weight link
# https://docs-assets.developer.apple.com/ml-research/models/cvnets/classification/mobilevit_xxs.pt
config = get_config("xx_small")
m = MobileViT(config, num_classes=num_classes)
return m
def mobile_vit_x_small(num_classes: int = 1000):
# pretrain weight link
# https://docs-assets.developer.apple.com/ml-research/models/cvnets/classification/mobilevit_xs.pt
config = get_config("x_small")
m = MobileViT(config, num_classes=num_classes)
return m
def mobile_vit_small(num_classes: int = 1000):
# pretrain weight link
# https://docs-assets.developer.apple.com/ml-research/models/cvnets/classification/mobilevit_s.pt
config = get_config("small")
m = MobileViT(config, num_classes=num_classes)
return m
if __name__ == "__main__":
# Generating Sample image
image_size = (1, 3, 640, 640)
image = torch.rand(*image_size)
# Model
model = mobile_vit_xx_small()
out = model(image)
print(out.size())
四、修改步骤
5.1 修改一
① 在
ultralytics/nn/
目录下新建
AddModules
文件夹用于存放模块代码
② 在
AddModules
文件夹下新建
MobileViTV1.py
,将
第三节
中的代码粘贴到此处
5.2 修改二
在
AddModules
文件夹下新建
__init__.py
(已有则不用新建),在文件内导入模块:
from .MobileViTV1 import *
4.3 修改三
在
ultralytics/nn/modules/tasks.py
文件中,需要添加各模块类。
① 首先:导入模块
② 在BaseModel类的predict函数中,在如下两处位置中去掉
embed
参数:
③ 在BaseModel类的_predict_once函数,替换如下代码:
def _predict_once(self, x, profile=False, visualize=False):
"""
Perform a forward pass through the network.
Args:
x (torch.Tensor): The input tensor to the model.
profile (bool): Print the computation time of each layer if True, defaults to False.
visualize (bool): Save the feature maps of the model if True, defaults to False.
Returns:
(torch.Tensor): The last output of the model.
"""
y, dt = [], [] # outputs
for m in self.model:
if m.f != -1: # if not from previous layer
x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers
if profile:
self._profile_one_layer(m, x, dt)
x = m(x) # run
y.append(x if m.i in self.save else None) # save output
if visualize:
feature_visualization(x, m.type, m.i, save_dir=visualize)
return x
④ 将
RTDETRDetectionModel类
中的
predict函数
完整替换:
def predict(self, x, profile=False, visualize=False, batch=None, augment=False):
"""
Perform a forward pass through the model.
Args:
x (torch.Tensor): The input tensor.
profile (bool, optional): If True, profile the computation time for each layer. Defaults to False.
visualize (bool, optional): If True, save feature maps for visualization. Defaults to False.
batch (dict, optional): Ground truth data for evaluation. Defaults to None.
augment (bool, optional): If True, perform data augmentation during inference. Defaults to False.
Returns:
(torch.Tensor): Model's output tensor.
"""
y, dt = [], [] # outputs
for m in self.model[:-1]: # except the head part
if m.f != -1: # if not from previous layer
x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers
if profile:
self._profile_one_layer(m, x, dt)
if hasattr(m, 'backbone'):
x = m(x)
for _ in range(5 - len(x)):
x.insert(0, None)
for i_idx, i in enumerate(x):
if i_idx in self.save:
y.append(i)
else:
y.append(None)
# for i in x:
# if i is not None:
# print(i.size())
x = x[-1]
else:
x = m(x) # run
y.append(x if m.i in self.save else None) # save output
if visualize:
feature_visualization(x, m.type, m.i, save_dir=visualize)
head = self.model[-1]
x = head([y[j] for j in head.f], batch) # head inference
return x
⑤ 在
parse_model函数
如下位置替换如下代码:
if verbose:
LOGGER.info(f"\n{'':>3}{'from':>20}{'n':>3}{'params':>10} {'module':<45}{'arguments':<30}")
ch = [ch]
layers, save, c2 = [], [], ch[-1] # layers, savelist, ch out
is_backbone = False
for i, (f, n, m, args) in enumerate(d['backbone'] + d['head']): # from, number, module, args
try:
if m == 'node_mode':
m = d[m]
if len(args) > 0:
if args[0] == 'head_channel':
args[0] = int(d[args[0]])
t = m
m = getattr(torch.nn, m[3:]) if 'nn.' in m else globals()[m] # get module
except:
pass
for j, a in enumerate(args):
if isinstance(a, str):
with contextlib.suppress(ValueError):
try:
args[j] = locals()[a] if a in locals() else ast.literal_eval(a)
except:
args[j] = a
替换后如下:
⑥ 在
parse_model
函数,添加如下代码。
elif m in {
mobile_vit_small, mobile_vit_x_small, mobile_vit_xx_small,
}:
m = m(*args)
c2 = m.width_list
⑦ 在
parse_model函数
如下位置替换如下代码:
if isinstance(c2, list):
is_backbone = True
m_ = m
m_.backbone = True
else:
m_ = nn.Sequential(*(m(*args) for _ in range(n))) if n > 1 else m(*args) # module
t = str(m)[8:-2].replace('__main__.', '') # module type
m_.np = sum(x.numel() for x in m_.parameters()) # number params
m_.i, m_.f, m_.type = i + 4 if is_backbone else i, f, t # attach index, 'from' index, type
if verbose:
LOGGER.info(f'{i:>3}{str(f):>20}{n_:>3}{m_.np:10.0f} {t:<45}{str(args):<30}') # print
save.extend(x % (i + 4 if is_backbone else i) for x in ([f] if isinstance(f, int) else f) if x != -1) # append to savelist
layers.append(m_)
if i == 0:
ch = []
if isinstance(c2, list):
ch.extend(c2)
for _ in range(5 - len(ch)):
ch.insert(0, 0)
else:
ch.append(c2)
return nn.Sequential(*layers), sorted(save)
⑧ 在
ultralytics\nn\autobackend.py
文件的
AutoBackend类
中的
forward函数
,完整替换如下代码:
def forward(self, im, augment=False, visualize=False):
"""
Runs inference on the YOLOv8 MultiBackend model.
Args:
im (torch.Tensor): The image tensor to perform inference on.
augment (bool): whether to perform data augmentation during inference, defaults to False
visualize (bool): whether to visualize the output predictions, defaults to False
Returns:
(tuple): Tuple containing the raw output tensor, and processed output for visualization (if visualize=True)
"""
b, ch, h, w = im.shape # batch, channel, height, width
if self.fp16 and im.dtype != torch.float16:
im = im.half() # to FP16
if self.nhwc:
im = im.permute(0, 2, 3, 1) # torch BCHW to numpy BHWC shape(1,320,192,3)
if self.pt or self.nn_module: # PyTorch
y = self.model(im, augment=augment, visualize=visualize) if augment or visualize else self.model(im)
elif self.jit: # TorchScript
y = self.model(im)
elif self.dnn: # ONNX OpenCV DNN
im = im.cpu().numpy() # torch to numpy
self.net.setInput(im)
y = self.net.forward()
elif self.onnx: # ONNX Runtime
im = im.cpu().numpy() # torch to numpy
y = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im})
elif self.xml: # OpenVINO
im = im.cpu().numpy() # FP32
y = list(self.ov_compiled_model(im).values())
elif self.engine: # TensorRT
if self.dynamic and im.shape != self.bindings['images'].shape:
i = self.model.get_binding_index('images')
self.context.set_binding_shape(i, im.shape) # reshape if dynamic
self.bindings['images'] = self.bindings['images']._replace(shape=im.shape)
for name in self.output_names:
i = self.model.get_binding_index(name)
self.bindings[name].data.resize_(tuple(self.context.get_binding_shape(i)))
s = self.bindings['images'].shape
assert im.shape == s, f"input size {im.shape} {'>' if self.dynamic else 'not equal to'} max model size {s}"
self.binding_addrs['images'] = int(im.data_ptr())
self.context.execute_v2(list(self.binding_addrs.values()))
y = [self.bindings[x].data for x in sorted(self.output_names)]
elif self.coreml: # CoreML
im = im[0].cpu().numpy()
im_pil = Image.fromarray((im * 255).astype('uint8'))
# im = im.resize((192, 320), Image.BILINEAR)
y = self.model.predict({'image': im_pil}) # coordinates are xywh normalized
if 'confidence' in y:
raise TypeError('Ultralytics only supports inference of non-pipelined CoreML models exported with '
f"'nms=False', but 'model={w}' has an NMS pipeline created by an 'nms=True' export.")
# TODO: CoreML NMS inference handling
# from ultralytics.utils.ops import xywh2xyxy
# box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]]) # xyxy pixels
# conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float32)
# y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1)
elif len(y) == 1: # classification model
y = list(y.values())
elif len(y) == 2: # segmentation model
y = list(reversed(y.values())) # reversed for segmentation models (pred, proto)
elif self.paddle: # PaddlePaddle
im = im.cpu().numpy().astype(np.float32)
self.input_handle.copy_from_cpu(im)
self.predictor.run()
y = [self.predictor.get_output_handle(x).copy_to_cpu() for x in self.output_names]
elif self.ncnn: # ncnn
mat_in = self.pyncnn.Mat(im[0].cpu().numpy())
ex = self.net.create_extractor()
input_names, output_names = self.net.input_names(), self.net.output_names()
ex.input(input_names[0], mat_in)
y = []
for output_name in output_names:
mat_out = self.pyncnn.Mat()
ex.extract(output_name, mat_out)
y.append(np.array(mat_out)[None])
elif self.triton: # NVIDIA Triton Inference Server
im = im.cpu().numpy() # torch to numpy
y = self.model(im)
else: # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU)
im = im.cpu().numpy()
if self.saved_model: # SavedModel
y = self.model(im, training=False) if self.keras else self.model(im)
if not isinstance(y, list):
y = [y]
elif self.pb: # GraphDef
y = self.frozen_func(x=self.tf.constant(im))
if len(y) == 2 and len(self.names) == 999: # segments and names not defined
ip, ib = (0, 1) if len(y[0].shape) == 4 else (1, 0) # index of protos, boxes
nc = y[ib].shape[1] - y[ip].shape[3] - 4 # y = (1, 160, 160, 32), (1, 116, 8400)
self.names = {i: f'class{i}' for i in range(nc)}
else: # Lite or Edge TPU
details = self.input_details[0]
integer = details['dtype'] in (np.int8, np.int16) # is TFLite quantized int8 or int16 model
if integer:
scale, zero_point = details['quantization']
im = (im / scale + zero_point).astype(details['dtype']) # de-scale
self.interpreter.set_tensor(details['index'], im)
self.interpreter.invoke()
y = []
for output in self.output_details:
x = self.interpreter.get_tensor(output['index'])
if integer:
scale, zero_point = output['quantization']
x = (x.astype(np.float32) - zero_point) * scale # re-scale
if x.ndim > 2: # if task is not classification
# Denormalize xywh by image size. See https://github.com/ultralytics/ultralytics/pull/1695
# xywh are normalized in TFLite/EdgeTPU to mitigate quantization error of integer models
x[:, [0, 2]] *= w
x[:, [1, 3]] *= h
y.append(x)
# TF segment fixes: export is reversed vs ONNX export and protos are transposed
if len(y) == 2: # segment with (det, proto) output order reversed
if len(y[1].shape) != 4:
y = list(reversed(y)) # should be y = (1, 116, 8400), (1, 160, 160, 32)
y[1] = np.transpose(y[1], (0, 3, 1, 2)) # should be y = (1, 116, 8400), (1, 32, 160, 160)
y = [x if isinstance(x, np.ndarray) else x.numpy() for x in y]
# for x in y:
# print(type(x), len(x)) if isinstance(x, (list, tuple)) else print(type(x), x.shape) # debug shapes
if isinstance(y, (list, tuple)):
return self.from_numpy(y[0]) if len(y) == 1 else [self.from_numpy(x) for x in y]
else:
return self.from_numpy(y)
至此就修改完成了,可以配置模型开始训练了
五、yaml模型文件
5.1 模型改进⭐
在代码配置完成后,配置模型的YAML文件。
此处以
ultralytics/cfg/models/rt-detr/rtdetr-l.yaml
为例,在同目录下创建一个用于自己数据集训练的模型文件
rtdetr-MobileViTV1.yaml
。
将
rtdetr-l.yaml
中的内容复制到
rtdetr-MobileViTV1.yaml
文件下,修改
nc
数量等于自己数据中目标的数量。
📌 模型的修改方法是将
骨干网络
替换成
mobile_vit_small
。
# Ultralytics YOLO 🚀, AGPL-3.0 license
# RT-DETR-l object detection model with P3-P5 outputs. For details see https://docs.ultralytics.com/models/rtdetr
# Parameters
nc: 1 # number of classes
scales: # model compound scaling constants, i.e. 'model=yolov8n-cls.yaml' will call yolov8-cls.yaml with scale 'n'
# [depth, width, max_channels]
l: [1.00, 1.00, 1024]
backbone:
# [from, repeats, module, args]
- [-1, 1, mobile_vit_small, []] # 4
head:
- [-1, 1, Conv, [256, 1, 1, None, 1, 1, False]] # 5 input_proj.2
- [-1, 1, AIFI, [1024, 8]] # 6
- [-1, 1, Conv, [256, 1, 1]] # 7, Y5, lateral_convs.0
- [-1, 1, nn.Upsample, [None, 2, 'nearest']] # 8
- [3, 1, Conv, [256, 1, 1, None, 1, 1, False]] # 9 input_proj.1
- [[-2, -1], 1, Concat, [1]] # 10
- [-1, 3, RepC3, [256]] # 11, fpn_blocks.0
- [-1, 1, Conv, [256, 1, 1]] # 12, Y4, lateral_convs.1
- [-1, 1, nn.Upsample, [None, 2, 'nearest']] # 13
- [2, 1, Conv, [256, 1, 1, None, 1, 1, False]] # 14 input_proj.0
- [[-2, -1], 1, Concat, [1]] # 15 cat backbone P4
- [-1, 3, RepC3, [256]] # X3 (16), fpn_blocks.1
- [-1, 1, Conv, [256, 3, 2]] # 17, downsample_convs.0
- [[-1, 12], 1, Concat, [1]] # 18 cat Y4
- [-1, 3, RepC3, [256]] # F4 (19), pan_blocks.0
- [-1, 1, Conv, [256, 3, 2]] # 20, downsample_convs.1
- [[-1, 7], 1, Concat, [1]] # 21 cat Y5
- [-1, 3, RepC3, [256]] # F5 (22), pan_blocks.1
- [[16, 19, 22], 1, RTDETRDecoder, [nc]] # Detect(P3, P4, P5)
六、成功运行结果
分别打印网络模型可以看到
MobileViTV1模块
已经加入到模型中,并可以进行训练了。
rtdetr-MobileViTV1 :
rtdetr-MobileViTv1 summary: 713 layers, 24,137,067 parameters, 24,137,067 gradients, 84.7 GFLOPs
from n params module arguments
0 -1 1 5578632 mobile_vit_small []
1 -1 1 164352 ultralytics.nn.modules.conv.Conv [640, 256, 1, 1, None, 1, 1, False]
2 -1 1 789760 ultralytics.nn.modules.transformer.AIFI [256, 1024, 8]
3 -1 1 66048 ultralytics.nn.modules.conv.Conv [256, 256, 1, 1]
4 -1 1 0 torch.nn.modules.upsampling.Upsample [None, 2, 'nearest']
5 3 1 33280 ultralytics.nn.modules.conv.Conv [128, 256, 1, 1, None, 1, 1, False]
6 [-2, -1] 1 0 ultralytics.nn.modules.conv.Concat [1]
7 -1 3 2232320 ultralytics.nn.modules.block.RepC3 [512, 256, 3]
8 -1 1 66048 ultralytics.nn.modules.conv.Conv [256, 256, 1, 1]
9 -1 1 0 torch.nn.modules.upsampling.Upsample [None, 2, 'nearest']
10 2 1 25088 ultralytics.nn.modules.conv.Conv [96, 256, 1, 1, None, 1, 1, False]
11 [-2, -1] 1 0 ultralytics.nn.modules.conv.Concat [1]
12 -1 3 2232320 ultralytics.nn.modules.block.RepC3 [512, 256, 3]
13 -1 1 590336 ultralytics.nn.modules.conv.Conv [256, 256, 3, 2]
14 [-1, 12] 1 0 ultralytics.nn.modules.conv.Concat [1]
15 -1 3 2232320 ultralytics.nn.modules.block.RepC3 [512, 256, 3]
16 -1 1 590336 ultralytics.nn.modules.conv.Conv [256, 256, 3, 2]
17 [-1, 7] 1 0 ultralytics.nn.modules.conv.Concat [1]
18 -1 3 2232320 ultralytics.nn.modules.block.RepC3 [512, 256, 3]
19 [16, 19, 22] 1 7303907 ultralytics.nn.modules.head.RTDETRDecoder [1, [256, 256, 256]]
rtdetr-MobileViTv1 summary: 713 layers, 24,137,067 parameters, 24,137,067 gradients, 84.7 GFLOPs