学习资源站

RT-DETR改进策略【独家融合改进】模型轻量化二次改进:StarNet+FreqFusion,极限降参,适用专栏内所有轻量化模型-

RT-DETR改进策略【独家融合改进】| 模型轻量化二次改进:StarNet + FreqFusion,极限降参,适用专栏内所有轻量化模型

一、本文介绍

本文记录的是 利用StarNet和FreqFusion对RT-DETR的模型结构进行改进 StarNet 的轻量化设计为 FreqFusion 提供了一个简洁高效的特征提取基础,使得 FreqFusion 能够 在相对简单的特征空间中进行融合操作,提高融合的效率和效果 。同时, FreqFusion 的特征融合能力又弥补了 RT-DETR 在特征融合方面可能存在的不足,两者相互配合, 实现了高效的轻量化特征融合,使模型在轻量化的同时保持良好的性能。



二、🌟

Rewrite the Stars

StarNet 是一种基于 星操作 (star operation)的轻量化神经网络模型,以下是其相关介绍:

2.1 出发点

  • “星操作”的优势适配 :研究发现“星操作”(element - wise乘法)在网络设计中具有潜力,它能在不增加网络宽度的情况下将输入映射到高维非线性特征空间,类似核技巧。这种特性使得基于“星操作”的网络可能更适合高效、紧凑的网络结构,而不是传统的大型模型,从而启发了StarNet的设计。
  • 现有高效网络的局限性 :尽管已有多种提升网络效率的方法,如深度可分离卷积、特征复用和重参数化等,但“星操作”提供了一种新的思路,可以在低维空间计算的同时考虑极高维的特征,为高效网络设计提供了独特优势,促使研究人员探索基于“星操作”的轻量化模型。

2.2 原理

2.2.1 高维特征映射

  • 单一层的“星操作” :在神经网络的单一层中,“星操作”通常写为 ( W 1 T X + B 1 ) ∗ ( W 2 T X + B 2 ) (W_{1}^{T}X + B_{1})*(W_{2}^{T}X + B_{2}) ( W 1 T X + B 1 ) ( W 2 T X + B 2 ) ,经过改写和分析,可以发现它能将输入特征在 d d d 维空间中通过计算高效的操作,实现到 ( d + 2 ) ( d + 1 ) 2 ≈ ( d 2 ) 2 \frac{(d + 2)(d + 1)}{2}\approx(\frac{d}{\sqrt{2}})^{2} 2 ( d + 2 ) ( d + 1 ) ( 2 d ) 2 (考虑 d ≫ 2 d\gg2 d 2 )隐式维度特征空间的表示,每个项(除了部分特殊项)都与输入呈现非线性关联,即实现了高维特征映射。
  • 多层的“星操作” :当堆叠多层时,以初始网络层宽度为 d d d 为例,经过 l l l 层的“星操作”,可以隐式获得属于 R ( d 2 ) 2 l \mathbb{R}^{(\frac{d}{\sqrt{2}})^{2^{l}}} R ( 2 d ) 2 l 的特征空间。

例如,一个10层宽度为128的各向同性网络,通过“星操作”获得的隐式特征维度数近似为 901024 901024 901024 ,可合理近似为无限维度,从而实现了指数级的隐式维度增加。

2.2.2 与核函数的类比

  • “星操作”实现非线性高维的方式与传统神经网络通过增加网络宽度不同,它类似于核函数(特别是多项式核函数)对不同通道特征进行成对乘法操作。这种类比进一步说明了“星操作”在特征空间变换上的原理优势。

2.3 结构

2.3.1 整体架构

StarNet 是一个 4 阶段的分层架构。

在这里插入图片描述

2.3.2 下采样层

利用 卷积层 进行下采样。

2.3.3 特征提取模块

采用修改后的demo block进行特征提取。在每个block中,受MobileNeXt启发,在末尾包含深度可分离卷积;将demo block中的GELU激活函数替换为 ReLU6 ,遵循MobileNetv2的设计;

网络宽度在每个阶段翻倍,通道扩展因子固定为4;同时,为了满足效率要求,用批归一化(Batch Normalization)替代层归一化(Layer Normalization),并放在深度可分离卷积之后(在推理时可融合)。

通过改变block数量和输入嵌入通道数来构建不同大小的 StarNet

在这里插入图片描述

2.4 优势

  • 性能优异
    • 实验验证 :在ImageNet - 1K验证集上,StarNet - S4相对于EdgeViT - XS在top - 1准确率上提高了 0.9 % 0.9\% 0.9% ,同时在iPhone13和CPU上运行速度快 3 3 3 倍,在GPU上快 2 2 2 倍。与其他精心设计的高效模型相比,如MobileNetv3、EdgeViT、FasterNet等,StarNet在性能上也具有竞争力。
    • 设计简洁高效 StarNet 设计简洁,没有复杂的设计和精细调整的超参数,却能取得优异的性能,这体现了 星操作 在网络设计中的有效性,也证明了基于“星操作”的轻量化设计理念的优势。
  • 低延迟 :在不同的硬件平台上,包括移动设备(如iPhone系列)和服务器端的CPU、GPU,StarNet都展现出了较低的延迟。例如在iPhone13上,StarNet - S1能在 0.7 0.7 0.7 秒内达到 73.5 % 73.5\% 73.5% 的top - 1准确率,与MobileOne - S0相比,在相同延迟下准确率提高了 2.1 % 2.1\% 2.1% 。这种低延迟特性使得 StarNet 在实际应用中具有很大的优势,特别是对于对实时性要求较高的任务。
  • 通用性和可扩展性 StarNet 的设计基于通用的神经网络架构原则,并通过 星操作 进行了优化。这种设计使得它具有较好的通用性,可以应用于各种计算机视觉任务。同时,通过调整模型的深度和宽度等参数,可以方便地对模型进行扩展,以适应不同的应用场景和性能需求。

论文: https://arxiv.org/pdf/2403.19967
源码: https://github.com/ma-xu/Rewrite-the-Stars

三、FreqFusion介绍

Frequency-aware Feature Fusion for Dense Image Prediction

FreqFusion 是一种旨在 解决密集图像预测任务中特征融合问题 的方法,以下从其结构设计的出发点、结构、原理和作用等方面进行详细介绍:

3.1 出发点

标准特征融合技术存在两个问题,即 类别内不一致性 边界位移

例如,同一物体不同部分的特征差异大导致类别内不一致;简单插值使特征过度平滑导致边界位移,且下层次特征的详细边界信息未被充分利用。

3.2 结构

自适应低通滤波器(ALPF)生成器 偏移生成器 自适应高通滤波器(AHPF)生成器 三个关键组件构成。

在这里插入图片描述

3.3 原理

  1. 首先进行 初始融合 将低层次和高层次特征压缩并融合 ,为三个生成器提供输入。
    • 简单初始融合存在不足,一是 采用简单插值上采样压缩特征导致边界模糊
    • 二是 ALPF生成器 依赖高频信息,但 传统卷积层只能捕获固定高频模式
    • 为此进行了增强,利用 ALPF生成器 生成 初始低通滤波器 上采样压缩的高层次特征 ,并采用 AHPF生成器 提取特征图中的高频分量
  2. ALPF生成器 以初始融合的 z l z^{l} z l 为输入,通过 3×3卷积层 Softmax层 预测 空间变化的低通滤波器 。接着使用 亚像素上采样技术 ,将低通滤波器重构成4组,得到4组低通滤波后的特征,再重新排列形成 上采样后的特征
  3. 偏移生成器 根据 局部相似度 计算偏移量,用于重采样特征像素, 用具有高类别内相似度的附近特征替换高层次特征中的不一致特征。
  4. AHPF生成器 预测并应用空间变化的高通滤波器到低层次特征,以 增强下采样过程中丢失的高频细节信息,从而更准确地描绘边界。

在这里插入图片描述

3.4 作用

FreqFusion 通过自适应地用空间变化的低通滤波器平滑高层次特征、重采样附近类别一致的特征来替换高层次特征中的不一致特征、增强低层次特征的高频边界细节,来解决类别不一致性和边界位移问题,从而恢复具有一致类别信息和清晰边界的融合特征。提高了特征一致性和边界清晰度,在各种密集预测任务中取得了显著的性能提升。

论文: https://arxiv.org/pdf/2408.12879
源码: https://github.com/Linwei-Chen/FreqFusion

四、StarNet和FreqFusion模块的实现代码

StarNet模块 的实现代码如下:

 

"""
Implementation of Prof-of-Concept Network: StarNet.

We make StarNet as simple as possible [to show the key contribution of element-wise multiplication]:
    - like NO layer-scale in network design,
    - and NO EMA during training,
    - which would improve the performance further.

Created by: Xu Ma (Email: ma.xu1@northeastern.edu)
Modified Date: Mar/29/2024
"""
import torch
import torch.nn as nn
from timm.models.layers import DropPath, trunc_normal_

__all__ = ['starnet_s050', 'starnet_s100', 'starnet_s150', 'starnet_s1', 'starnet_s2', 'starnet_s3', 'starnet_s4']

model_urls = {
    "starnet_s1": "https://github.com/ma-xu/Rewrite-the-Stars/releases/download/checkpoints_v1/starnet_s1.pth.tar",
    "starnet_s2": "https://github.com/ma-xu/Rewrite-the-Stars/releases/download/checkpoints_v1/starnet_s2.pth.tar",
    "starnet_s3": "https://github.com/ma-xu/Rewrite-the-Stars/releases/download/checkpoints_v1/starnet_s3.pth.tar",
    "starnet_s4": "https://github.com/ma-xu/Rewrite-the-Stars/releases/download/checkpoints_v1/starnet_s4.pth.tar",
}

class ConvBN(torch.nn.Sequential):
    def __init__(self, in_planes, out_planes, kernel_size=1, stride=1, padding=0, dilation=1, groups=1, with_bn=True):
        super().__init__()
        self.add_module('conv', torch.nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, dilation, groups))
        if with_bn:
            self.add_module('bn', torch.nn.BatchNorm2d(out_planes))
            torch.nn.init.constant_(self.bn.weight, 1)
            torch.nn.init.constant_(self.bn.bias, 0)

class Block(nn.Module):
    def __init__(self, dim, mlp_ratio=3, drop_path=0.):
        super().__init__()
        self.dwconv = ConvBN(dim, dim, 7, 1, (7 - 1) // 2, groups=dim, with_bn=True)
        self.f1 = ConvBN(dim, mlp_ratio * dim, 1, with_bn=False)
        self.f2 = ConvBN(dim, mlp_ratio * dim, 1, with_bn=False)
        self.g = ConvBN(mlp_ratio * dim, dim, 1, with_bn=True)
        self.dwconv2 = ConvBN(dim, dim, 7, 1, (7 - 1) // 2, groups=dim, with_bn=False)
        self.act = nn.ReLU6()
        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()

    def forward(self, x):
        input = x
        x = self.dwconv(x)
        x1, x2 = self.f1(x), self.f2(x)
        x = self.act(x1) * x2
        x = self.dwconv2(self.g(x))
        x = input + self.drop_path(x)
        return x

class StarNet(nn.Module):
    def __init__(self, base_dim=32, depths=[3, 3, 12, 5], mlp_ratio=4, drop_path_rate=0.0, num_classes=1000, **kwargs):
        super().__init__()
        self.num_classes = num_classes
        self.in_channel = 32
        # stem layer
        self.stem = nn.Sequential(ConvBN(3, self.in_channel, kernel_size=3, stride=2, padding=1), nn.ReLU6())
        dpr = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))] # stochastic depth
        # build stages
        self.stages = nn.ModuleList()
        cur = 0
        for i_layer in range(len(depths)):
            embed_dim = base_dim * 2 ** i_layer
            down_sampler = ConvBN(self.in_channel, embed_dim, 3, 2, 1)
            self.in_channel = embed_dim
            blocks = [Block(self.in_channel, mlp_ratio, dpr[cur + i]) for i in range(depths[i_layer])]
            cur += depths[i_layer]
            self.stages.append(nn.Sequential(down_sampler, *blocks))
        
        self.channel = [i.size(1) for i in self.forward(torch.randn(1, 3, 640, 640))]
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear or nn.Conv2d):
            trunc_normal_(m.weight, std=.02)
            if isinstance(m, nn.Linear) and m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm or nn.BatchNorm2d):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    def forward(self, x):
        features = []
        x = self.stem(x)
        features.append(x)
        for stage in self.stages:
            x = stage(x)
            features.append(x)
        return features

def starnet_s1(pretrained=False, **kwargs):
    model = StarNet(24, [2, 2, 8, 3], **kwargs)
    if pretrained:
        url = model_urls['starnet_s1']
        checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
        model.load_state_dict(checkpoint["state_dict"], strict=False)
    return model

def starnet_s2(pretrained=False, **kwargs):
    model = StarNet(32, [1, 2, 6, 2], **kwargs)
    if pretrained:
        url = model_urls['starnet_s2']
        checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
        model.load_state_dict(checkpoint["state_dict"], strict=False)
    return model

def starnet_s3(pretrained=False, **kwargs):
    model = StarNet(32, [2, 2, 8, 4], **kwargs)
    if pretrained:
        url = model_urls['starnet_s3']
        checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
        model.load_state_dict(checkpoint["state_dict"], strict=False)
    return model

def starnet_s4(pretrained=False, **kwargs):
    model = StarNet(32, [3, 3, 12, 5], **kwargs)
    if pretrained:
        url = model_urls['starnet_s4']
        checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
        model.load_state_dict(checkpoint["state_dict"], strict=False)
    return model

# very small networks #

def starnet_s050(pretrained=False, **kwargs):
    return StarNet(16, [1, 1, 3, 1], 3, **kwargs)

def starnet_s100(pretrained=False, **kwargs):
    return StarNet(20, [1, 2, 4, 1], 4, **kwargs)

def starnet_s150(pretrained=False, **kwargs):
    return StarNet(24, [1, 2, 4, 2], 3, **kwargs)

FreqFusion模块 的实现代码如下:

# TPAMI 2024:Frequency-aware Feature Fusion for Dense Image Prediction

import torch
import torch.nn as nn
import torch.nn.functional as F
try:
    from mmcv.ops.carafe import normal_init, xavier_init, carafe
except ImportError:
    pass
from torch.utils.checkpoint import checkpoint
import warnings
import numpy as np

__all__ = ['FreqFusion']

def normal_init(module, mean=0, std=1, bias=0):
    if hasattr(module, 'weight') and module.weight is not None:
        nn.init.normal_(module.weight, mean, std)
    if hasattr(module, 'bias') and module.bias is not None:
        nn.init.constant_(module.bias, bias)

def constant_init(module, val, bias=0):
    if hasattr(module, 'weight') and module.weight is not None:
        nn.init.constant_(module.weight, val)
    if hasattr(module, 'bias') and module.bias is not None:
        nn.init.constant_(module.bias, bias)

def resize(input,
           size=None,
           scale_factor=None,
           mode='nearest',
           align_corners=None,
           warning=True):
    if warning:
        if size is not None and align_corners:
            input_h, input_w = tuple(int(x) for x in input.shape[2:])
            output_h, output_w = tuple(int(x) for x in size)
            if output_h > input_h or output_w > input_w:
                if ((output_h > 1 and output_w > 1 and input_h > 1
                     and input_w > 1) and (output_h - 1) % (input_h - 1)
                        and (output_w - 1) % (input_w - 1)):
                    warnings.warn(
                        f'When align_corners={align_corners}, '
                        'the output would more aligned if '
                        f'input size {(input_h, input_w)} is `x+1` and '
                        f'out size {(output_h, output_w)} is `nx+1`')
    return F.interpolate(input, size, scale_factor, mode, align_corners)

def hamming2D(M, N):
    """
    生成二维Hamming窗

    参数:
    - M:窗口的行数
    - N:窗口的列数

    返回:
    - 二维Hamming窗
    """
    # 生成水平和垂直方向上的Hamming窗
    # hamming_x = np.blackman(M)
    # hamming_x = np.kaiser(M)
    hamming_x = np.hamming(M)
    hamming_y = np.hamming(N)
    # 通过外积生成二维Hamming窗
    hamming_2d = np.outer(hamming_x, hamming_y)
    return hamming_2d

class FreqFusion(nn.Module):
    def __init__(self,
                channels,
                scale_factor=1,
                lowpass_kernel=5,
                highpass_kernel=3,
                up_group=1,
                encoder_kernel=3,
                encoder_dilation=1,
                compressed_channels=64,        
                align_corners=False,
                upsample_mode='nearest',
                feature_resample=False, # use offset generator or not
                feature_resample_group=4,
                comp_feat_upsample=True, # use ALPF & AHPF for init upsampling
                use_high_pass=True,
                use_low_pass=True,
                hr_residual=True,
                semi_conv=True,
                hamming_window=True, # for regularization, do not matter really
                feature_resample_norm=True,
                **kwargs):
        super().__init__()
        hr_channels, lr_channels = channels
        self.scale_factor = scale_factor
        self.lowpass_kernel = lowpass_kernel
        self.highpass_kernel = highpass_kernel
        self.up_group = up_group
        self.encoder_kernel = encoder_kernel
        self.encoder_dilation = encoder_dilation
        self.compressed_channels = (hr_channels + lr_channels) // 8
        self.hr_channel_compressor = nn.Conv2d(hr_channels, self.compressed_channels,1)
        self.lr_channel_compressor = nn.Conv2d(lr_channels, self.compressed_channels,1)
        self.content_encoder = nn.Conv2d( # ALPF generator
            self.compressed_channels,
            lowpass_kernel ** 2 * self.up_group * self.scale_factor * self.scale_factor,
            self.encoder_kernel,
            padding=int((self.encoder_kernel - 1) * self.encoder_dilation / 2),
            dilation=self.encoder_dilation,
            groups=1)
        
        self.align_corners = align_corners
        self.upsample_mode = upsample_mode
        self.hr_residual = hr_residual
        self.use_high_pass = use_high_pass
        self.use_low_pass = use_low_pass
        self.semi_conv = semi_conv
        self.feature_resample = feature_resample
        self.comp_feat_upsample = comp_feat_upsample
        if self.feature_resample:
            self.dysampler = LocalSimGuidedSampler(in_channels=compressed_channels, scale=2, style='lp', groups=feature_resample_group, use_direct_scale=True, kernel_size=encoder_kernel, norm=feature_resample_norm)
        if self.use_high_pass:
            self.content_encoder2 = nn.Conv2d( # AHPF generator
                self.compressed_channels,
                highpass_kernel ** 2 * self.up_group * self.scale_factor * self.scale_factor,
                self.encoder_kernel,
                padding=int((self.encoder_kernel - 1) * self.encoder_dilation / 2),
                dilation=self.encoder_dilation,
                groups=1)
        self.hamming_window = hamming_window
        lowpass_pad=0
        highpass_pad=0
        if self.hamming_window:
            self.register_buffer('hamming_lowpass', torch.FloatTensor(hamming2D(lowpass_kernel + 2 * lowpass_pad, lowpass_kernel + 2 * lowpass_pad))[None, None,])
            self.register_buffer('hamming_highpass', torch.FloatTensor(hamming2D(highpass_kernel + 2 * highpass_pad, highpass_kernel + 2 * highpass_pad))[None, None,])
        else:
            self.register_buffer('hamming_lowpass', torch.FloatTensor([1.0]))
            self.register_buffer('hamming_highpass', torch.FloatTensor([1.0]))
        self.init_weights()

    def init_weights(self):
        for m in self.modules():
            # print(m)
            if isinstance(m, nn.Conv2d):
                xavier_init(m, distribution='uniform')
        normal_init(self.content_encoder, std=0.001)
        if self.use_high_pass:
            normal_init(self.content_encoder2, std=0.001)

    def kernel_normalizer(self, mask, kernel, scale_factor=None, hamming=1):
        if scale_factor is not None:
            mask = F.pixel_shuffle(mask, self.scale_factor)
        n, mask_c, h, w = mask.size()
        mask_channel = int(mask_c / float(kernel**2))
        # mask = mask.view(n, mask_channel, -1, h, w)
        # mask = F.softmax(mask, dim=2, dtype=mask.dtype)
        # mask = mask.view(n, mask_c, h, w).contiguous()

        mask = mask.view(n, mask_channel, -1, h, w)
        mask = F.softmax(mask, dim=2, dtype=mask.dtype)
        mask = mask.view(n, mask_channel, kernel, kernel, h, w)
        mask = mask.permute(0, 1, 4, 5, 2, 3).view(n, -1, kernel, kernel)
        # mask = F.pad(mask, pad=[padding] * 4, mode=self.padding_mode) # kernel + 2 * padding
        mask = mask * hamming
        mask /= mask.sum(dim=(-1, -2), keepdims=True)
        # print(hamming)
        # print(mask.shape)
        mask = mask.view(n, mask_channel, h, w, -1)
        mask =  mask.permute(0, 1, 4, 2, 3).view(n, -1, h, w).contiguous()
        return mask

    def forward(self, x, use_checkpoint=False):
        hr_feat, lr_feat = x
        if use_checkpoint:
            return checkpoint(self._forward, hr_feat, lr_feat)
        else:
            return self._forward(hr_feat, lr_feat)

    def _forward(self, hr_feat, lr_feat):
        compressed_hr_feat = self.hr_channel_compressor(hr_feat)
        compressed_lr_feat = self.lr_channel_compressor(lr_feat)
        if self.semi_conv:
            if self.comp_feat_upsample:
                if self.use_high_pass:
                    mask_hr_hr_feat = self.content_encoder2(compressed_hr_feat)
                    mask_hr_init = self.kernel_normalizer(mask_hr_hr_feat, self.highpass_kernel, hamming=self.hamming_highpass)
                    compressed_hr_feat = compressed_hr_feat + compressed_hr_feat - carafe(compressed_hr_feat, mask_hr_init.to(compressed_hr_feat.dtype), self.highpass_kernel, self.up_group, 1)
                    
                    mask_lr_hr_feat = self.content_encoder(compressed_hr_feat)
                    mask_lr_init = self.kernel_normalizer(mask_lr_hr_feat, self.lowpass_kernel, hamming=self.hamming_lowpass)
                    
                    mask_lr_lr_feat_lr = self.content_encoder(compressed_lr_feat)
                    mask_lr_lr_feat = F.interpolate(
                        carafe(mask_lr_lr_feat_lr, mask_lr_init.to(compressed_hr_feat.dtype), self.lowpass_kernel, self.up_group, 2), size=compressed_hr_feat.shape[-2:], mode='nearest')
                    mask_lr = mask_lr_hr_feat + mask_lr_lr_feat

                    mask_lr_init = self.kernel_normalizer(mask_lr, self.lowpass_kernel, hamming=self.hamming_lowpass)
                    mask_hr_lr_feat = F.interpolate(
                        carafe(self.content_encoder2(compressed_lr_feat), mask_lr_init.to(compressed_hr_feat.dtype), self.lowpass_kernel, self.up_group, 2), size=compressed_hr_feat.shape[-2:], mode='nearest')
                    mask_hr = mask_hr_hr_feat + mask_hr_lr_feat
                else: raise NotImplementedError
            else:
                mask_lr = self.content_encoder(compressed_hr_feat) + F.interpolate(self.content_encoder(compressed_lr_feat), size=compressed_hr_feat.shape[-2:], mode='nearest')
                if self.use_high_pass:
                    mask_hr = self.content_encoder2(compressed_hr_feat) + F.interpolate(self.content_encoder2(compressed_lr_feat), size=compressed_hr_feat.shape[-2:], mode='nearest')
        else:
            compressed_x = F.interpolate(compressed_lr_feat, size=compressed_hr_feat.shape[-2:], mode='nearest') + compressed_hr_feat
            mask_lr = self.content_encoder(compressed_x)
            if self.use_high_pass: 
                mask_hr = self.content_encoder2(compressed_x)
        
        mask_lr = self.kernel_normalizer(mask_lr, self.lowpass_kernel, hamming=self.hamming_lowpass)
        if self.semi_conv:
                lr_feat = carafe(lr_feat, mask_lr.to(compressed_hr_feat.dtype), self.lowpass_kernel, self.up_group, 2)
        else:
            lr_feat = resize(
                input=lr_feat,
                size=hr_feat.shape[2:],
                mode=self.upsample_mode,
                align_corners=None if self.upsample_mode == 'nearest' else self.align_corners)
            lr_feat = carafe(lr_feat, mask_lr, self.lowpass_kernel, self.up_group, 1)

        if self.use_high_pass:
            mask_hr = self.kernel_normalizer(mask_hr, self.highpass_kernel, hamming=self.hamming_highpass)
            if self.hr_residual:
                # print('using hr_residual')
                hr_feat_hf = hr_feat - carafe(hr_feat, mask_hr.to(compressed_hr_feat.dtype), self.highpass_kernel, self.up_group, 1)
                hr_feat = hr_feat_hf + hr_feat
            else:
                hr_feat = hr_feat_hf

        if self.feature_resample:
            # print(lr_feat.shape)
            lr_feat = self.dysampler(hr_x=compressed_hr_feat, 
                                     lr_x=compressed_lr_feat, feat2sample=lr_feat)
                
        # return  mask_lr, hr_feat, lr_feat
        return hr_feat + lr_feat

class LocalSimGuidedSampler(nn.Module):
    """
    offset generator in FreqFusion
    """
    def __init__(self, in_channels, scale=2, style='lp', groups=4, use_direct_scale=True, kernel_size=1, local_window=3, sim_type='cos', norm=True, direction_feat='sim_concat'):
        super().__init__()
        assert scale==2
        assert style=='lp'

        self.scale = scale
        self.style = style
        self.groups = groups
        self.local_window = local_window
        self.sim_type = sim_type
        self.direction_feat = direction_feat

        if style == 'pl':
            assert in_channels >= scale ** 2 and in_channels % scale ** 2 == 0
        assert in_channels >= groups and in_channels % groups == 0

        if style == 'pl':
            in_channels = in_channels // scale ** 2
            out_channels = 2 * groups
        else:
            out_channels = 2 * groups * scale ** 2
        if self.direction_feat == 'sim':
            self.offset = nn.Conv2d(local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
        elif self.direction_feat == 'sim_concat':
            self.offset = nn.Conv2d(in_channels + local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
        else: raise NotImplementedError
        normal_init(self.offset, std=0.001)
        if use_direct_scale:
            if self.direction_feat == 'sim':
                self.direct_scale = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
            elif self.direction_feat == 'sim_concat':
                self.direct_scale = nn.Conv2d(in_channels + local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
            else: raise NotImplementedError
            constant_init(self.direct_scale, val=0.)

        out_channels = 2 * groups
        if self.direction_feat == 'sim':
            self.hr_offset = nn.Conv2d(local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
        elif self.direction_feat == 'sim_concat':
            self.hr_offset = nn.Conv2d(in_channels + local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
        else: raise NotImplementedError
        normal_init(self.hr_offset, std=0.001)
        
        if use_direct_scale:
            if self.direction_feat == 'sim':
                self.hr_direct_scale = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
            elif self.direction_feat == 'sim_concat':
                self.hr_direct_scale = nn.Conv2d(in_channels + local_window**2 - 1, out_channels, kernel_size=kernel_size, padding=kernel_size//2)
            else: raise NotImplementedError
            constant_init(self.hr_direct_scale, val=0.)

        self.norm = norm
        if self.norm:
            self.norm_hr = nn.GroupNorm(in_channels // 8, in_channels)
            self.norm_lr = nn.GroupNorm(in_channels // 8, in_channels)
        else:
            self.norm_hr = nn.Identity()
            self.norm_lr = nn.Identity()
        self.register_buffer('init_pos', self._init_pos())

    def _init_pos(self):
        h = torch.arange((-self.scale + 1) / 2, (self.scale - 1) / 2 + 1) / self.scale
        return torch.stack(torch.meshgrid([h, h])).transpose(1, 2).repeat(1, self.groups, 1).reshape(1, -1, 1, 1)
    
    def sample(self, x, offset, scale=None):
        if scale is None: scale = self.scale
        B, _, H, W = offset.shape
        offset = offset.view(B, 2, -1, H, W)
        coords_h = torch.arange(H) + 0.5
        coords_w = torch.arange(W) + 0.5
        coords = torch.stack(torch.meshgrid([coords_w, coords_h])
                             ).transpose(1, 2).unsqueeze(1).unsqueeze(0).type(x.dtype).to(x.device)
        normalizer = torch.tensor([W, H], dtype=x.dtype, device=x.device).view(1, 2, 1, 1, 1)
        coords = 2 * (coords + offset) / normalizer - 1
        coords = F.pixel_shuffle(coords.view(B, -1, H, W), scale).view(
            B, 2, -1, scale * H, scale * W).permute(0, 2, 3, 4, 1).contiguous().flatten(0, 1)
        return F.grid_sample(x.reshape(B * self.groups, -1, x.size(-2), x.size(-1)), coords, mode='bilinear',
                             align_corners=False, padding_mode="border").view(B, -1, scale * H, scale * W)
    
    def forward(self, hr_x, lr_x, feat2sample):
        hr_x = self.norm_hr(hr_x)
        lr_x = self.norm_lr(lr_x)

        if self.direction_feat == 'sim':
            hr_sim = compute_similarity(hr_x, self.local_window, dilation=2, sim='cos')
            lr_sim = compute_similarity(lr_x, self.local_window, dilation=2, sim='cos')
        elif self.direction_feat == 'sim_concat':
            hr_sim = torch.cat([hr_x, compute_similarity(hr_x, self.local_window, dilation=2, sim='cos')], dim=1)
            lr_sim = torch.cat([lr_x, compute_similarity(lr_x, self.local_window, dilation=2, sim='cos')], dim=1)
            hr_x, lr_x = hr_sim, lr_sim
        # offset = self.get_offset(hr_x, lr_x)
        offset = self.get_offset_lp(hr_x, lr_x, hr_sim, lr_sim)
        return self.sample(feat2sample, offset)
    
    # def get_offset_lp(self, hr_x, lr_x):
    def get_offset_lp(self, hr_x, lr_x, hr_sim, lr_sim):
        if hasattr(self, 'direct_scale'):
            # offset = (self.offset(lr_x) + F.pixel_unshuffle(self.hr_offset(hr_x), self.scale)) * (self.direct_scale(lr_x) + F.pixel_unshuffle(self.hr_direct_scale(hr_x), self.scale)).sigmoid() + self.init_pos
            offset = (self.offset(lr_sim) + F.pixel_unshuffle(self.hr_offset(hr_sim), self.scale)) * (self.direct_scale(lr_x) + F.pixel_unshuffle(self.hr_direct_scale(hr_x), self.scale)).sigmoid() + self.init_pos
            # offset = (self.offset(lr_sim) + F.pixel_unshuffle(self.hr_offset(hr_sim), self.scale)) * (self.direct_scale(lr_sim) + F.pixel_unshuffle(self.hr_direct_scale(hr_sim), self.scale)).sigmoid() + self.init_pos
        else:
            offset =  (self.offset(lr_x) + F.pixel_unshuffle(self.hr_offset(hr_x), self.scale)) * 0.25 + self.init_pos
        return offset

    def get_offset(self, hr_x, lr_x):
        if self.style == 'pl':
            raise NotImplementedError
        return self.get_offset_lp(hr_x, lr_x)

def compute_similarity(input_tensor, k=3, dilation=1, sim='cos'):
    """
    计算输入张量中每一点与周围KxK范围内的点的余弦相似度。

    参数:
    - input_tensor: 输入张量,形状为[B, C, H, W]
    - k: 范围大小,表示周围KxK范围内的点

    返回:
    - 输出张量,形状为[B, KxK-1, H, W]
    """
    B, C, H, W = input_tensor.shape
    # 使用零填充来处理边界情况
    # padded_input = F.pad(input_tensor, (k // 2, k // 2, k // 2, k // 2), mode='constant', value=0)

    # 展平输入张量中每个点及其周围KxK范围内的点
    unfold_tensor = F.unfold(input_tensor, k, padding=(k // 2) * dilation, dilation=dilation) # B, CxKxK, HW
    # print(unfold_tensor.shape)
    unfold_tensor = unfold_tensor.reshape(B, C, k**2, H, W)

    # 计算余弦相似度
    if sim == 'cos':
        similarity = F.cosine_similarity(unfold_tensor[:, :, k * k // 2:k * k // 2 + 1], unfold_tensor[:, :, :], dim=1)
    elif sim == 'dot':
        similarity = unfold_tensor[:, :, k * k // 2:k * k // 2 + 1] * unfold_tensor[:, :, :]
        similarity = similarity.sum(dim=1)
    else:
        raise NotImplementedError

    # 移除中心点的余弦相似度,得到[KxK-1]的结果
    similarity = torch.cat((similarity[:, :k * k // 2], similarity[:, k * k // 2 + 1:]), dim=1)

    # 将结果重塑回[B, KxK-1, H, W]的形状
    similarity = similarity.view(B, k * k - 1, H, W)
    return similarity

四、修改步骤

StarNet配置步骤:

FreqFusion配置步骤:


五、yaml模型文件

5.1 模型改进⭐

在代码配置完成后,配置模型的YAML文件。

此处以 ultralytics/cfg/models/rt-detr/rtdetr-l.yaml 为例,在同目录下创建一个用于自己数据集训练的模型文件 rtdetr-l-StarNet-FreqFusion.yaml

rtdetr-l.yaml 中的内容复制到 rtdetr-StarNet-FreqFusion.yaml 文件下,修改 nc 数量等于自己数据中目标的数量。

📌 模型的修改方法是将 骨干网络 替换成 starnet_s050 ,颈部中加入 FreqFusion

# Ultralytics YOLO 🚀, AGPL-3.0 license
# RT-DETR-l object detection model with P3-P5 outputs. For details see https://docs.ultralytics.com/models/rtdetr

# Parameters
nc: 1  # number of classes
scales: # model compound scaling constants, i.e. 'model=yolov8n-cls.yaml' will call yolov8-cls.yaml with scale 'n'
  # [depth, width, max_channels]
  l: [1.00, 1.00, 1024]

backbone:
  # [from, repeats, module, args]
  - [-1, 1, starnet_s050, []]  # 4

head:
  - [-1, 1, Conv, [256, 1, 1, None, 1, 1, False]] # 5 input_proj.2
  - [-1, 1, AIFI, [1024, 8]]
  - [-1, 1, Conv, [256, 1, 1]] # 7, Y5, lateral_convs.0

  - [2, 1, Conv, [256]] # 8-P3/8
  - [3, 1, Conv, [256]] # 9-P4/16
  - [7, 1, Conv, [256]] # 10-P5/32

  - [[9, -1], 1, FreqFusion, []] # cat backbone P4
  - [-1, 3, RepC3, [256]] # 12, fpn_blocks.0
  - [-1, 1, Conv, [256, 1, 1]] # 13, Y4, lateral_convs.1

  - [[8, -1], 1, FreqFusion, []] # cat backbone P3
  - [-1, 3, RepC3, [256]] # X3 (15), fpn_blocks.1

  - [-1, 1, Conv, [256, 3, 2]] # 16, downsample_convs.0
  - [[-1, 13], 1, Concat, [1]] # cat Y4
  - [-1, 3, RepC3, [256]] # F4 (18), pan_blocks.0

  - [-1, 1, Conv, [256, 3, 2]] # 19, downsample_convs.1
  - [[-1, 7], 1, Concat, [1]] # cat Y5
  - [-1, 3, RepC3, [256]] # F5 (21), pan_blocks.1

  - [[15, 18, 21], 1, RTDETRDecoder, [nc]] # Detect(P3, P4, P5)


六、成功运行结果

分别打印网络模型可以看到 StarNet和FreqFusion模块 已经加入到模型中,并可以进行训练了。

rtdetr-StarNet-FreqFusion

rtdetr-StarNet-FreqFusion summary: 490 layers, 18,716,999 parameters, 18,716,999 gradients

                   from  n    params  module                                       arguments                     
  0                  -1  1    413472  starnet_s050                                 []                            
  1                  -1  1     33280  ultralytics.nn.modules.conv.Conv             [128, 256, 1, 1, None, 1, 1, False]
  2                  -1  1    789760  ultralytics.nn.modules.transformer.AIFI      [256, 1024, 8]                
  3                  -1  1     66048  ultralytics.nn.modules.conv.Conv             [256, 256, 1, 1]              
  4                   2  1      8704  ultralytics.nn.modules.conv.Conv             [32, 256]                     
  5                   3  1     16896  ultralytics.nn.modules.conv.Conv             [64, 256]                     
  6                   7  1     66048  ultralytics.nn.modules.conv.Conv             [256, 256]                    
  7             [9, -1]  1     52514  ultralytics.nn.AddModules.FreqFusion.FreqFusion[[256, 256]]                  
  8                  -1  3   2101248  ultralytics.nn.modules.block.RepC3           [256, 256, 3]                 
  9                  -1  1     66048  ultralytics.nn.modules.conv.Conv             [256, 256, 1, 1]              
 10             [8, -1]  1     52514  ultralytics.nn.AddModules.FreqFusion.FreqFusion[[256, 256]]                  
 11                  -1  3   2101248  ultralytics.nn.modules.block.RepC3           [256, 256, 3]                 
 12                  -1  1    590336  ultralytics.nn.modules.conv.Conv             [256, 256, 3, 2]              
 13            [-1, 13]  1         0  ultralytics.nn.modules.conv.Concat           [1]                           
 14                  -1  3   2232320  ultralytics.nn.modules.block.RepC3           [512, 256, 3]                 
 15                  -1  1    590336  ultralytics.nn.modules.conv.Conv             [256, 256, 3, 2]              
 16             [-1, 7]  1         0  ultralytics.nn.modules.conv.Concat           [1]                           
 17                  -1  3   2232320  ultralytics.nn.modules.block.RepC3           [512, 256, 3]                 
 18        [15, 18, 21]  1   7303907  ultralytics.nn.modules.head.RTDETRDecoder    [1, [256, 256, 256]]          
rtdetr-StarNet-FreqFusion summary: 490 layers, 18,716,999 parameters, 18,716,999 gradients