跳转至

第 9 章 视频分析与理解

📚 章节概述

本章介绍视频分析与理解的核心技术,包括光流估计、目标跟踪、动作识别等。视频分析是计算机视觉的重要方向,广泛应用于安防监控、体育分析、人机交互等领域。

学习时间: 5-7 天 难度等级:⭐⭐⭐⭐ 前置知识:第 1-8 章

🎯 学习目标

完成本章后,你将能够: - 理解视频分析的特殊性 - 掌握光流估计和目标跟踪 - 了解动作识别技术 - 能够实现视频分析应用 - 完成视频分析项目


9.1 光流估计

9.1.1 Lucas-Kanade 光流

Python
import cv2
import numpy as np

def lucas_kanade_optical_flow(prev_frame, curr_frame):
    """Lucas-Kanade光流"""
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)

    # Shi-Tomasi角点检测
    prev_pts = cv2.goodFeaturesToTrack(prev_gray, maxCorners=100,
                                      qualityLevel=0.3, minDistance=7)
    if prev_pts is None:
        raise RuntimeError("上一帧没有检测到可跟踪角点,可尝试调低 qualityLevel 或更换帧")

    # Lucas-Kanade光流
    lk_params = dict(winSize=(15, 15), maxLevel=2,
                    criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

    next_pts, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, prev_pts, None, **lk_params)
    if next_pts is None or status is None:
        raise RuntimeError("Lucas-Kanade 光流计算失败")

    # 选择好的跟踪点
    good_prev = prev_pts[status == 1]
    good_next = next_pts[status == 1]

    return good_prev, good_next

# 可视化
def visualize_optical_flow(image, prev_pts, next_pts):
    result = image.copy()
    for prev, next in zip(prev_pts, next_pts):  # zip按位置配对
        prev = prev.ravel().astype(int)
        next = next.ravel().astype(int)
        cv2.arrowedLine(result, tuple(prev), tuple(next), (0, 255, 0), 2)
    return result

9.1.2 稠密光流( Farneback )

Python
def farneback_optical_flow(prev_frame, curr_frame):
    """Farneback稠密光流"""
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)

    # 计算光流
    flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None,
                                       0.5, 3, 15, 3, 5, 1.2, 0)

    # 可视化
    h, w = flow.shape[:2]  # 切片操作,取前n个元素
    fx, fy = flow[:, :, 0], flow[:, :, 1]
    mag, ang = cv2.cartToPolar(fx, fy)

    hsv = np.zeros((h, w, 3), dtype=np.uint8)
    hsv[..., 0] = ang * 180 / np.pi / 2
    hsv[..., 1] = 255
    hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)

    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
    return bgr

9.2 目标跟踪

9.2.1 KCF 跟踪器

Python
def create_kcf_tracker():
    # OpenCV 4.x 中部分跟踪器位于 legacy 子模块
    if hasattr(cv2, 'legacy') and hasattr(cv2.legacy, 'TrackerKCF_create'):
        return cv2.legacy.TrackerKCF_create()
    if hasattr(cv2, 'TrackerKCF_create'):
        return cv2.TrackerKCF_create()
    raise RuntimeError('当前 OpenCV 未包含 KCF Tracker,请安装 opencv-contrib-python')


def kcf_tracking(video_path):
    """KCF 目标跟踪"""
    cap = cv2.VideoCapture(video_path)
    ret, frame = cap.read()
    if not ret:
        raise FileNotFoundError(f'无法读取视频: {video_path}')

    bbox = cv2.selectROI('Select ROI', frame, fromCenter=False, showCrosshair=True)
    cv2.destroyAllWindows()

    tracker = create_kcf_tracker()
    tracker.init(frame, bbox)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        success, bbox = tracker.update(frame)
        if success:
            x, y, w, h = [int(v) for v in bbox]
            cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

        cv2.imshow('Tracking', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

9.2.2 DeepSORT 跟踪

Python
from deep_sort_realtime.deepsort_tracker import DeepSort

def deepsort_tracking(video_path, detector):
    """DeepSORT 多目标跟踪。detector(frame) 需返回 [([l, t, w, h], conf, cls_name), ...]。"""
    cap = cv2.VideoCapture(video_path)
    tracker = DeepSort(max_age=30)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        detections = detector(frame)
        tracks = tracker.update_tracks(detections, frame=frame)

        for track in tracks:
            if not track.is_confirmed():
                continue
            track_id = track.track_id
            left, top, right, bottom = map(int, track.to_ltrb())
            cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
            cv2.putText(frame, f'ID: {track_id}', (left, top - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        cv2.imshow('DeepSORT', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

9.3 动作识别

9.3.1 3D CNN

Python
import torch
import torch.nn as nn

class C3D(nn.Module):
    def __init__(self, num_classes=101):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv3d(3, 64, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2)),
            nn.Conv3d(64, 128, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=2, stride=2),
            nn.Conv3d(128, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.Conv3d(256, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool3d((1, 4, 4)),
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 1 * 4 * 4, 512), nn.ReLU(inplace=True), nn.Dropout(0.5),
            nn.Linear(512, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        return self.classifier(x)

9.3.2 Two-Stream 网络

Python
from torchvision import models

class TwoStreamNetwork(nn.Module):
    def __init__(self, num_classes=101, flow_channels=20):
        super().__init__()

        self.spatial_stream = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        self.spatial_stream.fc = nn.Linear(2048, num_classes)

        self.temporal_stream = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        old_weight = self.temporal_stream.conv1.weight.data
        self.temporal_stream.conv1 = nn.Conv2d(flow_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.temporal_stream.conv1.weight.data = old_weight.mean(dim=1, keepdim=True).repeat(1, flow_channels, 1, 1)
        self.temporal_stream.fc = nn.Linear(2048, num_classes)

    def forward(self, rgb, flow):
        spatial_logits = self.spatial_stream(rgb)
        temporal_logits = self.temporal_stream(flow)
        return spatial_logits + temporal_logits

9.4 实战案例:视频目标检测与跟踪

Python
import cv2
from ultralytics import YOLO


def create_csrt_tracker():
    if hasattr(cv2, 'legacy') and hasattr(cv2.legacy, 'TrackerCSRT_create'):
        return cv2.legacy.TrackerCSRT_create()
    if hasattr(cv2, 'TrackerCSRT_create'):
        return cv2.TrackerCSRT_create()
    raise RuntimeError('当前 OpenCV 未包含 CSRT Tracker,请安装 opencv-contrib-python')


class VideoTracker:
    def __init__(self, detector_path='yolov8n.pt'):
        self.detector = YOLO(detector_path)
        self.tracker = None
        self.tracking = False

    def _reset_tracker(self, frame, bbox):
        self.tracker = create_csrt_tracker()
        self.tracker.init(frame, bbox)
        self.tracking = True

    def process_video(self, video_path, output_path):
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS) or 25
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame_count += 1

            if frame_count % 30 == 1 or not self.tracking:
                results = self.detector.predict(frame, conf=0.3, verbose=False)
                boxes = results[0].boxes
                if len(boxes) > 0:
                    best = boxes[boxes.conf.argmax().item()]
                    x1, y1, x2, y2 = best.xyxy[0].cpu().numpy().astype(int)
                    self._reset_tracker(frame, (x1, y1, x2 - x1, y2 - y1))
                else:
                    self.tracking = False

            if self.tracking and self.tracker is not None:
                success, bbox = self.tracker.update(frame)
                if success:
                    x, y, w, h = [int(v) for v in bbox]
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
                else:
                    self.tracking = False

            out.write(frame)
            cv2.imshow('Video Tracking', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        out.release()
        cv2.destroyAllWindows()

tracker = VideoTracker()
tracker.process_video('input.mp4', 'output.mp4')

9.5 练习题

基础题

  1. 简答题
  2. 光流估计的原理是什么?

    光流估计基于亮度恒定假设(同一像素在相邻帧中亮度不变)和小运动假设,通过计算相邻帧间像素的位移向量场来表示运动。经典方法包括 Lucas-Kanade (稀疏光流)和 Farneback (稠密光流)。

  3. 目标跟踪有哪些方法?

    主要方法:①相关滤波类( KCF 、 DCF ),速度快,利用循环矩阵加速;②深度学习类( SiamFC 、 SiamRPN ),利用孪生网络模板匹配;③多目标跟踪( SORT 、 DeepSORT ),结合检测+关联,使用卡尔曼滤波预测和外观特征匹配。

进阶题

  1. 编程题
  2. 实现一个简单的光流可视化。
  3. 使用 DeepSORT 进行多目标跟踪。

9.6 关键复盘

高频复盘题

Q1: 光流估计的原理是什么?

参考答案: - 基于亮度恒定假设 - 假设像素在短时间内移动很小 - Lucas-Kanade :稀疏光流 - Farneback :稠密光流

Q2: 目标跟踪和目标检测有什么区别?

参考答案: - 检测:每帧独立检测目标 - 跟踪:跨帧关联同一目标 - 跟踪更高效,但可能漂移 - 通常结合使用


9.7 本章小结

核心知识点

  1. 光流估计: Lucas-Kanade 、 Farneback
  2. 目标跟踪: KCF 、 DeepSORT
  3. 动作识别: 3D CNN 、 Two-Stream
  4. 视频分析:时序建模

下一步

下一章10-三维视觉.md - 学习 3D 视觉


恭喜完成第 9 章! 🎉

⚠️ 核验说明(2026-04-03):本页已完成逐段人工复核,并为稀疏光流示例补充了角点为空和跟踪失败的边界处理。若文中涉及外部模型、API、版本号、价格、部署依赖或第三方产品名称,请以官方文档、论文原文和实际运行环境为准。


最后更新日期: 2026-04-03