Source code for cosense3d.modules.plugin.target_assigners

import math
from abc import ABCMeta, abstractmethod
from functools import partial
from typing import List, Dict, Optional, Tuple

import torch
from torch import nn
import torch_scatter
from scipy.optimize import linear_sum_assignment

from cosense3d.utils.box_utils import (bbox_xyxy_to_cxcywh,
                                       bbox_cxcywh_to_xyxy,
                                       normalize_bbox,
                                       boxes3d_to_standup_bboxes,
                                       rotate_points_batch)
from cosense3d.utils.pclib import rotate_points_along_z_torch
from cosense3d.utils.iou2d_calculator import bbox_overlaps
from cosense3d.modules.utils.gaussian_utils import gaussian_2d
from cosense3d.modules.utils.gevbev_utils import draw_sample_evis, weighted_mahalanobis_dists
from cosense3d.modules.utils.me_utils import metric2indices, update_me_essentials
from cosense3d.modules.utils.box_coder import build_box_coder
from cosense3d.ops.iou3d_nms_utils import boxes_iou3d_gpu
from cosense3d.dataset.const import CoSenseBenchmarks as csb
from cosense3d.modules.utils.common import pad_r, pad_l, meshgrid
from cosense3d.ops.utils import points_in_boxes_gpu
from cosense3d.modules.losses import pred_to_conf_unc
from cosense3d.utils.misc import PI


[docs]def sample_mining(scores: torch.Tensor, labels: torch.Tensor, dists=None, sample_mining_thr=0.5, max_sample_ratio=5, max_num_sample=None): """ When only limited numbers of negative targets are sampled for training, and the majority of the negative samples are ignored, then there is a high probability that hard negative targets are also ignored. This will weaken the model to learn from these hard negative targets and generate a lot of false positives. Therefore, this function mines the samples that have high predictive scores as training targets. This function should be used after 'pos_neg_sampling'. :param scores: (N1, ...Nk) classification scores/confidences that the sample belong to foreground. :param labels: (N1..., Nk) class labels, -1 indicates ignore, 0 indicates negative, positive numbers indicates classes. :param dists: distances. :param sample_mining_thr: score threshold for sampling :param max_sample_ratio: `n_sample` / `n_pos_sample` :param max_num_sample: maximum number of samples. :return: """ assert scores.ndim == labels.ndim assert scores.shape == labels.shape pred_pos = scores > sample_mining_thr if dists is not None: # only mine points that are not too close to the real positive samples pred_pos[dists < 3] = False not_cared = labels == -1 sample_inds = torch.where(torch.logical_and(pred_pos, not_cared))[0] n_pos = (labels > 0).sum() max_num_sample = int(n_pos * max_sample_ratio) if max_num_sample is None else max_num_sample if len(sample_inds) > max_num_sample: sample_inds = sample_inds[torch.randperm(len(sample_inds))[:max_num_sample]] labels[sample_inds] = 0 return labels
[docs]def pos_neg_sampling(labels: torch.Tensor, pos_neg_ratio: float) -> torch.Tensor: """ Downsample negative targets. :param labels: class labels. :param pos_neg_ratio: ratio = num_neg_samples / num_pos_samples. :return: class labels with -1 labels to be ignored during training. """ pos = labels > 0 neg = labels == 0 n_neg_sample = pos.sum(dim=0) * pos_neg_ratio if neg.sum() > n_neg_sample: neg_inds = torch.where(neg)[0] perm = torch.randperm(len(neg_inds))[n_neg_sample:] labels[neg_inds[perm]] = -1 return labels
[docs]class BaseAssigner(metaclass=ABCMeta): """Base assigner."""
[docs] @abstractmethod def assign(self, *args, **kwargs): """Assign preds to targets."""
[docs]class MatchCost: """This class is modified from mmdet."""
[docs] @staticmethod def classification(cls_pred: torch.Tensor, gt_labels: torch.Tensor, weight: float=1.0) -> torch.Tensor: """ :param cls_pred: Predicted classification logits, shape (num_query, num_class). :param gt_labels: Label of `gt_bboxes`, shape (num_gt,). :param weight: loss_weight. :return: cls_cost value with weight """ # Following the official DETR repo, contrary to the loss that # NLL is used, we approximate it in 1 - cls_score[gt_label]. # The 1 is a constant that doesn't change the matching, # so it can be omitted. cls_score = cls_pred.softmax(-1) cls_cost = -cls_score[:, gt_labels] return cls_cost * weight
[docs] @staticmethod def bboxl1(bbox_pred: torch.Tensor, gt_bboxes: torch.Tensor, weight: float=1., box_format: str='xyxy') -> torch.Tensor: """ :param bbox_pred: Predicted boxes with normalized coordinates (cx, cy, w, h), which are all in range [0, 1]. Shape (num_query, 4). :param gt_bboxes: Ground truth boxes with normalized coordinates (x1, y1, x2, y2). Shape (num_gt, 4). :param weight: loss_weight. :param box_format: 'xyxy' for DETR, 'xywh' for Sparse_RCNN. :return: bbox_cost value with weight """ if box_format == 'xywh': gt_bboxes = bbox_xyxy_to_cxcywh(gt_bboxes) elif box_format == 'xyxy': bbox_pred = bbox_cxcywh_to_xyxy(bbox_pred) else: raise NotImplementedError bbox_cost = torch.cdist(bbox_pred, gt_bboxes, p=1) return bbox_cost * weight
[docs] @staticmethod def giou(bboxes: torch.Tensor, gt_bboxes: torch.Tensor, weight: float=1.0): """ :param bboxes: Predicted boxes with unnormalized coordinates (x1, y1, x2, y2). Shape (num_query, 4). :param gt_bboxes: Ground truth boxes with unnormalized coordinates (x1, y1, x2, y2). Shape (num_gt, 4). :param weight: loss weight. :return: giou_cost value with weight """ # overlaps: [num_bboxes, num_gt] overlaps = bbox_overlaps( bboxes, gt_bboxes, mode="giou", is_aligned=False) # The 1 is a constant that doesn't change the matching, so omitted. iou_cost = -overlaps return iou_cost * weight
[docs] @staticmethod def iou(bboxes, gt_bboxes, weight=1.0): """See giou""" # overlaps: [num_bboxes, num_gt] overlaps = bbox_overlaps( bboxes, gt_bboxes, mode="iou", is_aligned=False) # The 1 is a constant that doesn't change the matching, so omitted. iou_cost = -overlaps return iou_cost * weight
[docs] @staticmethod def l1(pred, gt, weight=1.0): """L1 distance between pred and gt Tensors""" cost = torch.cdist(pred, gt, p=1) return cost * weight
[docs] @staticmethod def binary_focal_loss(cls_pred, gt_labels, weight=1., alpha=0.25, gamma=2, eps=1e-12,): cls_pred = cls_pred.flatten(1) gt_labels = gt_labels.flatten(1).float() n = cls_pred.shape[1] cls_pred = cls_pred.sigmoid() neg_cost = -(1 - cls_pred + eps).log() * ( 1 - alpha) * cls_pred.pow(gamma) pos_cost = -(cls_pred + eps).log() * alpha * ( 1 - cls_pred).pow(gamma) cls_cost = torch.einsum('nc,mc->nm', pos_cost, gt_labels) + \ torch.einsum('nc,mc->nm', neg_cost, (1 - gt_labels)) return cls_cost / n * weight
[docs] @staticmethod def focal_loss(cls_pred, gt_labels, weight=1., alpha=0.25, gamma=2, eps=1e-12,): cls_pred = cls_pred.sigmoid() neg_cost = -(1 - cls_pred + eps).log() * ( 1 - alpha) * cls_pred.pow(gamma) pos_cost = -(cls_pred + eps).log() * alpha * ( 1 - cls_pred).pow(gamma) cls_cost = pos_cost[:, gt_labels] - neg_cost[:, gt_labels] return cls_cost * weight
[docs] def build(self, type, **kwargs): return partial(getattr(self, type), **kwargs)
[docs]class HungarianAssigner2D(BaseAssigner): """Computes one-to-one matching between predictions and ground truth. This class computes an assignment between the targets and the predictions based on the costs. The costs are weighted sum of three components: classification cost, regression L1 cost, regression iou cost and center2d l1 cost. The assignment is done in the following steps, the order matters. 1. assign every prediction to -1 2. compute the weighted costs 3. do Hungarian matching on CPU based on the costs 4. assign all to 0 (background) first, then for each matched pair between predictions and gts, treat this prediction as foreground and assign the corresponding gt index (plus 1) to it. """ def __init__(self, cls_cost=dict(type='classification', weight=1.), reg_cost=dict(type='bboxl1', weight=1.0), iou_cost=dict(type='giou', weight=1.0), centers2d_cost=dict(type='l1', weight=1.0)): cost_builder = MatchCost() self.cls_cost = cost_builder.build(**cls_cost) self.reg_cost = cost_builder.build(**reg_cost) self.iou_cost = cost_builder.build(**iou_cost) self.centers2d_cost = cost_builder.build(**centers2d_cost)
[docs] def assign(self, bbox_pred, cls_pred, pred_centers2d, gt_bboxes, gt_labels, centers2d, img_size, eps: float = 1e-7 ): """Computes one-to-one matching based on the weighted costs. This method assign each query prediction to a ground truth or background. The `assigned_gt_inds` with -1 means don't care, 0 means negative sample, and positive number is the index (1-based) of assigned gt. The assignment is done in the following steps, the order matters. 1. assign every prediction to -1 2. compute the weighted costs 3. do Hungarian matching on CPU based on the costs 4. assign all to 0 (background) first, then for each matched pair between predictions and gts, treat this prediction as foreground and assign the corresponding gt index (plus 1) to it. :param bbox_pred: Predicted boxes with normalized coordinates (cx, cy, w, h), which are all in range [0, 1]. Shape [num_query, 4]. :param cls_pred: Predicted classification logits, shape [num_query, num_class]. :param pred_centers2d: prediction 2d center points. :param gt_bboxes: ground truth bboxes. :param gt_labels: Label of `gt_bboxes`, shape (num_gt,). img_size: input image size. :param centers2d: 2d center points. :param img_size: input image size. :param eps: A value added to the denominator for numerical stability. Default 1e-7. :return: """ num_gts, num_bboxes = gt_bboxes.size(0), bbox_pred.size(0) # 1. assign -1 by default assigned_gt_inds = bbox_pred.new_full((num_bboxes, ), -1, dtype=torch.long) assigned_labels = bbox_pred.new_full((num_bboxes, ), -1, dtype=torch.long) if num_gts == 0 or num_bboxes == 0: # No ground truth or boxes, return empty assignment if num_gts == 0: # No ground truth, assign all to background assigned_gt_inds[:] = 0 return num_gts, assigned_gt_inds, assigned_labels img_h, img_w = img_size factor = gt_bboxes.new_tensor([img_w, img_h, img_w, img_h]).unsqueeze(0) # 2. compute the weighted costs # classification and bboxcost. cls_cost = self.cls_cost(cls_pred, gt_labels) # regression L1 cost normalize_gt_bboxes = gt_bboxes / factor reg_cost = self.reg_cost(bbox_pred, normalize_gt_bboxes) # regression iou cost, defaultly giou is used in official DETR. bboxes = bbox_cxcywh_to_xyxy(bbox_pred) * factor iou_cost = self.iou_cost(bboxes, gt_bboxes) # center2d L1 cost normalize_centers2d = centers2d / factor[:, 0:2] centers2d_cost = self.centers2d_cost(pred_centers2d, normalize_centers2d) # weighted sum of above four costs cost = cls_cost + reg_cost + iou_cost + centers2d_cost cost = torch.nan_to_num(cost, nan=100.0, posinf=100.0, neginf=-100.0) # 3. do Hungarian matching on CPU using linear_sum_assignment cost = cost.detach().cpu() matched_row_inds, matched_col_inds = linear_sum_assignment(cost) matched_row_inds = torch.from_numpy(matched_row_inds).to( bbox_pred.device) matched_col_inds = torch.from_numpy(matched_col_inds).to( bbox_pred.device) # 4. assign backgrounds and foregrounds # assign all indices to backgrounds first assigned_gt_inds[:] = 0 # assign foregrounds based on matching results assigned_gt_inds[matched_row_inds] = matched_col_inds + 1 assigned_labels[matched_row_inds] = gt_labels[matched_col_inds] return num_gts, assigned_gt_inds, assigned_labels
[docs]class HungarianAssigner3D(BaseAssigner): def __init__(self, cls_cost=dict(type='focal_loss', weight=1.0), reg_cost=dict(type='l1', weight=1.0), iou_cost=dict(type='iou', weight=1.0)): cost_builder = MatchCost() self.cls_cost = cost_builder.build(**cls_cost) self.reg_cost = cost_builder.build(**reg_cost) self.iou_cost = cost_builder.build(**iou_cost)
[docs] def assign(self, bbox_pred, cls_pred, gt_bboxes, gt_labels, code_weights=None, eps=1e-7): num_gts, num_bboxes = gt_bboxes.size(0), bbox_pred.size(0) # 1. assign -1 by default assigned_gt_inds = bbox_pred.new_full((num_bboxes,), -1, dtype=torch.long) assigned_labels = bbox_pred.new_full((num_bboxes,), -1, dtype=torch.long) if num_gts == 0 or num_bboxes == 0: # No ground truth or boxes, return empty assignment if num_gts == 0: # No ground truth, assign all to background assigned_gt_inds[:] = 0 return num_gts, assigned_gt_inds, assigned_labels # 2. compute the weighted costs # classification and bboxcost. cls_cost = self.cls_cost(cls_pred, gt_labels) # regression L1 cost normalized_gt_bboxes = normalize_bbox(gt_bboxes) if code_weights is not None: bbox_pred = bbox_pred * code_weights normalized_gt_bboxes = normalized_gt_bboxes * code_weights reg_cost = self.reg_cost(bbox_pred[:, :8], normalized_gt_bboxes[:, :8]) # weighted sum of above two costs cost = cls_cost + reg_cost # 3. do Hungarian matching on CPU using linear_sum_assignment cost = cost.detach().cpu() cost = torch.nan_to_num(cost, nan=100.0, posinf=100.0, neginf=-100.0) matched_row_inds, matched_col_inds = linear_sum_assignment(cost) matched_row_inds = torch.from_numpy(matched_row_inds).to( bbox_pred.device) matched_col_inds = torch.from_numpy(matched_col_inds).to( bbox_pred.device) # 4. assign backgrounds and foregrounds # assign all indices to backgrounds first assigned_gt_inds[:] = 0 # assign foregrounds based on matching results assigned_gt_inds[matched_row_inds] = matched_col_inds + 1 assigned_labels[matched_row_inds] = gt_labels[matched_col_inds] # # 5. align matched pred and gt # aligned_tgt_boxes = torch.zeros_like(bbox_pred) # assign_mask = assigned_gt_inds > 0 # aligned_tgt_boxes[assign_mask] = normalized_gt_bboxes[assigned_gt_inds[assign_mask] - 1] # from projects.utils.vislib import draw_points_boxes_plt # vis_boxes_pred = denormalize_bbox(bbox_pred[assign_mask], self.pc_range)[:, :-2] # vis_boxes_pred[:, :2] /= code_weights[:2] # vis_boxes_gt = denormalize_bbox(aligned_tgt_boxes[assign_mask], self.pc_range)[:, :-2] # vis_boxes_gt[:, :2] /= code_weights[:2] # draw_points_boxes_plt( # pc_range=51.2, # boxes_pred=vis_boxes_pred.detach().cpu().numpy(), # bbox_pred_label=[str(i) for i in range(vis_boxes_pred.shape[0])], # boxes_gt=vis_boxes_gt.detach().cpu().numpy(), # bbox_gt_label=[str(i) for i in range(vis_boxes_gt.shape[0])], # filename='/home/yuan/Downloads/tmp.png' # ) return num_gts, assigned_gt_inds, assigned_labels
[docs]class HeatmapAssigner(BaseAssigner):
[docs] @staticmethod def draw_heatmap_gaussian(heatmap, center, radius, k=1): """Get gaussian masked heatmap. Args: heatmap (torch.Tensor): Heatmap to be masked. center (torch.Tensor): Center coord of the heatmap. radius (int): Radius of gaussian. k (int, optional): Multiple of masked_gaussian. Defaults to 1. Returns: torch.Tensor: Masked heatmap. """ diameter = 2 * radius + 1 gaussian = gaussian_2d((diameter, diameter), sigma=diameter / 6) x, y = int(center[0]), int(center[1]) height, width = heatmap.shape[0:2] left, right = min(x, radius), min(width - x, radius + 1) top, bottom = min(y, radius), min(height - y, radius + 1) masked_heatmap = heatmap[y - top:y + bottom, x - left:x + right] masked_gaussian = torch.from_numpy( gaussian[radius - top:radius + bottom, radius - left:radius + right]).to(heatmap.device, torch.float32) if min(masked_gaussian.shape) > 0 and min(masked_heatmap.shape) > 0: torch.max(masked_heatmap, masked_gaussian * k, out=masked_heatmap) return heatmap
[docs] def assign(self, obj_centers2d, obj_bboxes, img_shape, stride): img_h, img_w = img_shape[:2] heatmap = torch.zeros(img_h // stride, img_w // stride, device=obj_centers2d.device) if len(obj_centers2d) != 0: l = obj_centers2d[..., 0:1] - obj_bboxes[..., 0:1] t = obj_centers2d[..., 1:2] - obj_bboxes[..., 1:2] r = obj_bboxes[..., 2:3] - obj_centers2d[..., 0:1] b = obj_bboxes[..., 3:4] - obj_centers2d[..., 1:2] bound = torch.cat([l, t, r, b], dim=-1) radius = torch.ceil(torch.min(bound, dim=-1)[0] / 16) radius = torch.clamp(radius, 1.0).cpu().numpy().tolist() for center, r in zip(obj_centers2d, radius): heatmap = self.draw_heatmap_gaussian(heatmap, center / 16, radius=int(r), k=1) return heatmap
[docs]class BoxAnchorAssigner(BaseAssigner, torch.nn.Module): def __init__(self, box_size, dirs, voxel_size, lidar_range, stride, box_coder, pos_threshold=0.6, neg_threshold=0.45, score_thrshold=0.25, ): super().__init__() self.voxel_size = voxel_size self.lidar_range = lidar_range self.num_anchors = len(dirs) self.stride = stride self.pos_threshold = pos_threshold self.neg_threshold = neg_threshold self.score_thrshold = score_thrshold self.box_coder = build_box_coder(**box_coder) anchors, standup_anchors = self.get_anchor_template(box_size, dirs) self.anchors = nn.Parameter(anchors, requires_grad=False) self.standup_anchors = nn.Parameter(standup_anchors, requires_grad=False)
[docs] def get_anchor_template(self, box_size, dirs): pix_x = self.voxel_size[0] * self.stride pix_y = self.voxel_size[1] * self.stride x = torch.arange(self.lidar_range[0], self.lidar_range[3], pix_x) + pix_x * 0.5 y = torch.arange(self.lidar_range[1], self.lidar_range[4], pix_y) + pix_y * 0.5 xys = torch.stack(torch.meshgrid(x, y, indexing='ij'), dim=-1) xys = xys.unsqueeze(2).repeat(1, 1, self.num_anchors, 1) zs = - torch.ones_like(xys[..., :1]) h, w = xys.shape[:2] lwh = torch.tensor(box_size).reshape( 1, 1, 1, -1).repeat(h, w, self.num_anchors, 1) rs = torch.deg2rad(torch.tensor(dirs)).reshape( 1, 1, -1, 1).repeat(h, w, 1, 1) # (w, h, num_anchor, 7) --> (whn, 7) anchors = torch.cat([xys, zs, lwh, rs], dim=-1) self.anchor_shape = anchors.shape anchors = anchors.view(-1, 7) standup_anchors = boxes3d_to_standup_bboxes(anchors) return anchors, standup_anchors
[docs] def assign(self, gt_boxes): """ Parameters ---------- gt_boxes Tensor(N, 7): [x, y, z, l, w, h, r, ...] Returns ------- reg Tensor(H, W, num_anchors, code_size): box regression targets """ if len(gt_boxes) == 0: labels = gt_boxes.new_full((self.standup_anchors.shape[0],), -1) reg_tgt = gt_boxes.new_zeros((0, self.box_coder.code_size)) dir_scores = gt_boxes.new_zeros((0, 4)) # Todo dir_score, gt_boxes, correct shape return labels, reg_tgt, dir_scores standup_boxes = boxes3d_to_standup_bboxes(gt_boxes[:, :7]) ious = self.box_overlaps(self.standup_anchors, standup_boxes) iou_max, max_inds = ious.max(dim=1) top1_inds = torch.argmax(ious, dim=0) pos = iou_max > self.pos_threshold pos_inds = torch.cat([top1_inds, torch.where(pos)[0]]).unique() neg = iou_max < self.neg_threshold neg[pos_inds] = False labels = gt_boxes.new_full((ious.shape[0],), -1) labels[neg] = 0 labels[pos_inds] = 1 aligned_gt_boxes = gt_boxes[max_inds[pos_inds], :7] aligned_anchors = self.anchors[pos_inds] reg_tgt, dir_score = self.box_coder.encode(aligned_anchors, aligned_gt_boxes) return labels, reg_tgt, dir_score
[docs] def box_overlaps(self, boxes1, boxes2): areas1 = (boxes1[:, 2] - boxes1[:, 0] + 1) * \ (boxes1[:, 3] - boxes1[:, 1] + 1) areas2 = (boxes2[:, 2] - boxes2[:, 0] + 1) * \ (boxes2[:, 3] - boxes2[:, 1] + 1) boxes1_mat = boxes1.unsqueeze(1).repeat(1, boxes2.shape[0], 1) boxes2_mat = boxes2.unsqueeze(0).repeat(boxes1.shape[0], 1, 1) x_extend = torch.minimum(boxes1_mat[..., 2], boxes2_mat[..., 2]) - \ torch.maximum(boxes1_mat[..., 0], boxes2_mat[..., 0]) + 1 y_extend = torch.minimum(boxes1_mat[..., 3], boxes2_mat[..., 3]) - \ torch.maximum(boxes1_mat[..., 1], boxes2_mat[..., 1]) + 1 overlaps = torch.zeros_like(boxes1_mat[..., 0]) pos = torch.logical_and(x_extend > 0, y_extend > 0) intersection = x_extend[pos] * y_extend[pos] union = (areas1.unsqueeze(1) + areas2.unsqueeze(0))[pos] - intersection overlaps[pos] = intersection / union return overlaps
[docs] def get_predictions(self, preds): # roi = {'box': [], 'scr': [], 'lbl': [], 'idx': []} roi = {} B = len(preds['cls']) pred_cls = preds['cls'].sigmoid().permute(0, 3, 2, 1).reshape(B, -1) pred_reg = preds['reg'].permute(0, 3, 2, 1).reshape(B, -1, 7) indices = torch.stack([torch.ones_like(pred_cls[0]) * i for i in range(B)], dim=0) anchors = self.anchors.unsqueeze(0).repeat(B, 1, 1) pos = pred_cls > self.score_thrshold boxes_dec = self.box_coder.decode(anchors, pred_reg) # remove abnormal boxes mask = (boxes_dec[..., 3:6] > 0.1) & (boxes_dec[..., 3:6] < 10) pos = torch.logical_and(pos, mask.all(dim=-1)) pred_cls = pred_cls[pos] pred_box = boxes_dec[pos] roi['scr'] = pred_cls roi['box'] = pred_box # TODO currently only support class car roi['lbl'] = torch.zeros_like(pred_cls) roi['idx'] = indices[pos] return roi
[docs]class BoxSparseAnchorAssigner(BaseAssigner, torch.nn.Module): def __init__(self, box_size, dirs, voxel_size, lidar_range, stride, box_coder, me_coor=True, pos_threshold=0.6, neg_threshold=0.45, score_thrshold=0.25, ): super().__init__() self.voxel_size = voxel_size self.lidar_range = lidar_range self.num_anchors = len(dirs) self.stride = stride self.pos_threshold = pos_threshold self.neg_threshold = neg_threshold self.score_thrshold = score_thrshold self.box_coder = build_box_coder(**box_coder) anchors, standup_anchors = self.get_anchor_template(box_size, dirs) self.anchors = nn.Parameter(anchors, requires_grad=False) self.standup_anchors = nn.Parameter(standup_anchors, requires_grad=False) if me_coor: lr = lidar_range res_x, res_y = stride * voxel_size[0], stride * voxel_size[1] self.size_x = round((lr[3] - lr[0]) / res_x) self.size_y = round((lr[4] - lr[1]) / res_y) self.offset_sz_x = round(lr[0] / res_x) self.offset_sz_y = round(lr[1] / res_y) self.coor_to_inds = self.me_coor_to_grid_indices else: raise NotImplementedError
[docs] def me_coor_to_grid_indices(self, coor): inds = coor / self.stride inds[:, 0] -= self.offset_sz_x inds[:, 1] -= self.offset_sz_y in_range_mask = (inds >= 0).all(dim=-1) & (inds[:, 0] < self.size_x) & (inds[:, 1] < self.size_y) return inds[in_range_mask].long(), in_range_mask
[docs] def get_anchor_template(self, box_size, dirs): pix_x = self.voxel_size[0] * self.stride pix_y = self.voxel_size[1] * self.stride x = torch.arange(self.lidar_range[0], self.lidar_range[3], pix_x) + pix_x * 0.5 y = torch.arange(self.lidar_range[1], self.lidar_range[4], pix_y) + pix_y * 0.5 xys = torch.stack(torch.meshgrid(x, y, indexing='ij'), dim=-1) xys = xys.unsqueeze(2).repeat(1, 1, self.num_anchors, 1) zs = - torch.ones_like(xys[..., :1]) h, w = xys.shape[:2] lwh = torch.tensor(box_size).reshape( 1, 1, 1, -1).repeat(h, w, self.num_anchors, 1) rs = torch.deg2rad(torch.tensor(dirs)).reshape( 1, 1, -1, 1).repeat(h, w, 1, 1) # (w, h, num_anchor, 7) --> (whn, 7) anchors = torch.cat([xys, zs, lwh, rs], dim=-1) standup_anchors = boxes3d_to_standup_bboxes( anchors.view(-1, 7)).reshape(h, w, self.num_anchors, 4) return anchors, standup_anchors
[docs] def assign(self, coors: torch.Tensor, gt_boxes: torch.Tensor): """ :param coors: (N, 2) 2D mink coor [x, y] :param gt_boxes: (M, 7) [x, y, z, l, w, h, r] :return: - labels Tensor(N, num_anchors): box regression targets - reg_tgt Tensor(N, num_anchors, code_size): box regression targets - ir_score Tensor(N, num_anchors, 4) or None: direction score target """ gt_boxes = gt_boxes[:, :7] if len(gt_boxes) == 0: labels = gt_boxes.new_full((coors.shape[0] * self.num_anchors,), -1) reg_tgt = gt_boxes.new_zeros((0, self.box_coder.code_size)) dir_scores = gt_boxes.new_zeros((0, 4)) # Todo dir_score, gt_boxes, correct shape return labels, reg_tgt, dir_scores inds, in_range_mask = self.coor_to_inds(coors) gt_standup_boxes = boxes3d_to_standup_bboxes(gt_boxes) standup_anchors = self.standup_anchors[inds[:, 0], inds[:, 1]].view(-1, 4) ious = self.box_overlaps(standup_anchors, gt_standup_boxes) iou_max, max_inds = ious.max(dim=1) top1_inds = torch.argmax(ious, dim=0) pos = iou_max > self.pos_threshold pos_inds = torch.cat([top1_inds, torch.where(pos)[0]]).unique() neg = iou_max < self.neg_threshold neg[pos_inds] = False labels = gt_boxes.new_full((ious.shape[0],), -1) labels[neg] = 0 labels[pos_inds] = 1 aligned_gt_boxes = gt_boxes[max_inds[pos_inds]] aligned_anchors = self.anchors[inds[:, 0], inds[:, 1]].view(-1, self.box_coder.code_size)[pos_inds] reg_tgt, dir_score = self.box_coder.encode(aligned_anchors, aligned_gt_boxes) labels_final = gt_boxes.new_full((in_range_mask.shape[0], self.num_anchors), -1) labels_final[in_range_mask] = labels.view(-1, self.num_anchors) return labels_final.view(-1), reg_tgt, dir_score
[docs] def box_overlaps(self, boxes1, boxes2): areas1 = (boxes1[:, 2] - boxes1[:, 0] + 1) * \ (boxes1[:, 3] - boxes1[:, 1] + 1) areas2 = (boxes2[:, 2] - boxes2[:, 0] + 1) * \ (boxes2[:, 3] - boxes2[:, 1] + 1) boxes1_mat = boxes1.unsqueeze(1).repeat(1, boxes2.shape[0], 1) boxes2_mat = boxes2.unsqueeze(0).repeat(boxes1.shape[0], 1, 1) x_extend = torch.minimum(boxes1_mat[..., 2], boxes2_mat[..., 2]) - \ torch.maximum(boxes1_mat[..., 0], boxes2_mat[..., 0]) + 1 y_extend = torch.minimum(boxes1_mat[..., 3], boxes2_mat[..., 3]) - \ torch.maximum(boxes1_mat[..., 1], boxes2_mat[..., 1]) + 1 overlaps = torch.zeros_like(boxes1_mat[..., 0]) pos = torch.logical_and(x_extend > 0, y_extend > 0) intersection = x_extend[pos] * y_extend[pos] union = (areas1.unsqueeze(1) + areas2.unsqueeze(0))[pos] - intersection overlaps[pos] = intersection / union return overlaps
[docs] def get_predictions(self, coors, preds): """ :param coors: Tensor(N, 3) mink coor [batch_idx, x, y] :param preds: :return: """ # roi = {'box': [], 'scr': [], 'lbl': [], 'idx': []} roi = {} inds, in_range_mask = self.coor_to_inds(coors[:, 1:]) pred_cls = preds['cls'][in_range_mask].sigmoid().reshape(-1) pred_reg = preds['reg'][in_range_mask].reshape(-1, 7) indices = coors[:, 0:1][in_range_mask].repeat(1, self.num_anchors).reshape(-1) anchors = self.anchors[inds[:, 0], inds[:, 1]].view(-1, self.box_coder.code_size) pos = pred_cls > self.score_thrshold anchors = anchors[pos] pred_cls = pred_cls[pos] pred_reg = pred_reg[pos] indices = indices[pos] boxes_dec = self.box_coder.decode(anchors, pred_reg) # remove abnormal boxes mask = (boxes_dec[..., 3:6] > 0.1) & (boxes_dec[..., 3:6] < 10) mask = mask.all(dim=-1) pred_cls = pred_cls[mask] pred_box = boxes_dec[mask] indices = indices[mask] roi['scr'] = pred_cls roi['box'] = pred_box # TODO currently only support class car roi['lbl'] = torch.zeros_like(pred_cls) roi['idx'] = indices return roi
[docs]class BoxCenterAssigner(BaseAssigner, torch.nn.Module): def __init__(self, voxel_size, lidar_range, stride, detection_benchmark, class_names_each_head, center_threshold, box_coder, activation='relu', edl=True, ): super().__init__() self.voxel_size = voxel_size self.lidar_range = lidar_range self.meter_per_pixel = (voxel_size[0] * stride, voxel_size[1] * stride) self.csb = csb.get(detection_benchmark) self.class_names_each_head = class_names_each_head self.activation = activation self.center_threshold = center_threshold self.box_coder = build_box_coder(**box_coder) self.edl = edl
[docs] def pts_to_indices(self, bev_pts: torch.Tensor): """ :param bev_pts: (N, 3+), BEV points, 1st column should be batch index. :return: """ x = (bev_pts[:, 1] - self.meter_per_pixel[0] * 0.5 - self.lidar_range[0]) \ / self.meter_per_pixel[0] y = (bev_pts[:, 2] - self.meter_per_pixel[1] * 0.5 - self.lidar_range[1]) \ / self.meter_per_pixel[1] indices = torch.stack([bev_pts[:, 0].long(), x.long(), y.long()], dim=1) return indices
[docs] @torch.no_grad() def assign(self, centers, gt_boxes, gt_labels, gt_preds=None, **kwargs): box_names = [self.csb[c.item()][0] for c in gt_labels] # cal regression targets reg_tgt = {'box': [], 'dir': [], 'scr': [], 'idx': [], 'valid_mask': [], 'vel': [], 'pred': []} for h, cur_cls_names in enumerate(self.class_names_each_head): center_indices = self.pts_to_indices(centers).T box_mask = [n in cur_cls_names for n in box_names] cur_boxes = gt_boxes[box_mask] res = self.box_coder.encode(centers, cur_boxes, self.meter_per_pixel, gt_preds) reg_box, reg_dir, dir_score, valid = res[:4] reg_tgt['idx'].append(center_indices[:, valid]) reg_tgt['valid_mask'].append(valid) reg_tgt['box'].append(reg_box) reg_tgt['dir'].append(reg_dir) reg_tgt['scr'].append(dir_score) if getattr(self.box_coder, 'with_velo', False): reg_tgt['vel'].append(res[4]) if getattr(self.box_coder, 'with_pred', False): reg_tgt['pred'].append(res[5]) return reg_tgt
[docs] def get_predictions(self, preds): """Decode the center and regression maps into BBoxes. :param preds: - cls: list[Tensor], each tensor is the result from a cls head with shape (B or N, Ncls, ...). - reg: * box: list[Tensor], one tensor per reg head with shape (B or N, 6, ...). * dir: list[Tensor], one tensor per reg head with shape (B or N, 8, ...). * scr: list[Tensor], one tensor per reg head with shape (B or N, 4, ...). :return: roi: * box: list[Tensor], one tensor per head with shape (N, 8). * scr: list[Tensor], one tensor per head with shape (N,). * lbl: list[Tensor], one tensor per head with shape (N,). * idx: list[Tensor], one tensor per head with shape (3, N), center map indices of the boxes. """ roi = {'box': [], 'scr': [], 'lbl': [], 'idx': []} lbl_cnt = torch.cumsum(torch.Tensor([0] + [m.shape[1] for m in preds['cls']]), dim=0) confs = [] for h, center_cls in enumerate(preds['cls']): if center_cls.ndim == 4: conf, _ = pred_to_conf_unc(center_cls.permute(0, 2, 3, 1), self.activation) center_mask = conf[..., 1:].max(dim=-1).values > self.center_threshold # b, h, w center_indices = torch.stack(torch.where(center_mask), dim=0) centers = self.indices_to_pts(center_indices[1:]).T cur_centers = torch.cat([center_indices[0].unsqueeze(-1), centers], dim=-1) cur_reg = {k: preds['reg'][k][h].permute(0, 2, 3, 1)[center_mask] for k in ['box', 'dir', 'scr']} else: conf, _ = pred_to_conf_unc(center_cls, self.activation, self.edl) centers = preds['ctr'] if self.edl: center_mask = conf[..., 1:].max(dim=-1).values > self.center_threshold # b, h, w else: center_mask = conf.max(dim=-1).values > self.center_threshold # b, h, w if center_cls.ndim == 3: indices = torch.stack([torch.zeros_like(centers[i, :, :1]) + i for i in range(centers.shape[0])], dim=0) centers = torch.cat([indices, centers], dim=-1) cur_centers = centers[center_mask] center_indices = self.pts_to_indices(cur_centers) cur_reg = {k: preds['reg'][k][h][center_mask] for k in preds['reg'].keys()} # from cosense3d.utils import vislib # mask = cur_centers[:, 0].int() == 0 # confs = conf[center_mask][mask, 1].detach().cpu().numpy() # points = cur_centers[mask, 1:].detach().cpu().numpy() # fig = vislib.plt.figure(figsize=(6, 6)) # vislib.plt.scatter(points[:, 0], points[:, 1], c=confs, s=1) # vislib.plt.show() # vislib.plt.close() cur_box = self.box_coder.decode(cur_centers, cur_reg) cur_scr, cur_lbl = conf[center_mask].max(dim=-1) cur_lbl = cur_lbl + lbl_cnt[h] roi['box'].append(cur_box) roi['scr'].append(cur_scr) roi['lbl'].append(cur_lbl) roi['idx'].append(center_indices) confs.append(conf) # from cosense3d.utils.vislib import draw_points_boxes_plt # points = centers[:, 1:].detach().cpu().numpy() # boxes = cur_box[:, 1:].detach().cpu().numpy() # draw_points_boxes_plt( # pc_range=self.lidar_range, # boxes_pred=boxes, # points=points, # filename="/home/yuan/Pictures/tmp.png" # ) # merge detections from all heads roi['box'] = torch.cat(roi['box'], dim=0) roi['scr'] = torch.cat(roi['scr'], dim=0) roi['lbl'] = torch.cat(roi['lbl'], dim=0) roi['idx'] = torch.cat(roi['idx'], dim=0) confs = torch.stack(confs, dim=1) return roi, confs
[docs]class BEVCenternessAssigner(BaseAssigner): """ Assign center points in the BEV maps to positve if the point is in the range 'min_radius' of any gt box center. """ def __init__(self, n_cls, min_radius=1.0, pos_neg_ratio=5, mining_thr=0, max_mining_ratio=3, mining_start_epoch=5, merge_all_classes=False, use_gaussian=False, sigma=1.0 ): super().__init__() self.n_cls = n_cls self.min_radius = min_radius self.pos_neg_ratio = pos_neg_ratio self.sample_mining_thr = mining_thr self.max_mining_ratio = max_mining_ratio self.mining_start_epoch = mining_start_epoch self.merge_all_classes = merge_all_classes self.use_gaussian = use_gaussian self.sigma = sigma
[docs] def get_labels_single_head(self, centers, gt_boxes, pred_scores=None, **kwargs): diff = centers[:, :2].unsqueeze(1) - gt_boxes[:, :2].unsqueeze(0) dists = torch.norm(diff, dim=-1) dists_min, dists_min_arg = dists.min(dim=1) if self.use_gaussian: labels = torch.exp(-0.5 * torch.sqrt(dists_min) / self.sigma ** 2) # sigmas = gt_boxes[:, 3:5][dists_min_arg] / 4 * self.sigma # labels = weighted_mahalanobis_dists( # sigmas ** 2, diff[torch.arange(len(diff)), dists_min_arg].abs().unsqueeze(1)) labels[labels < 1e-4] = 0 else: labels = (dists_min < self.min_radius).float() if self.pos_neg_ratio: labels = pos_neg_sampling(labels, self.pos_neg_ratio) if self.sample_mining_thr > 0 and kwargs.get('epoch', 0) > self.mining_start_epoch: assert pred_scores is not None labels = sample_mining(pred_scores, labels, dists_min, self.sample_mining_thr, self.max_mining_ratio) return labels
[docs] @torch.no_grad() def assign(self, centers, gt_boxes, gt_labels, pred_scores=None, **kwargs): if len(gt_boxes) == 0: labels = torch.zeros_like(centers[:, :1]) return labels if self.merge_all_classes: labels = self.get_labels_single_head(centers, gt_boxes).unsqueeze(-1) else: labels = [] for n in range(self.n_cls): cur_boxes = gt_boxes[gt_labels == n] cur_scores = None if pred_scores is None else pred_scores[n] labels.append(self.get_labels_single_head(centers, cur_boxes, cur_scores, **kwargs)) labels = torch.stack(labels, dim=-1) # import matplotlib.pyplot as plt # # from cosense3d.utils import vislib # pc_range = [-100, -41.6, -3.0, 100, 41.6, 3.0] # label = labels.detach().cpu().numpy() # label = label[:, 0] # points = centers.detach().cpu().numpy() # boxes = gt_boxes.cpu().numpy() # ax = vislib.draw_points_boxes_plt( # pc_range=pc_range, # boxes_gt=boxes, # return_ax=True # ) # ax.scatter(points[:, 0], points[:, 1], cmap='jet', c=label, s=1) # plt.savefig("/home/yuan/Downloads/tmp.png") # plt.close() return labels
[docs]class BEVBoxAssigner(BaseAssigner): """ Assign center points in the BEV maps to positve if the point is in the range 'min_radius' of any gt box center. """ def __init__(self, n_cls, pos_neg_ratio=5, mining_thr=0, max_mining_ratio=3, mining_start_epoch=5, merge_all_classes=False, ): super().__init__() self.n_cls = n_cls self.pos_neg_ratio = pos_neg_ratio self.sample_mining_thr = mining_thr self.max_mining_ratio = max_mining_ratio self.mining_start_epoch = mining_start_epoch self.merge_all_classes = merge_all_classes
[docs] def get_labels_single_head(self, centers, gt_boxes, pred_scores=None, **kwargs): boxes = pad_l(gt_boxes[:, :7]).clone() boxes[:, 3] = 0 pts = pad_r(pad_l(centers[:, :2])) _, box_idx_of_pts = points_in_boxes_gpu( pts, boxes, batch_size=1 ) labels = (box_idx_of_pts >= 0).float() if self.pos_neg_ratio: labels = pos_neg_sampling(labels, self.pos_neg_ratio) return labels
[docs] @torch.no_grad() def assign(self, centers, gt_boxes, gt_labels, pred_scores=None, **kwargs): if len(gt_boxes) == 0: labels = torch.zeros_like(centers[:, :1]) return labels if self.merge_all_classes: labels = self.get_labels_single_head(centers, gt_boxes).unsqueeze(-1) else: labels = [] for n in range(self.n_cls): cur_boxes = gt_boxes[gt_labels == n] cur_scores = None if pred_scores is None else pred_scores[n] labels.append(self.get_labels_single_head(centers, cur_boxes, cur_scores, **kwargs)) labels = torch.stack(labels, dim=-1) # import matplotlib.pyplot as plt # # from cosense3d.utils import vislib # pc_range = [-100, -41.6, -3.0, 100, 41.6, 3.0] # label = labels.detach().cpu().numpy() # label = label[:, 0] # points = centers.detach().cpu().numpy() # boxes = gt_boxes.cpu().numpy() # ax = vislib.draw_points_boxes_plt( # pc_range=pc_range, # boxes_gt=boxes, # return_ax=True # ) # ax.scatter(points[:, 0], points[:, 1], cmap='jet', c=label, s=1) # plt.savefig("/home/yuan/Downloads/tmp.png") # plt.close() return labels
[docs]class BEVPointAssigner(BaseAssigner): """ Assign target points to BEV boxes and down-sample the target points with buffered-based method. """ def __init__(self, down_sample=True, sample_mining_thr=0., max_mining_ratio=3, annealing_step=None, topk_sampling=False, annealing_sampling=False, ): super().__init__() self.down_sample = down_sample self.sample_mining_thr = sample_mining_thr self.max_mining_ratio = max_mining_ratio self.annealing_step = annealing_step self.topk_sampling = topk_sampling self.annealing_sampling = annealing_sampling
[docs] def downsample_tgt_pts(self, tgt_label, max_sam): selected = torch.ones_like(tgt_label.bool()) pos = tgt_label == 1 if pos.sum() > max_sam: mask = torch.rand_like(tgt_label[pos].float()) < max_sam / pos.sum() selected[pos] = mask buffer = tgt_label == 0 if buffer.sum() > max_sam: mask = torch.rand_like(tgt_label[buffer].float()) < max_sam / buffer.sum() selected[buffer] = mask neg = tgt_label == -1 if neg.sum() > max_sam: mask = torch.rand_like(tgt_label[neg].float()) < max_sam / neg.sum() selected[neg] = mask labels = - torch.ones_like(mask).long() labels[mask] = 0 tgt_label[neg] = labels return selected, tgt_label
[docs] def assign(self, tgt_pts, gt_boxes, B, conf=None, down_sample=True, **kwargs): boxes = gt_boxes.clone() boxes[:, 3] = 0 pts = pad_r(tgt_pts) if not down_sample or not self.down_sample: _, box_idx_of_pts = points_in_boxes_gpu( pts, boxes, batch_size=B ) tgt_label = torch.zeros_like(box_idx_of_pts) tgt_label[box_idx_of_pts >= 0] = 1 return tgt_pts, tgt_label, None _, box_idx_of_pts = points_in_boxes_gpu( pts, boxes, batch_size=B ) boxes[:, 4:6] *= 2 _, enlarged_box_idx_of_pts = points_in_boxes_gpu( pts, boxes, batch_size=B ) pos_mask = box_idx_of_pts >= 0 buffer_mask = (box_idx_of_pts < 0) & (enlarged_box_idx_of_pts >= 0) tgt_label = - torch.ones_like(box_idx_of_pts) tgt_label[pos_mask] = 1 tgt_label[buffer_mask] = 0 n_sam = len(boxes) * 50 # add points that have high pred scores if self.sample_mining_thr > 0: scores = conf[..., 1:].sum(dim=-1) tgt_label = sample_mining(scores, tgt_label, self.sample_mining_thr, max_num_sample=n_sam) mask, tgt_label = self.downsample_tgt_pts(tgt_label, max_sam=n_sam) # get final tgt tgt_pts = tgt_pts[mask] tgt_label = tgt_label[mask] return tgt_pts, tgt_label, mask
[docs] def get_predictions(self, x, edl=True, activation='none'): conf, unc = pred_to_conf_unc(x, activation, edl) return conf, unc
[docs]class BEVSemsegAssigner(BaseAssigner): def __init__(self, data_info, stride, tgt_range=None, down_sample=False, annealing_step=None, ): super().__init__() update_me_essentials(self, data_info, stride) self.tgt_range = tgt_range self.downsample = down_sample self.annealing_step = annealing_step
[docs] def pts_to_inds(self, pts): """Calculate indices of samples in the bev map""" ixy = metric2indices(pts[:, :3], self.res).long() ixy[:, 1] -= self.offset_sz_x ixy[:, 2] -= self.offset_sz_y maskx = torch.logical_and(ixy[:, 1] >= 0, ixy[:, 1] < self.size_x) masky = torch.logical_and(ixy[:, 2] >= 0, ixy[:, 2] < self.size_y) mask = torch.logical_and(maskx, masky) indices = ixy[mask] return indices.T, mask
[docs] def get_obs_mask(self, inds, B): obs_mask = torch.zeros((B, self.size_x, self.size_y), device=inds.device) inds = inds.clone().long().T inds[1] -= self.offset_sz_x inds[2] -= self.offset_sz_y obs_mask[inds[0], inds[1], inds[2]] = 1 return obs_mask.bool()
[docs] @staticmethod def down_sample_pred_pts(ctr_pts): keep = torch.rand_like(ctr_pts['ctr'][:, 0]) > 0.5 for k in ctr_pts.keys(): ctr_pts[k] = ctr_pts[k][keep] return ctr_pts
[docs] @torch.no_grad() def downsample_tgt_pts(self, tgt_label, max_sam): selected = torch.ones_like(tgt_label.bool()) pos = tgt_label == 1 if pos.sum() > max_sam: mask = torch.rand_like(tgt_label[pos].float()) < max_sam / pos.sum() selected[pos] = mask neg = tgt_label == 0 if neg.sum() > max_sam: mask = torch.rand_like(tgt_label[neg].float()) < max_sam / neg.sum() selected[neg] = mask return selected
[docs] def filter_range(self, ctr_pts, samples): mask = (ctr_pts['ctr'].abs() < self.tgt_range).all(1) for k in ctr_pts.keys(): ctr_pts[k] = ctr_pts[k][mask] mask = (samples[:, 1:3].abs() < self.tgt_range).all(1) samples = samples[mask] return ctr_pts, samples
[docs] def assign(self, ctr_pts, samples, B, gt_boxes=None, **kwargs): raise NotImplementedError
[docs] def get_predictions(self, data_dict, B, edl=True, activation='none', **kwargs): raise NotImplementedError
[docs]class ContiBEVAssigner(BEVSemsegAssigner): def __init__(self, distr_r=2.0, var0=0.1, **kwargs): super().__init__(**kwargs) self.distr_r = distr_r self.var0 = var0 steps = int(self.distr_r / self.res[0]) * 2 + 1 offset = meshgrid(-self.distr_r, self.distr_r, 2, n_steps=steps).cuda().view(-1, 2) self.nbrs = offset[torch.norm(offset, dim=1) < 2].view(1, -1, 2)
[docs] def sample_dynamic_tgt_pts(self, ctr_pts: dict, gt_boxes: torch.Tensor, B: int) \ -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: """ Given the input coordinates of the center points and the ground truth BBoxes, sample the BEV target points for BEV semantic segmentation following the buffer-based sampling as illustrated in the following image: .. image:: _static/imgs/buffer_based_sampling.png :width: 400 :alt: Buffer-based sampling of the BEV target :param ctr_pts: center points of bev maps, including indices, metric centers and regression results. :param gt_boxes: ground truth BBoxes. :param B: batch size. :return: - tgt_pts: sampled target points. - tgt_lbl: labels of the sampled target points. - inds: map indices of the sampled target points. """ tgt_pts = ctr_pts['ctr'].clone() tgt_pts[:, :2] = tgt_pts[:, :2] + torch.randn_like(tgt_pts[:, :2]) * 3 tgt_pts = torch.cat([ctr_pts['coor'][:, :1], tgt_pts], dim=-1) obs_mask = self.get_obs_mask(ctr_pts['coor'], B) inds, mask = self.pts_to_inds(tgt_pts) tgt_pts = tgt_pts[mask] mask = obs_mask[inds[0], inds[1], inds[2]] tgt_pts = tgt_pts[mask] inds = inds.T[mask] if len(gt_boxes) == 0 or len(tgt_pts) == 0: tgt_label = torch.zeros_like(tgt_pts[:, 0]).int() else: boxes = gt_boxes.clone() boxes[:, 3] = 0 pts = pad_r(tgt_pts) _, box_idx_of_pts = points_in_boxes_gpu( pts, boxes, batch_size=B ) boxes[:, 4:6] *= 4 _, box_idx_of_pts2 = points_in_boxes_gpu( pts, boxes, batch_size=B ) tgt_label = - (box_idx_of_pts2 >= 0).int() tgt_label[box_idx_of_pts >= 0] = 1 n_sam = len(gt_boxes) * 50 mask = self.downsample_tgt_pts(tgt_label, max_sam=n_sam) tgt_label = tgt_label > 0 return tgt_pts[mask], tgt_label[mask], inds[mask].T
[docs] def assign(self, ctr_pts, samples, B, gt_boxes=None, **kwargs) -> dict: """ Assign target. :param ctr_pts: center points of bev maps, including indices, metric centers and regression results. :param samples: BEV target point samples. :param B: batch size. :param gt_boxes: ground truth BBoxes. :param kwargs: keyword arguments. :return: target_dict that contains the static or/and dynamic target points and their corresponding labels. """ lr = self.lidar_range if self.tgt_range is not None: ctr_pts, samples = self.filter_range(ctr_pts, samples) lr = [-self.tgt_range, -self.tgt_range, -3, self.tgt_range, self.tgt_range, 1] if self.downsample: ctr_pts = self.down_sample_pred_pts(ctr_pts) tgt = {} if 'reg_static' in ctr_pts: tgt['evi_static'] = draw_sample_evis( ctr_pts, samples, 'static', self.res[0], self.distr_r, lr, B, self.var0) tgt['lbl_static'] = samples[:, -1] if 'reg_dynamic' in ctr_pts: assert gt_boxes is not None tgt_pts, tgt_label, inds = self.sample_dynamic_tgt_pts(ctr_pts, gt_boxes, B) tgt['evi_dynamic'] = draw_sample_evis( ctr_pts, tgt_pts, 'dynamic', self.res[0], self.distr_r, lr, B, self.var0) tgt['lbl_dynamic'] = tgt_label # # import matplotlib.pyplot as plt # from cosense3d.modules.utils.edl_utils import logit_to_edl # fig = plt.figure(figsize=(10, 10)) # coor = ctr_pts['coor'] # ctr = ctr_pts['ctr'] # sams = samples[samples[:, 0]==0][:, 1:].cpu().numpy() # mask = coor[:, 0] == 0 # xy = ctr[mask].cpu().numpy() # conf, unc = logit_to_edl(ctr_pts['reg_static'][mask, :2]) # colors = conf[:, 1].detach().cpu().numpy() # plt.scatter(xy[:, 0], xy[:, 1], cmap='jet', c=colors, edgecolors=None, marker='.', s=2, vmin=0, vmax=1) # plt.show() # plt.close() # # fig = plt.figure(figsize=(10, 10)) # pos = sams[:, -1] == 1 # plt.scatter(sams[:, 0], sams[:, 1], c='k', facecolors=None, marker='o', s=5) # plt.scatter(sams[pos, 0], sams[pos, 1], c='r', facecolors=None, marker='o', s=5) # plt.show() # plt.close() # # fig = plt.figure(figsize=(10, 10)) # mask = tgt_pts[:, 0] == 0 # sams = tgt_pts[mask][:, 1:].cpu().numpy() # pos = tgt_label[mask].cpu().numpy() == 1 # mask = coor[:, 0] == 0 # xy = ctr[mask].cpu().numpy() # conf, unc = logit_to_edl(ctr_pts['reg_dynamic'][mask, :2]) # colors = conf[:, 1].detach().cpu().numpy() # plt.scatter(xy[:, 0], xy[:, 1], cmap='jet', c=colors, edgecolors=None, marker='.', s=2, vmin=0, vmax=1) # plt.show() # plt.close() # # fig = plt.figure(figsize=(10, 10)) # plt.scatter(sams[:, 0], sams[:, 1], c='k', facecolors=None, marker='o', s=5) # plt.scatter(sams[pos, 0], sams[pos, 1], c='r', facecolors=None, marker='o', s=5) # plt.show() # plt.close() return tgt
[docs] def get_predictions(self, ctr_pts, B, tag, **kwargs): """ Given center points and its corresponding regressions, generate the dense bev semseg maps and its uncertainty and observation mask. :param ctr_pts: center points of bev maps, including indices, metric centers and regression results. :param B: batch size. :param tag: tag for regression key "static | dynamic". :param kwargs: keyword arguments :return: - conf: confidence bev map. - unc: uncertainty bev map. - obs_mask: observation mask of the bev map. """ reg = ctr_pts[f'reg_{tag}'].relu() reg_evi = reg[:, :2] reg_var = reg[:, 2:].view(-1, 2, 2) ctr = ctr_pts['ctr'] coor = ctr_pts['coor'] nbrs = self.nbrs.to(reg_evi.device) dists = torch.zeros_like(ctr.view(-1, 1, 2)) + nbrs vars0 = [self.var0, self.var0] probs_weighted = weighted_mahalanobis_dists(reg_evi, reg_var, dists, vars0) voxel_new = ctr.view(-1, 1, 2) + nbrs # convert metric voxel points to map indices x = (torch.floor(voxel_new[..., 0] / self.res[0]) - self.offset_sz_x).long() y = (torch.floor(voxel_new[..., 1] / self.res[1]) - self.offset_sz_y).long() batch_indices = (torch.ones_like(probs_weighted[:, :, 0]) * coor[:, :1]).long() mask = (x >= 0) & (x < self.size_x) & (y >= 0) & (y < self.size_y) x, y = x[mask], y[mask] batch_indices = batch_indices[mask] # copy sparse probs to the dense evidence map indices = batch_indices * self.size_x * self.size_y + x * self.size_y + y batch_size = coor[:, 0].max().int().item() + 1 probs_weighted = probs_weighted[mask].view(-1, 2) evidence = torch.zeros((batch_size, self.size_x, self.size_y, 2), device=probs_weighted.device).view(-1, 2) torch_scatter.scatter(probs_weighted, indices, dim=0, out=evidence, reduce='sum') evidence = evidence.view(batch_size, self.size_x, self.size_y, 2) # create observation mask obs_mask = torch.zeros_like(evidence[..., 0]).view(-1) obs = indices.unique().long() obs_mask[obs] = 1 obs_mask = obs_mask.view(batch_size, self.size_x, self.size_y).bool() conf, unc = pred_to_conf_unc(evidence) # import matplotlib.pyplot as plt # plt.imshow(conf[0, :, :, 1].T.detach().cpu().numpy()) # plt.show() # plt.close() return {f'conf_map_{tag}': conf, f'unc_map_{tag}': unc, f'obs_mask_{tag}': obs_mask}
[docs]class DiscreteBEVAssigner(BaseAssigner): def __init__(self, data_info, stride, down_sample=False, annealing_step=None, ): super().__init__() update_me_essentials(self, data_info, stride) self.down_sample = down_sample self.annealing_step = annealing_step
[docs] def pts_to_inds(self, samples): """Calculate indices of samples in the bev map""" ixy = metric2indices(samples[:, :3], self.res).long() ixy[:, 1] -= self.offset_sz_x ixy[:, 2] -= self.offset_sz_y maskx = torch.logical_and(ixy[:, 1] >= 0, ixy[:, 1] < self.size_x) masky = torch.logical_and(ixy[:, 2] >= 0, ixy[:, 2] < self.size_y) mask = torch.logical_and(maskx, masky) indices = ixy[mask] return indices.T, mask
[docs] def get_obs_mask(self, inds, B): obs_mask = torch.zeros((B, self.size_x, self.size_y), device=inds.device) inds = inds.T inds[1] -= self.offset_sz_x inds[2] -= self.offset_sz_y obs_mask[inds[0], inds[1], inds[2]] = 1 return obs_mask.bool()
[docs] def assign(self, ctr_pts, samples, B, gt_boxes=None, **kwargs): bevmap = self.get_predictions(ctr_pts, B) inds, mask = self.pts_to_inds(samples) labels = samples[mask][:, -1] preds = bevmap[inds[0], inds[1], inds[2]] # import matplotlib.pyplot as plt # img = pred_to_conf_unc(bevmap)[0][..., 1].detach().cpu().numpy() # plt.imshow(img[0].T) # plt.show() # plt.close() return preds, labels
[docs] def get_predictions(self, data_dict, B, edl=True, activation='none', **kwargs): reg = data_dict['reg'] inds = data_dict['coor'] reg_evi = reg.relu() bevmap = torch.zeros((B, self.size_x, self.size_y, reg_evi.shape[-1]), device=reg_evi.device) inds = inds.T inds[1] -= self.offset_sz_x inds[2] -= self.offset_sz_y # obs_mask = evidence[..., 0].bool() # obs_mask[inds[0], inds[1], inds[2]] = True bevmap[inds[0], inds[1], inds[2]] = reg_evi return bevmap
[docs]class RoIBox3DAssigner(BaseAssigner): def __init__(self, box_coder, ): self.box_coder = build_box_coder(**box_coder) self.code_size = self.box_coder.code_size
[docs] def assign(self, pred_boxes, gt_boxes, **kwargs): tgt_dict = { 'rois': [], 'gt_of_rois': [], 'gt_of_rois_src': [], 'cls_tgt': [], 'reg_tgt': [], 'iou_tgt': [], 'rois_anchor': [], 'record_len': [] } for rois, gts in zip(pred_boxes, gt_boxes): gts[:, -1] *= 1 ious = boxes_iou3d_gpu(rois, gts) max_ious, gt_inds = ious.max(dim=1) gt_of_rois = gts[gt_inds] rcnn_labels = (max_ious > 0.3).float() mask = torch.logical_not(rcnn_labels.bool()) # set negative samples back to rois, no correction in stage2 for them gt_of_rois[mask] = rois[mask] gt_of_rois_src = gt_of_rois.clone().detach() # canoical transformation roi_center = rois[:, 0:3] # TODO: roi_ry > 0 in pcdet roi_ry = rois[:, 6] % (2 * PI) gt_of_rois[:, 0:3] = gt_of_rois[:, 0:3] - roi_center gt_of_rois[:, 6] = gt_of_rois[:, 6] - roi_ry # transfer LiDAR coords to local coords gt_of_rois = rotate_points_along_z_torch( points=gt_of_rois.view(-1, 1, gt_of_rois.shape[-1]), angle=-roi_ry.view(-1) ).view(-1, gt_of_rois.shape[-1]) # flip orientation if rois have opposite orientation heading_label = (gt_of_rois[:, 6] + ( torch.div(torch.abs(gt_of_rois[:, 6].min()), (2 * PI), rounding_mode='trunc') + 1) * 2 * PI) % (2 * PI) # 0 ~ 2pi opposite_flag = (heading_label > PI * 0.5) & ( heading_label < PI * 1.5) # (0 ~ pi/2, 3pi/2 ~ 2pi) heading_label[opposite_flag] = (heading_label[ opposite_flag] + PI) % ( 2 * PI) flag = heading_label > PI heading_label[flag] = heading_label[ flag] - PI * 2 # (-pi/2, pi/2) heading_label = torch.clamp(heading_label, min=-PI / 2, max=PI / 2) gt_of_rois[:, 6] = heading_label # generate regression target rois_anchor = rois.clone().detach().view(-1, self.code_size) rois_anchor[:, 0:3] = 0 rois_anchor[:, 6] = 0 reg_targets, _ = self.box_coder.encode( rois_anchor, gt_of_rois.view(-1, self.code_size) ) tgt_dict['rois'].append(rois) tgt_dict['gt_of_rois'].append(gt_of_rois) tgt_dict['gt_of_rois_src'].append(gt_of_rois_src) tgt_dict['cls_tgt'].append(rcnn_labels) tgt_dict['reg_tgt'].append(reg_targets) tgt_dict['iou_tgt'].append(max_ious) tgt_dict['rois_anchor'].append(rois_anchor) tgt_dict['record_len'].append(rois.shape[0]) # cat list to tensor for k, v in tgt_dict.items(): if k == 'record_len': continue tgt_dict[k] = torch.cat(v, dim=0) return tgt_dict
[docs] def get_predictions(self, rcnn_cls, rcnn_iou, rcnn_reg, rois): rcnn_cls = rcnn_cls.sigmoid().view(-1) rcnn_iou = rcnn_iou.view(-1) rcnn_score = rcnn_cls * rcnn_iou**4 rcnn_reg = rcnn_reg.view(-1, 7) rois_anchor = rois.clone().detach().view(-1, self.code_size) rois_anchor[:, 0:3] = 0 rois_anchor[:, 6] = 0 roi_center = rois[:, 0:3] roi_ry = rois[:, 6] % (2 * PI) boxes_local = self.box_coder.decode(rois_anchor, rcnn_reg) # boxes_local = rcnn_reg + rois_anchor detections = rotate_points_along_z_torch( points=boxes_local.view(-1, 1, boxes_local.shape[-1]), angle=roi_ry.view(-1) ).view(-1, boxes_local.shape[-1]) detections[:, :3] = detections[:, :3] + roi_center detections[:, 6] = detections[:, 6] + roi_ry mask = rcnn_score >= 0.01 detections = detections[mask] scores = rcnn_score[mask] return { 'box': detections, 'scr': scores, # Todo currently only support cars 'lbl': torch.zeros_like(scores), # map indices to be aligned with sparse detection head format 'idx': torch.zeros_like(scores), }
[docs]class RoadLineAssigner(BaseAssigner): def __init__(self, res, range, pos_neg_ratio=2): super().__init__() self.res = res self.range = range self.size = int(round(range / res * 2)) self.pos_neg_ratio = pos_neg_ratio
[docs] def assign(self, coor, tgt_pts, B, **kwargs): ctr_coor = coor.clone() ctr_coor[:, 1:] = ctr_coor[:, 1:] + self.size / 2 ctr_coor = ctr_coor.long() roadline_maps = torch.zeros((B, self.size, self.size), device=tgt_pts.device) mask = (tgt_pts[:, 1:3].abs() < self.range).all(dim=-1) tgt_pts = tgt_pts[mask] tgt_coor = torch.floor((tgt_pts[:, 1:3] + self.range) / self.res).long() mask = torch.logical_and((tgt_coor >= 0).all(dim=-1), (tgt_coor < self.size).all(dim=-1)) roadline_maps[tgt_pts[mask, 0].long(), tgt_coor[mask, 0], tgt_coor[mask, 1]] = tgt_pts[mask, -1] valid = torch.logical_and((ctr_coor[:, 1:3] >= 0).all(dim=-1), (ctr_coor[:, 1:3] < self.size).all(dim=-1)) labels = roadline_maps[ctr_coor[valid, 0], ctr_coor[valid, 1], ctr_coor[valid, 2]] if self.pos_neg_ratio: labels = pos_neg_sampling(labels, self.pos_neg_ratio) # import matplotlib.pyplot as plt # pts_vis = ctr_coor[ctr_coor[:, 0] == 0, 1:].detach().cpu().numpy() # lbl_vis = labels.detach().cpu().numpy() # fig = plt.figure(figsize=(8, 8)) # ax = fig.add_subplot() # ax.scatter(pts_vis[:, 0], pts_vis[:, 1], c=lbl_vis, marker='.') # plt.show() # plt.close() return labels, valid