mm3dtest / tools /dataset_converters /kitti_converter.py
giantmonkeyTC
2344
34d1f8b
# Copyright (c) OpenMMLab. All rights reserved.
from collections import OrderedDict
from pathlib import Path
import mmcv
import mmengine
import numpy as np
from nuscenes.utils.geometry_utils import view_points
from mmdet3d.structures import points_cam2img
from mmdet3d.structures.ops import box_np_ops
from .kitti_data_utils import WaymoInfoGatherer, get_kitti_image_info
from .nuscenes_converter import post_process_coords
kitti_categories = ('Pedestrian', 'Cyclist', 'Car')
def convert_to_kitti_info_version2(info):
"""convert kitti info v1 to v2 if possible.
Args:
info (dict): Info of the input kitti data.
- image (dict): image info
- calib (dict): calibration info
- point_cloud (dict): point cloud info
"""
if 'image' not in info or 'calib' not in info or 'point_cloud' not in info:
info['image'] = {
'image_shape': info['img_shape'],
'image_idx': info['image_idx'],
'image_path': info['img_path'],
}
info['calib'] = {
'R0_rect': info['calib/R0_rect'],
'Tr_velo_to_cam': info['calib/Tr_velo_to_cam'],
'P2': info['calib/P2'],
}
info['point_cloud'] = {
'velodyne_path': info['velodyne_path'],
}
def _read_imageset_file(path):
with open(path, 'r') as f:
lines = f.readlines()
return [int(line) for line in lines]
class _NumPointsInGTCalculater:
"""Calculate the number of points inside the ground truth box. This is the
parallel version. For the serialized version, please refer to
`_calculate_num_points_in_gt`.
Args:
data_path (str): Path of the data.
relative_path (bool): Whether to use relative path.
remove_outside (bool, optional): Whether to remove points which are
outside of image. Default: True.
num_features (int, optional): Number of features per point.
Default: False.
num_worker (int, optional): the number of parallel workers to use.
Default: 8.
"""
def __init__(self,
data_path,
relative_path,
remove_outside=True,
num_features=4,
num_worker=8) -> None:
self.data_path = data_path
self.relative_path = relative_path
self.remove_outside = remove_outside
self.num_features = num_features
self.num_worker = num_worker
def calculate_single(self, info):
pc_info = info['point_cloud']
image_info = info['image']
calib = info['calib']
if self.relative_path:
v_path = str(Path(self.data_path) / pc_info['velodyne_path'])
else:
v_path = pc_info['velodyne_path']
points_v = np.fromfile(
v_path, dtype=np.float32,
count=-1).reshape([-1, self.num_features])
rect = calib['R0_rect']
Trv2c = calib['Tr_velo_to_cam']
P2 = calib['P2']
if self.remove_outside:
points_v = box_np_ops.remove_outside_points(
points_v, rect, Trv2c, P2, image_info['image_shape'])
annos = info['annos']
num_obj = len([n for n in annos['name'] if n != 'DontCare'])
dims = annos['dimensions'][:num_obj]
loc = annos['location'][:num_obj]
rots = annos['rotation_y'][:num_obj]
gt_boxes_camera = np.concatenate([loc, dims, rots[..., np.newaxis]],
axis=1)
gt_boxes_lidar = box_np_ops.box_camera_to_lidar(
gt_boxes_camera, rect, Trv2c)
indices = box_np_ops.points_in_rbbox(points_v[:, :3], gt_boxes_lidar)
num_points_in_gt = indices.sum(0)
num_ignored = len(annos['dimensions']) - num_obj
num_points_in_gt = np.concatenate(
[num_points_in_gt, -np.ones([num_ignored])])
annos['num_points_in_gt'] = num_points_in_gt.astype(np.int32)
return info
def calculate(self, infos):
ret_infos = mmengine.track_parallel_progress(self.calculate_single,
infos, self.num_worker)
for i, ret_info in enumerate(ret_infos):
infos[i] = ret_info
def _calculate_num_points_in_gt(data_path,
infos,
relative_path,
remove_outside=True,
num_features=4):
for info in mmengine.track_iter_progress(infos):
pc_info = info['point_cloud']
image_info = info['image']
calib = info['calib']
if relative_path:
v_path = str(Path(data_path) / pc_info['velodyne_path'])
else:
v_path = pc_info['velodyne_path']
points_v = np.fromfile(
v_path, dtype=np.float32, count=-1).reshape([-1, num_features])
rect = calib['R0_rect']
Trv2c = calib['Tr_velo_to_cam']
P2 = calib['P2']
if remove_outside:
points_v = box_np_ops.remove_outside_points(
points_v, rect, Trv2c, P2, image_info['image_shape'])
# points_v = points_v[points_v[:, 0] > 0]
annos = info['annos']
num_obj = len([n for n in annos['name'] if n != 'DontCare'])
# annos = kitti.filter_kitti_anno(annos, ['DontCare'])
dims = annos['dimensions'][:num_obj]
loc = annos['location'][:num_obj]
rots = annos['rotation_y'][:num_obj]
gt_boxes_camera = np.concatenate([loc, dims, rots[..., np.newaxis]],
axis=1)
gt_boxes_lidar = box_np_ops.box_camera_to_lidar(
gt_boxes_camera, rect, Trv2c)
indices = box_np_ops.points_in_rbbox(points_v[:, :3], gt_boxes_lidar)
num_points_in_gt = indices.sum(0)
num_ignored = len(annos['dimensions']) - num_obj
num_points_in_gt = np.concatenate(
[num_points_in_gt, -np.ones([num_ignored])])
annos['num_points_in_gt'] = num_points_in_gt.astype(np.int32)
def create_kitti_info_file(data_path,
pkl_prefix='kitti',
with_plane=False,
save_path=None,
relative_path=True):
"""Create info file of KITTI dataset.
Given the raw data, generate its related info file in pkl format.
Args:
data_path (str): Path of the data root.
pkl_prefix (str, optional): Prefix of the info file to be generated.
Default: 'kitti'.
with_plane (bool, optional): Whether to use plane information.
Default: False.
save_path (str, optional): Path to save the info file.
Default: None.
relative_path (bool, optional): Whether to use relative path.
Default: True.
"""
imageset_folder = Path(data_path) / 'ImageSets'
train_img_ids = _read_imageset_file(str(imageset_folder / 'train.txt'))
val_img_ids = _read_imageset_file(str(imageset_folder / 'val.txt'))
test_img_ids = _read_imageset_file(str(imageset_folder / 'test.txt'))
print('Generate info. this may take several minutes.')
if save_path is None:
save_path = Path(data_path)
else:
save_path = Path(save_path)
kitti_infos_train = get_kitti_image_info(
data_path,
training=True,
velodyne=True,
calib=True,
with_plane=with_plane,
image_ids=train_img_ids,
relative_path=relative_path)
_calculate_num_points_in_gt(data_path, kitti_infos_train, relative_path)
filename = save_path / f'{pkl_prefix}_infos_train.pkl'
print(f'Kitti info train file is saved to {filename}')
mmengine.dump(kitti_infos_train, filename)
kitti_infos_val = get_kitti_image_info(
data_path,
training=True,
velodyne=True,
calib=True,
with_plane=with_plane,
image_ids=val_img_ids,
relative_path=relative_path)
_calculate_num_points_in_gt(data_path, kitti_infos_val, relative_path)
filename = save_path / f'{pkl_prefix}_infos_val.pkl'
print(f'Kitti info val file is saved to {filename}')
mmengine.dump(kitti_infos_val, filename)
filename = save_path / f'{pkl_prefix}_infos_trainval.pkl'
print(f'Kitti info trainval file is saved to {filename}')
mmengine.dump(kitti_infos_train + kitti_infos_val, filename)
kitti_infos_test = get_kitti_image_info(
data_path,
training=False,
label_info=False,
velodyne=True,
calib=True,
with_plane=False,
image_ids=test_img_ids,
relative_path=relative_path)
filename = save_path / f'{pkl_prefix}_infos_test.pkl'
print(f'Kitti info test file is saved to {filename}')
mmengine.dump(kitti_infos_test, filename)
def create_waymo_info_file(data_path,
pkl_prefix='waymo',
save_path=None,
relative_path=True,
max_sweeps=5,
workers=8):
"""Create info file of waymo dataset.
Given the raw data, generate its related info file in pkl format.
Args:
data_path (str): Path of the data root.
pkl_prefix (str, optional): Prefix of the info file to be generated.
Default: 'waymo'.
save_path (str, optional): Path to save the info file.
Default: None.
relative_path (bool, optional): Whether to use relative path.
Default: True.
max_sweeps (int, optional): Max sweeps before the detection frame
to be used. Default: 5.
"""
imageset_folder = Path(data_path) / 'ImageSets'
train_img_ids = _read_imageset_file(str(imageset_folder / 'train.txt'))
val_img_ids = _read_imageset_file(str(imageset_folder / 'val.txt'))
test_img_ids = _read_imageset_file(str(imageset_folder / 'test.txt'))
print('Generate info. this may take several minutes.')
if save_path is None:
save_path = Path(data_path)
else:
save_path = Path(save_path)
waymo_infos_gatherer_trainval = WaymoInfoGatherer(
data_path,
training=True,
velodyne=True,
calib=True,
pose=True,
relative_path=relative_path,
max_sweeps=max_sweeps,
num_worker=workers)
waymo_infos_gatherer_test = WaymoInfoGatherer(
data_path,
training=False,
label_info=False,
velodyne=True,
calib=True,
pose=True,
relative_path=relative_path,
max_sweeps=max_sweeps,
num_worker=workers)
num_points_in_gt_calculater = _NumPointsInGTCalculater(
data_path,
relative_path,
num_features=6,
remove_outside=False,
num_worker=workers)
waymo_infos_train = waymo_infos_gatherer_trainval.gather(train_img_ids)
num_points_in_gt_calculater.calculate(waymo_infos_train)
filename = save_path / f'{pkl_prefix}_infos_train.pkl'
print(f'Waymo info train file is saved to {filename}')
mmengine.dump(waymo_infos_train, filename)
waymo_infos_val = waymo_infos_gatherer_trainval.gather(val_img_ids)
num_points_in_gt_calculater.calculate(waymo_infos_val)
filename = save_path / f'{pkl_prefix}_infos_val.pkl'
print(f'Waymo info val file is saved to {filename}')
mmengine.dump(waymo_infos_val, filename)
filename = save_path / f'{pkl_prefix}_infos_trainval.pkl'
print(f'Waymo info trainval file is saved to {filename}')
mmengine.dump(waymo_infos_train + waymo_infos_val, filename)
waymo_infos_test = waymo_infos_gatherer_test.gather(test_img_ids)
filename = save_path / f'{pkl_prefix}_infos_test.pkl'
print(f'Waymo info test file is saved to {filename}')
mmengine.dump(waymo_infos_test, filename)
def _create_reduced_point_cloud(data_path,
info_path,
save_path=None,
back=False,
num_features=4,
front_camera_id=2):
"""Create reduced point clouds for given info.
Args:
data_path (str): Path of original data.
info_path (str): Path of data info.
save_path (str, optional): Path to save reduced point cloud
data. Default: None.
back (bool, optional): Whether to flip the points to back.
Default: False.
num_features (int, optional): Number of point features. Default: 4.
front_camera_id (int, optional): The referenced/front camera ID.
Default: 2.
"""
kitti_infos = mmengine.load(info_path)
for info in mmengine.track_iter_progress(kitti_infos):
pc_info = info['point_cloud']
image_info = info['image']
calib = info['calib']
v_path = pc_info['velodyne_path']
v_path = Path(data_path) / v_path
points_v = np.fromfile(
str(v_path), dtype=np.float32,
count=-1).reshape([-1, num_features])
rect = calib['R0_rect']
if front_camera_id == 2:
P2 = calib['P2']
else:
P2 = calib[f'P{str(front_camera_id)}']
Trv2c = calib['Tr_velo_to_cam']
# first remove z < 0 points
# keep = points_v[:, -1] > 0
# points_v = points_v[keep]
# then remove outside.
if back:
points_v[:, 0] = -points_v[:, 0]
points_v = box_np_ops.remove_outside_points(points_v, rect, Trv2c, P2,
image_info['image_shape'])
if save_path is None:
save_dir = v_path.parent.parent / (v_path.parent.stem + '_reduced')
if not save_dir.exists():
save_dir.mkdir()
save_filename = save_dir / v_path.name
# save_filename = str(v_path) + '_reduced'
if back:
save_filename += '_back'
else:
save_filename = str(Path(save_path) / v_path.name)
if back:
save_filename += '_back'
with open(save_filename, 'w') as f:
points_v.tofile(f)
def create_reduced_point_cloud(data_path,
pkl_prefix,
train_info_path=None,
val_info_path=None,
test_info_path=None,
save_path=None,
with_back=False):
"""Create reduced point clouds for training/validation/testing.
Args:
data_path (str): Path of original data.
pkl_prefix (str): Prefix of info files.
train_info_path (str, optional): Path of training set info.
Default: None.
val_info_path (str, optional): Path of validation set info.
Default: None.
test_info_path (str, optional): Path of test set info.
Default: None.
save_path (str, optional): Path to save reduced point cloud data.
Default: None.
with_back (bool, optional): Whether to flip the points to back.
Default: False.
"""
if train_info_path is None:
train_info_path = Path(data_path) / f'{pkl_prefix}_infos_train.pkl'
if val_info_path is None:
val_info_path = Path(data_path) / f'{pkl_prefix}_infos_val.pkl'
if test_info_path is None:
test_info_path = Path(data_path) / f'{pkl_prefix}_infos_test.pkl'
print('create reduced point cloud for training set')
_create_reduced_point_cloud(data_path, train_info_path, save_path)
print('create reduced point cloud for validation set')
_create_reduced_point_cloud(data_path, val_info_path, save_path)
print('create reduced point cloud for testing set')
_create_reduced_point_cloud(data_path, test_info_path, save_path)
if with_back:
_create_reduced_point_cloud(
data_path, train_info_path, save_path, back=True)
_create_reduced_point_cloud(
data_path, val_info_path, save_path, back=True)
_create_reduced_point_cloud(
data_path, test_info_path, save_path, back=True)
def export_2d_annotation(root_path, info_path, mono3d=True):
"""Export 2d annotation from the info file and raw data.
Args:
root_path (str): Root path of the raw data.
info_path (str): Path of the info file.
mono3d (bool, optional): Whether to export mono3d annotation.
Default: True.
"""
# get bbox annotations for camera
kitti_infos = mmengine.load(info_path)
cat2Ids = [
dict(id=kitti_categories.index(cat_name), name=cat_name)
for cat_name in kitti_categories
]
coco_ann_id = 0
coco_2d_dict = dict(annotations=[], images=[], categories=cat2Ids)
from os import path as osp
for info in mmengine.track_iter_progress(kitti_infos):
coco_infos = get_2d_boxes(info, occluded=[0, 1, 2, 3], mono3d=mono3d)
(height, width,
_) = mmcv.imread(osp.join(root_path,
info['image']['image_path'])).shape
coco_2d_dict['images'].append(
dict(
file_name=info['image']['image_path'],
id=info['image']['image_idx'],
Tri2v=info['calib']['Tr_imu_to_velo'],
Trv2c=info['calib']['Tr_velo_to_cam'],
rect=info['calib']['R0_rect'],
cam_intrinsic=info['calib']['P2'],
width=width,
height=height))
for coco_info in coco_infos:
if coco_info is None:
continue
# add an empty key for coco format
coco_info['segmentation'] = []
coco_info['id'] = coco_ann_id
coco_2d_dict['annotations'].append(coco_info)
coco_ann_id += 1
if mono3d:
json_prefix = f'{info_path[:-4]}_mono3d'
else:
json_prefix = f'{info_path[:-4]}'
mmengine.dump(coco_2d_dict, f'{json_prefix}.coco.json')
def get_2d_boxes(info, occluded, mono3d=True):
"""Get the 2D annotation records for a given info.
Args:
info: Information of the given sample data.
occluded: Integer (0, 1, 2, 3) indicating occlusion state:
0 = fully visible, 1 = partly occluded, 2 = largely occluded,
3 = unknown, -1 = DontCare
mono3d (bool): Whether to get boxes with mono3d annotation.
Return:
list[dict]: List of 2D annotation record that belongs to the input
`sample_data_token`.
"""
# Get calibration information
P2 = info['calib']['P2']
repro_recs = []
# if no annotations in info (test dataset), then return
if 'annos' not in info:
return repro_recs
# Get all the annotation with the specified visibilties.
ann_dicts = info['annos']
mask = [(ocld in occluded) for ocld in ann_dicts['occluded']]
for k in ann_dicts.keys():
ann_dicts[k] = ann_dicts[k][mask]
# convert dict of list to list of dict
ann_recs = []
for i in range(len(ann_dicts['occluded'])):
ann_rec = {}
for k in ann_dicts.keys():
ann_rec[k] = ann_dicts[k][i]
ann_recs.append(ann_rec)
for ann_idx, ann_rec in enumerate(ann_recs):
# Augment sample_annotation with token information.
ann_rec['sample_annotation_token'] = \
f"{info['image']['image_idx']}.{ann_idx}"
ann_rec['sample_data_token'] = info['image']['image_idx']
sample_data_token = info['image']['image_idx']
loc = ann_rec['location'][np.newaxis, :]
dim = ann_rec['dimensions'][np.newaxis, :]
rot = ann_rec['rotation_y'][np.newaxis, np.newaxis]
# transform the center from [0.5, 1.0, 0.5] to [0.5, 0.5, 0.5]
dst = np.array([0.5, 0.5, 0.5])
src = np.array([0.5, 1.0, 0.5])
loc = loc + dim * (dst - src)
offset = (info['calib']['P2'][0, 3] - info['calib']['P0'][0, 3]) \
/ info['calib']['P2'][0, 0]
loc_3d = np.copy(loc)
loc_3d[0, 0] += offset
gt_bbox_3d = np.concatenate([loc, dim, rot], axis=1).astype(np.float32)
# Filter out the corners that are not in front of the calibrated
# sensor.
corners_3d = box_np_ops.center_to_corner_box3d(
gt_bbox_3d[:, :3],
gt_bbox_3d[:, 3:6],
gt_bbox_3d[:, 6], [0.5, 0.5, 0.5],
axis=1)
corners_3d = corners_3d[0].T # (1, 8, 3) -> (3, 8)
in_front = np.argwhere(corners_3d[2, :] > 0).flatten()
corners_3d = corners_3d[:, in_front]
# Project 3d box to 2d.
camera_intrinsic = P2
corner_coords = view_points(corners_3d, camera_intrinsic,
True).T[:, :2].tolist()
# Keep only corners that fall within the image.
final_coords = post_process_coords(corner_coords)
# Skip if the convex hull of the re-projected corners
# does not intersect the image canvas.
if final_coords is None:
continue
else:
min_x, min_y, max_x, max_y = final_coords
# Generate dictionary record to be included in the .json file.
repro_rec = generate_record(ann_rec, min_x, min_y, max_x, max_y,
sample_data_token,
info['image']['image_path'])
# If mono3d=True, add 3D annotations in camera coordinates
if mono3d and (repro_rec is not None):
repro_rec['bbox_cam3d'] = np.concatenate(
[loc_3d, dim, rot],
axis=1).astype(np.float32).squeeze().tolist()
repro_rec['velo_cam3d'] = -1 # no velocity in KITTI
center3d = np.array(loc).reshape([1, 3])
center2d = points_cam2img(
center3d, camera_intrinsic, with_depth=True)
repro_rec['center2d'] = center2d.squeeze().tolist()
# normalized center2D + depth
# samples with depth < 0 will be removed
if repro_rec['center2d'][2] <= 0:
continue
repro_rec['attribute_name'] = -1 # no attribute in KITTI
repro_rec['attribute_id'] = -1
repro_recs.append(repro_rec)
return repro_recs
def generate_record(ann_rec, x1, y1, x2, y2, sample_data_token, filename):
"""Generate one 2D annotation record given various information on top of
the 2D bounding box coordinates.
Args:
ann_rec (dict): Original 3d annotation record.
x1 (float): Minimum value of the x coordinate.
y1 (float): Minimum value of the y coordinate.
x2 (float): Maximum value of the x coordinate.
y2 (float): Maximum value of the y coordinate.
sample_data_token (str): Sample data token.
filename (str):The corresponding image file where the annotation
is present.
Returns:
dict: A sample 2D annotation record.
- file_name (str): file name
- image_id (str): sample data token
- area (float): 2d box area
- category_name (str): category name
- category_id (int): category id
- bbox (list[float]): left x, top y, x_size, y_size of 2d box
- iscrowd (int): whether the area is crowd
"""
repro_rec = OrderedDict()
repro_rec['sample_data_token'] = sample_data_token
coco_rec = dict()
key_mapping = {
'name': 'category_name',
'num_points_in_gt': 'num_lidar_pts',
'sample_annotation_token': 'sample_annotation_token',
'sample_data_token': 'sample_data_token',
}
for key, value in ann_rec.items():
if key in key_mapping.keys():
repro_rec[key_mapping[key]] = value
repro_rec['bbox_corners'] = [x1, y1, x2, y2]
repro_rec['filename'] = filename
coco_rec['file_name'] = filename
coco_rec['image_id'] = sample_data_token
coco_rec['area'] = (y2 - y1) * (x2 - x1)
if repro_rec['category_name'] not in kitti_categories:
return None
cat_name = repro_rec['category_name']
coco_rec['category_name'] = cat_name
coco_rec['category_id'] = kitti_categories.index(cat_name)
coco_rec['bbox'] = [x1, y1, x2 - x1, y2 - y1]
coco_rec['iscrowd'] = 0
return coco_rec