目录
一 DynamicHead
二 YOLOv8的“新头”之动态头
1 总体修改
2 配置文件
3 训练
其他
一 DynamicHead
官方论文地址:https://arxiv.org/pdf/2106.08322.pdf
官方代码地址:GitCode - 开发者的代码家园
在计算机视觉应用中,目标检测是为了回答“是什么物体位于哪里”的问题的任务。如何提高目标检测头的性能已成为现有目标检测工作中的一个关键问题。
在本文中,提出了一种新的目标检测头,它将尺度感知、空间感知和任务感知的注意统一在一个框架中。提出了一种关注目标检测头的新视角。作为一个插件块,动态头可以灵活地集成到任何现有的目标检测器框架中,并在目标检测头中有效地应用了注意力机制,以提高其性能和效率。
下图说明了的动态头部方法。它包含三种不同的注意机制,每一种机制都侧重于不同的视角:尺度感知注意力、空间感知注意力和任务感知注意力。同时,还可视化了在每个注意模块之后特征图是如何改进的。
下图是动态头的详细设计。
(a) 显示了每个注意力模块的详细实施情况。
(b) 展示了如何将动态头部块应用于one-stage目标检测。
(c) 展示了如何将动态头部块应用于two-stage目标检测。
二 YOLOv8的“新头”之动态头
ultralytics的版本为8.1.47,如下图所示:
1 总体修改
① 添加DynamicHead.py文件
在ultralytics/nn/modules目录下新建DynamicHead.py文件,内容如下所示:
import torch
import math
import torch.nn as nn
import torch.nn.functional as F
from mmcv.ops import ModulatedDeformConv2d
from ultralytics.utils.tal import dist2bbox, make_anchors
__all__ = ['Detect_DynamicHead']
def _make_divisible(v, divisor, min_value=None):
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
class h_swish(nn.Module):
def __init__(self, inplace=False):
super(h_swish, self).__init__()
self.inplace = inplace
def forward(self, x):
return x * F.relu6(x + 3.0, inplace=self.inplace) / 6.0
class h_sigmoid(nn.Module):
def __init__(self, inplace=True, h_max=1):
super(h_sigmoid, self).__init__()
self.relu = nn.ReLU6(inplace=inplace)
self.h_max = h_max
def forward(self, x):
return self.relu(x + 3) * self.h_max / 6
class DYReLU(nn.Module):
def __init__(self, inp, oup, reduction=4, lambda_a=1.0, K2=True, use_bias=True, use_spatial=False,
init_a=[1.0, 0.0], init_b=[0.0, 0.0]):
super(DYReLU, self).__init__()
self.oup = oup
self.lambda_a = lambda_a * 2
self.K2 = K2
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.use_bias = use_bias
if K2:
self.exp = 4 if use_bias else 2
else:
self.exp = 2 if use_bias else 1
self.init_a = init_a
self.init_b = init_b
# determine squeeze
if reduction == 4:
squeeze = inp // reduction
else:
squeeze = _make_divisible(inp // reduction, 4)
# print('reduction: {}, squeeze: {}/{}'.format(reduction, inp, squeeze))
# print('init_a: {}, init_b: {}'.format(self.init_a, self.init_b))
self.fc = nn.Sequential(
nn.Linear(inp, squeeze),
nn.ReLU(inplace=True),
nn.Linear(squeeze, oup * self.exp),
h_sigmoid()
)
if use_spatial:
self.spa = nn.Sequential(
nn.Conv2d(inp, 1, kernel_size=1),
nn.BatchNorm2d(1),
)
else:
self.spa = None
def forward(self, x):
if isinstance(x, list):
x_in = x[0]
x_out = x[1]
else:
x_in = x
x_out = x
b, c, h, w = x_in.size()
y = self.avg_pool(x_in).view(b, c)
y = self.fc(y).view(b, self.oup * self.exp, 1, 1)
if self.exp == 4:
a1, b1, a2, b2 = torch.split(y, self.oup, dim=1)
a1 = (a1 - 0.5) * self.lambda_a + self.init_a[0] # 1.0
a2 = (a2 - 0.5) * self.lambda_a + self.init_a[1]
b1 = b1 - 0.5 + self.init_b[0]
b2 = b2 - 0.5 + self.init_b[1]
out = torch.max(x_out * a1 + b1, x_out * a2 + b2)
elif self.exp == 2:
if self.use_bias: # bias but not PL
a1, b1 = torch.split(y, self.oup, dim=1)
a1 = (a1 - 0.5) * self.lambda_a + self.init_a[0] # 1.0
b1 = b1 - 0.5 + self.init_b[0]
out = x_out * a1 + b1
else:
a1, a2 = torch.split(y, self.oup, dim=1)
a1 = (a1 - 0.5) * self.lambda_a + self.init_a[0] # 1.0
a2 = (a2 - 0.5) * self.lambda_a + self.init_a[1]
out = torch.max(x_out * a1, x_out * a2)
elif self.exp == 1:
a1 = y
a1 = (a1 - 0.5) * self.lambda_a + self.init_a[0] # 1.0
out = x_out * a1
if self.spa:
ys = self.spa(x_in).view(b, -1)
ys = F.softmax(ys, dim=1).view(b, 1, h, w) * h * w
ys = F.hardtanh(ys, 0, 3, inplace=True) / 3
out = out * ys
return out
class Conv3x3Norm(torch.nn.Module):
def __init__(self, in_channels, out_channels, stride):
super(Conv3x3Norm, self).__init__()
self.conv = ModulatedDeformConv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
self.bn = nn.GroupNorm(num_groups=16, num_channels=out_channels)
def forward(self, input, **kwargs):
x = self.conv(input.contiguous(), **kwargs)
x = self.bn(x)
return x
class DyConv(nn.Module):
def __init__(self, in_channels=256, out_channels=256, conv_func=Conv3x3Norm):
super(DyConv, self).__init__()
self.DyConv = nn.ModuleList()
self.DyConv.append(conv_func(in_channels, out_channels, 1))
self.DyConv.append(conv_func(in_channels, out_channels, 1))
self.DyConv.append(conv_func(in_channels, out_channels, 2))
self.AttnConv = nn.Sequential(
nn.AdaptiveAvgPool2d(1),
nn.Conv2d(in_channels, 1, kernel_size=1),
nn.ReLU(inplace=True))
self.h_sigmoid = h_sigmoid()
self.relu = DYReLU(in_channels, out_channels)
self.offset = nn.Conv2d(in_channels, 27, kernel_size=3, stride=1, padding=1)
self.init_weights()
def init_weights(self):
for m in self.DyConv.modules():
if isinstance(m, nn.Conv2d):
nn.init.normal_(m.weight.data, 0, 0.01)
if m.bias is not None:
m.bias.data.zero_()
for m in self.AttnConv.modules():
if isinstance(m, nn.Conv2d):
nn.init.normal_(m.weight.data, 0, 0.01)
if m.bias is not None:
m.bias.data.zero_()
def forward(self, x):
next_x = {}
feature_names = list(x.keys())
for level, name in enumerate(feature_names):
feature = x[name]
offset_mask = self.offset(feature)
offset = offset_mask[:, :18, :, :]
mask = offset_mask[:, 18:, :, :].sigmoid()
conv_args = dict(offset=offset, mask=mask)
temp_fea = [self.DyConv[1](feature, **conv_args)]
if level > 0:
temp_fea.append(self.DyConv[2](x[feature_names[level - 1]], **conv_args))
if level < len(x) - 1:
input = x[feature_names[level + 1]]
temp_fea.append(F.interpolate(self.DyConv[0](input, **conv_args),
size=[feature.size(2), feature.size(3)]))
attn_fea = []
res_fea = []
for fea in temp_fea:
res_fea.append(fea)
attn_fea.append(self.AttnConv(fea))
res_fea = torch.stack(res_fea)
spa_pyr_attn = self.h_sigmoid(torch.stack(attn_fea))
mean_fea = torch.mean(res_fea * spa_pyr_attn, dim=0, keepdim=False)
next_x[name] = self.relu(mean_fea)
return next_x
def autopad(k, p=None, d=1): # kernel, padding, dilation
"""Pad to 'same' shape outputs."""
if d > 1:
k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k] # actual kernel-size
if p is None:
p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad
return p
class Conv(nn.Module):
"""Standard convolution with args(ch_in, ch_out, kernel, stride, padding, groups, dilation, activation)."""
default_act = nn.SiLU() # default activation
def __init__(self, c1, c2, k=1, s=1, p=None, g=1, d=1, act=True):
"""Initialize Conv layer with given arguments including activation."""
super().__init__()
self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p, d), groups=g, dilation=d, bias=False)
self.bn = nn.BatchNorm2d(c2)
self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()
def forward(self, x):
"""Apply convolution, batch normalization and activation to input tensor."""
return self.act(self.bn(self.conv(x)))
def forward_fuse(self, x):
"""Perform transposed convolution of 2D data."""
return self.act(self.conv(x))
class DFL(nn.Module):
"""
Integral module of Distribution Focal Loss (DFL).
Proposed in Generalized Focal Loss https://ieeexplore.ieee.org/document/9792391
"""
def __init__(self, c1=16):
"""Initialize a convolutional layer with a given number of input channels."""
super().__init__()
self.conv = nn.Conv2d(c1, 1, 1, bias=False).requires_grad_(False)
x = torch.arange(c1, dtype=torch.float)
self.conv.weight.data[:] = nn.Parameter(x.view(1, c1, 1, 1))
self.c1 = c1
def forward(self, x):
"""Applies a transformer layer on input tensor 'x' and returns a tensor."""
b, c, a = x.shape # batch, channels, anchors
return self.conv(x.view(b, 4, self.c1, a).transpose(2, 1).softmax(1)).view(b, 4, a)
# return self.conv(x.view(b, self.c1, 4, a).softmax(1)).view(b, 4, a)
class Detect_DynamicHead(nn.Module):
"""YOLOv8 Detect head for detection models."""
dynamic = False # force grid reconstruction
export = False # export mode
shape = None
anchors = torch.empty(0) # init
strides = torch.empty(0) # init
def __init__(self, nc=80, ch=()):
"""Initializes the YOLOv8 detection layer with specified number of classes and channels."""
super().__init__()
self.nc = nc # number of classes
self.nl = len(ch) # number of detection layers
self.reg_max = 16 # DFL channels (ch[0] // 16 to scale 4/8/12/16/20 for n/s/m/l/x)
self.no = nc + self.reg_max * 4 # number of outputs per anchor
self.stride = torch.zeros(self.nl) # strides computed during build
c2, c3 = max((16, ch[0] // 4, self.reg_max * 4)), max(ch[0], min(self.nc, 100)) # channels
self.cv2 = nn.ModuleList(
nn.Sequential(Conv(x, c2, 3), Conv(c2, c2, 3), nn.Conv2d(c2, 4 * self.reg_max, 1)) for x in ch)
self.cv3 = nn.ModuleList(nn.Sequential(Conv(x, c3, 3), Conv(c3, c3, 3), nn.Conv2d(c3, self.nc, 1)) for x in ch)
self.dfl = DFL(self.reg_max) if self.reg_max > 1 else nn.Identity()
dyhead_tower = []
for i in range(self.nl):
channel = ch[i]
dyhead_tower.append(
DyConv(
channel,
channel,
conv_func=Conv3x3Norm,
)
)
self.add_module('dyhead_tower', nn.Sequential(*dyhead_tower))
def forward(self, x):
tensor_dict = {i: tensor for i, tensor in enumerate(x)}
x = self.dyhead_tower(tensor_dict)
x = list(x.values())
"""Concatenates and returns predicted bounding boxes and class probabilities."""
shape = x[0].shape # BCHW
for i in range(self.nl):
x[i] = torch.cat((self.cv2[i](x[i]), self.cv3[i](x[i])), 1)
if self.training:
return x
elif self.dynamic or self.shape != shape:
self.anchors, self.strides = (x.transpose(0, 1) for x in make_anchors(x, self.stride, 0.5))
self.shape = shape
x_cat = torch.cat([xi.view(shape[0], self.no, -1) for xi in x], 2)
if self.export and self.format in ('saved_model', 'pb', 'tflite', 'edgetpu', 'tfjs'): # avoid TF FlexSplitV ops
box = x_cat[:, :self.reg_max * 4]
cls = x_cat[:, self.reg_max * 4:]
else:
box, cls = x_cat.split((self.reg_max * 4, self.nc), 1)
dbox = dist2bbox(self.dfl(box), self.anchors.unsqueeze(0), xywh=True, dim=1) * self.strides
if self.export and self.format in ('tflite', 'edgetpu'):
# Normalize xywh with image size to mitigate quantization error of TFLite integer models as done in YOLOv5:
# https://github.com/ultralytics/yolov5/blob/0c8de3fca4a702f8ff5c435e67f378d1fce70243/models/tf.py#L307-L309
# See this PR for details: https://github.com/ultralytics/ultralytics/pull/1695
img_h = shape[2] * self.stride[0]
img_w = shape[3] * self.stride[0]
img_size = torch.tensor([img_w, img_h, img_w, img_h], device=dbox.device).reshape(1, 4, 1)
dbox /= img_size
y = torch.cat((dbox, cls.sigmoid()), 1)
return y if self.export else (y, x)
def bias_init(self):
"""Initialize Detect() biases, WARNING: requires stride availability."""
m = self # self.model[-1] # Detect() module
# cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1
# ncf = math.log(0.6 / (m.nc - 0.999999)) if cf is None else torch.log(cf / cf.sum()) # nominal class frequency
for a, b, s in zip(m.cv2, m.cv3, m.stride): # from
a[-1].bias.data[:] = 1.0 # box
b[-1].bias.data[:m.nc] = math.log(5 / m.nc / (640 / s) ** 2) # cls (.01 objects, 80 classes, 640 img)
② 修改ultralytics/nn/tasks.py文件
具体的修改内容如下图所示:
可以Ctrl+f然后搜索 “detect”方便后续锁定需要修改的位置。
2 配置文件
yolov8_DynamicHead.yaml的内容如下所示:
# Ultralytics YOLO 🚀, AGPL-3.0 license
# YOLOv8 object detection model with P3-P5 outputs. For Usage examples see https://docs.ultralytics.com/tasks/detect
# Parameters
nc: 80 # number of classes
scales: # model compound scaling constants, i.e. 'model=yolov8n.yaml' will call yolov8.yaml with scale 'n'
# [depth, width, max_channels]
n: [0.33, 0.25, 1024] # YOLOv8n summary: 225 layers, 3157200 parameters, 3157184 gradients, 8.9 GFLOPs
s: [0.33, 0.50, 1024] # YOLOv8s summary: 225 layers, 11166560 parameters, 11166544 gradients, 28.8 GFLOPs
m: [0.67, 0.75, 768] # YOLOv8m summary: 295 layers, 25902640 parameters, 25902624 gradients, 79.3 GFLOPs
l: [1.00, 1.00, 512] # YOLOv8l summary: 365 layers, 43691520 parameters, 43691504 gradients, 165.7 GFLOPs
x: [1.00, 1.25, 512] # YOLOv8x summary: 365 layers, 68229648 parameters, 68229632 gradients, 258.5 GFLOPs
# YOLOv8.0n backbone
backbone:
# [from, repeats, module, args]
- [-1, 1, Conv, [64, 3, 2]] # 0-P1/2
- [-1, 1, Conv, [128, 3, 2]] # 1-P2/4
- [-1, 3, C2f, [128, True]]
- [-1, 1, Conv, [256, 3, 2]] # 3-P3/8
- [-1, 6, C2f, [256, True]]
- [-1, 1, Conv, [512, 3, 2]] # 5-P4/16
- [-1, 6, C2f, [512, True]]
- [-1, 1, Conv, [1024, 3, 2]] # 7-P5/32
- [-1, 3, C2f, [1024, True]]
- [-1, 1, SPPF, [1024, 5]] # 9
# YOLOv8.0n head
head:
- [-1, 1, nn.Upsample, [None, 2, "nearest"]]
- [[-1, 6], 1, Concat, [1]] # cat backbone P4
- [-1, 3, C2f, [512]] # 12
- [-1, 1, nn.Upsample, [None, 2, "nearest"]]
- [[-1, 4], 1, Concat, [1]] # cat backbone P3
- [-1, 3, C2f, [512]] # 15 (P3/8-small)
- [-1, 1, Conv, [256, 3, 2]]
- [[-1, 12], 1, Concat, [1]] # cat head P4
- [-1, 3, C2f, [512]] # 18 (P4/16-medium)
- [-1, 1, Conv, [512, 3, 2]]
- [[-1, 9], 1, Concat, [1]] # cat head P5
- [-1, 3, C2f, [512]] # 21 (P5/32-large)
- [[15, 18, 21], 1, Detect_DynamicHead, [nc]] # Detect(P3, P4, P5)
3 训练
上述修改完毕后,开始训练吧!🌺🌺🌺
训练示例如下:
yolo task=detect mode=train model=cfg/models/v8/yolov8_DynamicHead.yaml data=cfg/datasets/coco128.yaml epochs=100 batch=16 device=cpu project=yolov8
其他
1 task.py
如果觉得替换部分内容不方便的话,可以直接复制下述文件对应替换原始py文件的内容:
- 修改后的task.py
# Ultralytics YOLO 🚀, AGPL-3.0 license
import contextlib
from copy import deepcopy
from pathlib import Path
import torch
import torch.nn as nn
from ultralytics.nn.modules import (
AIFI,
C1,
C2,
C3,
C3TR,
OBB,
SPP,
SPPELAN,
SPPF,
ADown,
Bottleneck,
BottleneckCSP,
C2f,
C2fAttn,
C3Ghost,
C3x,
CBFuse,
CBLinear,
Classify,
Concat,
Conv,
Conv2,
ConvTranspose,
Detect,
DWConv,
DWConvTranspose2d,
Focus,
GhostBottleneck,
GhostConv,
HGBlock,
HGStem,
ImagePoolingAttn,
Pose,
RepC3,
RepConv,
RepNCSPELAN4,
ResNetLayer,
RTDETRDecoder,
Segment,
Silence,
WorldDetect,
)
from ultralytics.utils import DEFAULT_CFG_DICT, DEFAULT_CFG_KEYS, LOGGER, colorstr, emojis, yaml_load
from ultralytics.utils.checks import check_requirements, check_suffix, check_yaml
from ultralytics.utils.loss import v8ClassificationLoss, v8DetectionLoss, v8OBBLoss, v8PoseLoss, v8SegmentationLoss
from ultralytics.utils.plotting import feature_visualization
from ultralytics.utils.torch_utils import (
fuse_conv_and_bn,
fuse_deconv_and_bn,
initialize_weights,
intersect_dicts,
make_divisible,
model_info,
scale_img,
time_sync,
)
from .modules.DynamicHead import Detect_DynamicHead #导入DynamicHead.py中的检测头
try:
import thop
except ImportError:
thop = None
class BaseModel(nn.Module):
"""The BaseModel class serves as a base class for all the models in the Ultralytics YOLO family."""
def forward(self, x, *args, **kwargs):
"""
Forward pass of the model on a single scale. Wrapper for `_forward_once` method.
Args:
x (torch.Tensor | dict): The input image tensor or a dict including image tensor and gt labels.
Returns:
(torch.Tensor): The output of the network.
"""
if isinstance(x, dict): # for cases of training and validating while training.
return self.loss(x, *args, **kwargs)
return self.predict(x, *args, **kwargs)
def predict(self, x, profile=False, visualize=False, augment=False, embed=None):
"""
Perform a forward pass through the network.
Args:
x (torch.Tensor): The input tensor to the model.
profile (bool): Print the computation time of each layer if True, defaults to False.
visualize (bool): Save the feature maps of the model if True, defaults to False.
augment (bool): Augment image during prediction, defaults to False.
embed (list, optional): A list of feature vectors/embeddings to return.
Returns:
(torch.Tensor): The last output of the model.
"""
if augment:
return self._predict_augment(x)
return self._predict_once(x, profile, visualize, embed)
def _predict_once(self, x, profile=False, visualize=False, embed=None):
"""
Perform a forward pass through the network.
Args:
x (torch.Tensor): The input tensor to the model.
profile (bool): Print the computation time of each layer if True, defaults to False.
visualize (bool): Save the feature maps of the model if True, defaults to False.
embed (list, optional): A list of feature vectors/embeddings to return.
Returns:
(torch.Tensor): The last output of the model.
"""
y, dt, embeddings = [], [], [] # outputs
for m in self.model:
if m.f != -1: # if not from previous layer
x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers
if profile:
self._profile_one_layer(m, x, dt)
x = m(x) # run
y.append(x if m.i in self.save else None) # save output
if visualize:
feature_visualization(x, m.type, m.i, save_dir=visualize)
if embed and m.i in embed:
embeddings.append(nn.functional.adaptive_avg_pool2d(x, (1, 1)).squeeze(-1).squeeze(-1)) # flatten
if m.i == max(embed):
return torch.unbind(torch.cat(embeddings, 1), dim=0)
return x
def _predict_augment(self, x):
"""Perform augmentations on input image x and return augmented inference."""
LOGGER.warning(
f"WARNING ⚠️ {self.__class__.__name__} does not support augmented inference yet. "
f"Reverting to single-scale inference instead."
)
return self._predict_once(x)
def _profile_one_layer(self, m, x, dt):
"""
Profile the computation time and FLOPs of a single layer of the model on a given input. Appends the results to
the provided list.
Args:
m (nn.Module): The layer to be profiled.
x (torch.Tensor): The input data to the layer.
dt (list): A list to store the computation time of the layer.
Returns:
None
"""
c = m == self.model[-1] and isinstance(x, list) # is final layer list, copy input as inplace fix
flops = thop.profile(m, inputs=[x.copy() if c else x], verbose=False)[0] / 1e9 * 2 if thop else 0 # FLOPs
t = time_sync()
for _ in range(10):
m(x.copy() if c else x)
dt.append((time_sync() - t) * 100)
if m == self.model[0]:
LOGGER.info(f"{'time (ms)':>10s} {'GFLOPs':>10s} {'params':>10s} module")
LOGGER.info(f"{dt[-1]:10.2f} {flops:10.2f} {m.np:10.0f} {m.type}")
if c:
LOGGER.info(f"{sum(dt):10.2f} {'-':>10s} {'-':>10s} Total")
def fuse(self, verbose=True):
"""
Fuse the `Conv2d()` and `BatchNorm2d()` layers of the model into a single layer, in order to improve the
computation efficiency.
Returns:
(nn.Module): The fused model is returned.
"""
if not self.is_fused():
for m in self.model.modules():
if isinstance(m, (Conv, Conv2, DWConv)) and hasattr(m, "bn"):
if isinstance(m, Conv2):
m.fuse_convs()
m.conv = fuse_conv_and_bn(m.conv, m.bn) # update conv
delattr(m, "bn") # remove batchnorm
m.forward = m.forward_fuse # update forward
if isinstance(m, ConvTranspose) and hasattr(m, "bn"):
m.conv_transpose = fuse_deconv_and_bn(m.conv_transpose, m.bn)
delattr(m, "bn") # remove batchnorm
m.forward = m.forward_fuse # update forward
if isinstance(m, RepConv):
m.fuse_convs()
m.forward = m.forward_fuse # update forward
self.info(verbose=verbose)
return self
def is_fused(self, thresh=10):
"""
Check if the model has less than a certain threshold of BatchNorm layers.
Args:
thresh (int, optional): The threshold number of BatchNorm layers. Default is 10.
Returns:
(bool): True if the number of BatchNorm layers in the model is less than the threshold, False otherwise.
"""
bn = tuple(v for k, v in nn.__dict__.items() if "Norm" in k) # normalization layers, i.e. BatchNorm2d()
return sum(isinstance(v, bn) for v in self.modules()) < thresh # True if < 'thresh' BatchNorm layers in model
def info(self, detailed=False, verbose=True, imgsz=640):
"""
Prints model information.
Args:
detailed (bool): if True, prints out detailed information about the model. Defaults to False
verbose (bool): if True, prints out the model information. Defaults to False
imgsz (int): the size of the image that the model will be trained on. Defaults to 640
"""
return model_info(self, detailed=detailed, verbose=verbose, imgsz=imgsz)
def _apply(self, fn):
"""
Applies a function to all the tensors in the model that are not parameters or registered buffers.
Args:
fn (function): the function to apply to the model
Returns:
(BaseModel): An updated BaseModel object.
"""
self = super()._apply(fn)
m = self.model[-1] # Detect()
if isinstance(m, (Detect, Detect_DynamicHead)): # includes all Detect subclasses like Segment, Pose, OBB, WorldDetect
m.stride = fn(m.stride)
m.anchors = fn(m.anchors)
m.strides = fn(m.strides)
return self
def load(self, weights, verbose=True):
"""
Load the weights into the model.
Args:
weights (dict | torch.nn.Module): The pre-trained weights to be loaded.
verbose (bool, optional): Whether to log the transfer progress. Defaults to True.
"""
model = weights["model"] if isinstance(weights, dict) else weights # torchvision models are not dicts
csd = model.float().state_dict() # checkpoint state_dict as FP32
csd = intersect_dicts(csd, self.state_dict()) # intersect
self.load_state_dict(csd, strict=False) # load
if verbose:
LOGGER.info(f"Transferred {len(csd)}/{len(self.model.state_dict())} items from pretrained weights")
def loss(self, batch, preds=None):
"""
Compute loss.
Args:
batch (dict): Batch to compute loss on
preds (torch.Tensor | List[torch.Tensor]): Predictions.
"""
if not hasattr(self, "criterion"):
self.criterion = self.init_criterion()
preds = self.forward(batch["img"]) if preds is None else preds
return self.criterion(preds, batch)
def init_criterion(self):
"""Initialize the loss criterion for the BaseModel."""
raise NotImplementedError("compute_loss() needs to be implemented by task heads")
class DetectionModel(BaseModel):
"""YOLOv8 detection model."""
def __init__(self, cfg="yolov8n.yaml", ch=3, nc=None, verbose=True): # model, input channels, number of classes
"""Initialize the YOLOv8 detection model with the given config and parameters."""
super().__init__()
self.yaml = cfg if isinstance(cfg, dict) else yaml_model_load(cfg) # cfg dict
# Define model
ch = self.yaml["ch"] = self.yaml.get("ch", ch) # input channels
if nc and nc != self.yaml["nc"]:
LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}")
self.yaml["nc"] = nc # override YAML value
self.model, self.save = parse_model(deepcopy(self.yaml), ch=ch, verbose=verbose) # model, savelist
self.names = {i: f"{i}" for i in range(self.yaml["nc"])} # default names dict
self.inplace = self.yaml.get("inplace", True)
# Build strides
m = self.model[-1] # Detect()
if isinstance(m, (Detect, Detect_DynamicHead)): # includes all Detect subclasses like Segment, Pose, OBB, WorldDetect
s = 256 # 2x min stride
m.inplace = self.inplace
forward = lambda x: self.forward(x)[0] if isinstance(m, (Segment, Pose, OBB)) else self.forward(x)
m.stride = torch.tensor([s / x.shape[-2] for x in forward(torch.zeros(1, ch, s, s))]) # forward
self.stride = m.stride
m.bias_init() # only run once
else:
self.stride = torch.Tensor([32]) # default stride for i.e. RTDETR
# Init weights, biases
initialize_weights(self)
if verbose:
self.info()
LOGGER.info("")
def _predict_augment(self, x):
"""Perform augmentations on input image x and return augmented inference and train outputs."""
img_size = x.shape[-2:] # height, width
s = [1, 0.83, 0.67] # scales
f = [None, 3, None] # flips (2-ud, 3-lr)
y = [] # outputs
for si, fi in zip(s, f):
xi = scale_img(x.flip(fi) if fi else x, si, gs=int(self.stride.max()))
yi = super().predict(xi)[0] # forward
yi = self._descale_pred(yi, fi, si, img_size)
y.append(yi)
y = self._clip_augmented(y) # clip augmented tails
return torch.cat(y, -1), None # augmented inference, train
@staticmethod
def _descale_pred(p, flips, scale, img_size, dim=1):
"""De-scale predictions following augmented inference (inverse operation)."""
p[:, :4] /= scale # de-scale
x, y, wh, cls = p.split((1, 1, 2, p.shape[dim] - 4), dim)
if flips == 2:
y = img_size[0] - y # de-flip ud
elif flips == 3:
x = img_size[1] - x # de-flip lr
return torch.cat((x, y, wh, cls), dim)
def _clip_augmented(self, y):
"""Clip YOLO augmented inference tails."""
nl = self.model[-1].nl # number of detection layers (P3-P5)
g = sum(4**x for x in range(nl)) # grid points
e = 1 # exclude layer count
i = (y[0].shape[-1] // g) * sum(4**x for x in range(e)) # indices
y[0] = y[0][..., :-i] # large
i = (y[-1].shape[-1] // g) * sum(4 ** (nl - 1 - x) for x in range(e)) # indices
y[-1] = y[-1][..., i:] # small
return y
def init_criterion(self):
"""Initialize the loss criterion for the DetectionModel."""
return v8DetectionLoss(self)
class OBBModel(DetectionModel):
"""YOLOv8 Oriented Bounding Box (OBB) model."""
def __init__(self, cfg="yolov8n-obb.yaml", ch=3, nc=None, verbose=True):
"""Initialize YOLOv8 OBB model with given config and parameters."""
super().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)
def init_criterion(self):
"""Initialize the loss criterion for the model."""
return v8OBBLoss(self)
class SegmentationModel(DetectionModel):
"""YOLOv8 segmentation model."""
def __init__(self, cfg="yolov8n-seg.yaml", ch=3, nc=None, verbose=True):
"""Initialize YOLOv8 segmentation model with given config and parameters."""
super().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)
def init_criterion(self):
"""Initialize the loss criterion for the SegmentationModel."""
return v8SegmentationLoss(self)
class PoseModel(DetectionModel):
"""YOLOv8 pose model."""
def __init__(self, cfg="yolov8n-pose.yaml", ch=3, nc=None, data_kpt_shape=(None, None), verbose=True):
"""Initialize YOLOv8 Pose model."""
if not isinstance(cfg, dict):
cfg = yaml_model_load(cfg) # load model YAML
if any(data_kpt_shape) and list(data_kpt_shape) != list(cfg["kpt_shape"]):
LOGGER.info(f"Overriding model.yaml kpt_shape={cfg['kpt_shape']} with kpt_shape={data_kpt_shape}")
cfg["kpt_shape"] = data_kpt_shape
super().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)
def init_criterion(self):
"""Initialize the loss criterion for the PoseModel."""
return v8PoseLoss(self)
class ClassificationModel(BaseModel):
"""YOLOv8 classification model."""
def __init__(self, cfg="yolov8n-cls.yaml", ch=3, nc=None, verbose=True):
"""Init ClassificationModel with YAML, channels, number of classes, verbose flag."""
super().__init__()
self._from_yaml(cfg, ch, nc, verbose)
def _from_yaml(self, cfg, ch, nc, verbose):
"""Set YOLOv8 model configurations and define the model architecture."""
self.yaml = cfg if isinstance(cfg, dict) else yaml_model_load(cfg) # cfg dict
# Define model
ch = self.yaml["ch"] = self.yaml.get("ch", ch) # input channels
if nc and nc != self.yaml["nc"]:
LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}")
self.yaml["nc"] = nc # override YAML value
elif not nc and not self.yaml.get("nc", None):
raise ValueError("nc not specified. Must specify nc in model.yaml or function arguments.")
self.model, self.save = parse_model(deepcopy(self.yaml), ch=ch, verbose=verbose) # model, savelist
self.stride = torch.Tensor([1]) # no stride constraints
self.names = {i: f"{i}" for i in range(self.yaml["nc"])} # default names dict
self.info()
@staticmethod
def reshape_outputs(model, nc):
"""Update a TorchVision classification model to class count 'n' if required."""
name, m = list((model.model if hasattr(model, "model") else model).named_children())[-1] # last module
if isinstance(m, Classify): # YOLO Classify() head
if m.linear.out_features != nc:
m.linear = nn.Linear(m.linear.in_features, nc)
elif isinstance(m, nn.Linear): # ResNet, EfficientNet
if m.out_features != nc:
setattr(model, name, nn.Linear(m.in_features, nc))
elif isinstance(m, nn.Sequential):
types = [type(x) for x in m]
if nn.Linear in types:
i = types.index(nn.Linear) # nn.Linear index
if m[i].out_features != nc:
m[i] = nn.Linear(m[i].in_features, nc)
elif nn.Conv2d in types:
i = types.index(nn.Conv2d) # nn.Conv2d index
if m[i].out_channels != nc:
m[i] = nn.Conv2d(m[i].in_channels, nc, m[i].kernel_size, m[i].stride, bias=m[i].bias is not None)
def init_criterion(self):
"""Initialize the loss criterion for the ClassificationModel."""
return v8ClassificationLoss()
class RTDETRDetectionModel(DetectionModel):
"""
RTDETR (Real-time DEtection and Tracking using Transformers) Detection Model class.
This class is responsible for constructing the RTDETR architecture, defining loss functions, and facilitating both
the training and inference processes. RTDETR is an object detection and tracking model that extends from the
DetectionModel base class.
Attributes:
cfg (str): The configuration file path or preset string. Default is 'rtdetr-l.yaml'.
ch (int): Number of input channels. Default is 3 (RGB).
nc (int, optional): Number of classes for object detection. Default is None.
verbose (bool): Specifies if summary statistics are shown during initialization. Default is True.
Methods:
init_criterion: Initializes the criterion used for loss calculation.
loss: Computes and returns the loss during training.
predict: Performs a forward pass through the network and returns the output.
"""
def __init__(self, cfg="rtdetr-l.yaml", ch=3, nc=None, verbose=True):
"""
Initialize the RTDETRDetectionModel.
Args:
cfg (str): Configuration file name or path.
ch (int): Number of input channels.
nc (int, optional): Number of classes. Defaults to None.
verbose (bool, optional): Print additional information during initialization. Defaults to True.
"""
super().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)
def init_criterion(self):
"""Initialize the loss criterion for the RTDETRDetectionModel."""
from ultralytics.models.utils.loss import RTDETRDetectionLoss
return RTDETRDetectionLoss(nc=self.nc, use_vfl=True)
def loss(self, batch, preds=None):
"""
Compute the loss for the given batch of data.
Args:
batch (dict): Dictionary containing image and label data.
preds (torch.Tensor, optional): Precomputed model predictions. Defaults to None.
Returns:
(tuple): A tuple containing the total loss and main three losses in a tensor.
"""
if not hasattr(self, "criterion"):
self.criterion = self.init_criterion()
img = batch["img"]
# NOTE: preprocess gt_bbox and gt_labels to list.
bs = len(img)
batch_idx = batch["batch_idx"]
gt_groups = [(batch_idx == i).sum().item() for i in range(bs)]
targets = {
"cls": batch["cls"].to(img.device, dtype=torch.long).view(-1),
"bboxes": batch["bboxes"].to(device=img.device),
"batch_idx": batch_idx.to(img.device, dtype=torch.long).view(-1),
"gt_groups": gt_groups,
}
preds = self.predict(img, batch=targets) if preds is None else preds
dec_bboxes, dec_scores, enc_bboxes, enc_scores, dn_meta = preds if self.training else preds[1]
if dn_meta is None:
dn_bboxes, dn_scores = None, None
else:
dn_bboxes, dec_bboxes = torch.split(dec_bboxes, dn_meta["dn_num_split"], dim=2)
dn_scores, dec_scores = torch.split(dec_scores, dn_meta["dn_num_split"], dim=2)
dec_bboxes = torch.cat([enc_bboxes.unsqueeze(0), dec_bboxes]) # (7, bs, 300, 4)
dec_scores = torch.cat([enc_scores.unsqueeze(0), dec_scores])
loss = self.criterion(
(dec_bboxes, dec_scores), targets, dn_bboxes=dn_bboxes, dn_scores=dn_scores, dn_meta=dn_meta
)
# NOTE: There are like 12 losses in RTDETR, backward with all losses but only show the main three losses.
return sum(loss.values()), torch.as_tensor(
[loss[k].detach() for k in ["loss_giou", "loss_class", "loss_bbox"]], device=img.device
)
def predict(self, x, profile=False, visualize=False, batch=None, augment=False, embed=None):
"""
Perform a forward pass through the model.
Args:
x (torch.Tensor): The input tensor.
profile (bool, optional): If True, profile the computation time for each layer. Defaults to False.
visualize (bool, optional): If True, save feature maps for visualization. Defaults to False.
batch (dict, optional): Ground truth data for evaluation. Defaults to None.
augment (bool, optional): If True, perform data augmentation during inference. Defaults to False.
embed (list, optional): A list of feature vectors/embeddings to return.
Returns:
(torch.Tensor): Model's output tensor.
"""
y, dt, embeddings = [], [], [] # outputs
for m in self.model[:-1]: # except the head part
if m.f != -1: # if not from previous layer
x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers
if profile:
self._profile_one_layer(m, x, dt)
x = m(x) # run
y.append(x if m.i in self.save else None) # save output
if visualize:
feature_visualization(x, m.type, m.i, save_dir=visualize)
if embed and m.i in embed:
embeddings.append(nn.functional.adaptive_avg_pool2d(x, (1, 1)).squeeze(-1).squeeze(-1)) # flatten
if m.i == max(embed):
return torch.unbind(torch.cat(embeddings, 1), dim=0)
head = self.model[-1]
x = head([y[j] for j in head.f], batch) # head inference
return x
class WorldModel(DetectionModel):
"""YOLOv8 World Model."""
def __init__(self, cfg="yolov8s-world.yaml", ch=3, nc=None, verbose=True):
"""Initialize YOLOv8 world model with given config and parameters."""
self.txt_feats = torch.randn(1, nc or 80, 512) # features placeholder
self.clip_model = None # CLIP model placeholder
super().__init__(cfg=cfg, ch=ch, nc=nc, verbose=verbose)
def set_classes(self, text, batch=80, cache_clip_model=True):
"""Set classes in advance so that model could do offline-inference without clip model."""
try:
import clip
except ImportError:
check_requirements("git+https://github.com/ultralytics/CLIP.git")
import clip
if (
not getattr(self, "clip_model", None) and cache_clip_model
): # for backwards compatibility of models lacking clip_model attribute
self.clip_model = clip.load("ViT-B/32")[0]
model = self.clip_model if cache_clip_model else clip.load("ViT-B/32")[0]
device = next(model.parameters()).device
text_token = clip.tokenize(text).to(device)
txt_feats = [model.encode_text(token).detach() for token in text_token.split(batch)]
txt_feats = txt_feats[0] if len(txt_feats) == 1 else torch.cat(txt_feats, dim=0)
txt_feats = txt_feats / txt_feats.norm(p=2, dim=-1, keepdim=True)
self.txt_feats = txt_feats.reshape(-1, len(text), txt_feats.shape[-1])
self.model[-1].nc = len(text)
def predict(self, x, profile=False, visualize=False, txt_feats=None, augment=False, embed=None):
"""
Perform a forward pass through the model.
Args:
x (torch.Tensor): The input tensor.
profile (bool, optional): If True, profile the computation time for each layer. Defaults to False.
visualize (bool, optional): If True, save feature maps for visualization. Defaults to False.
txt_feats (torch.Tensor): The text features, use it if it's given. Defaults to None.
augment (bool, optional): If True, perform data augmentation during inference. Defaults to False.
embed (list, optional): A list of feature vectors/embeddings to return.
Returns:
(torch.Tensor): Model's output tensor.
"""
txt_feats = (self.txt_feats if txt_feats is None else txt_feats).to(device=x.device, dtype=x.dtype)
if len(txt_feats) != len(x):
txt_feats = txt_feats.repeat(len(x), 1, 1)
ori_txt_feats = txt_feats.clone()
y, dt, embeddings = [], [], [] # outputs
for m in self.model: # except the head part
if m.f != -1: # if not from previous layer
x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f] # from earlier layers
if profile:
self._profile_one_layer(m, x, dt)
if isinstance(m, C2fAttn):
x = m(x, txt_feats)
elif isinstance(m, WorldDetect):
x = m(x, ori_txt_feats)
elif isinstance(m, ImagePoolingAttn):
txt_feats = m(x, txt_feats)
else:
x = m(x) # run
y.append(x if m.i in self.save else None) # save output
if visualize:
feature_visualization(x, m.type, m.i, save_dir=visualize)
if embed and m.i in embed:
embeddings.append(nn.functional.adaptive_avg_pool2d(x, (1, 1)).squeeze(-1).squeeze(-1)) # flatten
if m.i == max(embed):
return torch.unbind(torch.cat(embeddings, 1), dim=0)
return x
def loss(self, batch, preds=None):
"""
Compute loss.
Args:
batch (dict): Batch to compute loss on.
preds (torch.Tensor | List[torch.Tensor]): Predictions.
"""
if not hasattr(self, "criterion"):
self.criterion = self.init_criterion()
if preds is None:
preds = self.forward(batch["img"], txt_feats=batch["txt_feats"])
return self.criterion(preds, batch)
class Ensemble(nn.ModuleList):
"""Ensemble of models."""
def __init__(self):
"""Initialize an ensemble of models."""
super().__init__()
def forward(self, x, augment=False, profile=False, visualize=False):
"""Function generates the YOLO network's final layer."""
y = [module(x, augment, profile, visualize)[0] for module in self]
# y = torch.stack(y).max(0)[0] # max ensemble
# y = torch.stack(y).mean(0) # mean ensemble
y = torch.cat(y, 2) # nms ensemble, y shape(B, HW, C)
return y, None # inference, train output
# Functions ------------------------------------------------------------------------------------------------------------
@contextlib.contextmanager
def temporary_modules(modules=None):
"""
Context manager for temporarily adding or modifying modules in Python's module cache (`sys.modules`).
This function can be used to change the module paths during runtime. It's useful when refactoring code,
where you've moved a module from one location to another, but you still want to support the old import
paths for backwards compatibility.
Args:
modules (dict, optional): A dictionary mapping old module paths to new module paths.
Example:
```python
with temporary_modules({'old.module.path': 'new.module.path'}):
import old.module.path # this will now import new.module.path
```
Note:
The changes are only in effect inside the context manager and are undone once the context manager exits.
Be aware that directly manipulating `sys.modules` can lead to unpredictable results, especially in larger
applications or libraries. Use this function with caution.
"""
if not modules:
modules = {}
import importlib
import sys
try:
# Set modules in sys.modules under their old name
for old, new in modules.items():
sys.modules[old] = importlib.import_module(new)
yield
finally:
# Remove the temporary module paths
for old in modules:
if old in sys.modules:
del sys.modules[old]
def torch_safe_load(weight):
"""
This function attempts to load a PyTorch model with the torch.load() function. If a ModuleNotFoundError is raised,
it catches the error, logs a warning message, and attempts to install the missing module via the
check_requirements() function. After installation, the function again attempts to load the model using torch.load().
Args:
weight (str): The file path of the PyTorch model.
Returns:
(dict): The loaded PyTorch model.
"""
from ultralytics.utils.downloads import attempt_download_asset
check_suffix(file=weight, suffix=".pt")
file = attempt_download_asset(weight) # search online if missing locally
try:
with temporary_modules(
{
"ultralytics.yolo.utils": "ultralytics.utils",
"ultralytics.yolo.v8": "ultralytics.models.yolo",
"ultralytics.yolo.data": "ultralytics.data",
}
): # for legacy 8.0 Classify and Pose models
ckpt = torch.load(file, map_location="cpu")
except ModuleNotFoundError as e: # e.name is missing module name
if e.name == "models":
raise TypeError(
emojis(
f"ERROR ❌️ {weight} appears to be an Ultralytics YOLOv5 model originally trained "
f"with https://github.com/ultralytics/yolov5.\nThis model is NOT forwards compatible with "
f"YOLOv8 at https://github.com/ultralytics/ultralytics."
f"\nRecommend fixes are to train a new model using the latest 'ultralytics' package or to "
f"run a command with an official YOLOv8 model, i.e. 'yolo predict model=yolov8n.pt'"
)
) from e
LOGGER.warning(
f"WARNING ⚠️ {weight} appears to require '{e.name}', which is not in ultralytics requirements."
f"\nAutoInstall will run now for '{e.name}' but this feature will be removed in the future."
f"\nRecommend fixes are to train a new model using the latest 'ultralytics' package or to "
f"run a command with an official YOLOv8 model, i.e. 'yolo predict model=yolov8n.pt'"
)
check_requirements(e.name) # install missing module
ckpt = torch.load(file, map_location="cpu")
if not isinstance(ckpt, dict):
# File is likely a YOLO instance saved with i.e. torch.save(model, "saved_model.pt")
LOGGER.warning(
f"WARNING ⚠️ The file '{weight}' appears to be improperly saved or formatted. "
f"For optimal results, use model.save('filename.pt') to correctly save YOLO models."
)
ckpt = {"model": ckpt.model}
return ckpt, file # load
def attempt_load_weights(weights, device=None, inplace=True, fuse=False):
"""Loads an ensemble of models weights=[a,b,c] or a single model weights=[a] or weights=a."""
ensemble = Ensemble()
for w in weights if isinstance(weights, list) else [weights]:
ckpt, w = torch_safe_load(w) # load ckpt
args = {**DEFAULT_CFG_DICT, **ckpt["train_args"]} if "train_args" in ckpt else None # combined args
model = (ckpt.get("ema") or ckpt["model"]).to(device).float() # FP32 model
# Model compatibility updates
model.args = args # attach args to model
model.pt_path = w # attach *.pt file path to model
model.task = guess_model_task(model)
if not hasattr(model, "stride"):
model.stride = torch.tensor([32.0])
# Append
ensemble.append(model.fuse().eval() if fuse and hasattr(model, "fuse") else model.eval()) # model in eval mode
# Module updates
for m in ensemble.modules():
if hasattr(m, "inplace"):
m.inplace = inplace
elif isinstance(m, nn.Upsample) and not hasattr(m, "recompute_scale_factor"):
m.recompute_scale_factor = None # torch 1.11.0 compatibility
# Return model
if len(ensemble) == 1:
return ensemble[-1]
# Return ensemble
LOGGER.info(f"Ensemble created with {weights}\n")
for k in "names", "nc", "yaml":
setattr(ensemble, k, getattr(ensemble[0], k))
ensemble.stride = ensemble[int(torch.argmax(torch.tensor([m.stride.max() for m in ensemble])))].stride
assert all(ensemble[0].nc == m.nc for m in ensemble), f"Models differ in class counts {[m.nc for m in ensemble]}"
return ensemble
def attempt_load_one_weight(weight, device=None, inplace=True, fuse=False):
"""Loads a single model weights."""
ckpt, weight = torch_safe_load(weight) # load ckpt
args = {**DEFAULT_CFG_DICT, **(ckpt.get("train_args", {}))} # combine model and default args, preferring model args
model = (ckpt.get("ema") or ckpt["model"]).to(device).float() # FP32 model
# Model compatibility updates
model.args = {k: v for k, v in args.items() if k in DEFAULT_CFG_KEYS} # attach args to model
model.pt_path = weight # attach *.pt file path to model
model.task = guess_model_task(model)
if not hasattr(model, "stride"):
model.stride = torch.tensor([32.0])
model = model.fuse().eval() if fuse and hasattr(model, "fuse") else model.eval() # model in eval mode
# Module updates
for m in model.modules():
if hasattr(m, "inplace"):
m.inplace = inplace
elif isinstance(m, nn.Upsample) and not hasattr(m, "recompute_scale_factor"):
m.recompute_scale_factor = None # torch 1.11.0 compatibility
# Return model and ckpt
return model, ckpt
def parse_model(d, ch, verbose=True): # model_dict, input_channels(3)
"""Parse a YOLO model.yaml dictionary into a PyTorch model."""
import ast
# Args
max_channels = float("inf")
nc, act, scales = (d.get(x) for x in ("nc", "activation", "scales"))
depth, width, kpt_shape = (d.get(x, 1.0) for x in ("depth_multiple", "width_multiple", "kpt_shape"))
if scales:
scale = d.get("scale")
if not scale:
scale = tuple(scales.keys())[0]
LOGGER.warning(f"WARNING ⚠️ no model scale passed. Assuming scale='{scale}'.")
depth, width, max_channels = scales[scale]
if act:
Conv.default_act = eval(act) # redefine default activation, i.e. Conv.default_act = nn.SiLU()
if verbose:
LOGGER.info(f"{colorstr('activation:')} {act}") # print
if verbose:
LOGGER.info(f"\n{'':>3}{'from':>20}{'n':>3}{'params':>10} {'module':<45}{'arguments':<30}")
ch = [ch]
layers, save, c2 = [], [], ch[-1] # layers, savelist, ch out
for i, (f, n, m, args) in enumerate(d["backbone"] + d["head"]): # from, number, module, args
m = getattr(torch.nn, m[3:]) if "nn." in m else globals()[m] # get module
for j, a in enumerate(args):
if isinstance(a, str):
with contextlib.suppress(ValueError):
args[j] = locals()[a] if a in locals() else ast.literal_eval(a)
n = n_ = max(round(n * depth), 1) if n > 1 else n # depth gain
if m in {
Classify,
Conv,
ConvTranspose,
GhostConv,
Bottleneck,
GhostBottleneck,
SPP,
SPPF,
DWConv,
Focus,
BottleneckCSP,
C1,
C2,
C2f,
RepNCSPELAN4,
ADown,
SPPELAN,
C2fAttn,
C3,
C3TR,
C3Ghost,
nn.ConvTranspose2d,
DWConvTranspose2d,
C3x,
RepC3,
}:
c1, c2 = ch[f], args[0]
if c2 != nc: # if c2 not equal to number of classes (i.e. for Classify() output)
c2 = make_divisible(min(c2, max_channels) * width, 8)
if m is C2fAttn:
args[1] = make_divisible(min(args[1], max_channels // 2) * width, 8) # embed channels
args[2] = int(
max(round(min(args[2], max_channels // 2 // 32)) * width, 1) if args[2] > 1 else args[2]
) # num heads
args = [c1, c2, *args[1:]]
if m in {BottleneckCSP, C1, C2, C2f, C2fAttn, C3, C3TR, C3Ghost, C3x, RepC3}:
args.insert(2, n) # number of repeats
n = 1
elif m is AIFI:
args = [ch[f], *args]
elif m in {HGStem, HGBlock}:
c1, cm, c2 = ch[f], args[0], args[1]
args = [c1, cm, c2, *args[2:]]
if m is HGBlock:
args.insert(4, n) # number of repeats
n = 1
elif m is ResNetLayer:
c2 = args[1] if args[3] else args[1] * 4
elif m is nn.BatchNorm2d:
args = [ch[f]]
elif m is Concat:
c2 = sum(ch[x] for x in f)
elif m in {Detect, WorldDetect, Segment, Pose, OBB, ImagePoolingAttn, Detect_DynamicHead}:
args.append([ch[x] for x in f])
if m is Segment:
args[2] = make_divisible(min(args[2], max_channels) * width, 8)
elif m is RTDETRDecoder: # special case, channels arg must be passed in index 1
args.insert(1, [ch[x] for x in f])
elif m is CBLinear:
c2 = args[0]
c1 = ch[f]
args = [c1, c2, *args[1:]]
elif m is CBFuse:
c2 = ch[f[-1]]
else:
c2 = ch[f]
m_ = nn.Sequential(*(m(*args) for _ in range(n))) if n > 1 else m(*args) # module
t = str(m)[8:-2].replace("__main__.", "") # module type
m.np = sum(x.numel() for x in m_.parameters()) # number params
m_.i, m_.f, m_.type = i, f, t # attach index, 'from' index, type
if verbose:
LOGGER.info(f"{i:>3}{str(f):>20}{n_:>3}{m.np:10.0f} {t:<45}{str(args):<30}") # print
save.extend(x % i for x in ([f] if isinstance(f, int) else f) if x != -1) # append to savelist
layers.append(m_)
if i == 0:
ch = []
ch.append(c2)
return nn.Sequential(*layers), sorted(save)
def yaml_model_load(path):
"""Load a YOLOv8 model from a YAML file."""
import re
path = Path(path)
if path.stem in (f"yolov{d}{x}6" for x in "nsmlx" for d in (5, 8)):
new_stem = re.sub(r"(\d+)([nslmx])6(.+)?$", r"\1\2-p6\3", path.stem)
LOGGER.warning(f"WARNING ⚠️ Ultralytics YOLO P6 models now use -p6 suffix. Renaming {path.stem} to {new_stem}.")
path = path.with_name(new_stem + path.suffix)
unified_path = re.sub(r"(\d+)([nslmx])(.+)?$", r"\1\3", str(path)) # i.e. yolov8x.yaml -> yolov8.yaml
yaml_file = check_yaml(unified_path, hard=False) or check_yaml(path)
d = yaml_load(yaml_file) # model dict
d["scale"] = guess_model_scale(path)
d["yaml_file"] = str(path)
return d
def guess_model_scale(model_path):
"""
Takes a path to a YOLO model's YAML file as input and extracts the size character of the model's scale. The function
uses regular expression matching to find the pattern of the model scale in the YAML file name, which is denoted by
n, s, m, l, or x. The function returns the size character of the model scale as a string.
Args:
model_path (str | Path): The path to the YOLO model's YAML file.
Returns:
(str): The size character of the model's scale, which can be n, s, m, l, or x.
"""
with contextlib.suppress(AttributeError):
import re
return re.search(r"yolov\d+([nslmx])", Path(model_path).stem).group(1) # n, s, m, l, or x
return ""
def guess_model_task(model):
"""
Guess the task of a PyTorch model from its architecture or configuration.
Args:
model (nn.Module | dict): PyTorch model or model configuration in YAML format.
Returns:
(str): Task of the model ('detect', 'segment', 'classify', 'pose').
Raises:
SyntaxError: If the task of the model could not be determined.
"""
def cfg2task(cfg):
"""Guess from YAML dictionary."""
m = cfg["head"][-1][-2].lower() # output module name
if m in {"classify", "classifier", "cls", "fc"}:
return "classify"
if m == "detect":
return "detect"
if m == "segment":
return "segment"
if m == "pose":
return "pose"
if m == "obb":
return "obb"
else:
return "detect"
# Guess from model cfg
if isinstance(model, dict):
with contextlib.suppress(Exception):
return cfg2task(model)
# Guess from PyTorch model
if isinstance(model, nn.Module): # PyTorch model
for x in "model.args", "model.model.args", "model.model.model.args":
with contextlib.suppress(Exception):
return eval(x)["task"]
for x in "model.yaml", "model.model.yaml", "model.model.model.yaml":
with contextlib.suppress(Exception):
return cfg2task(eval(x))
for m in model.modules():
if isinstance(m, Segment):
return "segment"
elif isinstance(m, Classify):
return "classify"
elif isinstance(m, Pose):
return "pose"
elif isinstance(m, OBB):
return "obb"
elif isinstance(m, (Detect, WorldDetect, Detect_DynamicHead)):
return "detect"
# Guess from model filename
if isinstance(model, (str, Path)):
model = Path(model)
if "-seg" in model.stem or "segment" in model.parts:
return "segment"
elif "-cls" in model.stem or "classify" in model.parts:
return "classify"
elif "-pose" in model.stem or "pose" in model.parts:
return "pose"
elif "-obb" in model.stem or "obb" in model.parts:
return "obb"
elif "detect" in model.parts:
return "detect"
# Unable to determine task from model
LOGGER.warning(
"WARNING ⚠️ Unable to automatically guess model task, assuming 'task=detect'. "
"Explicitly define task for your model, i.e. 'task=detect', 'segment', 'classify','pose' or 'obb'."
)
return "detect" # assume detect
2 报错处理
【报错】💔💔💔
ModuleNotFoundError: No module named 'mmcv'
【解决方法】💜💜💜
pip3 install mmcv -i https://pypi.tuna.tsinghua.edu.cn/simple
【测试】💚💚💚
python3 -c "from mmcv.ops import ModulatedDeformConv2d"
无报错说明安装成功。💕💕💕💕💕💕
到此,本文分享的内容就结束啦!遇见便是缘,感恩遇见!!!💛 💙 💜 ❤️ 💚