一、系统概述
该系统基于计算机视觉技术,实现对视频或摄像头画面中的人员进行检测、跟踪,并生成轨迹数据。支持透视变换校准(鸟瞰图显示)、多目标跟踪、轨迹存储及视频录制功能,适用于安防监控、行为分析等场景。
二、依赖库
python
运行
import cv2 # 计算机视觉处理(OpenCV库)
import numpy as np # 数值计算
import time # 时间处理
import os # 文件与目录操作
from datetime import datetime # 日期时间处理
三、类定义:PersonTracker
3.1 构造函数 __init__
功能
初始化人员跟踪器,配置视频源、输出参数、背景减除器及跟踪参数。
参数说明
参数名 | 类型 | 默认值 | 描述 |
---|---|---|---|
video_source | int/str | 0 | 视频源(0 为默认摄像头,或指定视频文件路径) |
save_video | bool | False | 是否保存处理后的视频 |
show_warped | bool | True | 是否显示透视变换后的鸟瞰图 |
内部属性
- 视频源与基础参数:
cap
:视频捕获对象(cv2.VideoCapture
实例)frame_width
/frame_height
:视频帧宽高fps
:帧率
- 输出配置:
output_folder
:输出文件夹(默认output
)out
:视频写入对象(cv2.VideoWriter
实例,仅当save_video=True
时创建)
- 背景减除:
fgbg
:使用MOG2
算法的背景减除器,支持阴影检测
- 跟踪参数:
min_contour_area
/max_contour_area
:过滤轮廓的面积阈值(单位像素)trajectories
:存储轨迹的字典(键为人员 ID,值为轨迹信息)max_disappeared_frames
:允许目标消失的最大帧数(超过则删除轨迹)max_distance
:轨迹匹配的最大距离(像素)
- 透视变换:
perspective_transform
:透视变换矩阵(校准后生成)warped_width
/warped_height
:鸟瞰图尺寸(宽度固定 500,高度与原始帧一致)
3.2 方法列表
3.2.1 calibrate_perspective()
- 功能:通过鼠标点击选择 4 个点,校准透视变换矩阵,生成鸟瞰图。
- 操作说明:
- 显示视频第一帧,按顺序点击左上、右上、右下、左下四个点,形成矩形区域。
- 按
q
键退出校准。
- 返回值:
bool
(True
为校准成功,False
为取消或失败)
3.2.2 detect_persons(frame)
- 功能:在输入帧中检测人员,返回检测结果和二值化掩码。
- 输入:
frame
(BGR 格式图像) - 处理流程:
- 应用背景减除,生成前景掩码。
- 形态学操作(开运算 + 闭运算)去除噪声。
- 查找轮廓,过滤面积不符合阈值的轮廓。
- 计算每个轮廓的中心点和边界框。
- 返回值:
(persons, thresh)
,其中:persons
:检测到的人员列表(每个元素为字典,包含bbox
、center
、contour
、area
)thresh
:二值化掩码图像
3.2.3 track_persons(detected_persons)
- 功能:根据检测结果更新人员轨迹。
- 输入:
detected_persons
(detect_persons
返回的人员列表) - 算法逻辑:
- 计算现有轨迹与新检测的匹配距离(欧氏距离),优先匹配近距离目标。
- 未匹配的轨迹:若连续消失超过
max_disappeared_frames
,则删除。 - 未匹配的检测:创建新轨迹,分配唯一 ID。
3.2.4 draw_results(frame, persons, thresh)
- 功能:在图像上绘制检测框、轨迹、ID 及统计信息,支持鸟瞰图显示。
- 输入:
frame
:原始帧persons
:检测到的人员列表thresh
:二值化掩码(未使用,仅保留接口)
- 输出:绘制后的结果图像(若
show_warped=True
,则为原始帧与鸟瞰图的横向拼接图)
3.2.5 save_trajectories()
- 功能:将当前所有轨迹数据保存到文本文件,包含 ID、起始时间、轨迹点坐标等。
- 存储路径:
output_folder/trajectories_时间戳.txt
3.2.6 run()
- 功能:运行跟踪主循环,处理视频流并实时显示结果。
- 操作说明:
- 按
q
键退出程序。 - 按
s
键保存当前轨迹数据。
- 按
- 流程:
- 调用
calibrate_perspective()
进行透视校准(可选)。 - 逐帧读取视频,检测、跟踪人员,绘制结果。
- 释放资源并关闭窗口。
- 调用
四、主程序入口
python
运行
if __name__ == "__main__":
tracker = PersonTracker(
video_source=0, # 0为摄像头,或指定视频文件路径(如"video.mp4")
save_video=True, # 启用视频录制
show_warped=True # 显示鸟瞰图
)
tracker.run()
五、使用说明
5.1 环境配置
- 安装依赖库:
bash
pip install opencv-python numpy
- 确保摄像头或视频文件可用。
5.2 透视校准操作
- 运行程序后,会弹出窗口提示选择 4 个点。
- 按顺序点击视频中的矩形区域四角(如地面区域),生成鸟瞰图。
- 校准完成后,右侧会显示鸟瞰图中的轨迹。
5.3 输出文件
- 视频文件:若
save_video=True
,生成output/tracking_时间戳.avi
。 - 轨迹文件:按
s
键生成output/trajectories_时间戳.txt
,包含各 ID 的坐标序列。
六、参数调整建议
参数名 | 作用 | 调整场景 |
---|---|---|
min_contour_area | 过滤小目标(如噪声) | 目标较小时调小,反之调大 |
max_contour_area | 过滤大目标(如多人重叠) | 目标较大时调大,反之调小 |
max_disappeared_frames | 目标消失后保留轨迹的帧数 | 目标运动间隔较长时调大 |
max_distance | 轨迹匹配的最大允许距离 | 目标运动速度快时调大 |
warped_width | 鸟瞰图宽度 | 显示区域宽窄调整 |
七、注意事项
- 背景减除器
MOG2
需要一定时间学习背景(前几秒可能检测不稳定)。 - 透视校准的四点应选择实际场景中的矩形区域(如地面边框),以确保鸟瞰图坐标准确。
- 若视频帧率较低,可尝试降低
warped_width
或关闭show_warped
以减少计算量。
完成代码
import cv2
import numpy as np
import time
import os
from datetime import datetime
class PersonTracker:
def __init__(self, video_source=0, save_video=False, show_warped=True):
"""初始化人员跟踪器"""
# 视频源设置
self.video_source = video_source
self.cap = cv2.VideoCapture(video_source)
if not self.cap.isOpened():
raise ValueError("无法打开视频源", video_source)
# 获取视频的宽度、高度和帧率
self.frame_width = int(self.cap.get(3))
self.frame_height = int(self.cap.get(4))
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
# 输出设置
self.save_video = save_video
self.output_folder = "output"
self.show_warped = show_warped
# 创建输出文件夹
if not os.path.exists(self.output_folder):
os.makedirs(self.output_folder)
# 背景减除器
self.fgbg = cv2.createBackgroundSubtractorMOG2(
history=500, varThreshold=100, detectShadows=True)
# 人员检测参数
self.min_contour_area = 1000 # 最小轮廓面积
self.max_contour_area = 50000 # 最大轮廓面积
# 轨迹存储
self.trajectories = {} # 存储每个人的轨迹
self.next_person_id = 1 # 下一个可用的人员ID
self.max_disappeared_frames = 10 # 最大消失帧数
self.max_distance = 100 # 最大匹配距离
# 透视变换参数
self.perspective_transform = None
self.warped_width = 500
self.warped_height = self.frame_height # 与原始帧高度一致
# 录制设置
self.out = None
if save_video:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = os.path.join(self.output_folder, f"tracking_{timestamp}.avi")
fourcc = cv2.VideoWriter_fourcc(*'XVID')
self.out = cv2.VideoWriter(output_path, fourcc, self.fps,
(self.frame_width, self.frame_height))
def calibrate_perspective(self):
"""校准透视变换,创建鸟瞰图"""
print("请在图像中选择4个点,形成一个矩形区域,用于透视变换")
print("按顺序点击:左上、右上、右下、左下")
# 读取一帧用于选择点
ret, frame = self.cap.read()
if not ret:
print("无法读取视频帧")
return False
# 创建窗口并设置鼠标回调
cv2.namedWindow("选择透视变换点 (按 'q' 退出)")
points = []
def click_event(event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN:
points.append((x, y))
cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)
cv2.imshow("选择透视变换点 (按 'q' 退出)", frame)
cv2.setMouseCallback("选择透视变换点 (按 'q' 退出)", click_event)
# 显示图像并等待点击
cv2.imshow("选择透视变换点 (按 'q' 退出)", frame)
while len(points) < 4:
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
cv2.destroyAllWindows()
return False
cv2.destroyAllWindows()
# 定义目标矩形
src = np.float32(points)
dst = np.float32([
[0, 0],
[self.warped_width, 0],
[self.warped_width, self.warped_height],
[0, self.warped_height]
])
# 计算透视变换矩阵
self.perspective_transform = cv2.getPerspectiveTransform(src, dst)
return True
def detect_persons(self, frame):
"""检测图像中的人物"""
# 应用背景减除
fgmask = self.fgbg.apply(frame)
# 图像预处理
_, thresh = cv2.threshold(fgmask, 127, 255, cv2.THRESH_BINARY)
kernel = np.ones((5, 5), np.uint8)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=2)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel, iterations=3)
# 查找轮廓
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
persons = []
for contour in contours:
area = cv2.contourArea(contour)
if area < self.min_contour_area or area > self.max_contour_area:
continue
# 计算边界框
x, y, w, h = cv2.boundingRect(contour)
center = (int(x + w/2), int(y + h/2))
# 计算轮廓的中心点
M = cv2.moments(contour)
if M["m00"] != 0:
cX = int(M["m10"] / M["m00"])
cY = int(M["m01"] / M["m00"])
center = (cX, cY)
persons.append({
'bbox': (x, y, w, h),
'center': center,
'contour': contour,
'area': area
})
return persons, thresh
def track_persons(self, detected_persons):
"""跟踪检测到的人员"""
# 计算当前检测点与现有轨迹的距离
unmatched_tracks = list(self.trajectories.keys())
unmatched_detections = list(range(len(detected_persons)))
matches = []
# 计算所有可能的匹配
for track_id in self.trajectories:
trajectory = self.trajectories[track_id]
last_position = trajectory['positions'][-1]
min_distance = float('inf')
min_index = -1
for i, person in enumerate(detected_persons):
if i in unmatched_detections:
distance = np.sqrt((last_position[0] - person['center'][0])**2 +
(last_position[1] - person['center'][1])**2)
if distance < min_distance and distance < self.max_distance:
min_distance = distance
min_index = i
# 如果找到匹配
if min_index != -1:
matches.append((track_id, min_index, min_distance))
# 按距离排序,优先处理距离近的匹配
matches.sort(key=lambda x: x[2])
# 应用匹配
for match in matches:
track_id, detection_index, _ = match
if track_id in unmatched_tracks and detection_index in unmatched_detections:
# 更新轨迹
self.trajectories[track_id]['positions'].append(detected_persons[detection_index]['center'])
self.trajectories[track_id]['last_seen'] = 0
self.trajectories[track_id]['bbox'] = detected_persons[detection_index]['bbox']
# 从待匹配列表中移除
unmatched_tracks.remove(track_id)
unmatched_detections.remove(detection_index)
# 处理未匹配的轨迹
for track_id in unmatched_tracks:
self.trajectories[track_id]['last_seen'] += 1
if self.trajectories[track_id]['last_seen'] > self.max_disappeared_frames:
del self.trajectories[track_id]
# 处理未匹配的检测结果
for detection_index in unmatched_detections:
# 创建新轨迹
self.trajectories[self.next_person_id] = {
'positions': [detected_persons[detection_index]['center']],
'last_seen': 0,
'bbox': detected_persons[detection_index]['bbox'],
'start_time': time.time()
}
self.next_person_id += 1
def draw_results(self, frame, persons, thresh):
"""在图像上绘制检测和跟踪结果"""
output = frame.copy()
# 绘制检测到的人物
for person in persons:
x, y, w, h = person['bbox']
cv2.rectangle(output, (x, y), (x + w, y + h), (0, 255, 0), 2)
cv2.circle(output, person['center'], 5, (0, 0, 255), -1)
# 绘制轨迹
for track_id, trajectory in self.trajectories.items():
positions = trajectory['positions']
# 绘制轨迹线
for i in range(1, len(positions)):
cv2.line(output, positions[i-1], positions[i], (255, 0, 0), 2)
# 绘制轨迹点
for pos in positions:
cv2.circle(output, pos, 3, (255, 0, 0), -1)
# 绘制ID和轨迹长度
if len(positions) > 0:
last_pos = positions[-1]
cv2.putText(output, f"ID: {track_id}",
(last_pos[0] + 10, last_pos[1] - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
cv2.putText(output, f"Points: {len(positions)}",
(last_pos[0] + 10, last_pos[1]),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
# 显示统计信息
cv2.putText(output, f"Persons: {len(self.trajectories)}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
cv2.putText(output, f"FPS: {int(self.fps)}",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
# 创建结果显示窗口
if self.show_warped and self.perspective_transform is not None:
# 创建鸟瞰图
warped = cv2.warpPerspective(output, self.perspective_transform,
(self.warped_width, self.warped_height))
# 在鸟瞰图上绘制轨迹
for track_id, trajectory in self.trajectories.items():
positions = trajectory['positions']
for i in range(1, len(positions)):
# 将原始坐标转换为鸟瞰图坐标
pos1 = np.array([[positions[i-1][0], positions[i-1][1]]], dtype=np.float32).reshape(-1, 1, 2)
pos2 = np.array([[positions[i][0], positions[i][1]]], dtype=np.float32).reshape(-1, 1, 2)
warped_pos1 = cv2.perspectiveTransform(pos1, self.perspective_transform)[0][0]
warped_pos2 = cv2.perspectiveTransform(pos2, self.perspective_transform)[0][0]
cv2.line(warped, (int(warped_pos1[0]), int(warped_pos1[1])),
(int(warped_pos2[0]), int(warped_pos2[1])), (255, 0, 0), 2)
# 合并显示
combined = np.hstack((output, warped))
return combined
return output
def save_trajectories(self):
"""保存轨迹数据到文件"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = os.path.join(self.output_folder, f"trajectories_{timestamp}.txt")
with open(output_path, 'w') as f:
f.write("Person Trajectories\n")
f.write(f"Recorded on: {datetime.now()}\n\n")
for track_id, trajectory in self.trajectories.items():
f.write(f"Person ID: {track_id}\n")
f.write(f"Start Time: {time.ctime(trajectory['start_time'])}\n")
f.write(f"Duration: {time.time() - trajectory['start_time']:.2f} seconds\n")
f.write(f"Trajectory Points: {len(trajectory['positions'])}\n")
f.write("Positions:\n")
for pos in trajectory['positions']:
f.write(f" ({pos[0]}, {pos[1]})\n")
f.write("\n")
print(f"轨迹数据已保存到: {output_path}")
def run(self):
"""运行人员跟踪系统"""
# 首先进行透视校准
if not self.calibrate_perspective():
print("透视校准失败,使用原始视角")
print("开始人员跟踪...")
print("按 'q' 退出,按 's' 保存轨迹数据")
frame_count = 0
start_time = time.time()
while True:
ret, frame = self.cap.read()
if not ret:
break
# 计算实际帧率
frame_count += 1
if frame_count % 10 == 0:
elapsed_time = time.time() - start_time
self.fps = frame_count / elapsed_time
# 检测人员
persons, thresh = self.detect_persons(frame)
# 跟踪人员
self.track_persons(persons)
# 绘制结果
result = self.draw_results(frame, persons, thresh)
# 保存视频
if self.save_video:
self.out.write(result)
# 显示结果
cv2.imshow("人员轨迹跟踪系统 (按 'q' 退出,按 's' 保存轨迹)", result)
# 按键处理
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('s'):
self.save_trajectories()
# 释放资源
self.cap.release()
if self.out:
self.out.release()
cv2.destroyAllWindows()
print("人员跟踪系统已关闭")
# 主程序入口
if __name__ == "__main__":
# 创建人员跟踪器实例
tracker = PersonTracker(
video_source=0, # 0表示默认摄像头,也可以指定视频文件路径
save_video=True, # 是否保存视频
show_warped=True # 是否显示鸟瞰图
)
# 运行跟踪器
tracker.run()