# Практическое задание

## 1 Работа с видео

### 1.1 Получение кадров из видео
Получать кадры из видео можно с использованием  `opencv` и  `ffmpeg`. Для начала применим `cv2.VideoCapture` - необходимо осуществить проход по всем кадрам видео и сохранить только те, которые соответствуют частоте кадров равной 5. Для этого потребуется узнать частоту кадров исходного видео с помощью метода [`cv2.VideoCapture.get`](https://docs.opencv.org/4.x/d8/dfe/classcv_1_1VideoCapture.html) и `enum` [`VideoCaptureProperties`](https://docs.opencv.org/4.x/d4/d15/group__videoio__flags__base.html).

In [None]:
import os

import cv2
import ffmpeg
import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision
import torchcodec
import torchvision.transforms.functional as TVF
from matplotlib import animation
from PIL import Image, ImageDraw

plt.rcParams["animation.html"] = "jshtml"
plt.rcParams["animation.embed_limit"] = 2**27

In [None]:
if not os.path.exists("road.mp4"):
    !curl -O 'https://courses.cv-gml.ru/storage/seminars/video-optical-flow/road.mp4'

In [None]:
cv_output_dir = "Road_cv_5"
os.makedirs(cv_output_dir, exist_ok=True)

cap = cv2.VideoCapture("road.mp4")
# video frame frate
frame_rate = cap.get(cv2.CAP_PROP_FPS)
# video save frequency
save_rate = frame_rate // 5
index = 0
while cap.isOpened():
    # current frame number
    frame_id = cap.get(cv2.CAP_PROP_POS_FRAMES)
    # current frame
    ret, frame = cap.read()

    if not ret:
        break
    if not frame_id % save_rate:
        filename = f"{cv_output_dir}/image_{index:04d}.jpg"
        index += 1
        frame = TVF.to_pil_image(frame)
        w, h = frame.size
        frame = frame.resize((1080, h * 1080 // w))
        frame.save(filename)
cap.release()

Более удобным способом работы с видео является использование `ffmpeg-python`. Для этого вам потребуется установленный в системе `ffmpeg`. Используя метод `filter`, конвертируйте видео с параметрами [`fps=5`](https://ffmpeg.org/ffmpeg-filters.html#fps-1) и [`width=1080`](https://ffmpeg.org/ffmpeg-filters.html#scale-1) (сохранив исходное соотношение сторон). В случае возникновения ошибок рекомедуется отключить `loglevel='error'` и `quiet=True`.

In [None]:
ffmpeg_output_dir = "Road_ffmpeg_5"
os.makedirs(ffmpeg_output_dir, exist_ok=True)
(
    ffmpeg.input("road.mp4")
    .filter("fps", fps=5, round="up")  # fps
    .filter("scale", width=1080, height=-1)  # frame shape
    .output(f"{ffmpeg_output_dir}/image_%04d.jpg", start_number=0, loglevel="error")
    .run(overwrite_output=True, quiet=True)
)

### 1.2 Создание видео из кадров

Попробуем создать видео c `fps=5` из сохраненных кадров, полученных в предыдущем пункте. Его продолжительность должна совпасть с оригинальным видео и равняться 30 секундам. 

In [None]:
(
    ffmpeg.input(f"{ffmpeg_output_dir}/*.jpg", pattern_type="glob", framerate=5)  # input video
    .output("road_5.mp4", loglevel="error")
    .run(overwrite_output=True, quiet=True)
)

### 1.3 Визуализация кадров
Визуализируем первые 15 кадров каждого видео, для этого также воспользуемся методом `filter`, который позволяет накладывать ограничения на номера возвращаемых кадров. Обратите внимание на способ получения через `select` с отрицательным индексом и `pipe`.

In [None]:
def get_video_frames(video_name, num_frames=15):
    probe = ffmpeg.probe(video_name)
    video_info = next(s for s in probe["streams"] if s["codec_type"] == "video")
    width = int(video_info["width"])
    height = int(video_info["height"])
    video, _ = (
        ffmpeg.input(video_name)
        .filter("select", f"gt(-n, -{num_frames})")
        .output("pipe:", format="rawvideo", pix_fmt="rgb24", loglevel="error")
        .run(capture_stdout=True)
    )
    video = np.frombuffer(video, np.uint8).reshape((-1, height, width, 3))
    return video


def plot_sequence_images(image_array):
    dpi = 72
    height, width = image_array[0].shape[:2]
    fig = plt.figure(figsize=(width / dpi, height / dpi), dpi=dpi)
    image = plt.figimage(image_array[0])

    def animate(i):
        image.set_array(image_array[i])
        return (image,)

    plot = animation.FuncAnimation(
        fig,
        animate,
        frames=len(image_array),
        cache_frame_data=False,
    )
    return plot

In [None]:
plot_sequence_images(get_video_frames("road.mp4"))

In [None]:
plot_sequence_images(get_video_frames("road_5.mp4"))

### 1.4 TorchCodec

In [None]:
dec = torchcodec.decoders.VideoDecoder("road.mp4")
dec.metadata

In [None]:
torchcodec_output_dir = "Road_torchcodec_5"
os.makedirs(torchcodec_output_dir, exist_ok=True)

for index, (frame,) in enumerate(
    torchcodec.samplers.clips_at_regular_timestamps(
        dec,
        # 5 fps -> 1/5 sec/frame
        seconds_between_clip_starts=1/5,
    )
):
    filename = f"{torchcodec_output_dir}/image_{index:04d}.jpg"
    frame = TVF.to_pil_image(frame.data)
    w, h = frame.size
    frame = frame.resize((1080, h * 1080 // w))
    frame.save(filename)

In [None]:
output_dir = torchcodec_output_dir

## 2 Оптический поток
Оптический поток – оценка видимого движения, представляющая собой сдвиг каждой точки между двумя изображениями.

<br/>

<center>
<div style="display: inline-block; vertical-align : middle;">
    <img src="https://courses.cv-gml.ru/storage/seminars/video-optical-flow/images/flow-arrows.jpg" width=480>
</div>
<div style="display: inline-block; vertical-align : middle;">

$$
\Huge
\quad \mathbf{I}_{a}\left(\mathbf{x}\right) \ \text{“}\!\!\!\!\!\!=\!\!\!\!\!\!\text{”}\  \mathbf{I}_{b}\left(\mathbf{x} + \mathbf{f}_{a \to b}\right)
$$

</div>
</center>

На практике оптический поток разделяется на 2 типа: __плотный__ (dense) и __разреженный__ (sparse). С помощью плотного оптического потока производится оценка смещения каждого пикселя изображения, в том время как с помощью разреженного оценивают только некоторые, заранее определенные точки, что значительно экономит ресурсы по сравнению с первым методом.

При визуализации оптический поток преобразуется в RGB по следующему правилу: направлению вектора соответствует Hue в цветовой модели HSV, а абсолютному значению Value.

<img src="https://courses.cv-gml.ru/storage/seminars/video-optical-flow/images/optical-flow-dense.jpg" width=1920 align="center">
<center>Плотный оптичеcкий поток</center>

Разреженный оптический поток можно визуализировать с помощью треков каждой точки.

<img src="https://courses.cv-gml.ru/storage/seminars/video-optical-flow/images/optical-flow-sparse.gif" width=960 align="center">
<center>Разреженный оптичеcкий поток</center>

### 2.1 Плотный оптический поток
Для оценки плотного оптического потока воспользуемся двумя подходами: классическим `cv2.calcOpticalFlowFarneback` и нейросетевым SOTA методом [RAFT: Recurrent All Pairs Field Transforms for Optical Flow](https://github.com/princeton-vl/RAFT). Попробуйте подобрать параметры метода `cv2.calcOpticalFlowFarneback` и визуализировать поток по аналогии с `RAFT`.

Обратите внимание, что при вызове `RAFT` на вход в качестве первого изображения подается второе и наоборот. Этот порядок необходимо применить и в `cv2` - оптической поток, получаемый таким способом, вместе с первым изображением может использоваться для генерации второго изображения.

In [None]:
if not os.path.exists("raft/"):
    !curl -O 'https://courses.cv-gml.ru/storage/seminars/video-optical-flow/raft.zip'
    !unzip -o raft.zip

In [None]:
from raft import get_flow_estimation, get_model, vis_flow

In [None]:
raft_network = get_model()
image_1 = Image.open(f"{output_dir}/image_0012.jpg")
image_2 = Image.open(f"{output_dir}/image_0013.jpg")

In [None]:
# estimate backward dense optical flow (2->1)
# using Recurrent All-Pairs Field Transforms (RAFT)
raft_flow = get_flow_estimation(
    raft_network,
    ...,
)

$$
\Huge
\quad \mathbf{f}_{a \to b} \neq -\mathbf{f}_{b \to a}
$$

<br/>

<center>
<div style="display: inline-block; vertical-align: middle;">
    <img src="https://courses.cv-gml.ru/storage/seminars/video-optical-flow/images/inv-flow-wrong.jpg" width=320>
</div>
<div style="display: inline-block; width: 20px;"></div>
<div style="display: inline-block; vertical-align: middle;">
    <img src="https://courses.cv-gml.ru/storage/seminars/video-optical-flow/images/flow-arrows.jpg" width=320>
</div>
<div style="display: inline-block; width: 20px;"></div>
<div style="display: inline-block; vertical-align: middle;">
    <img src="https://courses.cv-gml.ru/storage/seminars/video-optical-flow/images/inv-flow-right.jpg" width=320>
</div>
</center>

In [None]:
raft_flow_rgb, raft_flow_image_rgb = vis_flow(image_2, raft_flow)
Image.fromarray(raft_flow_image_rgb)

In [None]:
# grayscale version of image_1
image_1_gray = TVF.rgb_to_grayscale(image_1)

# grayscale version of image_2
image_2_gray = TVF.rgb_to_grayscale(image_2)

# estimate backward dense optical flow (2->1)
# using the Gunnar Farneback's algorithm
cv_flow = cv2.calcOpticalFlowFarneback(
    # note the backwards frame order
    prev=np.array(image_2_gray),
    next=np.array(image_1_gray),
    flow=None,
    pyr_scale=0.5,
    levels=10,
    winsize=15,
    iterations=3,
    poly_n=5,
    poly_sigma=1.0,
    flags=0,
)

In [None]:
# cv_flow visualization
cv_flow_rgb, cv_flow_image_rgb = vis_flow(image_2, cv_flow)
Image.fromarray(cv_flow_image_rgb)

<img src="https://courses.cv-gml.ru/storage/seminars/video-optical-flow/images/flow-field-color-coding.png" width=480 align="center">

Напишите функцию `generate_frame_from_flow`, которая принимает на вход первое изображение, обратный оптический поток $\left(\mathbf{f}_{2 \to 1}\right)$ и генерирует второе изображение при помощи `torch.nn.functional.grid_sample`.

На полученной визуализации у вас должно получиться, что каждая машина частично дублируется, так как оптический поток верно оценивается для объекта, а для "перекрытого" куска фона предсказывается нулевой поток - в итоге машина одновременно остается на прежнем месте, так и смещается по направлению движения.

In [None]:
def generate_frame_from_flow(image, flow):
    image = TVF.to_tensor(image)

    grid = torch.from_numpy(flow).to(torch.float32).clone()
    h, w = grid.shape[:2]

    # Relative flow -> Absolute coordinates
    ...

    # ([0; H], [0; W]) -> ([0; 1], ([0; 1])
    ...

    # [0; 1] -> [-1; 1]
    ...

    # Sample pixel values using the supplied grid
    new_image = ...

    return TVF.to_pil_image(new_image)

In [None]:
generate_frame_from_flow(image_1, raft_flow)

In [None]:
generate_frame_from_flow(image_1, cv_flow)

### 2.2 Трекинг с помощью оптического потока
Оптический поток может использоваться для задачи трекинга объектов в видеопотоке с низкой частотой кадров. В этом случае классические методы сопоставления bounding box объектов по IoU работают плохо, так как bounding box почти не пересекаются. Визуализируем bbox для двух кадров, соответствующих `fps=2.5`.


In [None]:
def vis_bbox(image, bboxes, colors=(0, 0, 100)):
    if not isinstance(bboxes, list):
        bboxes = [bboxes]
    if not isinstance(colors, list):
        colors = [colors] * len(bboxes)

    image = image.copy()
    draw = ImageDraw.Draw(image)
    for bbox, color in zip(bboxes, colors):
        x, y, w, h = bbox
        draw.rectangle([(x, y), (x + w, y + h)], outline=color, width=5)

    return image

In [None]:
image_1 = Image.open(f"{output_dir}/image_0012.jpg")
image_2 = Image.open(f"{output_dir}/image_0014.jpg")

In [None]:
# estimate forward dense optical flow (1->2)
# using Recurrent All-Pairs Field Transforms (RAFT)
raft_flow = get_flow_estimation(
    raft_network,
    ...,
)

In [None]:
# bbox: (x, y, w, h)
bbox_1 = [(445, 335, 100, 100), (570, 480, 120, 130)]
color_1 = (100, 0, 0)
bbox_2 = [(475, 270, 90, 90), (590, 350, 100, 110)]
color_2 = (0, 0, 100)
Image.fromarray(
    np.hstack(
        (
            vis_bbox(image_1, bbox_1, color_1),
            vis_bbox(image_2, bbox_2, color_2),
            vis_bbox(image_2, bbox_1 + bbox_2, 2 * [color_1] + 2 * [color_2]),
        )
    )
)

Реализуйте следующий алгоритм: для каждого bbox первого кадра вычислите средний оптический поток внутри bbox по двум направлениям, затем сместите bbox на полученный поток. В результате, исходные bbox второго кадра и смещенные bbox первого кадра будут иметь высокий IoU.

In [None]:
def bbox_apply_flow(bbox, flow):
    ...
    # new bbox after applying optical flow
    return ...

In [None]:
color_1_flow = (0, 100, 0)
bbox_1_flow = [bbox_apply_flow(bbox, raft_flow) for bbox in bbox_1]
vis_bbox(
    image_2,
    bbox_1 + bbox_2 + bbox_1_flow,
    colors=2 * [color_1] + 2 * [color_2] + 2 * [color_1_flow],
)

### Часть 2.3 Разреженный оптический поток
В методах разреженного оптического потока выбирается набор пикселей для трекинга и для их сопоставления между кадрами используются вычисленные признаки. Применим детектор углов Ши-Томаси `cv2.goodFeaturesToTrack` для выбора точек и метод  Лукаса-Канаде `cv2.calcOpticalFlowPyrLK` для расчета оптического потока.

In [None]:
def vis_points(image, points, color=(0, 100, 0)):
    image = image.copy()
    draw = ImageDraw.Draw(image)
    for point in points:
        draw.circle(point, radius=5, fill=color)
    return image


def vis_line(image, points_1, points_2, color=(0, 0, 100)):
    image = image.copy()
    draw = ImageDraw.Draw(image)
    for point_1, point_2 in zip(points_1, points_2):
        line = np.concatenate([point_1, point_2])
        draw.line(line, color, 2)
    return image

In [None]:
image_1 = Image.open(f"{output_dir}/image_0012.jpg")
image_2 = Image.open(f"{output_dir}/image_0013.jpg")
image_1_gray = TVF.to_grayscale(image_1)
image_2_gray = TVF.to_grayscale(image_2)

Выберите набор пикселей для трекинга с первого изображения, используя `cv2.goodFeaturesToTrack`. Оставьте точки, которые содержатся в одном из двух bbox из предыдущего задания (переменная `bbox_1`). Подберите оптимальные параметры.

In [None]:
# select points with cv2.goodFeaturesToTrack
points_1 = cv2.goodFeaturesToTrack(
    image=np.array(image_1_gray),
    maxCorners=300,
    qualityLevel=0.2,
    minDistance=2,
    blockSize=7,
)

# keep points inside any bbox in bbox_1
points_1 = np.array(
    [
        ...
    ]
)

In [None]:
vis_points(image_1, points_1)

C помощью функции `cv2.calcOpticalFlowPyrLK`, которая принимает на вход два изображения, а также полученные в предыдущем пункте точки, вычислите новые точки для второго изображения. Оставьте точки из `points_1` и `point_2`, которые соответствуют маске `status == 1`. Подберите оптимальные параметры.

In [None]:
# estimate the sparse forward optical flow (1->2) at selected points
# using the pyramidal iterative Lucas-Kanade method
points_2, status, _ = cv2.calcOpticalFlowPyrLK(
    prevImg=np.array(image_1_gray),
    nextImg=np.array(image_2_gray),
    prevPts=points_1,
    nextPts=None,
    winSize=(50, 50),
    maxLevel=25,
    criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03),
)

# filter points_1
points_1 = points_1[status[:, 0] == 1]

# filter points_2
points_2 = points_2[status[:, 0] == 1]

In [None]:
vis_line(image_2, points_1, points_2)

---