代码来源
论文链接
DeepSort
1. MOT(Multi-Object Tracking)简介2. DeepSort前身:Sort(Simple Online And Realtime Tracking)3. DeepSort总体框架(流程)3.1 框架概要3.2 流程分析 4. DeepSort各模块讲解4.1 目标检测模块4.1.1 目标检测模型概述4.1.2 Detection类解析 4.2 轨迹跟踪模块4.2.1 Track类4.2.2 Tracker类4.2.3 卡尔曼滤波 4.3 数据匹配模块4.3.1 ReID模块4.3.2 级联匹配和IOU匹配4.3.3 匈牙利算法
1. MOT(Multi-Object Tracking)简介
多目标跟踪是检测视频影像中出现的多个目标(如行人,汽车等)并对每个目标进行轨迹跟踪及ID分配。其中视频前后出现的同一目标应分配同一ID,不同目标分配不同ID。
目前,MOT问题的处理框架主要有(1)先检测后跟踪(Tracking by detection),如Sort/DeepSort;(2)检测和跟踪联合,如JDE,CenterTrack等;(3)基于注意力机制,如TransTrack,TrackFormer等
其中处理过程主要可分为四个步骤:
1.按帧数解析输入视频
2.通过目标检测网络获取视频原始帧的目标检测框
3.对检测出的目标框进行特征提取(运动或语义特征)并对前后视频帧进行相似度计算
4.数据关联,匹配目标框和对应轨迹及ID
2. DeepSort前身:Sort(Simple Online And Realtime Tracking)
Sort主要思路:首先通过检测器Detections(如Faster RCNN)检测出每一帧的目标;其次通过卡尔曼滤波预测目标在下一帧的位置,将预测的位置与检测器实际检测的位置做相似度计算(IOU);最后通过匈牙利算法匹配对应轨迹和目标框,匹配结果分为三种:(1)未匹配的轨迹,直接删除,(2)未匹配的目标框,初始化为新的轨迹,(3)匹配成功的轨迹与目标框,通过卡尔曼滤波更新检测框位置,获取最优估计。
Sort存在问题:由于只考虑检测框与预测框的重叠面积(IOU),当两个目标发生遮挡后导致ID-Switch增大(同一目标的ID发生变化);同时未考虑对于遮挡后重新出现目标,进一步导致ID-Switch过大。
3. DeepSort总体框架(流程)
3.1 框架概要
对比Sort的主要改进:利用ReID模型提取外观语义特征,加入外观信息;增加了级联匹配(Matching Cascade)和轨迹确认(Confirmed、Tentative、Deleted)
DeepSort主要模块:
(1)目标检测模块:通过目标检测网络,获取输入每一帧图片中的目标框
(2)轨迹跟踪模块:通过卡尔曼滤波进行轨迹预测和更新,获取新的轨迹集合
(3)数据匹配模块:通过级联匹配和IOU匹配将轨迹和目标框关联
DeepSort主要流程:检测器获取视频当前帧中目标框 → \rightarrow → 卡尔曼滤波根据当前帧的轨迹集合预测下一帧轨迹集合 → \rightarrow → 预测轨迹与下一帧检测目标框进行匹配 → \rightarrow → 卡尔曼滤波更新匹配成功的轨迹
3.2 流程分析
整个流程概括如下:
(1)将第一帧检测目标框初始化对应轨迹进行卡尔曼滤波预测下一时刻轨迹,其中初始化轨迹状态为不确定态
(2)将上一时刻确认态轨迹与当前时刻检测目标框进行级联匹配,级联匹配结果中匹配失败轨迹和匹配失败目标框用于后续IOU匹配,匹配成功轨迹和目标框进行卡尔曼滤波预测和更新
(3)将级联匹配结果中匹配失败轨迹和匹配失败目标框以及上一帧不确定态轨迹进行IOU匹配,匹配结果中匹配失败轨迹若仍为不确定态或为确定态但连续匹配失败次数超标则删除该轨迹;匹配失败轨迹为确定态且连续匹配失败次数未超标进行卡尔曼滤波预测;匹配失败目标框则初始化对应轨迹进行卡尔曼滤波预测;匹配成功轨迹和目标框进行卡尔曼滤波预测和更新
(4)重复(2)和(3),直到结束
4. DeepSort各模块讲解
4.1 目标检测模块
目标检测模块通过对应模型检测视频每一帧图片中的目标,其精度直接影响多目标跟踪的最终效果。
4.1.1 目标检测模型概述
目前整理了有关基于深度学习的目标检测模型的发展路线:
首先以Anchor-based为基础的Two-stage模型因其检测速度慢的问题逐渐发展为One-stage模型。然而由于Anchor的设计存在超参数,难以覆盖所有尺寸目标;同时为了保证一定的精度,选择了大量的Anchor用来检测占用计算量和内存消耗。
后来以Anchor-free为基础的模型提出以检测关键点(中心点)的方式解决Anchor存在的问题,大大优化了模型超参数的数量。此外大多数目标检测模型需要对预测结果进行非极大值抑制(NMS)后处理,对此提出了NMS-free模型皆在取消NMS后处理,提高检测速度,实现真正意义上的端到端检测。
目前Transformer架构在视觉领域(ViT,Swin Transformer)取得了一系列成功。自此以Transformer为基础的目标检测模型展现出了优越的性能并且基于Transformer的模型能够天然的消除人工设置的Anchor-based以及NMS后处理是一个优良的端到端检测器。
有关目标检测模型解析,后续整理发布
4.1.2 Detection类解析
通过相应目标检测模型将视频中每一帧图片的检测结果(边框坐标及置信度得分)封装于Detection类,此外该类包含了ReID模块提取的外观语义特征以及不同边框坐标形式转换方法,具体代码如下:
class Detection(object): def __init__(self, tlwh, confidence, feature): ''' tlwh:目标框左上角横纵坐标x, y; 宽w; 高h confindnce:目标类别置信度得分 feature:ReID模块提取目标框的外观语义特征 ''' self.tlwh = np.asarray(tlwh, dtype=np.float) self.confidence = float(confidence) self.feature = np.asarray(feature, dtype=np.float32) def to_tlbr(self): '''将目标框坐标tlwh转换为左上角横纵坐标x,y; 右下角横纵坐标x, y''' ret = self.tlwh.copy() ret[2:] += ret[:2] return ret def to_xyah(self): '''将目标框坐标tlwh转换为中心点横纵坐标x,y; 宽高比a; 高h''' ret = self.tlwh.copy() ret[:2] += ret[2:] / 2 ret[2] /= ret[3] return ret
4.2 轨迹跟踪模块
轨迹跟踪模块主要通过Track类和Tracker类完成,具体功能如下:
Track类:存储单个轨迹的状态和信息以及负责单个轨迹的预测和更新
Tracker类:存储所有轨迹(集合)的状态和信息,负责轨迹的初始化以及所有轨迹(集合)的预测和更新同时封装了数据匹配模块
4.2.1 Track类
Track类的核心代码及注释如下:
class TrackState:'''单个轨迹的三种状态''' Tentative = 1 #不确定态 Confirmed = 2 #确定态 Deleted = 3 #删除态class Track: def __init__(self, mean, covariance, track_id, class_id, conf, n_init, max_age, feature=None): ''' mean:位置、速度状态分布均值向量,维度(8×1) convariance:位置、速度状态分布方差矩阵,维度(8×8) track_id:轨迹ID class_id:轨迹所属类别 hits:轨迹更新次数(初始化为1),即轨迹与目标连续匹配成功次数 age:轨迹连续存在的帧数(初始化为1),即轨迹出现到被删除的连续总帧数 time_since_update:轨迹距离上次更新后的连续帧数(初始化为0),即轨迹与目标连续匹配失败次数 state:轨迹状态 features:轨迹所属目标的外观语义特征,轨迹匹配成功时添加当前帧的新外观语义特征 conf:轨迹所属目标的置信度得分 _n_init:轨迹状态由不确定态到确定态所需连续匹配成功的次数 _max_age:轨迹状态由不确定态到删除态所需连续匹配失败的次数 ''' self.mean = mean self.covariance = covariance self.track_id = track_id self.class_id = int(class_id) self.hits = 1 self.age = 1 self.time_since_update = 0 self.state = TrackState.Tentative self.features = [] if feature is not None: self.features.append(feature) #若不为None,初始化外观语义特征 self.conf = conf self._n_init = n_init self._max_age = max_age def increment_age(self): ''' 预测下一帧轨迹时调用 ''' self.age += 1 #轨迹连续存在帧数+1 self.time_since_update += 1 #轨迹连续匹配失败次数+1 def predict(self, kf): ''' 预测下一帧轨迹信息 ''' self.mean, self.covariance = kf.predict(self.mean, self.covariance) #卡尔曼滤波预测下一帧轨迹的状态均值和方差 self.increment_age() #调用函数,age+1,time_since_update+1 def update(self, kf, detection, class_id, conf): ''' 更新匹配成功的轨迹信息 ''' self.conf = conf #更新置信度得分 self.mean, self.covariance = kf.update( self.mean, self.covariance, detection.to_xyah()) #卡尔曼滤波更新轨迹的状态均值和方差 self.features.append(detection.feature) #添加轨迹对应目标框的外观语义特征 self.class_id = class_id.int() #更新轨迹所属类别 self.hits += 1 #轨迹匹配成功次数+1 self.time_since_update = 0 #匹配成功时,轨迹连续匹配失败次数归0 if self.state == TrackState.Tentative and self.hits >= self._n_init: self.state = TrackState.Confirmed #当连续匹配成功次数达标时轨迹由不确定态转为确定态 def mark_missed(self): ''' 将轨迹状态转为删除态 ''' if self.state == TrackState.Tentative: self.state = TrackState.Deleted #当级联匹配和IOU匹配后仍为不确定态 elif self.time_since_update > self._max_age: self.state = TrackState.Deleted #当连续匹配失败次数超标'''该部分还存在一些轨迹坐标转化及状态判定函数,具体可参考代码来源'''
Track类是轨迹的一个基本单位,通过mean、covariance存储轨迹的位置和速度信息,利用卡尔曼滤波对轨迹信息进行预测和更新;同时定义了轨迹的三种状态(Tentaitve、Confirmed、Deleted)及三种状态之间的转换关系。其中三种状态如下:
Tentaitive态:代表轨迹初始(默认)状态,该状态轨迹更新时连续匹配成功次数(hits)+1,达到指定匹配成功次数(_n_init默认为3)转换为Confirmed态;经过级联匹配和IOU匹配后该轨迹状态仍为Tentaiva,则认为轨迹没有匹配上任何目标,转换为Deleted态。
Confirmed态:代表轨迹匹配成功状态。该状态轨迹预测而不更新时连续匹配失败次数(time_since_update)+1,达到连续匹配失败次数(_max_age默认为70)转换为Deleted态。
Deleted态:代表轨迹失效状态。该轨迹从所有轨迹(集合)中删除。
4.2.2 Tracker类
Tracker类的核心代码及注释如下:
其中只列出了用于轨迹预测、更新及初始化部分,对于其中的数据匹配模块在4.3.2中讲解
class Tracker: GATING_THRESHOLD = np.sqrt(kalman_filter.chi2inv95[4]) #门控距离的阈值 def __init__(self, metric, max_iou_distance=0.9, max_age=30, n_init=3, _lambda=0): ''' metric:用于关联轨迹与目标框的距离度量函数,是NearestNeighborDistanceMetric类 max_iou_distance:IOU匹配中代价矩阵的阈值 max_age:轨迹状态由不确定态到删除所需连续匹配失败的次数 n_init:轨迹状态由不确定态到确定态所需连续匹配成功的次数 _lambda:门控距离矩阵和外观语义特征相似度矩阵之间的权重 kf:卡尔曼滤波模块 tracks:所有轨迹状态和信息集合 _next_id:下一个轨迹ID ''' self.metric = metric self.max_iou_distance = max_iou_distance self.max_age = max_age self.n_init = n_init self._lambda = _lambda self.kf = kalman_filter.KalmanFilter() self.tracks = [] self._next_id = 1 def predict(self): ''' 预测轨迹集合中所有轨迹的下一帧信息 ''' for track in self.tracks: track.predict(self.kf) def increment_ages(self): ''' 当视频帧中未检测到目标时调用 因无法进行卡尔曼滤波更新,故不再额外进行卡尔曼滤波预测,但需要更新相应轨迹信息 ''' for track in self.tracks: track.increment_age() #增加轨迹存在帧数和匹配失败次数 track.mark_missed() #判断轨迹是否需要转为删除态 def update(self, detections, classes, confidences): ''' 更新轨迹集合中所有轨迹状态和信息 ''' matches, unmatched_tracks, unmatched_detections = \ self._match(detections) #进行级联匹配和IOU匹配,得到匹配成功轨迹和目标框、匹配失败轨迹以及匹配失败目标框#更新轨迹集合的状态和信息 for track_idx, detection_idx in matches: self.tracks[track_idx].update(self.kf, detections[detection_idx], classes[detection_idx], confidences[detection_idx]) #更新匹配成功的轨迹信息 for track_idx in unmatched_tracks: self.tracks[track_idx].mark_missed() #判断匹配失败轨迹是否需要转为删除态 for detection_idx in unmatched_detections: self._initiate_track(detections[detection_idx], classes[detection_idx].item(), confidences[detection_idx].item()) #对匹配失败的目标框初始化轨迹 self.tracks = [t for t in self.tracks if not t.is_deleted()] #从轨迹集合中剔除删除态轨迹#更新度量轨迹和目标框距离中的外观语义特征信息 active_targets = [t.track_id for t in self.tracks if t.is_confirmed()] #确定态轨迹ID集合 features, targets = [], [] for track in self.tracks: if not track.is_confirmed(): continue #选择确认态轨迹 features += track.features #确认态轨迹的外观语义特征矩阵 targets += [track.track_id for _ in track.features] #外观语义特征对应的确认态轨迹ID集合 track.features = [] #清空确定态轨迹的外观语义特征 self.metric.partial_fit(np.asarray(features), np.asarray(targets), active_targets) #更新确认态轨迹(同一ID)以往所有时刻的外观语义特征字典 def _initiate_track(self, detection, class_id, conf): ''' 初始化轨迹状态和信息 ''' mean, covariance = self.kf.initiate(detection.to_xyah()) #卡尔曼滤波初始化轨迹位置和速度信息 self.tracks.append(Track( mean, covariance, self._next_id, class_id, conf, self.n_init, self.max_age, detection.feature)) #初始化轨迹加入轨迹集合 self._next_id += 1 #轨迹数量增加,下一个轨迹ID+1'''该部分还存在_full_cost_metric以及_match函数代码用于轨迹和目标框的匹配,在4.3.2中讲解'''
Tracker类是整个DeepSort框架的核心部分,整合了所有轨迹的信息和状态并逐一通过卡尔曼滤波对轨迹信息进行预测。由于在轨迹集合更新过程中涉及数据匹配模块,Tracker类的update函数封装了级联匹配和IOU匹配,根据对应匹配结果结合卡尔曼滤波更新轨迹集合中所有轨迹的状态和信息并且针对未匹配的目标(以及第一帧目标框)初始化轨迹状态和信息。该部分结合4.3.2级联匹配和IOU匹配以及3.1DeepSort框架图阅读。
4.2.3 卡尔曼滤波
卡尔曼滤波主要包含预测和更新。其中预测是根据系统当前状态对系统的下一步状态做出有根据的预测;更新是通过测量值与预测值综合计算出系统状态的最优估计值。具体可参考详解卡尔曼滤波原理博客,其中引用表达式:
预测:
x ⃗ k = F k x ⃗ k − 1 + B k u ⃗ k \vec x_k = F_k \vec x_{k - 1} + B_k\vec u_k x k=Fkx k−1+Bku k
P k = F k P k − 1 F k T + Q k P_k = F_kP_{k - 1}F_k^T + Q_k Pk=FkPk−1FkT+Qk
x ⃗ \vec x x : 状态向量(均值)
u ⃗ \vec u u : 外部控制向量
P P P:状态矩阵(协方差)
F F F:状态转移矩阵
B B B: 外部控制矩阵
Q Q Q: 外部干扰矩阵(噪声)
更新:
x ⃗ k ′ = x ⃗ k + K ( z ⃗ k − H k x ⃗ k ) \vec x'_k = \vec x_k + K(\vec z_k - H_k\vec x_k) x k′=x k+K(z k−Hkx k)
P k ′ = P k − K H k P k P'_k = P_k - KH_kP_k Pk′=Pk−KHkPk
K = P k H k T ( H k P k H k T + R k ) − 1 K = P_kH_k^T(H_kP_kH_k^T + R_k)^{-1} K=PkHkT(HkPkHkT+Rk)−1
x ⃗ ′ \vec x' x ′:最优估计向量(均值)
z ⃗ \vec z z : 测量向量(均值)
H H H:状态空间向测量空间转移矩阵
P ′ P' P′:最优估计矩阵(协方差)
R R R: 测量矩阵(协方差)
K K K:增益矩阵
其中 k − 1 k-1 k−1代表当前状态( k − 1 k-1 k−1时刻); k k k代表下一状态( k k k时刻)
卡尔曼滤波类实现代码及注释如下:
chi2inv95 = { 1: 3.8415, 2: 5.9915, 3: 7.8147, 4: 9.4877, 5: 11.070, 6: 12.592, 7: 14.067, 8: 15.507, 9: 16.919} #按自由度排布的卡方分布0.95分位数,用于门控距离阈值的选择class KalmanFilter(object): def __init__(self): ''' _motion_mat:状态转移矩阵F,维度(8×8) _update_mat:状态空间向测量空间转移矩阵H,(维度4×8) _std_weight_position:控制位置方差权重 _std_weight_velocity:控制速度方差权重 ''' ndim, dt = 4, 1. self._motion_mat = np.eye(2 * ndim, 2 * ndim) for i in range(ndim): self._motion_mat[i, ndim + i] = dt self._update_mat = np.eye(ndim, 2 * ndim) self._std_weight_position = 1. / 20 self._std_weight_velocity = 1. / 160 def initiate(self, measurement): ''' 根据目标框检测值初始化轨迹 measurement:目标框测量向量(x, y, a, h),中心点横纵坐标x, y,宽高比a,高h ''' mean_pos = measurement #位置状态分布向量(均值),维度(4, ) mean_vel = np.zeros_like(mean_pos) #速度状态分布向量(均值),维度(4, ) mean = np.r_[mean_pos, mean_vel] #位置、速度状态分布向量(均值),维度(8×1) std = [ 2 * self._std_weight_position * measurement[0], 2 * self._std_weight_position * measurement[1], 1 * measurement[2], 2 * self._std_weight_position * measurement[3], 10 * self._std_weight_velocity * measurement[0], 10 * self._std_weight_velocity * measurement[1], 0.1 * measurement[2], 10 * self._std_weight_velocity * measurement[3]] #位置、速度状态分布值(标准差),维度(8, ) covariance = np.diag(np.square(std)) #位置、速度状态分布矩阵(方差),维度(8×8) return mean, covariance def predict(self, mean, covariance): ''' 卡尔曼滤波根据当前时刻状态预测下一时刻状态 mean:当前时刻位置、速度状态分布向量(均值),维度(8×1) covariance:当前时刻位置、速度状态分布矩阵(方差),维度(8×8) ''' std_pos = [ self._std_weight_position * mean[0], self._std_weight_position * mean[1], 1 * mean[2], self._std_weight_position * mean[3]] #位置干扰值(标准差),维度(4, ) std_vel = [ self._std_weight_velocity * mean[0], self._std_weight_velocity * mean[1], 0.1 * mean[2], self._std_weight_velocity * mean[3]] #速度干扰值(标注差),维度(4, ) motion_cov = np.diag(np.square(np.r_[std_pos, std_vel])) #外部干扰矩阵(噪声)Q,维度(8×8) mean = np.dot(self._motion_mat, mean) #下一时刻位置、速度状态分布向量(均值),维度(8×1) covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov #下一时刻位置、速度状态分布向量(方差),维度(8×8) return mean, covariance def project(self, mean, covariance): ''' 卡尔曼滤波将状态分布映射到测量分布上 mean:下一时刻位置、速度状态分布向量(均值),维度(8×1) covariance:下一时刻位置、速度状态分布矩阵(方差),维度(8×8) ''' std = [ self._std_weight_position * mean[0], self._std_weight_position * mean[1], 0.1 * mean[2], self._std_weight_position * mean[3]] #测量值(标注差),维度(4, ) innovation_cov = np.diag(np.square(std)) #测量矩阵R,维度(4×4) mean = np.dot(self._update_mat, mean) #状态分布向量映射到测量分布向量(均值),即更新表达式中的Hx covariance = np.linalg.multi_dot((self._update_mat, covariance, self._update_mat.T)) #状态分布向量映射到测量分布向量(方差),即更新表达式中的HPH.T return mean, covariance + innovation_cov def update(self, mean, covariance, measurement): ''' 卡尔曼滤波根据测量值和预测值更新最优估计值 mean:下一时刻位置、速度状态分布向量(均值),维度(8×1) covariance:下一时刻位置、速度状态分布矩阵(方差),维度(8×8) measurement:下一时刻目标框测量向量(x, y, a, h),中心点横纵坐标x, y,宽高比a,高h ''' projected_mean, projected_cov = self.project(mean, covariance) #状态分布向量(均值)和矩阵(方差)映射到测量分布,即Hxx和HPH.T+R chol_factor, lower = scipy.linalg.cho_factor( projected_cov, lower=True, check_finite=False) #计算HPH.T+R的Cholesky分解,得到上三角或下三角矩阵,用于cho_solve kalman_gain = scipy.linalg.cho_solve( (chol_factor, lower), np.dot(covariance, self._update_mat.T).T, check_finite=False).T #计算卡尔曼增益K,维度(8×4);即通过求解线性方程(HPH.T+R)K = (PH.T).T,其中的K维度4×8,故最后需要转置T操作 innovation = measurement - projected_mean #测量向量和状态映射向量(均值)偏差,即z-Hx new_mean = mean + np.dot(innovation, kalman_gain.T) #最优估计值向量(均值),即x'=x+K(z-Hx) new_covariance = covariance - np.linalg.multi_dot(( kalman_gain, projected_cov, kalman_gain.T)) #最优估计矩阵(方差),即P'=P-KHP=K(PH.T).T=K(HPH.T+R)K.T return new_mean, new_covariance def gating_distance(self, mean, covariance, measurements, only_position=False): ''' 计算状态分布和测量值之间的门控距离 mean:位置、速度状态分布向量(均值),维度(8×1) covariance:位置、速度状态分布矩阵(方差),维度(8×8) measurement:目标框测量向量(x, y, a, h),中心点横纵坐标x, y,宽高比a,高h only_position:是否只考虑位置分量 ''' mean, covariance = self.project(mean, covariance) #将状态分布映射到测量分布上 if only_position: #判断是否只考虑中心位置分量 mean, covariance = mean[:2], covariance[:2, :2] measurements = measurements[:, :2] #取测量向量(均值)和矩阵(方差)中的位置分量,维度变为2×1和2×2 cholesky_factor = np.linalg.cholesky(covariance) #对映射到测量分布上的状态矩阵(方差)进行Cholesky分解得到下三角矩阵 d = measurements - mean #测量向量与状态映射向量(均值)偏差,即z-Hx g = scipy.linalg.solve_triangular( cholesky_factor, d.T, lower=True, check_finite=False, overwrite_b=True) #求状态映射向量和测量向量中各分量的门控距离,即求线性方程(HPH.T+R)g=z-Hx squared_maha = np.sum(g * g, agxis=0) #平方求和得门控距离 return squared_maha
KalmanFilter类对轨迹的状态分布信息进行预测和更新,以及对目标框初始化轨迹信息;同时包含了计算测量分布和状态分布之间的门控距离。需要注意:
(1)代码中考虑的是线性系统,故不包含外部控制向量,即 u ⃗ \vec u u =0
(2)代码中的状态分布向量(均值) x ⃗ = ( x , y , a , h , v x , v y , v a , v h ) T \vec x=(x, y, a, h, v_x, v_y, v_a, v_h)^T x =(x,y,a,h,vx,vy,va,vh)T,包含位置状态 ( x , y , a , h ) (x, y, a, h) (x,y,a,h)即检测框中心点横纵坐标 x , y x, y x,y,宽高比 a a a,高 h h h及其对应速度状态 ( v x , v y , v a , v h ) (v_x, v_y, v_a, v_h) (vx,vy,va,vh),其余变量的设置详见代码中注释
(3)代码中出现的有关numpy和scipy的函数可查阅官方文档numpy;scipy
(4)代码中注明了有关计算过程的矩阵关系和维度变化,结合表达式阅读,其中涉及有关线性代数和数值分析知识感兴趣可查阅有关文档
4.3 数据匹配模块
数据匹配模块主要通过级联匹配和IOU匹配以及匈牙利算法来完成对轨迹和目标框的关联匹配,具体功能如下:
级联匹配:使用(ReID模块提取)外观特征结合运动特征计算相似度,通过匈牙利算法进行关联匹配
IOU匹配:使用轨迹和目标框面积计算IOU,通过匈牙利算法进行关联匹配
4.3.1 ReID模块
重识别简称为ReID,是利用对应模型判断不同时间段出现的目标是否属于同一对象。多目标跟踪问题为减少ID-Switch引入ReID模块提取对应目标框的外观语义特征,供后续级联匹配中计算相似度使用。其中ReID模块是独立于目标检测和轨迹跟踪模块,源代码中为满足多目标跟踪的实时性采用轻量化网络模型(OsNet等)。
源代码中通过torchreid下的utils包中FeatureExtractor类构建相应模型提取目标框的外观语义特征,其代码如下:
class FeatureExtractor(object): ''' 构建ReID模型提取外观语义特征 ''' def __init__( self, model_name='', model_path='', image_size=(256, 128), pixel_mean=[0.485, 0.456, 0.406], pixel_std=[0.229, 0.224, 0.225], pixel_norm=True, device='cuda', verbose=True ): ''' mode_name:模型名称 model:ReID模型 preprocess:输入数据前处理 mode_path:模型权重路径 image_size:输入图片的尺寸 pixel_mean:图片正则化均值 pixel_std:图片正则化标准差 pixel_norm:图片是否正则化 device:模型推理设备 verbose:是否展示模型信息 ''' model = build_model( model_name, num_classes=1, pretrained=not (model_path and check_isfile(model_path)), use_gpu=device.startswith('cuda') ) #构建对应模型 model.eval() #模型推理 if verbose: num_params, flops = compute_model_complexity( model, (1, 3, image_size[0], image_size[1]) ) print('Model: {}'.format(model_name)) print('- params: {:,}'.format(num_params)) print('- flops: {:,}'.format(flops)) #输出模型信息 if model_path and check_isfile(model_path): load_pretrained_weights(model, model_path) #加载模型权重 transforms = [] transforms += [T.Resize(image_size)] transforms += [T.ToTensor()] if pixel_norm: transforms += [T.Normalize(mean=pixel_mean, std=pixel_std)] preprocess = T.Compose(transforms) #对输入图片的前处理 to_pil = T.ToPILImage() #转换到PIL图片 device = torch.device(device) model.to(device) self.model = model self.preprocess = preprocess self.to_pil = to_pil self.device = device def __call__(self, input): ''' 内置方法,赋予FeatureExtractor实例对象函数调用特性,即允许实例对象像函数一样被调用 ''' if isinstance(input, list): #如果输入属于list images = [] for element in input: if isinstance(element, str): image = Image.open(element).convert('RGB') elif isinstance(element, np.ndarray): image = self.to_pil(element) else: raise TypeError( 'Type of each element must belong to [str | numpy.ndarray]' ) image = self.preprocess(image) images.append(image) images = torch.stack(images, dim=0) images = images.to(self.device) elif isinstance(input, str): #如果输入属于str类型 image = Image.open(input).convert('RGB') image = self.preprocess(image) images = image.unsqueeze(0).to(self.device) elif isinstance(input, np.ndarray): #如果输入属于numpy数组 image = self.to_pil(input) image = self.preprocess(image) images = image.unsqueeze(0).to(self.device) elif isinstance(input, torch.Tensor): #如果输入属于tensor if input.dim() == 3: input = input.unsqueeze(0) images = input.to(self.device) else: raise NotImplementedError #否则报错 with torch.no_grad(): features = self.model(images) #提取对应输入图像外观语义特征 return features
其中ReID模块为保证多目标跟踪的实时性,多采用轻量化网络模型。轻量化网络模型的核心是在保持精度的前提下,从模型大小和推理速度两方面对网络进行轻量化改造。目前整理了有关轻量化网络模型有:SqueezeNet、Xception、MobileNet、ShuffleNet、OsNet以及GHostNet。
有关轻量化模型解析,点击查看:
《轻量化网络总结[1]–SqueezeNet,Xception,MobileNetv1~v3》
《轻量化网络总结[2]–ShuffleNetv1/v2,OSNet,GhostNet》
4.3.2 级联匹配和IOU匹配
级联匹配和IOU匹配用于轨迹和目标框的关联,封装于Tracker类中的_match函数中,具体代码如下:
def _full_cost_metric(self, tracks, dets, track_indices, detection_indices): ''' 使用外观特征和运动特征(门控距离)并结合相关阈值计算相似度矩阵 tracks:轨迹集合 dets:目标框集合 track_indices:轨迹索引集合 detction_indeces:目标框索引集合 ''' pos_cost = np.empty([len(track_indices), len(detection_indices)]) #目标框和轨迹的门控距离矩阵 msrs = np.asarray([dets[i].to_xyah() for i in detection_indices]) #目标框的测量矩阵 for row, track_idx in enumerate(track_indices): pos_cost[row, :] = np.sqrt( self.kf.gating_distance(tracks[track_idx].mean, tracks[track_idx].covariance, msrs, False)) / self.GATING_THRESHOLD #计算状态分布和测量值之间的门控距离矩阵,并除以对应阈值确保其有效范围为0-1 pos_gate = pos_cost > 1.0 #有效值为0-1,故阈值为1;判断门控距离矩阵超过阈值的位置 app_cost = self.metric.distance( np.array([dets[i].feature for i in detection_indices]), np.array([tracks[i].track_id for i in track_indices]), ) #计算当前目标框的外观语义特征与对应确认态轨迹(同一ID)以往所有的外观语义特征(欧式或余弦)距离矩阵 app_gate = app_cost > self.metric.matching_threshold #判断外观特征距离矩阵超过阈值的位置 cost_matrix = self._lambda * pos_cost + (1 - self._lambda) * app_cost #根据门控矩阵和外观特征距离矩阵加权计算相似度矩阵 cost_matrix[np.logical_or(pos_gate, app_gate)] = linear_assignment.INFTY_COST #将超过阈值的位置设定为无效值 return cost_matrix def _match(self, detections): ''' 进行级联匹配和IOU匹配,获取匹配成功轨迹和目标框、匹配失败轨迹以及匹配失败目标框 ''' confirmed_tracks = [i for i, t in enumerate(self.tracks) if t.is_confirmed()] #确定态轨迹集合 unconfirmed_tracks = [i for i, t in enumerate(self.tracks) if not t.is_confirmed()] #不确定态轨迹集合 matches_a, unmatched_tracks_a, unmatched_detections = linear_assignment.matching_cascade( self._full_cost_metric, #级联匹配中相似度矩阵计算函数 linear_assignment.INFTY_COST - 1, #相似度矩阵阈值,因计算函数已将相似度矩阵用阈值处理,故此处阈值设置为无效值 self.max_age, self.tracks, detections, confirmed_tracks, ) #确定态轨迹与检测目标框进行级联匹配 iou_track_candidates = unconfirmed_tracks + [ k for k in unmatched_tracks_a if self.tracks[k].time_since_update == 1 ] #选择用于IOU匹配的轨迹集合,即不确定态轨迹集合+级联匹配后匹配失败且失败次数为1的轨迹 unmatched_tracks_a = [ k for k in unmatched_tracks_a if self.tracks[k].time_since_update != 1 ] #剔除级联匹配后匹配失败轨迹集合中失败次数为1的轨迹 matches_b, unmatched_tracks_b, unmatched_detections = linear_assignment.min_cost_matching( iou_matching.iou_cost, #IOU匹配中IOU矩阵计算函数 self.max_iou_distance, #IOU矩阵阈值 self.tracks, detections, iou_track_candidates, unmatched_detections, ) #级联匹配后对不确定态轨迹+匹配失败且失败次数为1的轨迹集合与匹配失败目标框进行IOU匹配 matches = matches_a + matches_b #叠加级联匹配和IOU匹配后匹配成功的轨迹和目标框 unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b)) #剔除匹配失败轨迹集合中的重复项 return matches, unmatched_tracks, unmatched_detections
级联匹配和IOU匹配封装于Tracker类中用于轨迹和目标框的关联,结果为匹配成功轨迹和目标框、匹配失败轨迹以及匹配失败目标框,依据对应关联结果进行相应的后处理,结合4.2.2Tracker类核心代码一起阅读。
其中级联匹配和IOU匹配的具体实现代码如下:
INFTY_COST = 1e+5 #设置的无效值def min_cost_matching(distance_metric, max_distance, tracks, detections, track_indices=None, detection_indices=None): ''' 进行轨迹与目标框的匹配关联 distance_metric:计算用于(级联/IOU)匹配代价矩阵的函数 max_distance:阈值,大于该阈值被认为是无效的 tracks:当前时刻(卡尔曼滤波预测后)的轨迹集合 detections:当前时刻的目标框集合 track_indices:用于匹配的轨迹索引集合 detection_indices:用于匹配的目标框索引集合 ''' if track_indices is None: track_indices = np.arange(len(tracks)) #若轨迹索引集合为None:设置对应轨迹索引集合 if detection_indices is None: detection_indices = np.arange(len(detections)) #若目标框索引集合为None,设置对应目标框索引集合 if len(detection_indices) == 0 or len(track_indices) == 0: return [], track_indices, detection_indices #没有需要匹配的轨迹或目标框 cost_matrix = distance_metric( tracks, detections, track_indices, detection_indices) #计算用于匹配关联的代价矩阵 cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5 #将大于阈值部分设置为对应无效值 row_indices, col_indices = linear_sum_assignment(cost_matrix) #进行匈牙利算法,根据对应矩阵获取最小匹配代价#根据匈牙利算法和代价矩阵,获取匹配成功轨迹和目标框、匹配失败轨迹以及匹配失败目标框 matches, unmatched_tracks, unmatched_detections = [], [], [] for col, detection_idx in enumerate(detection_indices): if col not in col_indices: unmatched_detections.append(detection_idx) #匹配失败目标框 for row, track_idx in enumerate(track_indices): if row not in row_indices: unmatched_tracks.append(track_idx) #匹配失败轨迹 for row, col in zip(row_indices, col_indices): track_idx = track_indices[row] detection_idx = detection_indices[col] if cost_matrix[row, col] > max_distance: #代价矩阵中大于设定阈值被认定为无效 unmatched_tracks.append(track_idx) #匹配失败轨迹 unmatched_detections.append(detection_idx) #匹配失败目标框 else: matches.append((track_idx, detection_idx)) #匹配成功轨迹和目标框 return matches, unmatched_tracks, unmatched_detectionsdef matching_cascade(distance_metric, max_distance, cascade_depth, tracks, detections, track_indices=None, detection_indices=None): ''' 进行级联匹配 distance_metric:计算用于级联匹配相似度矩阵的函数 max_distance:阈值,大于该阈值被认为是无效的 cascade_depth:级联匹配的深度,与轨迹状态由不确定态到删除态所需连续匹配失败的次数(_max_age)相同 tracks:当前时刻(卡尔曼滤波预测后)的轨迹集合 detections:当前时刻的目标框集合 track_indices:用于匹配的轨迹索引集合 detection_indices:用于匹配的目标框索引集合 ''' if track_indices is None: track_indices = list(range(len(tracks))) #若轨迹索引集合为None:设置对应轨迹索引集合 if detection_indices is None: detection_indices = list(range(len(detections))) #若目标框索引集合为None,设置对应目标框索引集合 unmatched_detections = detection_indices #初始化未匹配目标 matches = [] for level in range(cascade_depth): #按深度进行级联匹配 if len(unmatched_detections) == 0: #没有需要匹配的目标 break track_indices_l = [ k for k in track_indices if tracks[k].time_since_update == 1 + level ] #连续匹配失败次数与级联匹配深度对应的轨迹集合 if len(track_indices_l) == 0: #该深度没有需要匹配的轨迹 continue matches_l, _, unmatched_detections = \ min_cost_matching( distance_metric, max_distance, tracks, detections, track_indices_l, unmatched_detections) #对轨迹和目标框进行匹配关联 matches += matches_l #匹配成功的轨迹和目标框 unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches)) #匹配失败的轨迹,即用于匹配的轨迹减去匹配成功轨迹 return matches, unmatched_tracks, unmatched_detectionsdef gate_cost_matrix( kf, cost_matrix, tracks, detections, track_indices, detection_indices, gated_cost=INFTY_COST, only_position=False): ''' 通过门控距离对外观语义特征相似度矩阵进行限制,代码中将该部分转换并整合至_full_cost_metric函数中,故没用到该函数 kf:卡尔曼滤波 cost_matrix:外观语义特征相似度矩阵 tracks:当前时刻(卡尔曼滤波预测后)的轨迹集合 detections:当前时刻的目标框集合 track_indices:用于匹配的轨迹索引集合 detection_indices:用于匹配的目标框索引集合 gated_cost:无效值,大于阈值则设定为无效值 only_position:是否只考虑位置分量 ''' gating_dim = 2 if only_position else 4 #根据是否只考虑位置分量选择维度 gating_threshold = kalman_filter.chi2inv95[gating_dim] #根据维度(自由度)选择门控距离阈值 measurements = np.asarray( [detections[i].to_xyah() for i in detection_indices]) #测量值矩阵 #计算轨迹状态分布和测量分布之间的门控距离对外观语义特征相似度矩阵进行限制 for row, track_idx in enumerate(track_indices): track = tracks[track_idx] gating_distance = kf.gating_distance( track.mean, track.covariance, measurements, only_position) #计算门控距离 cost_matrix[row, gating_distance > gating_threshold] = gated_cost #门控距离大于阈值则被设置为无效值 return cost_matrix
级联匹配中关联轨迹和目标框的距离度量函数(metric)封装在NearestNeighborDistanceMetric类,具体代码如下:
def _pdist(a, b): ''' 计算对应矩阵的成对平方距离(欧式距离) a:维度N×M,N代表样本数量,M代表特征维数 b:维度L×M,L代表样本数量,M代表特征维数 ''' a, b = np.asarray(a), np.asarray(b) #拷贝数据,防止原数据被更改 if len(a) == 0 or len(b) == 0: return np.zeros((len(a), len(b))) #存在空矩阵,则返回对应维度0矩阵 a2, b2 = np.square(a).sum(axis=1), np.square(b).sum(axis=1) #计算各个特征的平方和,维度变为N×1和L×1 r2 = -2. * np.dot(a, b.T) + a2[:, None] + b2[None, :] #计算距离矩阵,即a2+b2.T-2ab.T对应维度[N×1]+[1×L]-2[N×M][M×L], #由于numpy存在传播机制(broadcast),[N×1]+[1×L]会传播为[N×L]+[N×L] r2 = np.clip(r2, 0., float(np.inf)) #限制距离范围为0-inf值 return r2 #维度为N×Ldef _cosine_distance(a, b, data_is_normalized=False): ''' 计算对应矩阵的成对余弦距离 a:维度N×M,N代表样本数量,M代表特征维数 b:维度L×M,L代表样本数量,M代表特征维数 data_is_normalized:a、b矩阵的特征是否归一化 ''' if not data_is_normalized: #进行归一化操作 a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True) #对a矩阵归一化 b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True) #对b矩阵归一化 #其中归一化因子为单个样本特征向量的2范数,即按特征维度计算特征的平方和(维度为N×1和L×1),keedims将维度保持与原矩阵相同 return 1. - np.dot(a, b.T) #余弦距离=1-余弦相似度,维度为N×Ldef _nn_euclidean_distance(x, y): ''' 用于NearestNeighborDistanceMetric类中的欧式距离函数 x:维度N×M,N代表样本数量,M代表特征维数 y:维度L×M,L代表样本数量,M代表特征维数 ''' #这里运用torchreid下的distance包中的计算距离函数,原理与_pdist函数相同,实现方式略有不同 x_ = torch.from_numpy(np.asarray(x) / np.linalg.norm(x, axis=1, keepdims=True)) #归一化操作 y_ = torch.from_numpy(np.asarray(y) / np.linalg.norm(y, axis=1, keepdims=True)) #归一化操作 distances = compute_distance_matrix(x_, y_, metric='euclidean') #计算欧式距离矩阵 return np.maximum(0.0, torch.min(distances, axis=0)[0].numpy()) #按维度N取对应最小值,返回维度L×1def _nn_cosine_distance(x, y): ''' 用于NearestNeighborDistanceMetric类中的余弦距离函数 x:维度N×M,N代表样本数量,M代表特征维数 y:维度L×M,L代表样本数量,M代表特征维数 ''' #这里运用torchreid下的distance包中的计算距离函数,原理与_cosine_distance函数相同,实现方式略有不同 x_ = torch.from_numpy(np.asarray(x)) #将numpy转换为torch y_ = torch.from_numpy(np.asarray(y)) distances = compute_distance_matrix(x_, y_, metric='cosine') #计算余弦距离矩阵 distances = distances.cpu().detach().numpy() #转换为numpy类型 return distances.min(axis=0) #按维度N取对应最小值,返回维度L×1class NearestNeighborDistanceMetric(object): ''' 度量轨迹和目标框最近距离,即根据对应确认态轨迹(同一ID)以往所有时刻的外观语义特征与当前目标框的外观语义特征, 计算轨迹和目标框的关联距离(相似度) ''' def __init__(self, metric, matching_threshold, budget=None): ''' metric:选择度量距离的方式,欧式距离或余弦距离 matching_threshold:阈值,大于该阈值被认为是无效值 budget:存储确认态轨迹(同一ID)以往所有时刻的外观语义特征的容量,如果为None,则认为不限制容量 sample:存储确认态轨迹(同一ID)以往所有时刻的外观语义特征字典 '''#选择距离度量方式 if metric == "euclidean": self._metric = _nn_euclidean_distance #欧式距离 elif metric == "cosine": self._metric = _nn_cosine_distance #余弦距离 else: raise ValueError( "Invalid metric; must be either 'euclidean' or 'cosine'") #报错:无效距离方式 self.matching_threshold = matching_threshold self.budget = budget self.samples = {} def partial_fit(self, features, targets, active_targets): ''' 更新确认态轨迹(同一ID)以往所有时刻的外观语义特征字典 feautures:确认态轨迹的外观语义特征矩阵 targets:外观语义特征对应的确认态轨迹ID集合 active_targets:确定态轨迹ID集合 ''' for feature, target in zip(features, targets): self.samples.setdefault(target, []).append(feature) #添加或新增最新的外观语义特征 if self.budget is not None: self.samples[target] = self.samples[target][-self.budget:] #根据容量设置,剔除最早的外观语义特征 self.samples = {k: self.samples[k] for k in active_targets} #更新确认态轨迹(同一ID)以往所有时刻的外观语义特征字典er def distance(self, features, targets): ''' 计算目标框的外观语义特征与对应确认态轨迹(同一ID)以往所有的外观语义特征(欧式或余弦)距离矩阵 feautures:目标框的外观语义特征矩阵 targets:确认态轨迹ID集合 ''' cost_matrix = np.zeros((len(targets), len(features))) #距离矩阵初始化 for i, target in enumerate(targets): #逐一计算轨迹与外观语义特征矩阵距离 cost_matrix[i, :] = self._metric(self.samples[target], features) #计算距离,返回对应维度向量 return cost_matrix
级联匹配使用门控距离矩阵(运动特征)和外观语义特征距离矩阵(外观特征)加权计算代价矩阵,其中门控距离和外观语义特征距离都通过对应的阈值限制过大的值。匹配过程根据最大级联匹配深度(跟_max_age相同)逐层进行目标框与轨迹的关联,即根据连续匹配失败次数(time_since_update)与匹配深度对应,实现匹配失败次数少的轨迹优先匹配,失败次数多的轨迹靠后匹配。通过级联匹配,可以重新将被遮挡后重现的目标找回,降低ID-Switch。
IOU匹配使用IOU矩阵当作代价矩阵,其中的IOU计算就是通过轨迹和目标框的重叠面积除以总面积来完成,具体实现在iou_matching模块中,可参考源码。匹配过程根据对应轨迹和目标框直接进行关联。
4.3.3 匈牙利算法
匈牙利算法基于代价矩阵找到最小代价的分配方法,是解决分配问题中最优匹配(最小代价)的算法。
其中依据定理:
代价矩阵的行或列同时加或减一个数,得到新的代价矩阵的最优匹配与原代价矩阵相同。
算法步骤:
(1)若代价矩阵不为方阵,则在相应位置补0转换为方阵
(2)代价矩阵每一行减去此行最小值后每一列减去此列最小值
(3)用最少的横线或竖线覆盖代价矩阵中的所有0元素
(4)找出(3)中未被覆盖元素的最小值,且未被覆盖元素减去该最小值;对覆盖直线交叉点元素加上该最小值
(5)重复(3)和(4),直到覆盖线的数量等于对应方阵的维度数
过程演示(假设代价矩阵维度4×3):
匈牙利算法将确认态轨迹tracks与当前时刻的检测目标框detections进行关联,其中存在对应代价矩阵不为方阵的情况需进行填充操作(如det’列),对应的最小代价匹配结果需忽略填充项(如图中虚线部分)。针对算法步骤(3)(4)的具体代码实现有多种版本,源代码通过scipy库中的linear_sum_assignment函数实现匈牙利算法。