Here is the model, I’d really appreciate any suggestions.
class Swish(nn.Module):
def __init__(self) -> None:
super().__init__()
def forward(self, x: Tensor) -> Tensor:
return x * torch.sigmoid(x)
class Conv3DBNActivation(nn.Sequential):
def __init__(
self,
in_planes: int,
out_planes: int,
*,
kernel_size: Union[int, Tuple[int, int, int]],
padding: Union[int, Tuple[int, int, int]],
stride: Union[int, Tuple[int, int, int]] = 1,
groups: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
activation_layer: Optional[Callable[..., nn.Module]] = None,
**kwargs: Any,
) -> None:
super().__init__()
kernel_size = _triple(kernel_size)
stride = _triple(stride)
padding = _triple(padding)
if norm_layer is None:
norm_layer = nn.Identity
if activation_layer is None:
activation_layer = nn.Identity
self.kernel_size = kernel_size
self.stride = stride
dict_layers = OrderedDict({
"conv3d": nn.Conv3d(in_planes, out_planes,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=groups,
**kwargs),
"norm": norm_layer(out_planes, eps=0.001),
"act": activation_layer()
})
self.out_channels = out_planes
self.seq_layer = nn.Sequential(dict_layers)
# super(Conv3DBNActivation, self).__init__(dict_layers)
def forward(self, input):
return self.seq_layer(input)
class ConvBlock3D(nn.Module):
def __init__(
self,
in_planes: int,
out_planes: int,
*,
kernel_size: Union[int, Tuple[int, int, int]],
conv_type: str,
padding: Union[int, Tuple[int, int, int]] = 0,
stride: Union[int, Tuple[int, int, int]] = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
activation_layer: Optional[Callable[..., nn.Module]] = None,
bias: bool = False,
identifier: str = "",
**kwargs: Any,
) -> None:
super().__init__()
kernel_size = _triple(kernel_size)
stride = _triple(stride)
padding = _triple(padding)
self.conv_2 = None
padding = ((kernel_size[0]-1)//2, 0, 0)
self.conv_1 = Conv3DBNActivation(in_planes,
out_planes,
kernel_size=kernel_size,
padding=padding,
activation_layer=activation_layer,
norm_layer=norm_layer,
stride=stride,
bias=bias,
**kwargs)
self.padding = padding
self.kernel_size = kernel_size
self.dim_pad = self.kernel_size[0]-1
self.stride = stride
self.conv_type = conv_type
self.identifier = identifier
padding_pad = padding_init(self.identifier)
self.padding_op = nn.ConstantPad3d(padding_pad, value=0.0)
def forward(self, x: Tensor) -> Tensor:
x = self.padding_op(x)
x = self.conv_1(x)
return x
class SqueezeExcitation(nn.Module):
def __init__(self, input_channels: int, # TODO rename activations
activation_2: nn.Module,
activation_1: nn.Module,
conv_type: str,
squeeze_factor: int = 4,
bias: bool = True) -> None:
super().__init__()
se_multiplier = 1
squeeze_channels = _make_divisible(input_channels
// squeeze_factor
* se_multiplier, 8)
self.fc1 = ConvBlock3D(input_channels*se_multiplier,
squeeze_channels,
kernel_size=(1, 1, 1),
padding=0,
conv_type=conv_type,
bias=bias,
identifier="fc1")
self.activation_1 = activation_1()
self.activation_2 = activation_2()
self.fc2 = ConvBlock3D(squeeze_channels,
input_channels,
kernel_size=(1, 1, 1),
padding=0,
conv_type=conv_type,
bias=bias,
identifier="fc1")
self.the_max_pool = nn.MaxPool3d(kernel_size= (1, 1, 1))
def forward(self, input: Tensor) -> Tensor:
# scale = self._scale(input)
scale = self.the_max_pool(input) #(inp_size[2], inp_size[3], inp_size[3])
scale = self.fc1(scale)
scale = self.activation_1(scale)
scale = self.fc2(scale)
scale = self.activation_2(scale)
return scale * input
def _make_divisible(v: float,
divisor: int,
min_value: Optional[int] = None
) -> int:
"""
This function is taken from the original tf repo.
It ensures that all layers have a channel number that is divisible by 8
It can be seen here:
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
"""
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
def padding_init(identifier : str) -> Tuple[int, int, int, int]:
if identifier == "conv1":
padding_pad = (0, 1, 0, 1)
elif "shortcut" in identifier:
padding_pad = (0, 0, 0, 0)
elif "expand" in identifier:
padding_pad = (0, 0, 0, 0)
elif "deep" in identifier:
if "b0" in identifier:
padding_pad = (1, 2, 1, 2)
elif "b1" in identifier:
if "l0" in identifier:
padding_pad =(0, 1, 0, 1)
else:
padding_pad = (1, 1, 1, 1)
elif "b2" in identifier or "b3" in identifier:
padding_pad = (1, 1, 1, 1)
elif "b4" in identifier:
if "l0" in identifier:
padding_pad = (0, 1, 0, 1)
else:
padding_pad = (2, 2, 2, 2)
elif "fc1" in identifier:
padding_pad = (0, 0, 0, 0)
elif "project" in identifier:
padding_pad = (0, 0, 0, 0)
elif "conv7" in identifier:
padding_pad = (0, 0, 0, 0)
else:
padding_pad = (0, 0, 0, 0) # just some random values for dense9 and dense10 that are never called
return padding_pad
class tfAvgPool3D(nn.Module):
def __init__(self, identifier : str = "") -> None:
super().__init__()
self.avgf = nn.AvgPool3d((1, 3, 3), stride=(1, 2, 2))
self.avg_nf = nn.AvgPool3d(
(1, 3, 3),
stride=(1, 2, 2),
count_include_pad=False,
padding=(0, 1, 1))
self.identifier = identifier
if self.identifier == "b0_l0" or self.identifier == "b1_l0" or self.identifier == "b4_l0":
self.padding_pad = (0, 0, 0, 0)
self.f1 = False
else:
self.padding_pad = (0, 1, 0, 1)
self.f1 = True
self.padding_op = nn.ConstantPad3d(self.padding_pad, value=0.0)
def forward(self, x: Tensor) -> Tensor:
x = self.padding_op(x)
x = self.avg_nf(x)
return x
class BasicBneck(nn.Module):
def __init__(self,
cfg: "CfgNode",
conv_type: str,
norm_layer: Optional[Callable[..., nn.Module]] = None,
activation_layer: Optional[Callable[..., nn.Module]] = None,
identifier: str = ""
) -> None:
super().__init__()
assert type(cfg.stride) is tuple
if (not cfg.stride[0] == 1
or not (1 <= cfg.stride[1] <= 2)
or not (1 <= cfg.stride[2] <= 2)):
raise ValueError('illegal stride value')
self.res = None
self.expand = None
layers = []
if cfg.expanded_channels != cfg.out_channels:
# expand
self.expand = ConvBlock3D(
in_planes=cfg.input_channels,
out_planes=cfg.expanded_channels,
kernel_size=(1, 1, 1),
padding=(0, 0, 0),
conv_type=conv_type,
norm_layer=norm_layer,
activation_layer=activation_layer,
identifier="expand" + '+' + identifier
)
# deepwise
self.deep = ConvBlock3D(
in_planes=cfg.expanded_channels,
out_planes=cfg.expanded_channels,
kernel_size=cfg.kernel_size,
padding=cfg.padding,
stride=cfg.stride,
groups=cfg.expanded_channels,
conv_type=conv_type,
norm_layer=norm_layer,
activation_layer=activation_layer,
identifier="deep" + '+' + identifier
)
# SE
self.se = SqueezeExcitation(cfg.expanded_channels,
activation_1=activation_layer,
activation_2=(nn.Sigmoid),
conv_type=conv_type,
)
# project
self.project = ConvBlock3D(
cfg.expanded_channels,
cfg.out_channels,
kernel_size=(1, 1, 1),
padding=(0, 0, 0),
conv_type=conv_type,
norm_layer=norm_layer,
activation_layer=nn.Identity,
identifier="project" + '+' + identifier
)
if not (cfg.stride == (1, 1, 1)
and cfg.input_channels == cfg.out_channels):
if cfg.stride != (1, 1, 1):
layers.append(tfAvgPool3D(identifier))
layers.append(ConvBlock3D(
in_planes=cfg.input_channels,
out_planes=cfg.out_channels,
kernel_size=(1, 1, 1),
padding=(0, 0, 0),
norm_layer=norm_layer,
activation_layer=nn.Identity,
conv_type=conv_type,
identifier="shortcut" + '+' + identifier
))
self.res = nn.Sequential(*layers)
# ReZero
self.alpha = nn.Parameter(torch.tensor(0.0), requires_grad=True)
def forward(self, input: Tensor) -> Tensor:
if self.res is not None:
residual = self.res(input)
else:
residual = input
if self.expand is not None:
x = self.expand(input)
else:
x = input
x = self.deep(x)
x = self.se(x)
x = self.project(x)
result = residual + self.alpha * x
return result
class MoViNet(nn.Module):
def __init__(self,
cfg: "CfgNode",
num_classes: int = 600,
conv_type: str = "3d",
) -> None:
super().__init__()
"""
causal: causal mode
pretrained: pretrained models
If pretrained is True:
num_classes is set to 600,
conv_type is set to "3d" if causal is False,
"2plus1d" if causal is True
tf_like is set to True
num_classes: number of classes for classifcation
conv_type: type of convolution either 3d or 2plus1d
tf_like: tf_like behaviour, basically same padding for convolutions
"""
blocks_dic = OrderedDict()
norm_layer = nn.BatchNorm3d
activation_layer = Swish
# conv1
self.conv1 = ConvBlock3D(
in_planes=cfg.conv1.input_channels,
out_planes=cfg.conv1.out_channels,
kernel_size=cfg.conv1.kernel_size,
stride=cfg.conv1.stride,
padding=cfg.conv1.padding,
conv_type=conv_type,
norm_layer=norm_layer,
activation_layer=activation_layer,
identifier="conv1"
)
# blocks
for i, block in enumerate(cfg.blocks):
for j, basicblock in enumerate(block):
blocks_dic[f"b{i}_l{j}"] = BasicBneck(basicblock,
conv_type=conv_type,
norm_layer=norm_layer,
activation_layer=activation_layer,
identifier=f"b{i}_l{j}"
)
self.blocks = nn.Sequential(blocks_dic)
# conv7
self.conv7 = ConvBlock3D(
in_planes=cfg.conv7.input_channels,
out_planes=cfg.conv7.out_channels,
kernel_size=cfg.conv7.kernel_size,
stride=cfg.conv7.stride,
padding=cfg.conv7.padding,
conv_type=conv_type,
norm_layer=norm_layer,
activation_layer=activation_layer,
identifier="conv7"
)
# pool
self.classifier = nn.Sequential(
# dense9
ConvBlock3D(cfg.conv7.out_channels,
cfg.dense9.hidden_dim,
kernel_size=(1, 1, 1),
conv_type=conv_type,
bias=True,
identifier="dense9"),
Swish(),
nn.Dropout(p=0.2, inplace=True),
# dense10
ConvBlock3D(cfg.dense9.hidden_dim,
num_classes,
kernel_size=(1, 1, 1),
conv_type=conv_type,
bias=True,
identifier="dense10"),
)
state_dict = torch.hub.load_state_dict_from_url(cfg.weights)
# self.load_state_dict(state_dict)
self.avg = nn.MaxPool3d((16, 11, 11))
def forward(self, x: Tensor) -> Tensor:
x = self.conv1(x)
x = self.blocks(x)
x = self.conv7(x)
x = self.avg(x)
x = x.flatten(1)
return x