制备
为了了解正在发生的事情并最终解决我们的问题,我们首先需要对脚本进行一些改进。
我添加了对算法重要步骤的记录,对代码进行了一些重构,并添加了遮罩和已处理图像的保存,增加了使用单个框架图像运行脚本的功能,以及其他一些修改。
这是此刻的脚本:
import logging
import logging.handlers
import os
import time
import sys
import cv2
import numpy as np
from vehicle_counter import VehicleCounter
# ============================================================================
IMAGE_DIR = "images"
IMAGE_FILENAME_FORMAT = IMAGE_DIR + "/frame_%04d.png"
# Support either video file or individual frames
CAPTURE_FROM_VIDEO = False
if CAPTURE_FROM_VIDEO:
IMAGE_SOURCE = "traffic.avi" # Video file
else:
IMAGE_SOURCE = IMAGE_FILENAME_FORMAT # Image sequence
# Time to wait between frames, 0=forever
WAIT_TIME = 1 # 250 # ms
LOG_TO_FILE = True
# Colours for drawing on processed frames
DIVIDER_COLOUR = (255, 255, 0)
BOUNDING_BOX_COLOUR = (255, 0, 0)
CENTROID_COLOUR = (0, 0, 255)
# ============================================================================
def init_logging():
main_logger = logging.getLogger()
formatter = logging.Formatter(
fmt='%(asctime)s.%(msecs)03d %(levelname)-8s [%(name)s] %(message)s'
, datefmt='%Y-%m-%d %H:%M:%S')
handler_stream = logging.StreamHandler(sys.stdout)
handler_stream.setFormatter(formatter)
main_logger.addHandler(handler_stream)
if LOG_TO_FILE:
handler_file = logging.handlers.RotatingFileHandler("debug.log"
, maxBytes = 2**24
, backupCount = 10)
handler_file.setFormatter(formatter)
main_logger.addHandler(handler_file)
main_logger.setLevel(logging.DEBUG)
return main_logger
# ============================================================================
def save_frame(file_name_format, frame_number, frame, label_format):
file_name = file_name_format % frame_number
label = label_format % frame_number
log.debug("Saving %s as '%s'", label, file_name)
cv2.imwrite(file_name, frame)
# ============================================================================
def get_centroid(x, y, w, h):
x1 = int(w / 2)
y1 = int(h / 2)
cx = x + x1
cy = y + y1
return (cx, cy)
# ============================================================================
def detect_vehicles(fg_mask):
log = logging.getLogger("detect_vehicles")
MIN_CONTOUR_WIDTH = 21
MIN_CONTOUR_HEIGHT = 21
# Find the contours of any vehicles in the image
contours, hierarchy = cv2.findContours(fg_mask
, cv2.RETR_EXTERNAL
, cv2.CHAIN_APPROX_SIMPLE)
log.debug("Found %d vehicle contours.", len(contours))
matches = []
for (i, contour) in enumerate(contours):
(x, y, w, h) = cv2.boundingRect(contour)
contour_valid = (w >= MIN_CONTOUR_WIDTH) and (h >= MIN_CONTOUR_HEIGHT)
log.debug("Contour #%d: pos=(x=%d, y=%d) size=(w=%d, h=%d) valid=%s"
, i, x, y, w, h, contour_valid)
if not contour_valid:
continue
centroid = get_centroid(x, y, w, h)
matches.append(((x, y, w, h), centroid))
return matches
# ============================================================================
def filter_mask(fg_mask):
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
# Fill any small holes
closing = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel)
# Remove noise
opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)
# Dilate to merge adjacent blobs
dilation = cv2.dilate(opening, kernel, iterations = 2)
return dilation
# ============================================================================
def process_frame(frame_number, frame, bg_subtractor, car_counter):
log = logging.getLogger("process_frame")
# Create a copy of source frame to draw into
processed = frame.copy()
# Draw dividing line -- we count cars as they cross this line.
cv2.line(processed, (0, car_counter.divider), (frame.shape[1], car_counter.divider), DIVIDER_COLOUR, 1)
# Remove the background
fg_mask = bg_subtractor.apply(frame, None, 0.01)
fg_mask = filter_mask(fg_mask)
save_frame(IMAGE_DIR + "/mask_%04d.png"
, frame_number, fg_mask, "foreground mask for frame #%d")
matches = detect_vehicles(fg_mask)
log.debug("Found %d valid vehicle contours.", len(matches))
for (i, match) in enumerate(matches):
contour, centroid = match
log.debug("Valid vehicle contour #%d: centroid=%s, bounding_box=%s", i, centroid, contour)
x, y, w, h = contour
# Mark the bounding box and the centroid on the processed frame
# NB: Fixed the off-by one in the bottom right corner
cv2.rectangle(processed, (x, y), (x + w - 1, y + h - 1), BOUNDING_BOX_COLOUR, 1)
cv2.circle(processed, centroid, 2, CENTROID_COLOUR, -1)
log.debug("Updating vehicle count...")
car_counter.update_count(matches, processed)
return processed
# ============================================================================
def main():
log = logging.getLogger("main")
log.debug("Creating background subtractor...")
bg_subtractor = cv2.BackgroundSubtractorMOG()
log.debug("Pre-training the background subtractor...")
default_bg = cv2.imread(IMAGE_FILENAME_FORMAT % 119)
bg_subtractor.apply(default_bg, None, 1.0)
car_counter = None # Will be created after first frame is captured
# Set up image source
log.debug("Initializing video capture device #%s...", IMAGE_SOURCE)
cap = cv2.VideoCapture(IMAGE_SOURCE)
frame_width = cap.get(cv2.cv.CV_CAP_PROP_FRAME_WIDTH)
frame_height = cap.get(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT)
log.debug("Video capture frame size=(w=%d, h=%d)", frame_width, frame_height)
log.debug("Starting capture loop...")
frame_number = -1
while True:
frame_number += 1
log.debug("Capturing frame #%d...", frame_number)
ret, frame = cap.read()
if not ret:
log.error("Frame capture failed, stopping...")
break
log.debug("Got frame #%d: shape=%s", frame_number, frame.shape)
if car_counter is None:
# We do this here, so that we can initialize with actual frame size
log.debug("Creating vehicle counter...")
car_counter = VehicleCounter(frame.shape[:2], frame.shape[0] / 2)
# Archive raw frames from video to disk for later inspection/testing
if CAPTURE_FROM_VIDEO:
save_frame(IMAGE_FILENAME_FORMAT
, frame_number, frame, "source frame #%d")
log.debug("Processing frame #%d...", frame_number)
processed = process_frame(frame_number, frame, bg_subtractor, car_counter)
save_frame(IMAGE_DIR + "/processed_%04d.png"
, frame_number, processed, "processed frame #%d")
cv2.imshow('Source Image', frame)
cv2.imshow('Processed Image', processed)
log.debug("Frame #%d processed.", frame_number)
c = cv2.waitKey(WAIT_TIME)
if c == 27:
log.debug("ESC detected, stopping...")
break
log.debug("Closing video capture device...")
cap.release()
cv2.destroyAllWindows()
log.debug("Done.")
# ============================================================================
if __name__ == "__main__":
log = init_logging()
if not os.path.exists(IMAGE_DIR):
log.debug("Creating image directory `%s`...", IMAGE_DIR)
os.makedirs(IMAGE_DIR)
main()
该脚本负责处理图像流,并识别每一帧中的所有车辆-我将它们称为代码中的matches
。
计算检测到的车辆的任务委托给VehicleCounter
类。我选择上这门课的原因随着我们的进步而变得显而易见。我没有实现您的车辆计数算法,因为它无法工作,原因是随着我们对这一点的深入研究再次变得显而易见。
文件vehicle_counter.py
包含以下代码:
import logging
# ============================================================================
class VehicleCounter(object):
def __init__(self, shape, divider):
self.log = logging.getLogger("vehicle_counter")
self.height, self.width = shape
self.divider = divider
self.vehicle_count = 0
def update_count(self, matches, output_image = None):
self.log.debug("Updating count using %d matches...", len(matches))
# ============================================================================
最后,我编写了一个脚本,将所有生成的图像拼接在一起,因此更容易检查它们:
import cv2
import numpy as np
# ============================================================================
INPUT_WIDTH = 160
INPUT_HEIGHT = 120
OUTPUT_TILE_WIDTH = 10
OUTPUT_TILE_HEIGHT = 12
TILE_COUNT = OUTPUT_TILE_WIDTH * OUTPUT_TILE_HEIGHT
# ============================================================================
def stitch_images(input_format, output_filename):
output_shape = (INPUT_HEIGHT * OUTPUT_TILE_HEIGHT
, INPUT_WIDTH * OUTPUT_TILE_WIDTH
, 3)
output = np.zeros(output_shape, np.uint8)
for i in range(TILE_COUNT):
img = cv2.imread(input_format % i)
cv2.rectangle(img, (0, 0), (INPUT_WIDTH - 1, INPUT_HEIGHT - 1), (0, 0, 255), 1)
# Draw the frame number
cv2.putText(img, str(i), (2, 10)
, cv2.FONT_HERSHEY_PLAIN, 0.7, (255, 255, 255), 1)
x = i % OUTPUT_TILE_WIDTH * INPUT_WIDTH
y = i / OUTPUT_TILE_WIDTH * INPUT_HEIGHT
output[y:y+INPUT_HEIGHT, x:x+INPUT_WIDTH,:] = img
cv2.imwrite(output_filename, output)
# ============================================================================
stitch_images("images/frame_%04d.png", "stitched_frames.png")
stitch_images("images/mask_%04d.png", "stitched_masks.png")
stitch_images("images/processed_%04d.png", "stitched_processed.png")
分析
为了解决这个问题,我们应该对期望得到的结果有所了解。我们还应该在视频中标记所有不同的汽车,以便于谈论它们。
如果我们运行脚本并将图像拼接在一起,则会获得许多有用的文件来帮助我们分析问题:
- 运行的调试日志 。
在检查了这些之后,许多问题变得显而易见:
- 前景蒙版往往很吵。我们应该做一些过滤(腐蚀/膨胀?)以消除噪音和狭窄的间隙。
- 有时我们会错过车辆(灰色的)。
- 有些车辆在同一帧中被检测到两次。
- 在车架的上部区域很少检测到车辆。
- 通常在连续的帧中检测到同一车辆。我们需要找出一种在连续的帧中跟踪同一辆车辆并仅对其计数一次的方法。
解
1.预播背景减法器
我们的视频很短,只有120帧。学习率为0.01
,背景检测器需要稳定视频的大部分内容。
幸运的是,视频的最后一帧(帧号119)完全没有车辆,因此我们可以将其用作初始背景图像。 (在注释和注释中提到了获得合适图像的其他选项。)
要使用此初始背景图像,我们只需加载它,然后apply
其apply
到学习因子为1.0
的背景减法器上:
bg_subtractor = cv2.BackgroundSubtractorMOG()
default_bg = cv2.imread(IMAGE_FILENAME_FORMAT % 119)
bg_subtractor.apply(default_bg, None, 1.0)
当我们查看新的蒙版马赛克时,我们可以看到噪音减少了,并且车辆检测在早期帧中效果更好。
2.清洁前景面具
一种改进前景掩膜的简单方法是应用一些形态转换 。
def filter_mask(fg_mask):
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
# Fill any small holes
closing = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel)
# Remove noise
opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)
# Dilate to merge adjacent blobs
dilation = cv2.dilate(opening, kernel, iterations = 2)
return dilation
通过检查遮罩 ,已处理的帧和通过过滤生成的日志文件 ,我们可以看到,我们现在可以更可靠地检测车辆,并减轻了将一辆车的不同部分检测为单独对象的问题。
3.在框架之间跟踪车辆
此时,我们需要浏览日志文件,并收集每辆车的所有质心坐标。这将使我们能够绘制和检查每个车辆在图像上的追踪路径,并开发一种算法来自动执行此操作。为了简化此过程,我们可以通过删除相关条目来创建减少的日志 。
重心坐标列表:
traces = {
'A': [(112, 36), (112, 45), (112, 52), (112, 54), (112, 63), (111, 73), (111, 86), (111, 91), (111, 97), (110, 105)]
, 'B': [(119, 37), (120, 42), (121, 54), (121, 55), (123, 64), (124, 74), (125, 87), (127, 94), (125, 100), (126, 108)]
, 'C': [(93, 23), (91, 27), (89, 31), (87, 36), (85, 42), (82, 49), (79, 59), (74, 71), (70, 82), (62, 86), (61, 92), (55, 101)]
, 'D': [(118, 30), (124, 83), (125, 90), (116, 101), (122, 100)]
, 'E': [(77, 27), (75, 30), (73, 33), (70, 37), (67, 42), (63, 47), (59, 53), (55, 59), (49, 67), (43, 75), (36, 85), (27, 92), (24, 97), (20, 102)]
, 'F': [(119, 30), (120, 34), (120, 39), (122, 59), (123, 60), (124, 70), (125, 82), (127, 91), (126, 97), (128, 104)]
, 'G': [(88, 37), (87, 41), (85, 48), (82, 55), (79, 63), (76, 74), (72, 87), (67, 92), (65, 98), (60, 106)]
, 'H': [(124, 35), (123, 40), (125, 45), (127, 59), (126, 59), (128, 67), (130, 78), (132, 88), (134, 93), (135, 99), (135, 107)]
, 'I': [(98, 26), (97, 30), (96, 34), (94, 40), (92, 47), (90, 55), (87, 64), (84, 77), (79, 87), (74, 93), (73, 102)]
, 'J': [(123, 60), (125, 63), (125, 81), (127, 93), (126, 98), (125, 100)]
}
在背景上绘制的单个车辆轨迹:
所有车辆痕迹的组合放大图像:
向量
为了分析运动,我们需要处理矢量(即,移动的距离和方向)。下图显示了角度如何与图像中的车辆运动相对应。
我们可以使用以下函数来计算两点之间的向量:
def get_vector(a, b):
"""Calculate vector (distance, angle in degrees) from point a to point b.
Angle ranges from -180 to 180 degrees.
Vector with angle 0 points straight down on the image.
Values increase in clockwise direction.
"""
dx = float(b[0] - a[0])
dy = float(b[1] - a[1])
distance = math.sqrt(dx**2 + dy**2)
if dy > 0:
angle = math.degrees(math.atan(-dx/dy))
elif dy == 0:
if dx < 0:
angle = 90.0
elif dx > 0:
angle = -90.0
else:
angle = 0.0
else:
if dx < 0:
angle = 180 - math.degrees(math.atan(dx/dy))
elif dx > 0:
angle = -180 - math.degrees(math.atan(dx/dy))
else:
angle = 180.0
return distance, angle
分类
我们寻找可用于将运动分类为有效/无效的模式的一种方法是绘制散点图(角度与距离):
- 绿点代表有效的移动,我们使用每辆车的点列表确定。
- 红点表示无效运动-相邻行车道中各点之间的矢量。
- 我绘制了两条蓝色曲线,我们可以用它们来区分两种类型的运动。位于两条曲线下方的任何点都可以视为有效。曲线为:
-
distance = -0.008 * angle**2 + 0.4 * angle + 25.0
-
distance = 10.0
-
我们可以使用以下函数对运动矢量进行分类:
def is_valid_vector(a):
distance, angle = a
threshold_distance = max(10.0, -0.008 * angle**2 + 0.4 * angle + 25.0)
return (distance <= threshold_distance)
注意:存在一个异常值,这是由于我们在第43..48帧中松动了车辆D所致。
算法
我们将使用Vehicle
类来存储有关每个被跟踪车辆的信息:
- 某种标识符
- 职位清单,最近的职位
- 上次看到的计数器-自上次看到此车辆以来的帧数
- 标记以标记车辆是否被计数
VehicleCounter
类将存储当前跟踪的车辆列表,并跟踪总计数。在每帧上,我们将使用边界框列表和已识别车辆的位置(候选列表)来更新VehicleCounter
的状态:
- 更新当前跟踪的
Vehicle
:- 每辆车
- 如果给定车辆存在任何有效匹配项,请更新车辆位置并重置其最近看到的计数器。从候选列表中删除匹配项。
- 否则,请增加该车辆的最新计数器。
- 每辆车
- 为其余所有匹配项创建新的
Vehicle
- 更新车辆数量
- 每辆车
- 如果车辆经过分隔线并且尚未计数,请更新总计数并将车辆标记为已计数
- 每辆车
- 移除不再可见的车辆
- 每辆车
- 如果最后一次看到的计数器超过阈值,则卸下车辆
- 每辆车
4.解决方案
我们可以将主脚本与vehicle_counter.py
的最终版本一起使用,其中包含我们的计数算法的实现:
import logging
import math
import cv2
import numpy as np
# ============================================================================
CAR_COLOURS = [ (0,0,255), (0,106,255), (0,216,255), (0,255,182), (0,255,76)
, (144,255,0), (255,255,0), (255,148,0), (255,0,178), (220,0,255) ]
# ============================================================================
class Vehicle(object):
def __init__(self, id, position):
self.id = id
self.positions = [position]
self.frames_since_seen = 0
self.counted = False
@property
def last_position(self):
return self.positions[-1]
def add_position(self, new_position):
self.positions.append(new_position)
self.frames_since_seen = 0
def draw(self, output_image):
car_colour = CAR_COLOURS[self.id % len(CAR_COLOURS)]
for point in self.positions:
cv2.circle(output_image, point, 2, car_colour, -1)
cv2.polylines(output_image, [np.int32(self.positions)]
, False, car_colour, 1)
# ============================================================================
class VehicleCounter(object):
def __init__(self, shape, divider):
self.log = logging.getLogger("vehicle_counter")
self.height, self.width = shape
self.divider = divider
self.vehicles = []
self.next_vehicle_id = 0
self.vehicle_count = 0
self.max_unseen_frames = 7
@staticmethod
def get_vector(a, b):
"""Calculate vector (distance, angle in degrees) from point a to point b.
Angle ranges from -180 to 180 degrees.
Vector with angle 0 points straight down on the image.
Values increase in clockwise direction.
"""
dx = float(b[0] - a[0])
dy = float(b[1] - a[1])
distance = math.sqrt(dx**2 + dy**2)
if dy > 0:
angle = math.degrees(math.atan(-dx/dy))
elif dy == 0:
if dx < 0:
angle = 90.0
elif dx > 0:
angle = -90.0
else:
angle = 0.0
else:
if dx < 0:
angle = 180 - math.degrees(math.atan(dx/dy))
elif dx > 0:
angle = -180 - math.degrees(math.atan(dx/dy))
else:
angle = 180.0
return distance, angle
@staticmethod
def is_valid_vector(a):
distance, angle = a
threshold_distance = max(10.0, -0.008 * angle**2 + 0.4 * angle + 25.0)
return (distance <= threshold_distance)
def update_vehicle(self, vehicle, matches):
# Find if any of the matches fits this vehicle
for i, match in enumerate(matches):
contour, centroid = match
vector = self.get_vector(vehicle.last_position, centroid)
if self.is_valid_vector(vector):
vehicle.add_position(centroid)
self.log.debug("Added match (%d, %d) to vehicle #%d. vector=(%0.2f,%0.2f)"
, centroid[0], centroid[1], vehicle.id, vector[0], vector[1])
return i
# No matches fit...
vehicle.frames_since_seen += 1
self.log.debug("No match for vehicle #%d. frames_since_seen=%d"
, vehicle.id, vehicle.frames_since_seen)
return None
def update_count(self, matches, output_image = None):
self.log.debug("Updating count using %d matches...", len(matches))
# First update all the existing vehicles
for vehicle in self.vehicles:
i = self.update_vehicle(vehicle, matches)
if i is not None:
del matches[i]
# Add new vehicles based on the remaining matches
for match in matches:
contour, centroid = match
new_vehicle = Vehicle(self.next_vehicle_id, centroid)
self.next_vehicle_id += 1
self.vehicles.append(new_vehicle)
self.log.debug("Created new vehicle #%d from match (%d, %d)."
, new_vehicle.id, centroid[0], centroid[1])
# Count any uncounted vehicles that are past the divider
for vehicle in self.vehicles:
if not vehicle.counted and (vehicle.last_position[1] > self.divider):
self.vehicle_count += 1
vehicle.counted = True
self.log.debug("Counted vehicle #%d (total count=%d)."
, vehicle.id, self.vehicle_count)
# Optionally draw the vehicles on an image
if output_image is not None:
for vehicle in self.vehicles:
vehicle.draw(output_image)
cv2.putText(output_image, ("%02d" % self.vehicle_count), (142, 10)
, cv2.FONT_HERSHEY_PLAIN, 0.7, (127, 255, 255), 1)
# Remove vehicles that have not been seen long enough
removed = [ v.id for v in self.vehicles
if v.frames_since_seen >= self.max_unseen_frames ]
self.vehicles[:] = [ v for v in self.vehicles
if not v.frames_since_seen >= self.max_unseen_frames ]
for id in removed:
self.log.debug("Removed vehicle #%d.", id)
self.log.debug("Count updated, tracking %d vehicles.", len(self.vehicles))
# ============================================================================
该程序现在将所有当前跟踪的车辆的历史路径以及车辆计数绘制到输出图像中。每辆车分配10种颜色之一。
请注意,车辆D最终被跟踪了两次,但是仅被计数一次,因为我们在穿越分隔线之前就无法对其进行跟踪。附录中提到了解决方法的想法。
基于脚本生成的最后处理的帧
车辆总数为10 。这是正确的结果。
可以在生成的脚本的输出中找到更多详细信息:
A.潜在的改进
- 重构,添加单元测试。
- 改善前景遮罩的过滤/预处理
- 多次过滤,使用
cv2.drawContours
和CV_FILLED
? - 分水岭算法?
- 多次过滤,使用
- 改善运动向量的分类
- 创建一个预测器以估计创建车辆时的初始移动角度(并且只知道一个位置)...以便能够
- 在方向 ,而不是单独使用方向 变化 (我认为这将有效聚集运动矢量的角度接近于零)。
- 改善车辆追踪
- 预测看不见车辆的车架的位置。
0
我一直试图在越线时对汽车进行计数,但是它可以工作,但是问题是它多次计数一辆汽车,这很荒谬,因为应该计数一次
这是我正在使用的代码:
视频在我的github页面上@ https://github.com/Tes3awy/MATLAB-Tutorials/blob/f24b680f2215c1b1bb96c76f5ba81df533552983/traffic.avi (这也是Matlab库中的内置视频)
每辆车算一次有什么帮助吗?
编辑:视频的各个帧如下所示: