動(dòng)態(tài) ROI 區(qū)域監(jiān)控與實(shí)例分割
區(qū)域監(jiān)控是計(jì)算機(jī)視覺中的一個(gè)熱門應(yīng)用,它通過監(jiān)控一段時(shí)間內(nèi)進(jìn)入和離開特定區(qū)域的實(shí)體數(shù)量來實(shí)現(xiàn)。常見的區(qū)域監(jiān)控應(yīng)用場(chǎng)景包括建筑工地安全、病人監(jiān)控和禁區(qū)控制。
這個(gè)區(qū)域,通常被稱為感興趣區(qū)域(ROI),通常由用戶設(shè)置為一個(gè)靜態(tài)的多邊形形狀,覆蓋攝像機(jī)捕捉到的場(chǎng)景的一部分。雖然這種方法在固定位置的攝像機(jī)上效果良好,但在感興趣區(qū)域或攝像機(jī)本身移動(dòng)的情況下,如自動(dòng)駕駛車輛或機(jī)器人應(yīng)用中,可能就不太適用。
在本文中,我們考慮專用車道監(jiān)控的問題。城市中通常有為自行車、公交車和拼車車輛等類型車輛指定的車道。然而,執(zhí)法限制常常導(dǎo)致錯(cuò)誤車輛使用或阻塞這些車道。為了解決這個(gè)問題,我們將開發(fā)一個(gè)可以在車輛儀表盤攝像頭上使用的專用車道監(jiān)控應(yīng)用。
訓(xùn)練模型
為了檢測(cè)專用車道,我們使用Roboflow訓(xùn)練一個(gè)自定義分割模型。Roboflow是一個(gè)在線平臺(tái),幫助用戶快速創(chuàng)建、訓(xùn)練和部署計(jì)算機(jī)視覺模型。他們的入門教程是學(xué)習(xí)平臺(tái)關(guān)鍵功能的好起點(diǎn)。
與生成邊界框預(yù)測(cè)的目標(biāo)檢測(cè)模型不同,分割模型預(yù)測(cè)目標(biāo)物體的精確像素邊界,提供關(guān)于其形狀和大小的更豐富信息。除了物理物體外,它們?cè)跈z測(cè)空間區(qū)域(如專用車道)方面也非常有效。
將我們的數(shù)據(jù)集上傳到新的Roboflow項(xiàng)目后,我們使用智能多邊形工具來標(biāo)注專用車道。
我們創(chuàng)建第二個(gè)Roboflow項(xiàng)目,這次選擇“目標(biāo)檢測(cè)”作為項(xiàng)目類型。這個(gè)模型將檢測(cè)車輛。使用邊界框工具上傳并標(biāo)注圖像數(shù)據(jù)集。
一旦兩個(gè)數(shù)據(jù)集都被標(biāo)注并分割為訓(xùn)練/驗(yàn)證/測(cè)試集,就可以使用Roboflow訓(xùn)練和托管模型。
過濾檢測(cè)結(jié)果
除了我們的計(jì)算機(jī)視覺模型外,我們還需要編寫程序邏輯來過濾檢測(cè)結(jié)果,只包括在專用車道內(nèi)行駛的車輛。讓我們用一個(gè)簡單的Python腳本來實(shí)現(xiàn)這一點(diǎn)。
下面的代碼將我們?cè)赗oboflow上托管的模型下載到本地機(jī)器,并使用它們對(duì)輸入圖像進(jìn)行推理。
from inference import get_model
import cv2
# load hosted models from Roboflow
vehicle_model = get_model("sg-vehicles/1")
lane_model = get_model("sg-bus-lanes/1")
# process an image
image = cv2.imread("/your/image.png")
vehicle_results = vehicle_model.infer(image)[0]
lane_results = lane_model.infer(image)[0]
我們使用Supervision庫來可視化我們的預(yù)測(cè)。
import supervision as sv
# convert predictions into supervision detection objects
vehicle_detections = sv.Detections.from_inference(vehicle_results)
lane_detections = sv.Detections.from_inference(lane_results)
# initialize annotators
bounding_box_annotator = sv.BoxAnnotator()
mask_annotator = sv.MaskAnnotator()
label_annotator = sv.LabelAnnotator()
# annotate image
image = bounding_box_annotator.annotate(scene=image, detections=vehicle_detections)
image = label_annotator.annotate(scene=image, detections=vehicle_detections)
image = mask_annotator.annotate(scene=image, detections=lane_detections)
image = label_annotator.annotate(scene=image, detections=lane_detections)
# show image
cv2.imshow("output", image)
如上圖所示,雖然總共檢測(cè)到四輛車,但實(shí)際上只有一輛車在專用車道內(nèi)行駛。過濾不需要的車輛的一種方法是只考慮其邊界框與專用車道分割掩碼相交的檢測(cè)結(jié)果。我們使用以下函數(shù)來實(shí)現(xiàn)這一點(diǎn),該函數(shù)簡單地檢查掩碼的任何像素是否位于給定窗口內(nèi):
import numpy as np
def filter_detection_by_mask(detection, mask):
num_objects = len(detection)
filter_mask = np.empty(num_objects, dtype='bool')
for idx in range(num_objects):
bbox = detection.xyxy[idx].astype(int)
filter_mask[idx] = np.any(mask[bbox[1]:bbox[3], bbox[0]:bbox[2]])
return detection[filter_mask]
# filter detections by mask
lane_mask = lane_detections.mask
lane_mask = np.any(lane_mask, axis=0)
vehicle_detections = filter_detection_by_mask(vehicle_detections, lane_mask)
可視化過濾后的檢測(cè)結(jié)果,我們觀察到只有白色貨車保留下來。
跟蹤區(qū)域入侵
為了測(cè)量每輛錯(cuò)誤車輛在專用車道內(nèi)行駛的時(shí)間長度,我們需要跟蹤其在各幀中的位置。我們使用ByteTrack來實(shí)現(xiàn)這一點(diǎn),ByteTrack是一種多目標(biāo)跟蹤算法,它在后續(xù)幀之間匹配目標(biāo)檢測(cè)并為每個(gè)檢測(cè)分配一個(gè)唯一的跟蹤ID。
我們定義以下函數(shù),該函數(shù)接受過濾后的檢測(cè)結(jié)果、一個(gè)ByteTrack實(shí)例和一個(gè)記錄唯一檢測(cè)結(jié)果出現(xiàn)幀數(shù)的Python字典。在一定幀數(shù)后未被檢測(cè)到的跟蹤對(duì)象將被移除。
import supervision as sv
# ByteTrack
tracker = sv.ByteTrack()
# dict of python tuples
# [0: number of detections, 1: frames since last detection]
tracked_objects = {}
def track_detections(detections, tracker, tracked_objects):
detections = tracker.update_with_detections(detections)
# remove old detections
max_misses = fps
expired_ids = []
for id in tracked_objects:
tracked_objects[id][1] += 1
if tracked_objects[id][1] > max_misses:
expired_ids.append(id)
for id in expired_ids:
tracked_objects.pop(id)
# add or modify based on current detections
for id in list(detections.tracker_id):
if id in tracked_objects:
tracked_objects[id][0] += 1
tracked_objects[id][1] = 0
else:
tracked_objects[id] = [1, 0]
return detections
vehicle_detections = track_detections(vehicle_detections)
我們修改我們的標(biāo)注腳本,以打印專用車道內(nèi)每輛車的跟蹤ID和持續(xù)時(shí)間??倳r(shí)間可以通過將車輛被跟蹤的幀數(shù)乘以視頻幀率(FPS)來計(jì)算。
使用Roboflow管道進(jìn)行視頻推理
現(xiàn)在我們的代碼可以在圖像上運(yùn)行了,讓我們修改它以處理視頻文件。我們首先將代碼轉(zhuǎn)換為一個(gè)單一的Python類。
class LaneIntrusionModel:
def __init__(self):
self.vehicle_model = get_model("sg-vehicles/1")
self.lane_model = get_model("sg-bus-lanes/1")
self.tracker = sv.ByteTrack()
self.tracker.reset()
self.tracked_objects = {}
self.fps = 30
self.bounding_box_annotator = sv.BoxAnnotator()
self.mask_annotator = sv.MaskAnnotator()
self.label_annotator = sv.LabelAnnotator()
# public methods
def infer(self, image):
vehicle_results = self.vehicle_model.infer(image)
lane_results = self.lane_model.infer(image)
return list(zip(vehicle_results, lane_results))
def process_inference(self, image, vehicle_results, lane_results):
vehicle_detections, lane_detections = self.get_detections(image, vehicle_results, lane_results)
vehicle_detections = self.track_detections(vehicle_detections)
annotated_image = self.annotate_frame(image, vehicle_detections, lane_detections)
return annotated_image
## private methods
def get_detections(self, image, vehicle_results, lane_results):
vehicle_detections = sv.Detections.from_inference(vehicle_results)
lane_detections = sv.Detections.from_inference(lane_results)
lane_mask = lane_detections.mask
if lane_mask is None or lane_mask.shape[0] == 0:
return vehicle_detections[vehicle_detections.class_id == -1], lane_detections
lane_mask = np.any(lane_mask, axis=0)
vehicle_detections = filter_detection_by_mask(vehicle_detections, lane_mask)
return vehicle_detections, lane_detections
def track_detections(self, detections):
detections = self.tracker.update_with_detections(detections)
# remove old detections
max_misses = self.fps
expired_ids = []
for id in self.tracked_objects:
self.tracked_objects[id][1] += 1
if self.tracked_objects[id][1] > max_misses:
expired_ids.append(id)
for id in expired_ids:
self.tracked_objects.pop(id)
# add or modify based on current detections
for id in list(detections.tracker_id):
if id in self.tracked_objects:
self.tracked_objects[id][0] += 1
self.tracked_objects[id][1] = 0
else:
self.tracked_objects[id] = [1, 0]
return detections
def annotate_frame(self, image, vehicle_detections, lane_detections):
if len(vehicle_detections):
labels = []
for id in list(vehicle_detections.tracker_id):
time_tracked = self.tracked_objects[id][0]/self.fps
labels.append(f"vehicle:#{id} time:{time_tracked:.2f}s")
image = self.bounding_box_annotator.annotate(scene=image, detections=vehicle_detections)
image = self.label_annotator.annotate(scene=image, detections=vehicle_detections, labels=labels)
if len(lane_detections):
image = self.mask_annotator.annotate(scene=image, detections=lane_detections)
image = self.label_annotator.annotate(scene=image, detections=lane_detections)
return image
由于深度學(xué)習(xí)模型的計(jì)算需求,實(shí)時(shí)處理可能具有挑戰(zhàn)性,通常需要專門的硬件(如GPU)。Roboflow提供了InferencePipeline工具包(https://inference.roboflow.com/using_inference/inference_pipeline/),通過在單獨(dú)的線程上運(yùn)行預(yù)處理、推理和后處理,并支持批量推理,來提高處理時(shí)間。我們首先定義以下回調(diào)函數(shù):
from inference.core.interfaces.camera.entities import VideoFrame
from inference import InferencePipeline
from typing import Union, List, Optional, Any
model = LaneIntrusionModel()
def on_video_frame(video_frames: List[VideoFrame]) -> List[Any]:
images = [v.image for v in video_frames]
return model.infer(images)
def on_prediction(
predictions: Union[dict, List[Optional[dict]]],
video_frame: Union[VideoFrame, List[Optional[VideoFrame]]]
) -> None:
if not issubclass(type(predictions), list):
# this is required to support both sequential and batch processing with single code
# if you use only one mode - you may create function that handles with only one type
# of input
predictions = [predictions]
video_frame = [video_frame]
for result, frame in zip(predictions, video_frame):
annotated_image = model.process_inference(frame.image, result[0], result[1])
cv2.imshow("output", annotated_image)
cv2.waitKey(1)
然后,我們使用以下腳本在視頻文件上運(yùn)行管道:
pipeline = InferencePipeline.init_with_custom_logic(
max_fps=out_fps,
video_reference="/your/video.mp4",
on_video_frame=on_video_frame,
on_prediction=on_prediction,
)
# start the pipeline
pipeline.start()
# wait for the pipeline to finish
pipeline.join()
得到以下結(jié)果:
結(jié)論
總之,本教程展示了如何使用Roboflow Inference實(shí)現(xiàn)實(shí)時(shí)區(qū)域監(jiān)控。分割模型允許動(dòng)態(tài)檢測(cè)圖像場(chǎng)景中的感興趣區(qū)域,從而在持續(xù)變化的環(huán)境中實(shí)現(xiàn)區(qū)域監(jiān)控。
除了專用車道監(jiān)控外,動(dòng)態(tài)區(qū)域監(jiān)控的其他應(yīng)用場(chǎng)景可能包括:
- 送貨無人機(jī)掃描著陸區(qū)的包裹
- 閉路電視攝像機(jī)監(jiān)控高低潮之間的海岸線游泳者
實(shí)時(shí)視頻處理的限制通常將這些應(yīng)用中的模型類型限制為快速、單階段的檢測(cè)器,如YOLO和SSD,這些模型需要強(qiáng)大的訓(xùn)練數(shù)據(jù)才能有效。未來的ML優(yōu)化和硬件工作可能需要將動(dòng)態(tài)區(qū)域監(jiān)控能力引入零樣本檢測(cè)模型。