RT-DETR改进策略【独家融合改进】| 模型轻量化二次改进:StarNet + FreqFusion,极限降参,适用专栏内所有轻量化模型
一、本文介绍
本文记录的是
利用StarNet和FreqFusion对RT-DETR的模型结构进行改进
。
StarNet
的轻量化设计为
FreqFusion
提供了一个简洁高效的特征提取基础,使得
FreqFusion
能够
在相对简单的特征空间中进行融合操作,提高融合的效率和效果
。同时,
FreqFusion
的特征融合能力又弥补了
RT-DETR
在特征融合方面可能存在的不足,两者相互配合,
实现了高效的轻量化特征融合,使模型在轻量化的同时保持良好的性能。
二、🌟
Rewrite the Stars
StarNet
是一种基于
星操作
(star operation)的轻量化神经网络模型,以下是其相关介绍:
2.1 出发点
- “星操作”的优势适配 :研究发现“星操作”(element - wise乘法)在网络设计中具有潜力,它能在不增加网络宽度的情况下将输入映射到高维非线性特征空间,类似核技巧。这种特性使得基于“星操作”的网络可能更适合高效、紧凑的网络结构,而不是传统的大型模型,从而启发了StarNet的设计。
- 现有高效网络的局限性 :尽管已有多种提升网络效率的方法,如深度可分离卷积、特征复用和重参数化等,但“星操作”提供了一种新的思路,可以在低维空间计算的同时考虑极高维的特征,为高效网络设计提供了独特优势,促使研究人员探索基于“星操作”的轻量化模型。
2.2 原理
2.2.1 高维特征映射
- 单一层的“星操作” :在神经网络的单一层中,“星操作”通常写为 ( W 1 T X + B 1 ) ∗ ( W 2 T X + B 2 ) (W_{1}^{T}X + B_{1})*(W_{2}^{T}X + B_{2}) ( W 1 T X + B 1 ) ∗ ( W 2 T X + B 2 ) ,经过改写和分析,可以发现它能将输入特征在 d d d 维空间中通过计算高效的操作,实现到 ( d + 2 ) ( d + 1 ) 2 ≈ ( d 2 ) 2 \frac{(d + 2)(d + 1)}{2}\approx(\frac{d}{\sqrt{2}})^{2} 2 ( d + 2 ) ( d + 1 ) ≈ ( 2 d ) 2 (考虑 d ≫ 2 d\gg2 d ≫ 2 )隐式维度特征空间的表示,每个项(除了部分特殊项)都与输入呈现非线性关联,即实现了高维特征映射。
- 多层的“星操作” :当堆叠多层时,以初始网络层宽度为 d d d 为例,经过 l l l 层的“星操作”,可以隐式获得属于 R ( d 2 ) 2 l \mathbb{R}^{(\frac{d}{\sqrt{2}})^{2^{l}}} R ( 2 d ) 2 l 的特征空间。
例如,一个10层宽度为128的各向同性网络,通过“星操作”获得的隐式特征维度数近似为 901024 901024 901024 ,可合理近似为无限维度,从而实现了指数级的隐式维度增加。
2.2.2 与核函数的类比
- “星操作”实现非线性高维的方式与传统神经网络通过增加网络宽度不同,它类似于核函数(特别是多项式核函数)对不同通道特征进行成对乘法操作。这种类比进一步说明了“星操作”在特征空间变换上的原理优势。
2.3 结构
2.3.1 整体架构
StarNet
是一个
4
阶段的分层架构。
2.3.2 下采样层
利用
卷积层
进行下采样。
2.3.3 特征提取模块
采用修改后的demo block进行特征提取。在每个block中,受MobileNeXt启发,在末尾包含深度可分离卷积;将demo block中的GELU激活函数替换为
ReLU6
,遵循MobileNetv2的设计;
网络宽度在每个阶段翻倍,通道扩展因子固定为4;同时,为了满足效率要求,用批归一化(Batch Normalization)替代层归一化(Layer Normalization),并放在深度可分离卷积之后(在推理时可融合)。
通过改变block数量和输入嵌入通道数来构建不同大小的
StarNet
。
2.4 优势
-
性能优异
- 实验验证 :在ImageNet - 1K验证集上,StarNet - S4相对于EdgeViT - XS在top - 1准确率上提高了 0.9 % 0.9\% 0.9% ,同时在iPhone13和CPU上运行速度快 3 3 3 倍,在GPU上快 2 2 2 倍。与其他精心设计的高效模型相比,如MobileNetv3、EdgeViT、FasterNet等,StarNet在性能上也具有竞争力。
-
设计简洁高效
:
StarNet设计简洁,没有复杂的设计和精细调整的超参数,却能取得优异的性能,这体现了 星操作 在网络设计中的有效性,也证明了基于“星操作”的轻量化设计理念的优势。
-
低延迟
:在不同的硬件平台上,包括移动设备(如iPhone系列)和服务器端的CPU、GPU,StarNet都展现出了较低的延迟。例如在iPhone13上,StarNet - S1能在
0.7
0.7
0.7
秒内达到
73.5
%
73.5\%
73.5%
的top - 1准确率,与MobileOne - S0相比,在相同延迟下准确率提高了
2.1
%
2.1\%
2.1%
。这种低延迟特性使得
StarNet在实际应用中具有很大的优势,特别是对于对实时性要求较高的任务。 -
通用性和可扩展性
:
StarNet的设计基于通用的神经网络架构原则,并通过 星操作 进行了优化。这种设计使得它具有较好的通用性,可以应用于各种计算机视觉任务。同时,通过调整模型的深度和宽度等参数,可以方便地对模型进行扩展,以适应不同的应用场景和性能需求。
论文: https://arxiv.org/pdf/2403.19967
源码: https://github.com/ma-xu/Rewrite-the-Stars
三、FreqFusion介绍
Frequency-aware Feature Fusion for Dense Image Prediction
FreqFusion
是一种旨在
解决密集图像预测任务中特征融合问题
的方法,以下从其结构设计的出发点、结构、原理和作用等方面进行详细介绍:
3.1 出发点
标准特征融合技术存在两个问题,即 类别内不一致性 和 边界位移 。
例如,同一物体不同部分的特征差异大导致类别内不一致;简单插值使特征过度平滑导致边界位移,且下层次特征的详细边界信息未被充分利用。
3.2 结构
由
自适应低通滤波器(ALPF)生成器
、
偏移生成器
和
自适应高通滤波器(AHPF)生成器
三个关键组件构成。
3.3 原理
-
首先进行
初始融合
,
将低层次和高层次特征压缩并融合
,为三个生成器提供输入。
- 简单初始融合存在不足,一是 采用简单插值上采样压缩特征导致边界模糊 ;
-
二是
ALPF生成器依赖高频信息,但 传统卷积层只能捕获固定高频模式 。 -
为此进行了增强,利用
ALPF生成器生成初始低通滤波器来 上采样压缩的高层次特征 ,并采用AHPF生成器提取特征图中的高频分量 。
-
ALPF生成器以初始融合的 z l z^{l} z l 为输入,通过3×3卷积层和Softmax层预测 空间变化的低通滤波器 。接着使用亚像素上采样技术,将低通滤波器重构成4组,得到4组低通滤波后的特征,再重新排列形成 上采样后的特征 。 -
偏移生成器根据 局部相似度 计算偏移量,用于重采样特征像素, 用具有高类别内相似度的附近特征替换高层次特征中的不一致特征。 -
AHPF生成器预测并应用空间变化的高通滤波器到低层次特征,以 增强下采样过程中丢失的高频细节信息,从而更准确地描绘边界。
3.4 作用
FreqFusion
通过自适应地用空间变化的低通滤波器平滑高层次特征、重采样附近类别一致的特征来替换高层次特征中的不一致特征、增强低层次特征的高频边界细节,来解决类别不一致性和边界位移问题,从而恢复具有一致类别信息和清晰边界的融合特征。提高了特征一致性和边界清晰度,在各种密集预测任务中取得了显著的性能提升。
论文: https://arxiv.org/pdf/2408.12879
源码: https://github.com/Linwei-Chen/FreqFusion
四、StarNet和FreqFusion模块的实现代码
StarNet模块
的实现代码如下:
"""
Implementation of Prof-of-Concept Network: StarNet.
We make StarNet as simple as possible [to show the key contribution of element-wise multiplication]:
- like NO layer-scale in network design,
- and NO EMA during training,
- which would improve the performance further.
Created by: Xu Ma (Email: ma.xu1@northeastern.edu)
Modified Date: Mar/29/2024
"""
import torch
import torch.nn as nn
from timm.models.layers import DropPath, trunc_normal_
__all__ = ['starnet_s050', 'starnet_s100', 'starnet_s150', 'starnet_s1', 'starnet_s2', 'starnet_s3', 'starnet_s4']
model_urls = {
"starnet_s1": "https://github.com/ma-xu/Rewrite-the-Stars/releases/download/checkpoints_v1/starnet_s1.pth.tar",
"starnet_s2": "https://github.com/ma-xu/Rewrite-the-Stars/releases/download/checkpoints_v1/starnet_s2.pth.tar",
"starnet_s3": "https://github.com/ma-xu/Rewrite-the-Stars/releases/download/checkpoints_v1/starnet_s3.pth.tar",
"starnet_s4": "https://github.com/ma-xu/Rewrite-the-Stars/releases/download/checkpoints_v1/starnet_s4.pth.tar",
}
class ConvBN(torch.nn.Sequential):
def __init__(self, in_planes, out_planes, kernel_size=1, stride=1, padding=0, dilation=1, groups=1, with_bn=True):
super().__init__()
self.add_module('conv', torch.nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, dilation, groups))
if with_bn:
self.add_module('bn', torch.nn.BatchNorm2d(out_planes))
torch.nn.init.constant_(self.bn.weight, 1)
torch.nn.init.constant_(self.bn.bias, 0)
class Block(nn.Module):
def __init__(self, dim, mlp_ratio=3, drop_path=0.):
super().__init__()
self.dwconv = ConvBN(dim, dim, 7, 1, (7 - 1) // 2, groups=dim, with_bn=True)
self.f1 = ConvBN(dim, mlp_ratio * dim, 1, with_bn=False)
self.f2 = ConvBN(dim, mlp_ratio * dim, 1, with_bn=False)
self.g = ConvBN(mlp_ratio * dim, dim, 1, with_bn=True)
self.dwconv2 = ConvBN(dim, dim, 7, 1, (7 - 1) // 2, groups=dim, with_bn=False)
self.act = nn.ReLU6()
self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
def forward(self, x):
input = x
x = self.dwconv(x)
x1, x2 = self.f1(x), self.f2(x)
x = self.act(x1) * x2
x = self.dwconv2(self.g(x))
x = input + self.drop_path(x)
return x
class StarNet(nn.Module):
def __init__(self, base_dim=32, depths=[3, 3, 12, 5], mlp_ratio=4, drop_path_rate=0.0, num_classes=1000, **kwargs):
super().__init__()
self.num_classes = num_classes
self.in_channel = 32
# stem layer
self.stem = nn.Sequential(ConvBN(3, self.in_channel, kernel_size=3, stride=2, padding=1), nn.ReLU6())
dpr = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))] # stochastic depth
# build stages
self.stages = nn.ModuleList()
cur = 0
for i_layer in range(len(depths)):
embed_dim = base_dim * 2 ** i_layer
down_sampler = ConvBN(self.in_channel, embed_dim, 3, 2, 1)
self.in_channel = embed_dim
blocks = [Block(self.in_channel, mlp_ratio, dpr[cur + i]) for i in range(depths[i_layer])]
cur += depths[i_layer]
self.stages.append(nn.Sequential(down_sampler, *blocks))
self.channel = [i.size(1) for i in self.forward(torch.randn(1, 3, 640, 640))]
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear or nn.Conv2d):
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm or nn.BatchNorm2d):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
def forward(self, x):
features = []
x = self.stem(x)
features.append(x)
for stage in self.stages:
x = stage(x)
features.append(x)
return features
def starnet_s1(pretrained=False, **kwargs):
model = StarNet(24, [2, 2, 8, 3], **kwargs)
if pretrained:
url = model_urls['starnet_s1']
checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
model.load_state_dict(checkpoint["state_dict"], strict=False)
return model
def starnet_s2(pretrained=False, **kwargs):
model = StarNet(32, [1, 2, 6, 2], **kwargs)
if pretrained:
url = model_urls['starnet_s2']
checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
model.load_state_dict(checkpoint["state_dict"], strict=False)
return model
def starnet_s3(pretrained=False, **kwargs):
model = StarNet(32, [2, 2, 8, 4], **kwargs)
if pretrained:
url = model_urls['starnet_s3']
checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
model.load_state_dict(checkpoint["state_dict"], strict=False)
return model
def starnet_s4(pretrained=False, **kwargs):
model = StarNet(32, [3, 3, 12, 5], **kwargs)
if pretrained:
url = model_urls['starnet_s4']
checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
model.load_state_dict(checkpoint["state_dict"], strict=False)
return model
# very small networks #
def starnet_s050(pretrained=False, **kwargs):
return StarNet(16, [1, 1, 3, 1], 3, **kwargs)
def starnet_s100(pretrained=False, **kwargs):
return StarNet(20, [1, 2, 4, 1], 4, **kwargs)
def starnet_s150(pretrained=False, **kwargs):
return StarNet(24, [1, 2, 4, 2], 3, **kwargs)
FreqFusion模块
的实现代码如下:
# TPAMI 2024:Frequency-aware Feature Fusion for Dense Image Prediction
import torch
import torch.nn as nn
import torch.nn.functional as F
try:
from mmcv.ops.carafe import normal_init, xavier_init, carafe
except ImportError:
pass
from torch.utils.checkpoint import checkpoint
import warnings
import numpy as np
__all__ = ['FreqFusion']
def normal_init(module, mean=0, std=1, bias=0):
if hasattr(module, 'weight') and module.weight is not None:
nn.init.normal_(module.weight, mean, std)
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias)
def constant_init(module, val, bias=0):
if hasattr(module, 'weight') and module.weight is not None:
nn.init.constant_(module.weight, val)
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias)
def resize(input,
size=None,
scale_factor=None,
mode='nearest',
align_corners=None,
warning=True):
if warning:
if size is not None and align_corners:
input_h, input_w = tuple(int(x) for x in input.shape[2:])
output_h, output_w = tuple(int(x) for x in size)
if output_h > input_h or output_w > input_w:
if ((output_h > 1 and output_w > 1 and input_h > 1
and input_w > 1) and (output_h - 1) % (input_h - 1)
and (output_w - 1) % (input_w - 1)):
warnings.warn(
f'When align_corners={align_corners}, '
'the output would more aligned if '
f'input size {(input_h, input_w)} is `x+1` and '
f'out size {(output_h, output_w)} is `nx+1`')
return F.interpolate(input, size, scale_factor, mode, align_corners)
def hamming2D(M, N):
"""
生成二维Hamming窗
参数:
- M:窗口的行数
- N:窗口的列数
返回:
- 二维Hamming窗
"""
# 生成水平和垂直方向上的Hamming窗
# hamming_x = np.blackman(M)
# hamming_x = np.kaiser(M)
hamming_x = np.hamming(M)
hamming_y = np.hamming(N)
# 通过外积生成二维Hamming窗
hamming_2d = np.outer(hamming_x, hamming_y)
return hamming_2d
class FreqFusion(nn.Module):
def __init__(self,
channels,
scale_factor=1,
lowpass_kernel=5,
highpass_kernel=3,
up_group=1,
encoder_kernel=3,
encoder_dilation=1,
compressed_channels=64,
align_corners=False,
upsample_mode='nearest',
feature_resample=False, # use offset generator or not
feature_resample_group=4,
comp_feat_upsample=True, # use ALPF & AHPF for init upsampling
use_high_pass=True,
use_low_pass=True,
hr_residual=True,
semi_conv=True,
hamming_window=True, # for regularization, do not matter really
feature_resample_norm=True,
**kwargs):
super().__init__()
hr_channels, lr_channels = channels
self.scale_factor = scale_factor
self.lowpass_kernel = lowpass_kernel
self.highpass_kernel = highpass_kernel
self.up_group = up_group
self.encoder_kernel = encoder_kernel
self.encoder_dilation = encoder_dilation
self.compressed_channels = (hr_channels + lr_channels) // 8
self.hr_channel_compressor = nn.Conv2d(hr_channels, self.compressed_channels,1)
self.lr_channel_compressor = nn.Conv2d(lr_channels, self.compressed_channels,1)
self.content_encoder = nn.Conv2d( # ALPF generator
self.compressed_channels,
lowpass_kernel ** 2 * self.up_group * self.scale_factor * self.scale_factor,
self.encoder_kernel,
padding=int((self.encoder_kernel - 1) * self.encoder_dilation / 2),
dilation=self.encoder_dilation,
groups=1)
self.align_corners = align_corners
self.upsample_mode = upsample_mode
self.hr_residual = hr_residual
self.use_high_pass = use_high_pass
self.use_low_pass = use_low_pass
self.semi_conv = semi_conv
self.feature_resample = feature_resample
self.comp_feat_upsample = comp_feat_upsample
if self.feature_resample:
self.dysampler = LocalSimGuidedSampler(in_channels=compressed_channels, scale=2, style='lp', groups=feature_resample_group, use_direct_scale=True, kernel_size=encoder_kernel, norm=feature_resample_norm)
if self.use_high_pass:
self.content_encoder2 = nn.Conv2d( # AHPF generator
self.compressed_channels,
highpass_kernel ** 2 * self.up_group * self.scale_factor * self.scale_factor,
self.encoder_kernel,
padding=int((self.encoder_kernel - 1) * self.encoder_dilation / 2),
dilation=self.encoder_dilation,
groups=1)
self.hamming_window = hamming_window
lowpass_pad=0
highpass_pad=0
if self.hamming_window:
self.register_buffer('hamming_lowpass', torch.FloatTensor(hamming2D(lowpass_kernel + 2 * lowpass_pad, lowpass_kernel + 2 * lowpass_pad))[None, None,])
self.register_buffer('hamming_highpass', torch.FloatTensor(hamming2D(highpass_kernel + 2 * highpass_pad, highpass_kernel + 2 * highpass_pad))[None, None,])
else:
self.register_buffer('hamming_lowpass', torch.FloatTensor([1.0]))
self.register_buffer('hamming_highpass', torch.FloatTensor([1.0]))
self.init_weights()
def init_weights(self):
for m in self.modules():
# print(m)
if isinstance(m, nn.Conv2d):
xavier_init(m, distribution='uniform')
normal_init(self.content_encoder, std=0.001)
if self.use_high_pass:
normal_init(self.content_encoder2, std=0.001)
def kernel_normalizer(self, mask, kernel, scale_factor=None, hamming=1):
if scale_factor is not None:
mask = F.pixel_shuffle(mask, self.scale_factor)
n, mask_c, h, w = mask.size()
mask_channel = int(mask_c / float(kernel**2))
# mask = mask.view(n, mask_channel, -1, h, w)
# mask = F.softmax(mask, dim=2, dtype=mask.dtype)
# mask = mask.view(n, mask_c, h, w).contiguous()
mask = mask.view(n, mask_channel, -1, h, w)
mask = F.softmax(mask, dim=2, dtype=mask.dtype)
mask = mask.view(n, mask_channel, kernel, kernel, h, w)
mask = mask.permute(0, 1, 4, 5, 2, 3).view(n, -1, kernel, kernel)
# mask = F.pad(mask, pad=[padding] * 4, mode=self.padding_mode) # kernel + 2 * padding
mask = mask * hamming
mask /= mask.sum(dim=(-1, -2), keepdims=True)
# print(hamming)
# print(mask.shape)
mask = mask.view(n, mask_channel, h, w, -1)
mask = mask.permute(0, 1, 4, 2, 3).view(n, -1, h, w).contiguous()
return mask
def forward(self, x, use_checkpoint=False):
hr_feat, lr_feat = x
if use_checkpoint:
return checkpoint(self._forward, hr_feat, lr_feat)
else:
return self._forward(hr_feat, lr_feat)
def _forward(self, hr_feat, lr_feat):
compressed_hr_feat = self.hr_channel_compressor(hr_feat)
compressed_lr_feat = self.lr_channel_compressor(lr_feat)
if self.semi_conv:
if self.comp_feat_upsample:
if self.use_high_pass:
mask_hr_hr_feat = self.content_encoder2(compressed_hr_feat)
mask_hr_init = self.kernel_normalizer(mask_hr_hr_feat, self.highpass_kernel, hamming=self.hamming_highpass)
compressed_hr_feat = compressed_hr_feat + compressed_hr_feat - carafe(compressed_hr_feat, mask_hr_init.to(compressed_hr_feat.dtype), self.highpass_kernel, self.up_group, 1)
mask_lr_hr_feat = self.content_encoder(compressed_hr_feat)
mask_lr_init = self.kernel_normalizer(mask_lr_hr_feat, self.lowpass_kernel, hamming=self.hamming_lowpass)
mask_lr_lr_feat_lr = self.content_encoder(compressed_lr_feat)
mask_lr_lr_feat = F.interpolate(
carafe(mask_lr_lr_feat_lr, mask_lr_init.to(compressed_hr_feat.dtype), self.lowpass_kernel, self.up_group, 2), size=compressed_hr_feat.shape[-2:], mode='nearest')
mask_lr = mask_lr_hr_feat + mask_lr_lr_feat
mask_lr_init = self.kernel_normalizer(mask_lr, self.lowpass_kernel, hamming=self.hamming_lowpass)
mask_hr_lr_feat = F.interpolate(
carafe(self.content_encoder2(compressed_lr_feat), mask_lr_init.to(compressed_hr_feat.dtype), self.lowpass_kernel, self.up_group, 2), size=compressed_hr_feat.shape[-2:], mode='nearest')
mask_hr = mask_hr_hr_feat + mask_hr_lr_feat
else: raise NotImplementedError
else:
mask_lr = self.content_encoder(compressed_hr_feat) + F.interpolate(self.content_encoder(compressed_lr_feat), size=compressed_hr_feat.shape[-2:], mode='nearest')
if self.use_high_pass:
mask_hr = self.content_encoder2(compressed_hr_feat) + F.interpolate(self.content_encoder2(compressed_lr_feat), size=compressed_hr_feat.shape[-2:], mode='nearest')
else:
compressed_x = F.interpolate(compressed_lr_feat, size=compressed_hr_feat.shape[-2:], mode='nearest') + compressed_hr_feat
mask_lr = self.content_encoder(compressed_x)
if self.use_high_pass:
mask_hr = self.content_encoder2(compressed_x)
mask_lr = self.kernel_normalizer(mask_lr, self.lowpass_kernel, hamming=self.hamming_lowpass)
if self.semi_conv:
lr_feat = carafe(lr_feat, mask_lr.to(compressed_hr_feat.dtype), self.lowpass_kernel, self.up_group, 2)
else:
lr_feat = resize(
input=lr_feat,
size=hr_feat.shape[2:],
mode=self.upsample_mode,
align_corners=None if self.upsample_mode == 'nearest' else self.align_corners)
lr_feat = carafe(lr_feat, mask_lr, self.lowpass_kernel, self.up_group, 1)
if self.use_high_pass:
mask_hr = self.kernel_normalizer(mask_hr, self.highpass_kernel, hamming=self.hamming_highpass)
if self.hr_residual:
# print('using hr_residual')
hr_feat_hf = hr_feat - carafe(hr_feat, mask_hr.to(compressed_hr_feat.dtype), self.highpass_kernel, self.up_group, 1)
hr_feat = hr_feat_hf + hr_feat
else:
hr_feat = hr_feat_hf
if self.feature_resample:
# print(lr_feat.shape)
lr_feat = self.dysampler(hr_x=compressed_hr_feat,
lr_x=compressed_lr_feat, feat2sample=lr_feat)
# return mask_lr, hr_feat, lr_feat
return hr_feat + lr_feat
class LocalSimGuidedSampler(nn.Module):
"""
offset generator in FreqFusion
"""
def __init__(self, in_channels, scale=2, style='lp', groups=4, use_direct_scale=True, kernel_size=1, local_window=3, sim_type='cos', norm=True, direction_feat='sim_concat'):
super().__init__()
assert scale==2
assert style=='lp'
self.scale = scale
self.style = style
self.groups = groups
self.local_window = local_window
self.sim_type = sim_type
self.direction_feat = direction_feat
if style == 'pl':
assert in_channels >= scale ** 2 and in_channels % scale ** 2 == 0
assert in_channels >= groups and in_channels % groups == 0
if style == 'pl':
in_channels = in_channels // scale ** 2
out_channels = 2 * groups
else:
out_channels = 2 * groups * scale ** 2
if self.direction_feat == 'sim':
self.offset = nn.Conv2d(local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
elif self.direction_feat == 'sim_concat':
self.offset = nn.Conv2d(in_channels + local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
else: raise NotImplementedError
normal_init(self.offset, std=0.001)
if use_direct_scale:
if self.direction_feat == 'sim':
self.direct_scale = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
elif self.direction_feat == 'sim_concat':
self.direct_scale = nn.Conv2d(in_channels + local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
else: raise NotImplementedError
constant_init(self.direct_scale, val=0.)
out_channels = 2 * groups
if self.direction_feat == 'sim':
self.hr_offset = nn.Conv2d(local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
elif self.direction_feat == 'sim_concat':
self.hr_offset = nn.Conv2d(in_channels + local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
else: raise NotImplementedError
normal_init(self.hr_offset, std=0.001)
if use_direct_scale:
if self.direction_feat == 'sim':
self.hr_direct_scale = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
elif self.direction_feat == 'sim_concat':
self.hr_direct_scale = nn.Conv2d(in_channels + local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
else: raise NotImplementedError
constant_init(self.hr_direct_scale, val=0.)
self.norm = norm
if self.norm:
self.norm_hr = nn.GroupNorm(in_channels // 8, in_channels)
self.norm_lr = nn.GroupNorm(in_channels // 8, in_channels)
else:
self.norm_hr = nn.Identity()
self.norm_lr = nn.Identity()
self.register_buffer('init_pos', self._init_pos())
def _init_pos(self):
h = torch.arange((-self.scale + 1) / 2, (self.scale - 1) / 2 + 1) / self.scale
return torch.stack(torch.meshgrid([h, h])).transpose(1, 2).repeat(1, self.groups, 1).reshape(1, -1, 1, 1)
def sample(self, x, offset, scale=None):
if scale is None: scale = self.scale
B, _, H, W = offset.shape
offset = offset.view(B, 2, -1, H, W)
coords_h = torch.arange(H) + 0.5
coords_w = torch.arange(W) + 0.5
coords = torch.stack(torch.meshgrid([coords_w, coords_h])
).transpose(1, 2).unsqueeze(1).unsqueeze(0).type(x.dtype).to(x.device)
normalizer = torch.tensor([W, H], dtype=x.dtype, device=x.device).view(1, 2, 1, 1, 1)
coords = 2 * (coords + offset) / normalizer - 1
coords = F.pixel_shuffle(coords.view(B, -1, H, W), scale).view(
B, 2, -1, scale * H, scale * W).permute(0, 2, 3, 4, 1).contiguous().flatten(0, 1)
return F.grid_sample(x.reshape(B * self.groups, -1, x.size(-2), x.size(-1)), coords, mode='bilinear',
align_corners=False, padding_mode="border").view(B, -1, scale * H, scale * W)
def forward(self, hr_x, lr_x, feat2sample):
hr_x = self.norm_hr(hr_x)
lr_x = self.norm_lr(lr_x)
if self.direction_feat == 'sim':
hr_sim = compute_similarity(hr_x, self.local_window, dilation=2, sim='cos')
lr_sim = compute_similarity(lr_x, self.local_window, dilation=2, sim='cos')
elif self.direction_feat == 'sim_concat':
hr_sim = torch.cat([hr_x, compute_similarity(hr_x, self.local_window, dilation=2, sim='cos')], dim=1)
lr_sim = torch.cat([lr_x, compute_similarity(lr_x, self.local_window, dilation=2, sim='cos')], dim=1)
hr_x, lr_x = hr_sim, lr_sim
# offset = self.get_offset(hr_x, lr_x)
offset = self.get_offset_lp(hr_x, lr_x, hr_sim, lr_sim)
return self.sample(feat2sample, offset)
# def get_offset_lp(self, hr_x, lr_x):
def get_offset_lp(self, hr_x, lr_x, hr_sim, lr_sim):
if hasattr(self, 'direct_scale'):
# offset = (self.offset(lr_x) + F.pixel_unshuffle(self.hr_offset(hr_x), self.scale)) * (self.direct_scale(lr_x) + F.pixel_unshuffle(self.hr_direct_scale(hr_x), self.scale)).sigmoid() + self.init_pos
offset = (self.offset(lr_sim) + F.pixel_unshuffle(self.hr_offset(hr_sim), self.scale)) * (self.direct_scale(lr_x) + F.pixel_unshuffle(self.hr_direct_scale(hr_x), self.scale)).sigmoid() + self.init_pos
# offset = (self.offset(lr_sim) + F.pixel_unshuffle(self.hr_offset(hr_sim), self.scale)) * (self.direct_scale(lr_sim) + F.pixel_unshuffle(self.hr_direct_scale(hr_sim), self.scale)).sigmoid() + self.init_pos
else:
offset = (self.offset(lr_x) + F.pixel_unshuffle(self.hr_offset(hr_x), self.scale)) * 0.25 + self.init_pos
return offset
def get_offset(self, hr_x, lr_x):
if self.style == 'pl':
raise NotImplementedError
return self.get_offset_lp(hr_x, lr_x)
def compute_similarity(input_tensor, k=3, dilation=1, sim='cos'):
"""
计算输入张量中每一点与周围KxK范围内的点的余弦相似度。
参数:
- input_tensor: 输入张量,形状为[B, C, H, W]
- k: 范围大小,表示周围KxK范围内的点
返回:
- 输出张量,形状为[B, KxK-1, H, W]
"""
B, C, H, W = input_tensor.shape
# 使用零填充来处理边界情况
# padded_input = F.pad(input_tensor, (k // 2, k // 2, k // 2, k // 2), mode='constant', value=0)
# 展平输入张量中每个点及其周围KxK范围内的点
unfold_tensor = F.unfold(input_tensor, k, padding=(k // 2) * dilation, dilation=dilation) # B, CxKxK, HW
# print(unfold_tensor.shape)
unfold_tensor = unfold_tensor.reshape(B, C, k**2, H, W)
# 计算余弦相似度
if sim == 'cos':
similarity = F.cosine_similarity(unfold_tensor[:, :, k * k // 2:k * k // 2 + 1], unfold_tensor[:, :, :], dim=1)
elif sim == 'dot':
similarity = unfold_tensor[:, :, k * k // 2:k * k // 2 + 1] * unfold_tensor[:, :, :]
similarity = similarity.sum(dim=1)
else:
raise NotImplementedError
# 移除中心点的余弦相似度,得到[KxK-1]的结果
similarity = torch.cat((similarity[:, :k * k // 2], similarity[:, k * k // 2 + 1:]), dim=1)
# 将结果重塑回[B, KxK-1, H, W]的形状
similarity = similarity.view(B, k * k - 1, H, W)
return similarity
四、修改步骤
StarNet配置步骤:
FreqFusion配置步骤:
五、yaml模型文件
5.1 模型改进⭐
在代码配置完成后,配置模型的YAML文件。
此处以
ultralytics/cfg/models/rt-detr/rtdetr-l.yaml
为例,在同目录下创建一个用于自己数据集训练的模型文件
rtdetr-l-StarNet-FreqFusion.yaml
。
将
rtdetr-l.yaml
中的内容复制到
rtdetr-StarNet-FreqFusion.yaml
文件下,修改
nc
数量等于自己数据中目标的数量。
📌 模型的修改方法是将
骨干网络
替换成
starnet_s050
,颈部中加入
FreqFusion
。
# Ultralytics YOLO 🚀, AGPL-3.0 license
# RT-DETR-l object detection model with P3-P5 outputs. For details see https://docs.ultralytics.com/models/rtdetr
# Parameters
nc: 1 # number of classes
scales: # model compound scaling constants, i.e. 'model=yolov8n-cls.yaml' will call yolov8-cls.yaml with scale 'n'
# [depth, width, max_channels]
l: [1.00, 1.00, 1024]
backbone:
# [from, repeats, module, args]
- [-1, 1, starnet_s050, []] # 4
head:
- [-1, 1, Conv, [256, 1, 1, None, 1, 1, False]] # 5 input_proj.2
- [-1, 1, AIFI, [1024, 8]]
- [-1, 1, Conv, [256, 1, 1]] # 7, Y5, lateral_convs.0
- [2, 1, Conv, [256]] # 8-P3/8
- [3, 1, Conv, [256]] # 9-P4/16
- [7, 1, Conv, [256]] # 10-P5/32
- [[9, -1], 1, FreqFusion, []] # cat backbone P4
- [-1, 3, RepC3, [256]] # 12, fpn_blocks.0
- [-1, 1, Conv, [256, 1, 1]] # 13, Y4, lateral_convs.1
- [[8, -1], 1, FreqFusion, []] # cat backbone P3
- [-1, 3, RepC3, [256]] # X3 (15), fpn_blocks.1
- [-1, 1, Conv, [256, 3, 2]] # 16, downsample_convs.0
- [[-1, 13], 1, Concat, [1]] # cat Y4
- [-1, 3, RepC3, [256]] # F4 (18), pan_blocks.0
- [-1, 1, Conv, [256, 3, 2]] # 19, downsample_convs.1
- [[-1, 7], 1, Concat, [1]] # cat Y5
- [-1, 3, RepC3, [256]] # F5 (21), pan_blocks.1
- [[15, 18, 21], 1, RTDETRDecoder, [nc]] # Detect(P3, P4, P5)
六、成功运行结果
分别打印网络模型可以看到
StarNet和FreqFusion模块
已经加入到模型中,并可以进行训练了。
rtdetr-StarNet-FreqFusion :
rtdetr-StarNet-FreqFusion summary: 490 layers, 18,716,999 parameters, 18,716,999 gradients
from n params module arguments
0 -1 1 413472 starnet_s050 []
1 -1 1 33280 ultralytics.nn.modules.conv.Conv [128, 256, 1, 1, None, 1, 1, False]
2 -1 1 789760 ultralytics.nn.modules.transformer.AIFI [256, 1024, 8]
3 -1 1 66048 ultralytics.nn.modules.conv.Conv [256, 256, 1, 1]
4 2 1 8704 ultralytics.nn.modules.conv.Conv [32, 256]
5 3 1 16896 ultralytics.nn.modules.conv.Conv [64, 256]
6 7 1 66048 ultralytics.nn.modules.conv.Conv [256, 256]
7 [9, -1] 1 52514 ultralytics.nn.AddModules.FreqFusion.FreqFusion[[256, 256]]
8 -1 3 2101248 ultralytics.nn.modules.block.RepC3 [256, 256, 3]
9 -1 1 66048 ultralytics.nn.modules.conv.Conv [256, 256, 1, 1]
10 [8, -1] 1 52514 ultralytics.nn.AddModules.FreqFusion.FreqFusion[[256, 256]]
11 -1 3 2101248 ultralytics.nn.modules.block.RepC3 [256, 256, 3]
12 -1 1 590336 ultralytics.nn.modules.conv.Conv [256, 256, 3, 2]
13 [-1, 13] 1 0 ultralytics.nn.modules.conv.Concat [1]
14 -1 3 2232320 ultralytics.nn.modules.block.RepC3 [512, 256, 3]
15 -1 1 590336 ultralytics.nn.modules.conv.Conv [256, 256, 3, 2]
16 [-1, 7] 1 0 ultralytics.nn.modules.conv.Concat [1]
17 -1 3 2232320 ultralytics.nn.modules.block.RepC3 [512, 256, 3]
18 [15, 18, 21] 1 7303907 ultralytics.nn.modules.head.RTDETRDecoder [1, [256, 256, 256]]
rtdetr-StarNet-FreqFusion summary: 490 layers, 18,716,999 parameters, 18,716,999 gradients