nnunet使用及改进教程。
本文介绍在nnunet中引入ConvNext模块。
一 、ConvNext
ConvNext认为Transformer模块强大的原因来自于它的结构而不是multi head self-attention,将使用卷积层采用了Transformer的设计理念设计了ConvNext模块。
代码参考:https://github.com/facebookresearch/ConvNeXt
ConvNext结构如下,他把Token Mixer替换成了Pooling:

参考代码:
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import torch
import torch.nn as nn
import torch.nn.functional as F
from timm.models.layers import trunc_normal_, DropPath
from timm.models.registry import register_model
class Block(nn.Module):
r""" ConvNeXt Block. There are two equivalent implementations:
(1) DwConv -> LayerNorm (channels_first) -> 1x1 Conv -> GELU -> 1x1 Conv; all in (N, C, H, W)
(2) DwConv -> Permute to (N, H, W, C); LayerNorm (channels_last) -> Linear -> GELU -> Linear; Permute back
We use (2) as we find it slightly faster in PyTorch
Args:
dim (int): Number of input channels.
drop_path (float): Stochastic depth rate. Default: 0.0
layer_scale_init_value (float): Init value for Layer Scale. Default: 1e-6.
"""
def __init__(self, dim, drop_path=0., layer_scale_init_value=1e-6):
super().__init__()
self.dwconv = nn.Conv2d(dim, dim, kernel_size=7, padding=3, groups=dim) # depthwise conv
self.norm = LayerNorm(dim, eps=1e-6)
self.pwconv1 = nn.Linear(dim, 4 * dim) # pointwise/1x1 convs, implemented with linear layers
self.act = nn.GELU()
self.pwconv2 = nn.Linear(4 * dim, dim)
self.gamma = nn.Parameter(layer_scale_init_value * torch.ones((dim)),
requires_grad=True) if layer_scale_init_value > 0 else None
self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
def forward(self, x):
input = x
x = self.dwconv(x)
x = x.permute(0, 2, 3, 1) # (N, C, H, W) -> (N, H, W, C)
x = self.norm(x)
x = self.pwconv1(x)
x = self.act(x)
x = self.pwconv2(x)
if self.gamma is not None:
x = self.gamma * x
x = x.permute(0, 3, 1, 2) # (N, H, W, C) -> (N, C, H, W)
x = input + self.drop_path(x)
return x
class ConvNeXt(nn.Module):
r""" ConvNeXt
A PyTorch impl of : `A ConvNet for the 2020s` -
https://arxiv.org/pdf/2201.03545.pdf
Args:
in_chans (int): Number of input image channels. Default: 3
num_classes (int): Number of classes for classification head. Default: 1000
depths (tuple(int)): Number of blocks at each stage. Default: [3, 3, 9, 3]
dims (int): Feature dimension at each stage. Default: [96, 192, 384, 768]
drop_path_rate (float): Stochastic depth rate. Default: 0.
layer_scale_init_value (float): Init value for Layer Scale. Default: 1e-6.
head_init_scale (float): Init scaling value for classifier weights and biases. Default: 1.
"""
def __init__(self, in_chans=3, num_classes=1000,
depths=[3, 3, 9, 3], dims=[96, 192, 384, 768], drop_path_rate=0.,
layer_scale_init_value=1e-6, head_init_scale=1.,
):
super().__init__()
self.downsample_layers = nn.ModuleList() # stem and 3 intermediate downsampling conv layers
stem = nn.Sequential(
nn.Conv2d(in_chans, dims[0], kernel_size=4, stride=4),
LayerNorm(dims[0], eps=1e-6, data_format="channels_first")
)
self.downsample_layers.append(stem)
for i in range(3):
downsample_layer = nn.Sequential(
LayerNorm(dims[i], eps=1e-6, data_format="channels_first"),
nn.Conv2d(dims[i], dims[i+1], kernel_size=2, stride=2),
)
self.downsample_layers.append(downsample_layer)
self.stages = nn.ModuleList() # 4 feature resolution stages, each consisting of multiple residual blocks
dp_rates=[x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))]
cur = 0
for i in range(4):
stage = nn.Sequential(
*[Block(dim=dims[i], drop_path=dp_rates[cur + j],
layer_scale_init_value=layer_scale_init_value) for j in range(depths[i])]
)
self.stages.append(stage)
cur += depths[i]
self.norm = nn.LayerNorm(dims[-1], eps=1e-6) # final norm layer
self.head = nn.Linear(dims[-1], num_classes)
self.apply(self._init_weights)
self.head.weight.data.mul_(head_init_scale)
self.head.bias.data.mul_(head_init_scale)
def _init_weights(self, m):
if isinstance(m, (nn.Conv2d, nn.Linear)):
trunc_normal_(m.weight, std=.02)
nn.init.constant_(m.bias, 0)
def forward_features(self, x):
for i in range(4):
x = self.downsample_layers[i](x)
x = self.stages[i](x)
return self.norm(x.mean([-2, -1])) # global average pooling, (N, C, H, W) -> (N, C)
def forward(self, x):
x = self.forward_features(x)
x = self.head(x)
return x
class LayerNorm(nn.Module):
r""" LayerNorm that supports two data formats: channels_last (default) or channels_first.
The ordering of the dimensions in the inputs. channels_last corresponds to inputs with
shape (batch_size, height, width, channels) while channels_first corresponds to inputs
with shape (batch_size, channels, height, width).
"""
def __init__(self, normalized_shape, eps=1e-6, data_format="channels_last"):
super().__init__()
self.weight = nn.Parameter(torch.ones(normalized_shape))
self.bias = nn.Parameter(torch.zeros(normalized_shape))
self.eps = eps
self.data_format = data_format
if self.data_format not in ["channels_last", "channels_first"]:
raise NotImplementedError
self.normalized_shape = (normalized_shape, )
def forward(self, x):
if self.data_format == "channels_last":
return F.layer_norm(x, self.normalized_shape, self.weight, self.bias, self.eps)
elif self.data_format == "channels_first":
u = x.mean(1, keepdim=True)
s = (x - u).pow(2).mean(1, keepdim=True)
x = (x - u) / torch.sqrt(s + self.eps)
x = self.weight[:, None, None] * x + self.bias[:, None, None]
return x
model_urls = {
"convnext_tiny_1k": "https://dl.fbaipublicfiles.com/convnext/convnext_tiny_1k_224_ema.pth",
"convnext_small_1k": "https://dl.fbaipublicfiles.com/convnext/convnext_small_1k_224_ema.pth",
"convnext_base_1k": "https://dl.fbaipublicfiles.com/convnext/convnext_base_1k_224_ema.pth",
"convnext_large_1k": "https://dl.fbaipublicfiles.com/convnext/convnext_large_1k_224_ema.pth",
"convnext_tiny_22k": "https://dl.fbaipublicfiles.com/convnext/convnext_tiny_22k_224.pth",
"convnext_small_22k": "https://dl.fbaipublicfiles.com/convnext/convnext_small_22k_224.pth",
"convnext_base_22k": "https://dl.fbaipublicfiles.com/convnext/convnext_base_22k_224.pth",
"convnext_large_22k": "https://dl.fbaipublicfiles.com/convnext/convnext_large_22k_224.pth",
"convnext_xlarge_22k": "https://dl.fbaipublicfiles.com/convnext/convnext_xlarge_22k_224.pth",
}
@register_model
def convnext_tiny(pretrained=False,in_22k=False, **kwargs):
model = ConvNeXt(depths=[3, 3, 9, 3], dims=[96, 192, 384, 768], **kwargs)
if pretrained:
url = model_urls['convnext_tiny_22k'] if in_22k else model_urls['convnext_tiny_1k']
checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu", check_hash=True)
model.load_state_dict(checkpoint["model"])
return model
@register_model
def convnext_small(pretrained=False,in_22k=False, **kwargs):
model = ConvNeXt(depths=[3, 3, 27, 3], dims=[96, 192, 384, 768], **kwargs)
if pretrained:
url = model_urls['convnext_small_22k'] if in_22k else model_urls['convnext_small_1k']
checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
model.load_state_dict(checkpoint["model"])
return model
@register_model
def convnext_base(pretrained=False, in_22k=False, **kwargs):
model = ConvNeXt(depths=[3, 3, 27, 3], dims=[128, 256, 512, 1024], **kwargs)
if pretrained:
url = model_urls['convnext_base_22k'] if in_22k else model_urls['convnext_base_1k']
checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
model.load_state_dict(checkpoint["model"])
return model
@register_model
def convnext_large(pretrained=False, in_22k=False, **kwargs):
model = ConvNeXt(depths=[3, 3, 27, 3], dims=[192, 384, 768, 1536], **kwargs)
if pretrained:
url = model_urls['convnext_large_22k'] if in_22k else model_urls['convnext_large_1k']
checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
model.load_state_dict(checkpoint["model"])
return model
@register_model
def convnext_xlarge(pretrained=False, in_22k=False, **kwargs):
model = ConvNeXt(depths=[3, 3, 27, 3], dims=[256, 512, 1024, 2048], **kwargs)
if pretrained:
assert in_22k, "only ImageNet-22K pre-trained ConvNeXt-XL is available; please set in_22k=True"
url = model_urls['convnext_xlarge_22k']
checkpoint = torch.hub.load_state_dict_from_url(url=url, map_location="cpu")
model.load_state_dict(checkpoint["model"])
return model二、nnunet加入ConvNext
之前的教程已经提到过,nnunet的网络需要在dynamic-network-architectures中修改,并在数据集的plan中修改来实现自己的网络训练。
1、网络结构修改
在dynamic-network-architectures的architectures目录下新建convnextunet.py,如下图:

代码内容如下:
from typing import Union, Type, List, Tuple
import numpy as np
import torch
import torch.nn.functional as F
from dynamic_network_architectures.building_blocks.helper import convert_conv_op_to_dim
from dynamic_network_architectures.initialization.weight_init import InitWeights_He
from torch import nn
from torch.nn.modules.conv import _ConvNd
from torch.nn.modules.dropout import _DropoutNd
from dynamic_network_architectures.building_blocks.helper import maybe_convert_scalar_to_list, get_matching_pool_op
from timm.models.layers import DropPath, trunc_normal_
from dynamic_network_architectures.building_blocks.helper import get_matching_convtransp
class ConvNextPlainConvUNet(nn.Module):
def __init__(self,
input_channels: int,
n_stages: int,
features_per_stage: Union[int, List[int], Tuple[int, ...]],
conv_op: Type[_ConvNd],
kernel_sizes: Union[int, List[int], Tuple[int, ...]],
strides: Union[int, List[int], Tuple[int, ...]],
n_conv_per_stage: Union[int, List[int], Tuple[int, ...]],
num_classes: int,
n_conv_per_stage_decoder: Union[int, Tuple[int, ...], List[int]],
conv_bias: bool = False,
norm_op: Union[None, Type[nn.Module]] = None,
norm_op_kwargs: dict = None,
dropout_op: Union[None, Type[_DropoutNd]] = None,
dropout_op_kwargs: dict = None,
nonlin: Union[None, Type[torch.nn.Module]] = None,
nonlin_kwargs: dict = None,
deep_supervision: bool = False,
nonlin_first: bool = False
):
"""
nonlin_first: if True you get conv -> nonlin -> norm. Else it's conv -> norm -> nonlin
"""
super().__init__()
if isinstance(n_conv_per_stage, int):
n_conv_per_stage = [n_conv_per_stage] * n_stages
if isinstance(n_conv_per_stage_decoder, int):
n_conv_per_stage_decoder = [n_conv_per_stage_decoder] * (n_stages - 1)
assert len(n_conv_per_stage) == n_stages, "n_conv_per_stage must have as many entries as we have " \
f"resolution stages. here: {n_stages}. " \
f"n_conv_per_stage: {n_conv_per_stage}"
assert len(n_conv_per_stage_decoder) == (n_stages - 1), "n_conv_per_stage_decoder must have one less entries " \
f"as we have resolution stages. here: {n_stages} " \
f"stages, so it should have {n_stages - 1} entries. " \
f"n_conv_per_stage_decoder: {n_conv_per_stage_decoder}"
self.encoder = PlainConvEncoder(input_channels, n_stages, features_per_stage, conv_op, kernel_sizes, strides,
n_conv_per_stage, conv_bias, norm_op, norm_op_kwargs, dropout_op,
dropout_op_kwargs, nonlin, nonlin_kwargs, return_skips=True,
nonlin_first=nonlin_first)
self.decoder = UNetDecoder(self.encoder, num_classes, n_conv_per_stage_decoder, deep_supervision,
nonlin_first=nonlin_first)
print('............using convnext unet......................')
def forward(self, x):
skips = self.encoder(x)
# for k in skips:
# print(k.shape)
# exit(0)
return self.decoder(skips)
def compute_conv_feature_map_size(self, input_size):
assert len(input_size) == convert_conv_op_to_dim(self.encoder.conv_op), "just give the image size without color/feature channels or " \
"batch channel. Do not give input_size=(b, c, x, y(, z)). " \
"Give input_size=(x, y(, z))!"
return self.encoder.compute_conv_feature_map_size(input_size) + self.decoder.compute_conv_feature_map_size(input_size)
@staticmethod
def initialize(module):
InitWeights_He(1e-2)(module)
class PlainConvEncoder(nn.Module):
def __init__(self,
input_channels: int,
n_stages: int,
features_per_stage: Union[int, List[int], Tuple[int, ...]],
conv_op: Type[_ConvNd],
kernel_sizes: Union[int, List[int], Tuple[int, ...]],
strides: Union[int, List[int], Tuple[int, ...]],
n_conv_per_stage: Union[int, List[int], Tuple[int, ...]],
conv_bias: bool = False,
norm_op: Union[None, Type[nn.Module]] = None,
norm_op_kwargs: dict = None,
dropout_op: Union[None, Type[_DropoutNd]] = None,
dropout_op_kwargs: dict = None,
nonlin: Union[None, Type[torch.nn.Module]] = None,
nonlin_kwargs: dict = None,
return_skips: bool = False,
nonlin_first: bool = False,
pool: str = 'conv'
):
super().__init__()
if isinstance(kernel_sizes, int):
kernel_sizes = [kernel_sizes] * n_stages
if isinstance(features_per_stage, int):
features_per_stage = [features_per_stage] * n_stages
if isinstance(n_conv_per_stage, int):
n_conv_per_stage = [n_conv_per_stage] * n_stages
if isinstance(strides, int):
strides = [strides] * n_stages
assert len(kernel_sizes) == n_stages, "kernel_sizes must have as many entries as we have resolution stages (n_stages)"
assert len(n_conv_per_stage) == n_stages, "n_conv_per_stage must have as many entries as we have resolution stages (n_stages)"
assert len(features_per_stage) == n_stages, "features_per_stage must have as many entries as we have resolution stages (n_stages)"
assert len(strides) == n_stages, "strides must have as many entries as we have resolution stages (n_stages). " \
"Important: first entry is recommended to be 1, else we run strided conv drectly on the input"
stages = []
for s in range(n_stages):
stage_modules = []
if pool == 'max' or pool == 'avg':
if (isinstance(strides[s], int) and strides[s] != 1) or \
isinstance(strides[s], (tuple, list)) and any([i != 1 for i in strides[s]]):
stage_modules.append(get_matching_pool_op(conv_op, pool_type=pool)(kernel_size=strides[s], stride=strides[s]))
conv_stride = 1
elif pool == 'conv':
conv_stride = strides[s]
else:
raise RuntimeError()
# if s < 2:
# stage_modules.append(StackedConvBlocks(
# n_conv_per_stage[s], conv_op, input_channels, features_per_stage[s], kernel_sizes[s], conv_stride,
# conv_bias, norm_op, norm_op_kwargs, dropout_op, dropout_op_kwargs, nonlin, nonlin_kwargs, nonlin_first
# ))
# else:
stage_modules.append(ConvNextStackedConvBlocks(
n_conv_per_stage[s], conv_op, input_channels, features_per_stage[s], kernel_sizes[s], conv_stride,
conv_bias, norm_op, norm_op_kwargs, dropout_op, dropout_op_kwargs, nonlin, nonlin_kwargs, nonlin_first
))
stages.append(nn.Sequential(*stage_modules))
input_channels = features_per_stage[s]
self.stages = nn.Sequential(*stages)
self.output_channels = features_per_stage
self.strides = [maybe_convert_scalar_to_list(conv_op, i) for i in strides]
self.return_skips = return_skips
# we store some things that a potential decoder needs
self.conv_op = conv_op
self.norm_op = norm_op
self.norm_op_kwargs = norm_op_kwargs
self.nonlin = nonlin
self.nonlin_kwargs = nonlin_kwargs
self.dropout_op = dropout_op
self.dropout_op_kwargs = dropout_op_kwargs
self.conv_bias = conv_bias
self.kernel_sizes = kernel_sizes
def forward(self, x):
ret = []
for s in self.stages:
x = s(x)
ret.append(x)
if self.return_skips:
return ret
else:
return ret[-1]
def compute_conv_feature_map_size(self, input_size):
output = np.int64(0)
for s in range(len(self.stages)):
if isinstance(self.stages[s], nn.Sequential):
for sq in self.stages[s]:
if hasattr(sq, 'compute_conv_feature_map_size'):
output += self.stages[s][-1].compute_conv_feature_map_size(input_size)
else:
output += self.stages[s].compute_conv_feature_map_size(input_size)
input_size = [i // j for i, j in zip(input_size, self.strides[s])]
return output
class UNetDecoder(nn.Module):
def __init__(self,
encoder: Union[PlainConvEncoder],
num_classes: int,
n_conv_per_stage: Union[int, Tuple[int, ...], List[int]],
deep_supervision,
nonlin_first: bool = False,
norm_op: Union[None, Type[nn.Module]] = None,
norm_op_kwargs: dict = None,
dropout_op: Union[None, Type[_DropoutNd]] = None,
dropout_op_kwargs: dict = None,
nonlin: Union[None, Type[torch.nn.Module]] = None,
nonlin_kwargs: dict = None,
conv_bias: bool = None
):
"""
This class needs the skips of the encoder as input in its forward.
the encoder goes all the way to the bottleneck, so that's where the decoder picks up. stages in the decoder
are sorted by order of computation, so the first stage has the lowest resolution and takes the bottleneck
features and the lowest skip as inputs
the decoder has two (three) parts in each stage:
1) conv transpose to upsample the feature maps of the stage below it (or the bottleneck in case of the first stage)
2) n_conv_per_stage conv blocks to let the two inputs get to know each other and merge
3) (optional if deep_supervision=True) a segmentation output Todo: enable upsample logits?
:param encoder:
:param num_classes:
:param n_conv_per_stage:
:param deep_supervision:
"""
super().__init__()
self.deep_supervision = deep_supervision
self.encoder = encoder
self.num_classes = num_classes
n_stages_encoder = len(encoder.output_channels)
if isinstance(n_conv_per_stage, int):
n_conv_per_stage = [n_conv_per_stage] * (n_stages_encoder - 1)
assert len(n_conv_per_stage) == n_stages_encoder - 1, "n_conv_per_stage must have as many entries as we have " \
"resolution stages - 1 (n_stages in encoder - 1), " \
"here: %d" % n_stages_encoder
transpconv_op = get_matching_convtransp(conv_op=encoder.conv_op)
conv_bias = encoder.conv_bias if conv_bias is None else conv_bias
norm_op = encoder.norm_op if norm_op is None else norm_op
norm_op_kwargs = encoder.norm_op_kwargs if norm_op_kwargs is None else norm_op_kwargs
dropout_op = encoder.dropout_op if dropout_op is None else dropout_op
dropout_op_kwargs = encoder.dropout_op_kwargs if dropout_op_kwargs is None else dropout_op_kwargs
nonlin = encoder.nonlin if nonlin is None else nonlin
nonlin_kwargs = encoder.nonlin_kwargs if nonlin_kwargs is None else nonlin_kwargs
# we start with the bottleneck and work out way up
stages = []
transpconvs = []
seg_layers = []
for s in range(1, n_stages_encoder):
input_features_below = encoder.output_channels[-s]
input_features_skip = encoder.output_channels[-(s + 1)]
stride_for_transpconv = encoder.strides[-s]
transpconvs.append(transpconv_op(
input_features_below, input_features_skip, stride_for_transpconv, stride_for_transpconv,
bias=conv_bias
))
# input features to conv is 2x input_features_skip (concat input_features_skip with transpconv output)
# if s < 2:
stages.append(ConvNextStackedConvBlocks(
n_conv_per_stage[s-1], encoder.conv_op, 2 * input_features_skip, input_features_skip,
encoder.kernel_sizes[-(s + 1)], 1,
conv_bias,
norm_op,
norm_op_kwargs,
dropout_op,
dropout_op_kwargs,
nonlin,
nonlin_kwargs,
nonlin_first
))
# else:
# stages.append(StackedConvBlocks(
# n_conv_per_stage[s-1], encoder.conv_op, 2 * input_features_skip, input_features_skip,
# encoder.kernel_sizes[-(s + 1)], 1,
# conv_bias,
# norm_op,
# norm_op_kwargs,
# dropout_op,
# dropout_op_kwargs,
# nonlin,
# nonlin_kwargs,
# nonlin_first
# ))
# we always build the deep supervision outputs so that we can always load parameters. If we don't do this
# then a model trained with deep_supervision=True could not easily be loaded at inference time where
# deep supervision is not needed. It's just a convenience thing
seg_layers.append(encoder.conv_op(input_features_skip, num_classes, 1, 1, 0, bias=True))
self.stages = nn.ModuleList(stages)
self.transpconvs = nn.ModuleList(transpconvs)
self.seg_layers = nn.ModuleList(seg_layers)
def forward(self, skips):
"""
we expect to get the skips in the order they were computed, so the bottleneck should be the last entry
:param skips:
:return:
"""
lres_input = skips[-1]
seg_outputs = []
for s in range(len(self.stages)):
# print(lres_input.shape)
x = self.transpconvs[s](lres_input)
# print(x.shape, skips[-(s+2)].shape)
x = torch.cat((x, skips[-(s+2)]), 1)
x = self.stages[s](x)
if self.deep_supervision:
seg_outputs.append(self.seg_layers[s](x))
elif s == (len(self.stages) - 1):
seg_outputs.append(self.seg_layers[-1](x))
lres_input = x
# invert seg outputs so that the largest segmentation prediction is returned first
seg_outputs = seg_outputs[::-1]
if not self.deep_supervision:
r = seg_outputs[0]
else:
r = seg_outputs
return r
def compute_conv_feature_map_size(self, input_size):
"""
IMPORTANT: input_size is the input_size of the encoder!
:param input_size:
:return:
"""
# first we need to compute the skip sizes. Skip bottleneck because all output feature maps of our ops will at
# least have the size of the skip above that (therefore -1)
skip_sizes = []
for s in range(len(self.encoder.strides) - 1):
skip_sizes.append([i // j for i, j in zip(input_size, self.encoder.strides[s])])
input_size = skip_sizes[-1]
# print(skip_sizes)
assert len(skip_sizes) == len(self.stages)
# our ops are the other way around, so let's match things up
output = np.int64(0)
for s in range(len(self.stages)):
# print(skip_sizes[-(s+1)], self.encoder.output_channels[-(s+2)])
# conv blocks
output += self.stages[s].compute_conv_feature_map_size(skip_sizes[-(s+1)])
# trans conv
output += np.prod([self.encoder.output_channels[-(s+2)], *skip_sizes[-(s+1)]], dtype=np.int64)
# segmentation
if self.deep_supervision or (s == (len(self.stages) - 1)):
output += np.prod([self.num_classes, *skip_sizes[-(s+1)]], dtype=np.int64)
return output
class StackedConvBlocks(nn.Module):
def __init__(self,
num_convs: int,
conv_op: Type[_ConvNd],
input_channels: int,
output_channels: Union[int, List[int], Tuple[int, ...]],
kernel_size: Union[int, List[int], Tuple[int, ...]],
initial_stride: Union[int, List[int], Tuple[int, ...]],
conv_bias: bool = False,
norm_op: Union[None, Type[nn.Module]] = None,
norm_op_kwargs: dict = None,
dropout_op: Union[None, Type[_DropoutNd]] = None,
dropout_op_kwargs: dict = None,
nonlin: Union[None, Type[torch.nn.Module]] = None,
nonlin_kwargs: dict = None,
nonlin_first: bool = False
):
"""
:param conv_op:
:param num_convs:
:param input_channels:
:param output_channels: can be int or a list/tuple of int. If list/tuple are provided, each entry is for
one conv. The length of the list/tuple must then naturally be num_convs
:param kernel_size:
:param initial_stride:
:param conv_bias:
:param norm_op:
:param norm_op_kwargs:
:param dropout_op:
:param dropout_op_kwargs:
:param nonlin:
:param nonlin_kwargs:
"""
super().__init__()
if not isinstance(output_channels, (tuple, list)):
output_channels = [output_channels] * num_convs
self.convs = nn.Sequential(
ConvDropoutNormReLU(
conv_op, input_channels, output_channels[0], kernel_size, initial_stride, conv_bias, norm_op,
norm_op_kwargs, dropout_op, dropout_op_kwargs, nonlin, nonlin_kwargs, nonlin_first
),
*[
ConvDropoutNormReLU(
conv_op, output_channels[i - 1], output_channels[i], kernel_size, 1, conv_bias, norm_op,
norm_op_kwargs, dropout_op, dropout_op_kwargs, nonlin, nonlin_kwargs, nonlin_first
)
for i in range(1, num_convs)
]
)
self.output_channels = output_channels[-1]
self.initial_stride = maybe_convert_scalar_to_list(conv_op, initial_stride)
def forward(self, x):
return self.convs(x)
def compute_conv_feature_map_size(self, input_size):
assert len(input_size) == len(self.initial_stride), "just give the image size without color/feature channels or " \
"batch channel. Do not give input_size=(b, c, x, y(, z)). " \
"Give input_size=(x, y(, z))!"
output = self.convs[0].compute_conv_feature_map_size(input_size)
size_after_stride = [i // j for i, j in zip(input_size, self.initial_stride)]
for b in self.convs[1:]:
output += b.compute_conv_feature_map_size(size_after_stride)
return output
class ConvNextStackedConvBlocks(nn.Module):
def __init__(self,
num_convs: int,
conv_op: Type[_ConvNd],
input_channels: int,
output_channels: Union[int, List[int], Tuple[int, ...]],
kernel_size: Union[int, List[int], Tuple[int, ...]],
initial_stride: Union[int, List[int], Tuple[int, ...]],
conv_bias: bool = False,
norm_op: Union[None, Type[nn.Module]] = None,
norm_op_kwargs: dict = None,
dropout_op: Union[None, Type[_DropoutNd]] = None,
dropout_op_kwargs: dict = None,
nonlin: Union[None, Type[torch.nn.Module]] = None,
nonlin_kwargs: dict = None,
nonlin_first: bool = False
):
"""
:param conv_op:
:param num_convs:
:param input_channels:
:param output_channels: can be int or a list/tuple of int. If list/tuple are provided, each entry is for
one conv. The length of the list/tuple must then naturally be num_convs
:param kernel_size:
:param initial_stride:
:param conv_bias:
:param norm_op:
:param norm_op_kwargs:
:param dropout_op:
:param dropout_op_kwargs:
:param nonlin:
:param nonlin_kwargs:
"""
super().__init__()
if not isinstance(output_channels, (tuple, list)):
output_channels = [output_channels] * num_convs
self.convs = nn.Sequential(
ConvDropoutNorm(
conv_op, input_channels, output_channels[0], kernel_size, initial_stride, conv_bias, norm_op,
norm_op_kwargs, dropout_op, dropout_op_kwargs, nonlin, nonlin_kwargs, nonlin_first
),
*[
Block(
conv_op, output_channels[i - 1], output_channels[i], kernel_size, 1, conv_bias, norm_op,
norm_op_kwargs, dropout_op, dropout_op_kwargs, nonlin, nonlin_kwargs, nonlin_first
)
for i in range(1, num_convs)
]
)
self.output_channels = output_channels[-1]
self.initial_stride = maybe_convert_scalar_to_list(conv_op, initial_stride)
def forward(self, x):
return self.convs(x)
def compute_conv_feature_map_size(self, input_size):
assert len(input_size) == len(self.initial_stride), "just give the image size without color/feature channels or " \
"batch channel. Do not give input_size=(b, c, x, y(, z)). " \
"Give input_size=(x, y(, z))!"
output = self.convs[0].compute_conv_feature_map_size(input_size)
size_after_stride = [i // j for i, j in zip(input_size, self.initial_stride)]
for b in self.convs[1:]:
output += b.compute_conv_feature_map_size(size_after_stride)
return output
class ConvDropoutNorm(nn.Module):
def __init__(self,
conv_op: Type[_ConvNd],
input_channels: int,
output_channels: int,
kernel_size: Union[int, List[int], Tuple[int, ...]],
stride: Union[int, List[int], Tuple[int, ...]],
conv_bias: bool = False,
norm_op: Union[None, Type[nn.Module]] = None,
norm_op_kwargs: dict = None,
dropout_op: Union[None, Type[_DropoutNd]] = None,
dropout_op_kwargs: dict = None,
nonlin: Union[None, Type[torch.nn.Module]] = None,
nonlin_kwargs: dict = None,
nonlin_first: bool = False
):
super(ConvDropoutNorm, self).__init__()
self.input_channels = input_channels
self.output_channels = output_channels
stride = maybe_convert_scalar_to_list(conv_op, stride)
self.stride = stride
kernel_size = maybe_convert_scalar_to_list(conv_op, kernel_size)
if norm_op_kwargs is None:
norm_op_kwargs = {}
if nonlin_kwargs is None:
nonlin_kwargs = {}
ops = []
self.conv = conv_op(
input_channels,
output_channels,
kernel_size,
stride,
padding=[(i - 1) // 2 for i in kernel_size],
dilation=1,
bias=conv_bias,
)
ops.append(self.conv)
if dropout_op is not None:
self.dropout = dropout_op(**dropout_op_kwargs)
ops.append(self.dropout)
if norm_op is not None:
self.norm = norm_op(output_channels, **norm_op_kwargs)
ops.append(self.norm)
self.all_modules = nn.Sequential(*ops)
def forward(self, x):
return self.all_modules(x)
def compute_conv_feature_map_size(self, input_size):
assert len(input_size) == len(self.stride), "just give the image size without color/feature channels or " \
"batch channel. Do not give input_size=(b, c, x, y(, z)). " \
"Give input_size=(x, y(, z))!"
output_size = [i // j for i, j in zip(input_size, self.stride)] # we always do same padding
return np.prod([self.output_channels, *output_size], dtype=np.int64)
class Block(nn.Module):
def __init__(self,
conv_op: Type[_ConvNd],
input_channels: int,
output_channels: int,
kernel_size: Union[int, List[int], Tuple[int, ...]],
stride: Union[int, List[int], Tuple[int, ...]],
conv_bias: bool = False,
norm_op: Union[None, Type[nn.Module]] = None,
norm_op_kwargs: dict = None,
dropout_op: Union[None, Type[_DropoutNd]] = None,
dropout_op_kwargs: dict = None,
nonlin: Union[None, Type[torch.nn.Module]] = None,
nonlin_kwargs: dict = None,
nonlin_first: bool = False,
drop_path=0.,
layer_scale_init_value=1e-6,
):
super().__init__()
self.is_3d = True
if conv_op == torch.nn.modules.conv.Conv2d:
self.is_3d = False
self.dwconv = nn.Conv2d(input_channels, output_channels, kernel_size=7, padding=3, groups=input_channels) # depthwise conv
self.norm = nn.LayerNorm(output_channels, eps=1e-6)
self.pwconv1 = nn.Linear(output_channels, 4 * output_channels) # pointwise/1x1 convs, implemented with linear layers
self.act = nn.GELU()
self.pwconv2 = nn.Linear(4 * output_channels, output_channels)
self.gamma = nn.Parameter(layer_scale_init_value * torch.ones((output_channels)),
requires_grad=True) if layer_scale_init_value > 0 else None
self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
elif conv_op == torch.nn.modules.conv.Conv3d:
self.dwconv = nn.Conv3d(input_channels, output_channels, kernel_size=7, padding=3, groups=input_channels) # depthwise conv
self.norm = nn.LayerNorm(output_channels, eps=1e-6)
self.pwconv1 = nn.Linear(output_channels, 4 * output_channels) # pointwise/1x1 convs, implemented with linear layers
self.act = nn.GELU()
self.pwconv2 = nn.Linear(4 * output_channels, output_channels)
self.gamma = nn.Parameter(layer_scale_init_value * torch.ones((output_channels)),
requires_grad=True) if layer_scale_init_value > 0 else None
self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
def forward(self, x):
input = x
if self.is_3d:
x = self.dwconv(x)
x = x.permute(0, 2, 3, 4, 1) # (N, C, H, W, D) -> (N, H, W, D, C)
x = self.norm(x)
x = self.pwconv1(x)
x = self.act(x)
x = self.pwconv2(x)
if self.gamma is not None:
x = self.gamma * x
x = x.permute(0, 4, 1, 2, 3) # (N, H, W, D, C) -> (N, C, H, W, D)
x = input + self.drop_path(x)
else:
x = self.dwconv(x)
x = x.permute(0, 2, 3, 1) # (N, C, H, W) -> (N, H, W, C)
x = self.norm(x)
x = self.pwconv1(x)
x = self.act(x)
x = self.pwconv2(x)
if self.gamma is not None:
x = self.gamma * x
x = x.permute(0, 3, 1, 2) # (N, H, W, C) -> (N, C, H, W)
x = input + self.drop_path(x)
return x
2、配置文件修改
在完成了模型修改后,还是用上个教程的Task04_Hippocampus数据集来验证(如果没做上个教程的,自行完成数据处理),编辑nnUNet\nnUNet_preprocessed\Dataset004_Hippocampus\nnUNetPlans.json这个配置文件,进行以下改动,把network_class_name改成dynamic_network_architectures.architectures.convnextunet.ConvNextPlainConvUNet,如下图:

三、模型训练
完成了模型和数据集配置文件的修改后,开始训练模型,使用的数据集还是Task04_Hippocampus,以上的代码支持2d和3d模型,可以使用以下的训练命令:
nnUNetv2_train 4 2d 0
nnUNetv2_train 4 2d 1
nnUNetv2_train 4 2d 2
nnUNetv2_train 4 2d 3
nnUNetv2_train 4 2d 4
nnUNetv2_train 4 3d_fullres 0
nnUNetv2_train 4 3d_fullres 1
nnUNetv2_train 4 3d_fullres 2
nnUNetv2_train 4 3d_fullres 3
nnUNetv2_train 4 3d_fullres 4 可以看到,2d模型训练起来了:

3d_fullres也训练一下:

因为nnunet训练非常的久,实验资源有限,没有完成全部训练,只完成了代码修改及跑通。