# Copyright (c) OpenMMLab. All rights reserved. from collections import OrderedDict from concurrent import futures as futures from os import path as osp from pathlib import Path import mmengine import numpy as np from PIL import Image from skimage import io def get_image_index_str(img_idx, use_prefix_id=False): if use_prefix_id: return '{:07d}'.format(img_idx) else: return '{:06d}'.format(img_idx) def get_kitti_info_path(idx, prefix, info_type='image_2', file_tail='.png', training=True, relative_path=True, exist_check=True, use_prefix_id=False): img_idx_str = get_image_index_str(idx, use_prefix_id) img_idx_str += file_tail prefix = Path(prefix) if training: file_path = Path('training') / info_type / img_idx_str else: file_path = Path('testing') / info_type / img_idx_str if exist_check and not (prefix / file_path).exists(): raise ValueError('file not exist: {}'.format(file_path)) if relative_path: return str(file_path) else: return str(prefix / file_path) def get_image_path(idx, prefix, training=True, relative_path=True, exist_check=True, info_type='image_2', file_tail='.png', use_prefix_id=False): return get_kitti_info_path(idx, prefix, info_type, file_tail, training, relative_path, exist_check, use_prefix_id) def get_label_path(idx, prefix, training=True, relative_path=True, exist_check=True, info_type='label_2', use_prefix_id=False): return get_kitti_info_path(idx, prefix, info_type, '.txt', training, relative_path, exist_check, use_prefix_id) def get_plane_path(idx, prefix, training=True, relative_path=True, exist_check=True, info_type='planes', use_prefix_id=False): return get_kitti_info_path(idx, prefix, info_type, '.txt', training, relative_path, exist_check, use_prefix_id) def get_velodyne_path(idx, prefix, training=True, relative_path=True, exist_check=True, use_prefix_id=False): return get_kitti_info_path(idx, prefix, 'velodyne', '.bin', training, relative_path, exist_check, use_prefix_id) def get_calib_path(idx, prefix, training=True, relative_path=True, exist_check=True, use_prefix_id=False): return get_kitti_info_path(idx, prefix, 'calib', '.txt', training, relative_path, exist_check, use_prefix_id) def get_pose_path(idx, prefix, training=True, relative_path=True, exist_check=True, use_prefix_id=False): return get_kitti_info_path(idx, prefix, 'pose', '.txt', training, relative_path, exist_check, use_prefix_id) def get_timestamp_path(idx, prefix, training=True, relative_path=True, exist_check=True, use_prefix_id=False): return get_kitti_info_path(idx, prefix, 'timestamp', '.txt', training, relative_path, exist_check, use_prefix_id) def get_label_anno(label_path): annotations = {} annotations.update({ 'name': [], 'truncated': [], 'occluded': [], 'alpha': [], 'bbox': [], 'dimensions': [], 'location': [], 'rotation_y': [] }) with open(label_path, 'r') as f: lines = f.readlines() # if len(lines) == 0 or len(lines[0]) < 15: # content = [] # else: content = [line.strip().split(' ') for line in lines] num_objects = len([x[0] for x in content if x[0] != 'DontCare']) annotations['name'] = np.array([x[0] for x in content]) num_gt = len(annotations['name']) annotations['truncated'] = np.array([float(x[1]) for x in content]) annotations['occluded'] = np.array([int(x[2]) for x in content]) annotations['alpha'] = np.array([float(x[3]) for x in content]) annotations['bbox'] = np.array([[float(info) for info in x[4:8]] for x in content]).reshape(-1, 4) # dimensions will convert hwl format to standard lhw(camera) format. annotations['dimensions'] = np.array([[float(info) for info in x[8:11]] for x in content ]).reshape(-1, 3)[:, [2, 0, 1]] annotations['location'] = np.array([[float(info) for info in x[11:14]] for x in content]).reshape(-1, 3) annotations['rotation_y'] = np.array([float(x[14]) for x in content]).reshape(-1) if len(content) != 0 and len(content[0]) == 16: # have score annotations['score'] = np.array([float(x[15]) for x in content]) else: annotations['score'] = np.zeros((annotations['bbox'].shape[0], )) index = list(range(num_objects)) + [-1] * (num_gt - num_objects) annotations['index'] = np.array(index, dtype=np.int32) annotations['group_ids'] = np.arange(num_gt, dtype=np.int32) return annotations def _extend_matrix(mat): mat = np.concatenate([mat, np.array([[0., 0., 0., 1.]])], axis=0) return mat def get_kitti_image_info(path, training=True, label_info=True, velodyne=False, calib=False, with_plane=False, image_ids=7481, extend_matrix=True, num_worker=8, relative_path=True, with_imageshape=True): """ KITTI annotation format version 2: { [optional]points: [N, 3+] point cloud [optional, for kitti]image: { image_idx: ... image_path: ... image_shape: ... } point_cloud: { num_features: 4 velodyne_path: ... } [optional, for kitti]calib: { R0_rect: ... Tr_velo_to_cam: ... P2: ... } annos: { location: [num_gt, 3] array dimensions: [num_gt, 3] array rotation_y: [num_gt] angle array name: [num_gt] ground truth name array [optional]difficulty: kitti difficulty [optional]group_ids: used for multi-part object } } """ root_path = Path(path) if not isinstance(image_ids, list): image_ids = list(range(image_ids)) def map_func(idx): info = {} pc_info = {'num_features': 4} calib_info = {} image_info = {'image_idx': idx} annotations = None if velodyne: pc_info['velodyne_path'] = get_velodyne_path( idx, path, training, relative_path) image_info['image_path'] = get_image_path(idx, path, training, relative_path) if with_imageshape: img_path = image_info['image_path'] if relative_path: img_path = str(root_path / img_path) image_info['image_shape'] = np.array( io.imread(img_path).shape[:2], dtype=np.int32) if label_info: label_path = get_label_path(idx, path, training, relative_path) if relative_path: label_path = str(root_path / label_path) annotations = get_label_anno(label_path) info['image'] = image_info info['point_cloud'] = pc_info if calib: calib_path = get_calib_path( idx, path, training, relative_path=False) with open(calib_path, 'r') as f: lines = f.readlines() P0 = np.array([float(info) for info in lines[0].split(' ')[1:13] ]).reshape([3, 4]) P1 = np.array([float(info) for info in lines[1].split(' ')[1:13] ]).reshape([3, 4]) P2 = np.array([float(info) for info in lines[2].split(' ')[1:13] ]).reshape([3, 4]) P3 = np.array([float(info) for info in lines[3].split(' ')[1:13] ]).reshape([3, 4]) if extend_matrix: P0 = _extend_matrix(P0) P1 = _extend_matrix(P1) P2 = _extend_matrix(P2) P3 = _extend_matrix(P3) R0_rect = np.array([ float(info) for info in lines[4].split(' ')[1:10] ]).reshape([3, 3]) if extend_matrix: rect_4x4 = np.zeros([4, 4], dtype=R0_rect.dtype) rect_4x4[3, 3] = 1. rect_4x4[:3, :3] = R0_rect else: rect_4x4 = R0_rect Tr_velo_to_cam = np.array([ float(info) for info in lines[5].split(' ')[1:13] ]).reshape([3, 4]) Tr_imu_to_velo = np.array([ float(info) for info in lines[6].split(' ')[1:13] ]).reshape([3, 4]) if extend_matrix: Tr_velo_to_cam = _extend_matrix(Tr_velo_to_cam) Tr_imu_to_velo = _extend_matrix(Tr_imu_to_velo) calib_info['P0'] = P0 calib_info['P1'] = P1 calib_info['P2'] = P2 calib_info['P3'] = P3 calib_info['R0_rect'] = rect_4x4 calib_info['Tr_velo_to_cam'] = Tr_velo_to_cam calib_info['Tr_imu_to_velo'] = Tr_imu_to_velo info['calib'] = calib_info if with_plane: plane_path = get_plane_path(idx, path, training, relative_path) if relative_path: plane_path = str(root_path / plane_path) lines = mmengine.list_from_file(plane_path) info['plane'] = np.array([float(i) for i in lines[3].split()]) if annotations is not None: info['annos'] = annotations add_difficulty_to_annos(info) return info with futures.ThreadPoolExecutor(num_worker) as executor: image_infos = executor.map(map_func, image_ids) return list(image_infos) class WaymoInfoGatherer: """ Parallel version of waymo dataset information gathering. Waymo annotation format version like KITTI: { [optional]points: [N, 3+] point cloud [optional, for kitti]image: { image_idx: ... image_path: ... image_shape: ... } point_cloud: { num_features: 6 velodyne_path: ... } [optional, for kitti]calib: { R0_rect: ... Tr_velo_to_cam0: ... P0: ... } annos: { location: [num_gt, 3] array dimensions: [num_gt, 3] array rotation_y: [num_gt] angle array name: [num_gt] ground truth name array [optional]difficulty: kitti difficulty [optional]group_ids: used for multi-part object } } """ def __init__(self, path, training=True, label_info=True, velodyne=False, calib=False, pose=False, extend_matrix=True, num_worker=8, relative_path=True, with_imageshape=True, max_sweeps=5) -> None: self.path = path self.training = training self.label_info = label_info self.velodyne = velodyne self.calib = calib self.pose = pose self.extend_matrix = extend_matrix self.num_worker = num_worker self.relative_path = relative_path self.with_imageshape = with_imageshape self.max_sweeps = max_sweeps def gather_single(self, idx): root_path = Path(self.path) info = {} pc_info = {'num_features': 6} calib_info = {} image_info = {'image_idx': idx} annotations = None if self.velodyne: pc_info['velodyne_path'] = get_velodyne_path( idx, self.path, self.training, self.relative_path, use_prefix_id=True) with open( get_timestamp_path( idx, self.path, self.training, relative_path=False, use_prefix_id=True)) as f: info['timestamp'] = np.int64(f.read()) image_info['image_path'] = get_image_path( idx, self.path, self.training, self.relative_path, info_type='image_0', file_tail='.jpg', use_prefix_id=True) if self.with_imageshape: img_path = image_info['image_path'] if self.relative_path: img_path = str(root_path / img_path) # io using PIL is significantly faster than skimage w, h = Image.open(img_path).size image_info['image_shape'] = np.array((h, w), dtype=np.int32) if self.label_info: label_path = get_label_path( idx, self.path, self.training, self.relative_path, info_type='label_all', use_prefix_id=True) cam_sync_label_path = get_label_path( idx, self.path, self.training, self.relative_path, info_type='cam_sync_label_all', use_prefix_id=True) if self.relative_path: label_path = str(root_path / label_path) cam_sync_label_path = str(root_path / cam_sync_label_path) annotations = get_label_anno(label_path) cam_sync_annotations = get_label_anno(cam_sync_label_path) info['image'] = image_info info['point_cloud'] = pc_info if self.calib: calib_path = get_calib_path( idx, self.path, self.training, relative_path=False, use_prefix_id=True) with open(calib_path, 'r') as f: lines = f.readlines() P0 = np.array([float(info) for info in lines[0].split(' ')[1:13] ]).reshape([3, 4]) P1 = np.array([float(info) for info in lines[1].split(' ')[1:13] ]).reshape([3, 4]) P2 = np.array([float(info) for info in lines[2].split(' ')[1:13] ]).reshape([3, 4]) P3 = np.array([float(info) for info in lines[3].split(' ')[1:13] ]).reshape([3, 4]) P4 = np.array([float(info) for info in lines[4].split(' ')[1:13] ]).reshape([3, 4]) if self.extend_matrix: P0 = _extend_matrix(P0) P1 = _extend_matrix(P1) P2 = _extend_matrix(P2) P3 = _extend_matrix(P3) P4 = _extend_matrix(P4) R0_rect = np.array([ float(info) for info in lines[5].split(' ')[1:10] ]).reshape([3, 3]) if self.extend_matrix: rect_4x4 = np.zeros([4, 4], dtype=R0_rect.dtype) rect_4x4[3, 3] = 1. rect_4x4[:3, :3] = R0_rect else: rect_4x4 = R0_rect # TODO: naming Tr_velo_to_cam or Tr_velo_to_cam0 Tr_velo_to_cam = np.array([ float(info) for info in lines[6].split(' ')[1:13] ]).reshape([3, 4]) Tr_velo_to_cam1 = np.array([ float(info) for info in lines[7].split(' ')[1:13] ]).reshape([3, 4]) Tr_velo_to_cam2 = np.array([ float(info) for info in lines[8].split(' ')[1:13] ]).reshape([3, 4]) Tr_velo_to_cam3 = np.array([ float(info) for info in lines[9].split(' ')[1:13] ]).reshape([3, 4]) Tr_velo_to_cam4 = np.array([ float(info) for info in lines[10].split(' ')[1:13] ]).reshape([3, 4]) if self.extend_matrix: Tr_velo_to_cam = _extend_matrix(Tr_velo_to_cam) Tr_velo_to_cam1 = _extend_matrix(Tr_velo_to_cam1) Tr_velo_to_cam2 = _extend_matrix(Tr_velo_to_cam2) Tr_velo_to_cam3 = _extend_matrix(Tr_velo_to_cam3) Tr_velo_to_cam4 = _extend_matrix(Tr_velo_to_cam4) calib_info['P0'] = P0 calib_info['P1'] = P1 calib_info['P2'] = P2 calib_info['P3'] = P3 calib_info['P4'] = P4 calib_info['R0_rect'] = rect_4x4 calib_info['Tr_velo_to_cam'] = Tr_velo_to_cam calib_info['Tr_velo_to_cam1'] = Tr_velo_to_cam1 calib_info['Tr_velo_to_cam2'] = Tr_velo_to_cam2 calib_info['Tr_velo_to_cam3'] = Tr_velo_to_cam3 calib_info['Tr_velo_to_cam4'] = Tr_velo_to_cam4 info['calib'] = calib_info if self.pose: pose_path = get_pose_path( idx, self.path, self.training, relative_path=False, use_prefix_id=True) info['pose'] = np.loadtxt(pose_path) if annotations is not None: info['annos'] = annotations info['annos']['camera_id'] = info['annos'].pop('score') add_difficulty_to_annos(info) info['cam_sync_annos'] = cam_sync_annotations # NOTE: the 2D labels do not have strict correspondence with # the projected 2D lidar labels # e.g.: the projected 2D labels can be in camera 2 # while the most_visible_camera can have id 4 info['cam_sync_annos']['camera_id'] = info['cam_sync_annos'].pop( 'score') sweeps = [] prev_idx = idx while len(sweeps) < self.max_sweeps: prev_info = {} prev_idx -= 1 prev_info['velodyne_path'] = get_velodyne_path( prev_idx, self.path, self.training, self.relative_path, exist_check=False, use_prefix_id=True) if_prev_exists = osp.exists( Path(self.path) / prev_info['velodyne_path']) if if_prev_exists: with open( get_timestamp_path( prev_idx, self.path, self.training, relative_path=False, use_prefix_id=True)) as f: prev_info['timestamp'] = np.int64(f.read()) prev_info['image_path'] = get_image_path( prev_idx, self.path, self.training, self.relative_path, info_type='image_0', file_tail='.jpg', use_prefix_id=True) prev_pose_path = get_pose_path( prev_idx, self.path, self.training, relative_path=False, use_prefix_id=True) prev_info['pose'] = np.loadtxt(prev_pose_path) sweeps.append(prev_info) else: break info['sweeps'] = sweeps return info def gather(self, image_ids): if not isinstance(image_ids, list): image_ids = list(range(image_ids)) image_infos = mmengine.track_parallel_progress(self.gather_single, image_ids, self.num_worker) return list(image_infos) def kitti_anno_to_label_file(annos, folder): folder = Path(folder) for anno in annos: image_idx = anno['metadata']['image_idx'] label_lines = [] for j in range(anno['bbox'].shape[0]): label_dict = { 'name': anno['name'][j], 'alpha': anno['alpha'][j], 'bbox': anno['bbox'][j], 'location': anno['location'][j], 'dimensions': anno['dimensions'][j], 'rotation_y': anno['rotation_y'][j], 'score': anno['score'][j], } label_line = kitti_result_line(label_dict) label_lines.append(label_line) label_file = folder / f'{get_image_index_str(image_idx)}.txt' label_str = '\n'.join(label_lines) with open(label_file, 'w') as f: f.write(label_str) def add_difficulty_to_annos(info): min_height = [40, 25, 25] # minimum height for evaluated groundtruth/detections max_occlusion = [ 0, 1, 2 ] # maximum occlusion level of the groundtruth used for evaluation max_trunc = [ 0.15, 0.3, 0.5 ] # maximum truncation level of the groundtruth used for evaluation annos = info['annos'] dims = annos['dimensions'] # lhw format bbox = annos['bbox'] height = bbox[:, 3] - bbox[:, 1] occlusion = annos['occluded'] truncation = annos['truncated'] diff = [] easy_mask = np.ones((len(dims), ), dtype=bool) moderate_mask = np.ones((len(dims), ), dtype=bool) hard_mask = np.ones((len(dims), ), dtype=bool) i = 0 for h, o, t in zip(height, occlusion, truncation): if o > max_occlusion[0] or h <= min_height[0] or t > max_trunc[0]: easy_mask[i] = False if o > max_occlusion[1] or h <= min_height[1] or t > max_trunc[1]: moderate_mask[i] = False if o > max_occlusion[2] or h <= min_height[2] or t > max_trunc[2]: hard_mask[i] = False i += 1 is_easy = easy_mask is_moderate = np.logical_xor(easy_mask, moderate_mask) is_hard = np.logical_xor(hard_mask, moderate_mask) for i in range(len(dims)): if is_easy[i]: diff.append(0) elif is_moderate[i]: diff.append(1) elif is_hard[i]: diff.append(2) else: diff.append(-1) annos['difficulty'] = np.array(diff, np.int32) return diff def kitti_result_line(result_dict, precision=4): prec_float = '{' + ':.{}f'.format(precision) + '}' res_line = [] all_field_default = OrderedDict([ ('name', None), ('truncated', -1), ('occluded', -1), ('alpha', -10), ('bbox', None), ('dimensions', [-1, -1, -1]), ('location', [-1000, -1000, -1000]), ('rotation_y', -10), ('score', 0.0), ]) res_dict = [(key, None) for key, val in all_field_default.items()] res_dict = OrderedDict(res_dict) for key, val in result_dict.items(): if all_field_default[key] is None and val is None: raise ValueError('you must specify a value for {}'.format(key)) res_dict[key] = val for key, val in res_dict.items(): if key == 'name': res_line.append(val) elif key in ['truncated', 'alpha', 'rotation_y', 'score']: if val is None: res_line.append(str(all_field_default[key])) else: res_line.append(prec_float.format(val)) elif key == 'occluded': if val is None: res_line.append(str(all_field_default[key])) else: res_line.append('{}'.format(val)) elif key in ['bbox', 'dimensions', 'location']: if val is None: res_line += [str(v) for v in all_field_default[key]] else: res_line += [prec_float.format(v) for v in val] else: raise ValueError('unknown key. supported key:{}'.format( res_dict.keys())) return ' '.join(res_line)