# Обучение нейросетей: Продвинутые приемы

In [None]:
import os

import albumentations as A
import albumentations.pytorch.transforms
import lightning as L
import matplotlib.pyplot as plt
import numpy as np
import nvidia.dali as dali
import nvidia.dali.fn as fn
import nvidia.dali.plugin.pytorch as dali_pytorch
import nvidia.dali.types as types
import timm
import torch
import torch.nn.functional as F
import torchmetrics
import torchvision
from nvidia.dali import pipeline_def
from torch import nn
from torch.utils import data
from tqdm.auto import tqdm

# Increase these if figures appear small
plt.rcParams["figure.figsize"] = fx, fy = (14.08, 6.40)

# Not using `bfloat16` matrix multiplication for consistency
# You might get better performance without much precision loss
# by setting this to "medium" on some devices
torch.set_float32_matmul_precision("high")

In [None]:
def show_images(images, titles=[]):
    num = len(images)
    fig, axs = plt.subplots(nrows=1, ncols=num, squeeze=True, layout="constrained")
    axs = [axs] if num <= 1 else axs
    for i in range(num):
        ax = axs[i]
        ax.imshow(images[i])
        ax.axis("off")
        if titles != []:
            ax.set_title(titles[i])

    plt.show(fig)
    plt.close(fig)

In [None]:
MEAN = (0.4914, 0.4822, 0.4465)
STD = (0.2023, 0.1994, 0.2010)

BATCH_SIZE = 32
MAX_EPOCHS = 10
BASE_LR = 0.05

NUM_WORKERS = os.cpu_count()

## Данные

В качестве датасета будем использовать CIFAR10

In [None]:
def CIFAR10(**kwargs):
    kwargs.setdefault("root", "cifar10")
    return torchvision.datasets.CIFAR10(**kwargs)


# Download the dataset
CIFAR10(download=True);

In [None]:
dataset = CIFAR10(transform=None)

images = []
titles = []
rows, cols = 4, 8
for idx in range(rows * cols):
    image, label = dataset[idx]
    images.append(image)
    titles.append(f"Class {label} ({dataset.classes[label]})")

for _ in range(rows):
    show_images(
        [images.pop() for _ in range(cols)],
        [titles.pop() for _ in range(cols)],
    )

In [None]:
augmentations = [
    A.Affine(translate_percent=0.05, scale=(1.0, 1.05), rotate=15, p=0.5),
    A.RGBShift(r_shift_limit=15, g_shift_limit=15, b_shift_limit=15, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
]
common_transforms = [
    A.Normalize(mean=MEAN, std=STD),
    A.pytorch.transforms.ToTensorV2(),
]

MyTrainTransform = A.Compose(augmentations + common_transforms)
MyValidTransform = A.Compose(common_transforms)


def my_train_transform(image):
    return MyTrainTransform(image=np.array(image))["image"]


def my_valid_transform(image):
    return MyValidTransform(image=np.array(image))["image"]

## Модель

В качестве базовой модели возьмем ResNet-18. Есть несколько способов загрузить готовую архитектуру и веса.

### Torchvision Models

Список моделей можно посмотреть в [документации](https://pytorch.org/vision/stable/models.html).

In [None]:
def get_resnet_torchvision(num_classes, transfer=True):
    weights = torchvision.models.ResNet18_Weights.DEFAULT if transfer else None
    model = torchvision.models.resnet18(weights=weights)

    linear_size = model.fc.in_features
    model.fc = nn.Linear(linear_size, num_classes)

    return model

### Pytorch Image Models (aka `timm`)

Официальный репозиторий: [pytorch-image-models](https://github.com/huggingface/pytorch-image-models).

Выведем список моделей `resnet` из `timm`:

In [None]:
timm.list_models(filter="*resnet*18*", pretrained=True)

В списке одна и та же модель может появляться несколько раз с разными суффиксами. Эти суффиксы соответствуют разным процедурам обучения — например, `.a1_in1k` означает, что модель обучалась на датасете ImageNet-1k по схеме A1 из статьи [ResNet strikes back](https://arxiv.org/abs/2110.00476).

Замеры метрик, скоростей работы, потребления памяти и т.п. для различных моделей на различных девайсах можно посмотреть [в папке `results` в репозитории `timm`](https://github.com/huggingface/pytorch-image-models/tree/main/results).

In [None]:
def get_resnet_timm(num_classes, transfer=True):
    return timm.create_model(
        "resnet18.a1_in1k",
        pretrained=transfer,
        num_classes=num_classes,
    )

### Lightning-модуль

Напишем lightning-модуль для обучения/тестирования

In [None]:
class LightningCIFARClassifier(L.LightningModule):
    num_classes = 10

    def __init__(self, *, transfer=True, lr=BASE_LR, **kwargs):
        super().__init__(**kwargs)
        self.lr = lr
        self.transfer = transfer
        self.model = self.get_model()
        self.loss_fn = nn.CrossEntropyLoss()
        self.accuracy = torchmetrics.classification.Accuracy(
            task="multiclass",
            num_classes=self.num_classes,
        )

    def get_model(self):
        return get_resnet_torchvision(self.num_classes, self.transfer)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)

    def training_step(self, batch):
        return self._step(batch, "train")

    def validation_step(self, batch):
        return self._step(batch, "valid")

    def _step(self, batch, kind):
        x, y = batch
        p = self.model(x)

        loss = self.loss_fn(p, y)
        accs = self.accuracy(p.argmax(axis=-1), y)

        return self._log_metrics(loss, accs, kind)

    def _log_metrics(self, loss, accs, kind):
        metrics = {}
        if loss is not None:
            metrics[f"{kind}_loss"] = loss
        if accs is not None:
            metrics[f"{kind}_accs"] = accs
        self.log_dict(
            metrics,
            prog_bar=True,
            logger=True,
            on_step=kind == "train",
            on_epoch=True,
        )
        return loss

### Базовое обучение

In [None]:
def train_model(
    model,
    experiment_path,
    dl_train,
    dl_valid,
    max_epochs=MAX_EPOCHS,
    **trainer_kwargs,
):
    callbacks = [
        L.pytorch.callbacks.TQDMProgressBar(leave=True),
        L.pytorch.callbacks.LearningRateMonitor(),
        L.pytorch.callbacks.ModelCheckpoint(
            filename="{epoch}-{valid_accs:.3f}",
            monitor="valid_accs",
            mode="max",
            save_top_k=1,
            save_last=True,
        ),
    ]
    trainer = L.Trainer(
        callbacks=callbacks,
        max_epochs=max_epochs,
        default_root_dir=experiment_path,
        **trainer_kwargs,
    )
    trainer.fit(model, dl_train, dl_valid)

In [None]:
# NOTE: technically, CIFAR10(train=False) is the TEST set,
# not the validation set, but we use it here for simplicity


ds_train = CIFAR10(transform=my_train_transform, train=True)
ds_valid = CIFAR10(transform=my_valid_transform, train=False)

dl_train = data.DataLoader(
    ds_train,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
)
dl_valid = data.DataLoader(
    ds_valid,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
)

In [None]:
train_model(
    LightningCIFARClassifier(),
    "runs/basic",
    dl_train,
    dl_valid,
)

## Разморозка весов

Обучение нейросетей требует большого объема данных и много времени. Часто удобно использовать веса, заранее полученные при обучении на некотором большом датасете (ImageNet, OpenImages). В зависимости от размера нашего набора данных, можно тренировать разное количество слоев (начиная с выхода нейросети), а остальные "заморозить".
Использовать можно следующую схему:

![](https://courses.cv-gml.ru/storage/seminars/nn-training-advanced/freeze_methods.png)

Добавим возможность замораживать часть весов:

In [None]:
def get_frozen_resnet(num_classes, transfer=True, unfreeze="most"):
    resnet_model = get_resnet_torchvision(num_classes, transfer)

    unfreeze_params = {
        "last": -1,
        "most": -4,
        "full": 0,
    }

    assert unfreeze in unfreeze_params.keys()
    first_unfrozen = unfreeze_params[unfreeze]

    for child in list(resnet_model.children())[:first_unfrozen]:
        for param in child.parameters():
            param.requires_grad = False

    return resnet_model

In [None]:
class LightningCIFARClassifierFrozen(LightningCIFARClassifier):
    def __init__(self, *, unfreeze="most", **kwargs):
        self.unfreeze = unfreeze
        super().__init__(**kwargs)

    def get_model(self):
        return get_frozen_resnet(
            self.num_classes,
            self.transfer,
            self.unfreeze,
        )

Попробуем обучить 3 нейросети с разным числом "размороженных" слоев: только последний слой (last), часть слоев (most), все слои (full)

In [None]:
for unfreeze in ["last", "most", "full"]:
    train_model(
        LightningCIFARClassifierFrozen(unfreeze=unfreeze),
        f"runs/unfreeze_{unfreeze}",
        dl_train,
        dl_valid,
    )

## Обучение адапторов

В NLP крайне популярно семейство методов дообучения, основанных на добавлении в нейросеть адапторов. Базовым методом из данной категории является метод LoRA (Low-Rank Adaptation), предложенный в [данной статье](https://arxiv.org/pdf/2106.09685).

Идея подобных методов заключается в том, что вместо прямого обучения параметров нейросети $P$, можно обучать **добавки** к этим параметрам $P + \Delta$. Дальше, делается предположение, что эти добавки будут имет вид матриц низкого ранга и соответственно будут допускать разложение $\Delta = A \cdot B$, где $size(A) + size(B) \ll size(P)$.

<img src="https://courses.cv-gml.ru/storage/seminars/nn-training-advanced/lora.jpg" width="30%"/>

Данное семейство методов в CV тоже используются, но меньше чем в NLP (в основном - при дообучении супер больших трансформерных сетей).

## Warmup & Cosine Annealing LR

### Cosine Annealing Learning Rate

Метод из [оригинальной статьи](https://arxiv.org/abs/1608.03983v5) использует warm restart — циклическое уменьшение и возвращение к исходному значению шага градиентного спуска. Часто его используют только с одной итерацией уменьшения шага.

PyTorch предлагает соответственно `CosineAnnealingWarmRestarts` и `CosineAnnealingLR`:

In [None]:
def plot_lrs(scheduler_cls, show_num_steps, /, **kwargs):
    fig, ax = plt.subplots()

    p = torch.zeros(1, requires_grad=True)
    dummy_optimizer = torch.optim.Adam([p], lr=0.001)
    scheduler = scheduler_cls(dummy_optimizer, **kwargs)
    lrs = []
    for _ in range(show_num_steps):
        lrs.append(scheduler.get_last_lr())
        dummy_optimizer.step(), scheduler.step()

    ax.plot(lrs)

    ax.grid()
    ax.set_xlabel("Step")
    ax.set_ylabel("Learning rate")
    kwargs_str = ", ".join(f"{k}={v!r}" for k, v in kwargs.items())
    ax.set_title(f"{scheduler_cls.__name__}({kwargs_str})")

    plt.tight_layout()
    plt.show()


plot_lrs(torch.optim.lr_scheduler.CosineAnnealingLR, 1600, T_max=1600)
plot_lrs(torch.optim.lr_scheduler.CosineAnnealingWarmRestarts, 1600, T_0=400, T_mult=1)
plot_lrs(torch.optim.lr_scheduler.CosineAnnealingWarmRestarts, 1600, T_0=229, T_mult=2)

Также, идею warm restarts можно развить, сохраняя параметры в конце каждого из периодов обучения. Тогда, полученный набор моделей можно использовать как ансамбль или даже усреднять их веса для получения одной мета-модели. Данная техника называется Stochastic Weight Averaging и была предложена [в данной статье](https://arxiv.org/abs/1803.05407).

### Warm-up

Процесс дообучения может быть нестабилен в самом начале из-за больших значений функции потерь (из-за специфики самой задачи или из-за случайной инициализации последних слоев в нейросети). В подобных ситуациях попытки сразу начать процесс дообучения, используя большой стартовый learning rate, могут привести к тому, что сеть "забывает" часть предобученной информации или даже расходится.

Warm-up позволяет снизить влияние такого эффекта на начальных этапах. Если при обычном обучении мы бы использовали стартовый learning rate $\lambda_{start}$, то при использовании warm-up предлагается начинать обучение, используя learning rate $k \cdot \lambda_{start}\ \ (0 \lt k \ll 1)$, и затем в течение $n$ эпох линейно увеличивать его обратно до $\lambda_{start}$, после чего обучение продолжается так же, как обычно.

Период warm-up как правило длится одну эпоху.

Скомбинируем Warm-up и Cosine Annealing LR:

In [None]:
def warmup_then_cosine_annealing_lr(
    optimizer,
    start_factor,
    total_steps,
    warmup_duration,
):
    warmup = torch.optim.lr_scheduler.LinearLR(
        optimizer,
        start_factor=start_factor,
        end_factor=1.0,
        total_iters=warmup_duration,
    )
    cos_annealing = torch.optim.lr_scheduler.CosineAnnealingLR(
        optimizer,
        T_max=total_steps - warmup_duration,
    )
    warmup_then_cos_anneal = torch.optim.lr_scheduler.SequentialLR(
        optimizer,
        [warmup, cos_annealing],
        milestones=[warmup_duration],
    )
    return warmup_then_cos_anneal


plot_lrs(
    warmup_then_cosine_annealing_lr,
    1600,
    start_factor=0.1,
    total_steps=1600,
    warmup_duration=200,
)

In [None]:
class LightningCIFARClassifierWarmupCos(LightningCIFARClassifierFrozen):
    def configure_optimizers(self):
        optimizer = super().configure_optimizers()

        steps_per_epoch = len(dl_train)
        warmup_duration = 1 * steps_per_epoch
        total_steps = MAX_EPOCHS * steps_per_epoch

        scheduler = warmup_then_cosine_annealing_lr(
            optimizer,
            start_factor=0.0001,
            total_steps=total_steps,
            warmup_duration=warmup_duration,
        )

        lr_scheduler = {
            "scheduler": scheduler,
            "interval": "step",
            "frequency": 1,
        }
        return [optimizer], [lr_scheduler]

In [None]:
train_model(
    LightningCIFARClassifierWarmupCos(),
    "runs/warmup_cosine",
    dl_train,
    dl_valid,
)

## Регуляризующие аугментации

Попробуем использовать более сложную аугментацию из статьи [MixUp: Beyond Empirical Risk Minimization](https://arxiv.org/abs/1710.09412).

MixUp - взвешенное усреднение двух изображений с соответствующим изменением one-hot таргета. Брать пары изображений можно внутри одного батча.

![](https://courses.cv-gml.ru/storage/seminars/nn-training-advanced/mixup.png)

In [None]:
def mixup_data(x, y, alpha):
    lam = np.random.beta(alpha, alpha)

    batch_size = x.size()[0]
    index = torch.randperm(batch_size, device=x.device)

    mixed_x = lam * x + (1 - lam) * x[index, ...]
    y_a, y_b = y, y[index]

    return mixed_x, y_a, y_b, lam


def mixup_loss_fn(loss_fn, p, y_a, y_b, lam):
    loss_a = loss_fn(p, y_a)
    loss_b = loss_fn(p, y_b)

    loss = lam * loss_a + (1 - lam) * loss_b

    # Or alternatively for smooth data,
    # you can keep the loss function and
    # mix the labels in mixup_data instead
    #
    # mixed_y = lam * y_a + (1-lam) * y_b
    return loss

In [None]:
class LightningCIFARClassifierMixUp(LightningCIFARClassifierFrozen):
    def __init__(self, *, alpha=1.0, **kwargs):
        super().__init__(**kwargs)
        self.alpha = alpha

    # Mix-up in TRAIN ONLY
    def training_step(self, batch):
        x, y = batch

        x_mixed, y_a, y_b, lam = mixup_data(x, y, self.alpha)

        p = self.model(x_mixed)

        loss = mixup_loss_fn(self.loss_fn, p, y_a, y_b, lam)

        return self._log_metrics(loss, accs=None, kind="train")

In [None]:
train_model(
    LightningCIFARClassifierMixUp(),
    "runs/mixup",
    dl_train,
    dl_valid,
)

Другие похожие способы аугментаций:
* [CutMix](https://arxiv.org/abs/1905.04899)
* [FMix](https://arxiv.org/abs/2002.12047)

## Label smoothing

Модель машинного обучения называется "откалиброванной" (calibrated), если выдаваемая ею вероятность отражает ее качество. Пусть у нас есть 100 примеров, на каждый модель выдает вероятность 0.9. Тогда, если модель "откалибрована", то ровно 90 примеров должны быть классифицированы корректно.

Предсказания "откалиброванной" модели легче интерпретируются, к такой модели проще подбирать пороги и легче добавлять ее в ансамбль.

Чересчур "уверенная" модель (overconfident) склонна выдавать очень высокие скоры. Побороть этот эффект помогает label smoothing. Он может быть использован в связке с softmax+cross-enropy loss. Метод заключается в том, чтобы преобразовать таргет по следующей формуле:

$$y_{ls} = (1 - α) \cdot y_{hot}\ +\ α \cdot\frac{1}{K}$$

$y_{hot}$ - one-hot представление таргета, $K$ - число классов, $α$ - гиперпараметр, задающий силу сглаживания (обычному представлению таргета соответствует $α$ = 0)

In [None]:
class LabelSmoothingLoss(nn.Module):
    def __init__(self, num_classes, smoothing=0.0):
        assert 0 <= smoothing < 1

        super().__init__()
        self.smoothing = smoothing
        self.num_classes = num_classes

    def forward(self, p, y):

        with torch.no_grad():
            K = self.num_classes
            alpha = self.smoothing
            y_one_hot = F.one_hot(y, num_classes=K)

            smooth_y = (1 - alpha) * y_one_hot + (alpha / K)

        loss = F.cross_entropy(p, smooth_y)
        return loss

In [None]:
class LightningCIFARClassifierSmooth(LightningCIFARClassifierFrozen):
    def __init__(self, *, smoothing=0.5, **kwargs):
        super().__init__(**kwargs)
        self.loss_fn = LabelSmoothingLoss(
            num_classes=self.num_classes,
            smoothing=smoothing,
        )

In [None]:
train_model(
    LightningCIFARClassifierSmooth(),
    "runs/smoothing",
    dl_train,
    dl_valid,
)

## Способы увеличения размера батча

### (Automatic) Mixed Precision

Данный метод был изначально предложен в статье [Mixed Precision Training](https://arxiv.org/abs/1710.03740) от NVIDIA.

По умолчанию для хранения весов и выполнения операций используется тип `float32`. Использование типа `float16` может потенциально ускорить вычисления и снизить требования к объему и скорости памяти. Однако просто конвертировать все веса и входы в `float16` и проводить все вычисления в этом типе скорее всего не получится, т.к. точность и диапазон значений у типа `float16` значительно меньше, чем у `float32`.

<img src="https://courses.cv-gml.ru/storage/seminars/nn-training-advanced/fp_ranges.jpeg" width="50%"/>

Из-за ограниченного диапазона и точности типа `float16` всплывают **три** проблемы, исправлением которых и занимается метод Automatic Mixed Precision.

**Первая проблема**: При попытке использования обычных градиентных оптимизаторов для обновления `float16` параметров может получиться, что шаг градиентного спуска $\lambda\nabla L$ будет настолько мал по сравнению c $P$, что с учетом округления $P - \lambda\nabla L = P$ (то есть параметры не обновляются или обновляются плохо).

Основная идея Mixed Precision Training, исправляющая данную проблему, заключается в том, что при обучении можно хранить "истинные" (master) значения весов в `float32`, при вычислении прямого и обратного прохода временно конвертировать их в `float16`, а затем полученные `float16` градиенты обратно конвертировать в `float32` для обновления параметров.

<img src="https://courses.cv-gml.ru/storage/seminars/nn-training-advanced/amp_naive.png" style="width: 50%"/>

**Вторая проблема**: Даже если выполнять обновление параметров, используя тип `float32`, в прямом и обратном проходе нейросети все равно останутся некоторые операции, для которых потеря точности `float16` будет значительной.

В первую очередь это операции, которые в ходе своего вычисления выполняют редукцию (например, матричное умножение и свертка). В подобных операциях во время многократного суммирования будет накапливаться ошибка. К счастью, в Tensor Cores современных графических ускорителей реализуются специальные операции, позволяющие проводить основную часть вычислений в типе `float16`, а затем аккумулировать результат уже в типе `float32`.

<img src="https://courses.cv-gml.ru/storage/seminars/nn-training-advanced/tensor_core.png" style="width: 40%"/>

Кроме того, потери точности или переполнения могут возникать в обычных агрегирующих операциях (например, `.sum()` и `.mean()`), а также при использовании примитивных операций, которые сами по себе менее стабильны (например, `exp`, `log` и `1/x`). Подобные операции обычно встречаются в некоторых функциях активаций, в функциях потерь и в различных видах нормализации.

Для таких операций предлагается конвертировать `float16` входы/параметры в `float32`, проводить вычисления в расширенном типе, а затем возможно конвертировать их обратно в `float16`. В `PyTorch` уже реализован функционал, автоматически определяющий, в каком типе лучше всего проводить вычисления для большинства распространенных нейросетевых операций. Для этого вместо ручного приведения к нужным типам необходимо просто обернуть интересующие нас вызовы в контекстный менеджер `torch.autocast`.

В ситуациях, когда вы хотите самостоятельно реализовать какие-то сложные операции и более явно контролировать для них типы промежуточных вычислений, вы можете использовать декораторы `torch.amp.custom_fwd`/`custom_bwd` в классах `torch.autograd.Function` и контекстный менеджер `torch.autocast(enable=True/False)` для обычных вычислений. См. раздел [Autocast and Custom Autograd Functions](https://pytorch.org/docs/stable/notes/amp_examples.html#autocast-and-custom-autograd-functions) в документации Automatic Mixed Precision.

In [None]:
class LightningCIFARClassifierAutocast(LightningCIFARClassifierFrozen):
    def __init__(self, *, autocast_dtype=torch.float32, **kwargs):
        super().__init__(**kwargs)
        self.autocast_dtype = autocast_dtype

    def training_step(self, batch):
        x, y = batch

        with torch.autocast(
            device_type=self.device.type,
            dtype=self.autocast_dtype,
        ):
            # Dynamically mix fp32 and fp16
            # during the forward computation
            p = self.model(x)
            loss = self.loss_fn(p, y)

        accs = self.accuracy(p.argmax(axis=-1), y)
        return self._log_metrics(loss, accs, "train")

**Третья проблема**: Кроме проблем с погрешностями при вычислении градиентов может оказаться, что даже точно вычисленные градиенты имеют распределение, выходящее за границы диапазона чисел, представимых в типе `float16`.

 <img src="https://courses.cv-gml.ru/storage/seminars/nn-training-advanced/fp16_range_clipping_small.png" width="45%"/>

Обнуления/переполнения градиентов в `float16` можно избежать, если во время обратного распространения ошибки вычислять не истинные значения градиентов, а умноженные на какую-то константу $S$, а затем во время шага оптимизатора умножать уже сконвертированные в `float32` градиенты на $1/S$. Таким образом, `float16` градиенты вычисляются с использованием смещенного диапазона значений.

<img src="https://courses.cv-gml.ru/storage/seminars/nn-training-advanced/amp_scaled.png" width="68%"/>

При правильном выборе множителя $S$ подобное обучение может сходиться так же хорошо, как с использованием `float32`, но работая быстрее и используя меньше памяти.

<img src="https://courses.cv-gml.ru/storage/seminars/nn-training-advanced/amp_training_history.png" width="55%"/>

Тем не менее, не очень удобно подбирать правильное значение $S$ вручную. Для автоматизации процедуры поиска данного множителя было бы удобно уметь эффективно определять факт обнуления или переполнения в ходе вычислений с плавающей точкой. На практике числа с плавающей точкой IEEE 754 не предусматривают функционал для эффективного определения обнуления.

С другой стороны, для автоматического определения переполнения значений достаточно после обратного прохода проверить, есть ли среди вычисленных градиентов $\pm\infty$ или $\texttt{NaN}$. Тогда, используя данную информацию, можно реализовать следующий алгоритм:

1. Выполняем прямой проход используя описанные выше mixed precision вычисления.

2. Умножаем `loss` на множитель $S$ и выполняем обратный проход. В итоге получаем `float16` градиенты.

3. Если среди вычисленных градиентов есть $\pm\infty$ или $\texttt{NaN}$
    - Cчитаем, что значение $S$ было слишком большим — делим $S$ на 2, выкидываем посчитанные градиенты, пропускаем текущий батч и пробуем ещё раз.
   <br/><br/>

4. Если в градиентах не было $\pm\infty$ и $\texttt{NaN}$
    - Конвертируем градиенты в `float32`, умножаем их на $1/S$ и выполняем шаг оптимизации как при обычном обучении.
   <br/><br/>

5. Если $\pm\infty$ и $\texttt{NaN}$ не было уже много шагов подряд
    - Значит значение $S$ может быть слишком маленьким — пробуем умножить $S$ на 2 и смотрим, получатся ли $\pm\infty$ или $\texttt{NaN}$.

<img src="https://courses.cv-gml.ru/storage/seminars/nn-training-advanced/loss_scale.jpeg" width="55%"/>

В `PyTorch` данный функционал реализован в классе `torch.amp.GradScaler`:

```python
scaler = torch.amp.GradScaler()

for epoch in epochs:
    for x, y in data:
        optimizer.zero_grad()
        with torch.autocast(...):
            p = model(x)
        loss = loss_fn(p, y)

        # Scales loss.  Calls backward() on scaled loss to create scaled gradients.
        scaler.scale(loss).backward()

        # scaler.step() first unscales gradients of the optimizer's params.
        # If gradients don't contain infs/NaNs, optimizer.step() is then called,
        # otherwise, optimizer.step() is skipped.
        scaler.step(optimizer)

        # Updates the scale for next iteration.
        scaler.update()
```

В PyTorch Lightning типы для вычислений можно задать параметром `precision` в `Trainer`:

In [None]:
train_model(
    LightningCIFARClassifierAutocast(autocast_dtype=torch.float16),
    "runs/amp_lightning_fp16",
    dl_train,
    dl_valid,
    precision="16-mixed",
)

#### Тип bfloat16

Некоторые современные ускорители аппаратно поддерживают тип `bfloat16` (изначально [разработанный](https://cloud.google.com/blog/products/ai-machine-learning/bfloat16-the-secret-to-high-performance-on-cloud-tpus) Google). В `bfloat16` на экспоненту отводится больше бит, чем в `float16`, расширяя диапазон значений (ценой точности). Таким образом, при использовании `bfloat16` можно не делать масштабирование градиентов (не использовать `GradScaler`).

Однако стоит понимать, что эффективные операции для работы с данным типом доступны не на всех девайсах. В случае отсутствия аппаратной поддержки, вычисления в данном типе могут работать даже медленнее чем `float32`.

In [None]:
train_model(
    LightningCIFARClassifierAutocast(autocast_dtype=torch.bfloat16),
    "runs/amp_lightning_bf16",
    dl_train,
    dl_valid,
    precision="bf16-mixed",
)

Больше про Automatic Mixed Precision можно почитать
- NVIDIA
    - [Video Series: Mixed-Precision Training Techniques Using Tensor Cores for Deep Learning](https://developer.nvidia.com/blog/video-mixed-precision-techniques-tensor-cores-deep-learning/)
    - [Train With Mixed Precision](https://docs.nvidia.com/deeplearning/performance/mixed-precision-training/index.html)
    <br/><br/>
- PyTorch
    - [`amp` Package Documentation](https://pytorch.org/docs/stable/amp.html)
    - [Recipe](https://pytorch.org/tutorials/recipes/recipes/amp_recipe.html)
    - [AMP Examples](https://pytorch.org/docs/stable/notes/amp_examples.html)
    <br/><br/>
- Lightning
    - [N-Bit Precision](https://lightning.ai/docs/pytorch/stable/common/precision_intermediate.html)

### Gradient Accumulation

Пусть в GPU помещается батч размера $N$. Так как выполнение `backward()` не перезаписывает значения градиентов, а аккумулирует их (`+=`), можно выполнять шаг оптимизатора (и последующее обнуление градиентов) после каждого $k$-го батча размера $N$, эмулируя "аккумулированный" батч размера $k \cdot N$.

При этом обновление градиентного спуска с Gradient Accumulation **не будет** полностью эквивалентно обновлению при размере батча в $k \cdot N$ — например, из-за слоев типа `BatchNorm`, которые собирают статистики по исходному батчу размера $N$.

В PyTorch Lightning за Gradient Accumulation отвечает параметр `accumulate_grad_batches` в `Trainer`:

In [None]:
train_model(
    LightningCIFARClassifierFrozen(),
    "runs/accumulate_grad",
    dl_train,
    dl_valid,
    accumulate_grad_batches=4,
)

При использовании вместе с Automatic Mixed Precision, градиенты накапливаются в scaled-режиме (**до** приведения к исходному масштабу). [Пример в PyTorch](https://pytorch.org/docs/stable/notes/amp_examples.html#gradient-accumulation).

Также в Lightning можно использовать callback [`L.pytorch.callbacks.GradientAccumulationScheduler`](https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.callbacks.GradientAccumulationScheduler.html) для автоматического изменения количества аккумулированных батчей в ходе обучения.

### Multi-device обучение

В случае, если у вас есть доступ к нескольким графическим ускорителям (или даже нескольким машинам с ускорителями), то достаточно логичной является идея использовать все эти девайсы. Этот подход может позволить вам ускорить сам процесс обучения или даже увеличить размера батча (за счет того что каждый ускоритель/машина имеет свою память).

Самый простой способ использования нескольких девайсов - это просто одновременный запуск нескольких экспериментов (по одному эксперименту на девайс). К сожалению, данный подход "ускоряет" процесс проведения экспериментов, только амортизировано при проведении нескольких экспериментов и вообще не позволяет увеличить размер батча.

Для того чтобы использовать несколько девайсов или машин в ходе **одного** эксперимента, в PyTorch есть модуль [`torch.distributed`](https://docs.pytorch.org/docs/stable/distributed.html) с реализацией низкоуровневых операций для коммуникаций между девайсами и машинами, а также высокоуровневый класс [`torch.nn.parallel.DistributedDataParallel`](https://docs.pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html) (или "`DDP`").

Идея `DDP` является в "разбиении" каждого батча между несколькими девайсами. При этом на каждом девайсе будет храниться полная копия всех параметров модели. Следовательно, при обновлении параметров в ходе обучения требуется синхронизировать градиенты или параметры между всеми девайсами.

Кроме того, добавление `DDP` в обучение требует особой осторожности. В частности - при загрузке данных необходимо учитывать для какого девайса эти данные загружаются. Слои вроде `BatchNorm` или сложные функции потерь, сравнивающие разные примеры внутри каждого батча, могут требовать дополнительной логики для обмена данными между девайсами.

#### Sharded методы

Как было упомянуто выше, `DDP` предполагает разбиение батчей / данных между девайсами. При этом сами параметры модели дублируются между девайсами, а вычисления в `forward`/`backward` проводятся одновременно (для разных данных).

Естественно, это не единственный способ параллелизации вычислений между несколькими девайсами. В NLP крайне популярны методы параллелизации, делающие разбиение самой модели (и соответственно вычислений `forward`/`backward`). Подобные методы называются "Model Parallel" или "Sharded Parallel".

Самые актуальные в данный момент методы из этой категории:

- FSDP (Fully-Sharded Data Parallel)
    - [`torch.distributed.fsdp`](https://docs.pytorch.org/docs/stable/fsdp.html)
    - [`torch.distributed.fsdp.fully_shard`](https://docs.pytorch.org/docs/stable/distributed.fsdp.fully_shard.html)

<!-- -->

- Tensor Parallel
    - [`torch.distributed.tensor.parallel`](https://docs.pytorch.org/docs/stable/distributed.tensor.parallel.html)
    - [`torch.distributed.tensor.DTensor`](https://docs.pytorch.org/docs/stable/distributed.tensor.html#torch.distributed.tensor.DTensor)

В CV обычно используется простой `DDP`, а "sharded" методы используются намного реже. В отличие от NLP моделей, количество параметров в CV моделях обычно увеличивается медленнее чем вычислительная сложность `forward`/`backward` и соответственно "sharded" паралеллизм имеет меньше смысла.

#### В Lightning

В `lightning`, выше упомянутые методы (и многие другие) называются "стратегиями" и могут быть автоматически включены просто правильно оформив обучение и сконфигурировав `Trainer`.

### Gradient Checkpointing

Во время обучения бóльшую часть выделенной на GPU памяти занимает граф вычислений, который хранит выходные значения всех слоев нейросети. С помощью модуля `torch.utils.checkpoint` можно сохранять выходы только части слоев (например, каждого $k$-го слоя). Тогда во время работы `.backward()` от $i$-го чекпоинта до $(i-1)$-го потребуется выполнить `forward()` и (временно) сохранить выходы от $(i - 1)$-го чекпоинта до $i$-го.

Как итог, чекпоинтинг градиентов может уменьшить потребление памяти ценой выполнения дополнительных вычислений.

В отличие от NLP, данный прием не так часто используется на практике в CV.

Больше про этот прием можно узнать в [документации PyTorch](https://pytorch.org/docs/stable/checkpoint.html).

## Эффективная загрузка и аугментация данных на GPU

### NVIDIA DALI

[NVIDIA DALI](https://developer.nvidia.com/dali) — библиотека, которая позволяет проводить декодирование, аугментирование и прочую обработку данных на GPU. DALI может считывать изображения с диска из большого количества различных [форматов датасетов](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/operations/nvidia.dali.fn.readers.html). Если вашего формата нет в этом списке, то вы можете также использовать загрузчик [`external_source`](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/operations/nvidia.dali.fn.external_source.html) для определения собственной логики загрузки на Python.

CIFAR10 не очень большой датасет, так что `torchvision.datasets.CIFAR10` на самом деле загружает его целиком в память. В демонстративных целях давайте сохраним изображения CIFAR10 на диск в формате `.jpg` и запишем имена файлов и метки классов в `labels.txt`.

In [None]:
def save_CIFAR10_images(ds, root):
    os.makedirs(root, exist_ok=True)

    with open(f"{root}/labels.txt", "w") as f:
        for idx, (img, label) in tqdm(enumerate(ds), total=len(ds)):
            filename = f"{idx}.jpg"
            plt.imsave(f"{root}/{filename}", img)
            f.write(f"{filename} {label}\n")


if not os.path.exists("cifar10_images"):
    ds_train = CIFAR10(transform=None, train=True)
    ds_valid = CIFAR10(transform=None, train=False)

    save_CIFAR10_images(ds_train, "cifar10_images/train")
    save_CIFAR10_images(ds_valid, "cifar10_images/valid")

Далее, нам необходимо реализовать описание пайплайна для загрузки и предобработки данных.

Сохраненные таким образом файлы подходят для загрузки с помощью [`fn.readers.file`](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/operations/nvidia.dali.fn.readers.file.html#nvidia.dali.fn.readers.file). Обратите внимание, что DALI позволяет загружать сырые байты PNG/JPEG изображений прямо на GPU и затем декодировать их уже в памяти графического ускорителя.

В NVIDIA DALI довольно мало (относительно Albumentations) уже реализованных аугментаций, потому многие операторы нужно собирать из базовых функций.

In [None]:
@pipeline_def(enable_conditionals=True)
def get_pipeline(file_root, is_training):
    jpgs, labels = fn.readers.file(
        name="Reader",
        file_root=file_root,
        random_shuffle=is_training,
        file_list=f"{file_root}/labels.txt",
    )
    images = fn.decoders.image(jpgs, device="mixed")
    labels = labels.gpu()
    labels = fn.cast(labels, dtype=types.INT64)

    if is_training:
        rotate_limit, p = 15, 0.5
        if fn.random.uniform(range=(0, 1)) < p:
            angle = fn.random.uniform(range=(-rotate_limit, rotate_limit))
            images = fn.rotate(images, angle=angle, keep_size=True)

        r_shift_limit, g_shift_limit, b_shift_limit, p = 15, 15, 15, 0.5
        if fn.random.uniform(range=(0, 1)) < p:
            shifts = fn.random.uniform(range=(-1, 1), shape=(3,))
            shifts = shifts * (r_shift_limit, g_shift_limit, b_shift_limit)
            images = images + shifts
            images = dali.math.clamp(images, 0, 255)
            images = fn.cast(images, dtype=types.UINT8)

        brightness_limit, contrast_limit, p = 0.05, 0.05, 0.5
        if fn.random.uniform(range=(0, 1)) < p:
            brightness = fn.random.uniform(
                range=(1 - brightness_limit, 1 + brightness_limit),
            )
            contrast = fn.random.uniform(
                range=(1 - contrast_limit, 1 + contrast_limit),
            )
            images = fn.brightness_contrast(
                images,
                brightness=brightness,
                contrast=contrast,
            )

    images = (images / 255 - MEAN) / STD

    images = fn.transpose(images, perm=[2, 0, 1])

    return images, labels

In [None]:
def get_dali_loader(file_root, is_training, batch_size, num_workers):
    pipeline = get_pipeline(
        file_root=file_root,
        is_training=is_training,
        batch_size=batch_size,
        num_threads=num_workers,
        device_id=0,
    )
    policy = "DROP" if is_training else "PARTIAL"
    policy = getattr(dali_pytorch.LastBatchPolicy, policy)
    loader = dali_pytorch.DALIClassificationIterator(
        pipeline,
        reader_name="Reader",
        last_batch_policy=policy,
    )
    return loader


dl_train_dali = get_dali_loader("cifar10_images/train", True, BATCH_SIZE, NUM_WORKERS)
dl_valid_dali = get_dali_loader("cifar10_images/valid", False, BATCH_SIZE, NUM_WORKERS)

`DALIClassificationIterator` выдает батчи формата `[{"data": ..., "label": ...}]`. Для корректной обработки батчей обновим Lightning-модуль:

In [None]:
class LightningCIFARClassifierDALI(LightningCIFARClassifierFrozen):
    def _step(self, batch, kind):
        batch = self.prepare_dali_batch(batch)
        return super()._step(batch, kind)

    def prepare_dali_batch(self, batch):
        (batch,) = batch
        x = batch["data"]
        y = batch["label"].squeeze(-1)
        return (x, y)

In [None]:
train_model(
    LightningCIFARClassifierDALI(),
    "runs/dali",
    dl_train_dali,
    dl_valid_dali,
    num_sanity_val_steps=0,
)

Другие библиотеки для ускорения загрузки / обработки данных:

* [`torchvision.io.decode_jpeg`](https://docs.pytorch.org/vision/main/generated/torchvision.io.decode_jpeg.html) - декодирование `jpeg` файлов на GPU без NVIDIA DALI

<!-- -->

* [`GPUDirect`](https://developer.nvidia.com/gpudirect) и [`cuFile`](https://docs.nvidia.com/gpudirect-storage/api-reference-guide/index.html) - более эффективное чтение файлов с диска прямо на GPU (потенциально - вообще в обход CPU)

<!-- -->

* [`kornia`](https://kornia.github.io/) - реализация аугментаций как PyTorch операций (дифференцируемые и с поддержкой GPU)

---