mm3dtest / tools /dataset_converters /scannet_data_utils.py
giantmonkeyTC
2344
34d1f8b
# Copyright (c) OpenMMLab. All rights reserved.
import os
from concurrent import futures as futures
from os import path as osp
import mmengine
import numpy as np
class ScanNetData(object):
"""ScanNet data.
Generate scannet infos for scannet_converter.
Args:
root_path (str): Root path of the raw data.
split (str, optional): Set split type of the data. Default: 'train'.
"""
def __init__(self, root_path, split='train'):
self.root_dir = root_path
self.split = split
self.split_dir = osp.join(root_path)
self.classes = [
'cabinet', 'bed', 'chair', 'sofa', 'table', 'door', 'window',
'bookshelf', 'picture', 'counter', 'desk', 'curtain',
'refrigerator', 'showercurtrain', 'toilet', 'sink', 'bathtub',
'garbagebin'
]
self.cat2label = {cat: self.classes.index(cat) for cat in self.classes}
self.label2cat = {self.cat2label[t]: t for t in self.cat2label}
self.cat_ids = np.array(
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 16, 24, 28, 33, 34, 36, 39])
self.cat_ids2class = {
nyu40id: i
for i, nyu40id in enumerate(list(self.cat_ids))
}
assert split in ['train', 'val', 'test']
split_file = osp.join(self.root_dir, 'meta_data',
f'scannetv2_{split}.txt')
mmengine.check_file_exist(split_file)
self.sample_id_list = mmengine.list_from_file(split_file)
self.test_mode = (split == 'test')
def __len__(self):
return len(self.sample_id_list)
def get_aligned_box_label(self, idx):
box_file = osp.join(self.root_dir, 'scannet_instance_data',
f'{idx}_aligned_bbox.npy')
mmengine.check_file_exist(box_file)
return np.load(box_file)
def get_unaligned_box_label(self, idx):
box_file = osp.join(self.root_dir, 'scannet_instance_data',
f'{idx}_unaligned_bbox.npy')
mmengine.check_file_exist(box_file)
return np.load(box_file)
def get_axis_align_matrix(self, idx):
matrix_file = osp.join(self.root_dir, 'scannet_instance_data',
f'{idx}_axis_align_matrix.npy')
mmengine.check_file_exist(matrix_file)
return np.load(matrix_file)
def get_images(self, idx):
paths = []
path = osp.join(self.root_dir, 'posed_images', idx)
for file in sorted(os.listdir(path)):
if file.endswith('.jpg'):
paths.append(osp.join('posed_images', idx, file))
return paths
def get_extrinsics(self, idx):
extrinsics = []
path = osp.join(self.root_dir, 'posed_images', idx)
for file in sorted(os.listdir(path)):
if file.endswith('.txt') and not file == 'intrinsic.txt':
extrinsics.append(np.loadtxt(osp.join(path, file)))
return extrinsics
def get_intrinsics(self, idx):
matrix_file = osp.join(self.root_dir, 'posed_images', idx,
'intrinsic.txt')
mmengine.check_file_exist(matrix_file)
return np.loadtxt(matrix_file)
def get_infos(self, num_workers=4, has_label=True, sample_id_list=None):
"""Get data infos.
This method gets information from the raw data.
Args:
num_workers (int, optional): Number of threads to be used.
Default: 4.
has_label (bool, optional): Whether the data has label.
Default: True.
sample_id_list (list[int], optional): Index list of the sample.
Default: None.
Returns:
infos (list[dict]): Information of the raw data.
"""
def process_single_scene(sample_idx):
print(f'{self.split} sample_idx: {sample_idx}')
info = dict()
pc_info = {'num_features': 6, 'lidar_idx': sample_idx}
info['point_cloud'] = pc_info
pts_filename = osp.join(self.root_dir, 'scannet_instance_data',
f'{sample_idx}_vert.npy')
points = np.load(pts_filename)
mmengine.mkdir_or_exist(osp.join(self.root_dir, 'points'))
points.tofile(
osp.join(self.root_dir, 'points', f'{sample_idx}.bin'))
info['pts_path'] = osp.join('points', f'{sample_idx}.bin')
# update with RGB image paths if exist
if os.path.exists(osp.join(self.root_dir, 'posed_images')):
info['intrinsics'] = self.get_intrinsics(sample_idx)
all_extrinsics = self.get_extrinsics(sample_idx)
all_img_paths = self.get_images(sample_idx)
# some poses in ScanNet are invalid
extrinsics, img_paths = [], []
for extrinsic, img_path in zip(all_extrinsics, all_img_paths):
if np.all(np.isfinite(extrinsic)):
img_paths.append(img_path)
extrinsics.append(extrinsic)
info['extrinsics'] = extrinsics
info['img_paths'] = img_paths
if not self.test_mode:
pts_instance_mask_path = osp.join(
self.root_dir, 'scannet_instance_data',
f'{sample_idx}_ins_label.npy')
pts_semantic_mask_path = osp.join(
self.root_dir, 'scannet_instance_data',
f'{sample_idx}_sem_label.npy')
pts_instance_mask = np.load(pts_instance_mask_path).astype(
np.int64)
pts_semantic_mask = np.load(pts_semantic_mask_path).astype(
np.int64)
mmengine.mkdir_or_exist(
osp.join(self.root_dir, 'instance_mask'))
mmengine.mkdir_or_exist(
osp.join(self.root_dir, 'semantic_mask'))
pts_instance_mask.tofile(
osp.join(self.root_dir, 'instance_mask',
f'{sample_idx}.bin'))
pts_semantic_mask.tofile(
osp.join(self.root_dir, 'semantic_mask',
f'{sample_idx}.bin'))
info['pts_instance_mask_path'] = osp.join(
'instance_mask', f'{sample_idx}.bin')
info['pts_semantic_mask_path'] = osp.join(
'semantic_mask', f'{sample_idx}.bin')
if has_label:
annotations = {}
# box is of shape [k, 6 + class]
aligned_box_label = self.get_aligned_box_label(sample_idx)
unaligned_box_label = self.get_unaligned_box_label(sample_idx)
annotations['gt_num'] = aligned_box_label.shape[0]
if annotations['gt_num'] != 0:
aligned_box = aligned_box_label[:, :-1] # k, 6
unaligned_box = unaligned_box_label[:, :-1]
classes = aligned_box_label[:, -1] # k
annotations['name'] = np.array([
self.label2cat[self.cat_ids2class[classes[i]]]
for i in range(annotations['gt_num'])
])
# default names are given to aligned bbox for compatibility
# we also save unaligned bbox info with marked names
annotations['location'] = aligned_box[:, :3]
annotations['dimensions'] = aligned_box[:, 3:6]
annotations['gt_boxes_upright_depth'] = aligned_box
annotations['unaligned_location'] = unaligned_box[:, :3]
annotations['unaligned_dimensions'] = unaligned_box[:, 3:6]
annotations[
'unaligned_gt_boxes_upright_depth'] = unaligned_box
annotations['index'] = np.arange(
annotations['gt_num'], dtype=np.int32)
annotations['class'] = np.array([
self.cat_ids2class[classes[i]]
for i in range(annotations['gt_num'])
])
axis_align_matrix = self.get_axis_align_matrix(sample_idx)
annotations['axis_align_matrix'] = axis_align_matrix # 4x4
info['annos'] = annotations
return info
sample_id_list = sample_id_list if sample_id_list is not None \
else self.sample_id_list
with futures.ThreadPoolExecutor(num_workers) as executor:
infos = executor.map(process_single_scene, sample_id_list)
return list(infos)
class ScanNetSegData(object):
"""ScanNet dataset used to generate infos for semantic segmentation task.
Args:
data_root (str): Root path of the raw data.
ann_file (str): The generated scannet infos.
split (str, optional): Set split type of the data. Default: 'train'.
num_points (int, optional): Number of points in each data input.
Default: 8192.
label_weight_func (function, optional): Function to compute the
label weight. Default: None.
"""
def __init__(self,
data_root,
ann_file,
split='train',
num_points=8192,
label_weight_func=None):
self.data_root = data_root
self.data_infos = mmengine.load(ann_file)
self.split = split
assert split in ['train', 'val', 'test']
self.num_points = num_points
self.all_ids = np.arange(41) # all possible ids
self.cat_ids = np.array([
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 16, 24, 28, 33, 34, 36,
39
]) # used for seg task
self.ignore_index = len(self.cat_ids)
self.cat_id2class = np.ones(
(self.all_ids.shape[0], ), dtype=np.int64) * self.ignore_index
for i, cat_id in enumerate(self.cat_ids):
self.cat_id2class[cat_id] = i
# label weighting function is taken from
# https://github.com/charlesq34/pointnet2/blob/master/scannet/scannet_dataset.py#L24
self.label_weight_func = (lambda x: 1.0 / np.log(1.2 + x)) if \
label_weight_func is None else label_weight_func
def get_seg_infos(self):
if self.split == 'test':
return
scene_idxs, label_weight = self.get_scene_idxs_and_label_weight()
save_folder = osp.join(self.data_root, 'seg_info')
mmengine.mkdir_or_exist(save_folder)
np.save(
osp.join(save_folder, f'{self.split}_resampled_scene_idxs.npy'),
scene_idxs)
np.save(
osp.join(save_folder, f'{self.split}_label_weight.npy'),
label_weight)
print(f'{self.split} resampled scene index and label weight saved')
def _convert_to_label(self, mask):
"""Convert class_id in loaded segmentation mask to label."""
if isinstance(mask, str):
if mask.endswith('npy'):
mask = np.load(mask)
else:
mask = np.fromfile(mask, dtype=np.int64)
label = self.cat_id2class[mask]
return label
def get_scene_idxs_and_label_weight(self):
"""Compute scene_idxs for data sampling and label weight for loss
calculation.
We sample more times for scenes with more points. Label_weight is
inversely proportional to number of class points.
"""
num_classes = len(self.cat_ids)
num_point_all = []
label_weight = np.zeros((num_classes + 1, )) # ignore_index
for data_info in self.data_infos:
label = self._convert_to_label(
osp.join(self.data_root, data_info['pts_semantic_mask_path']))
num_point_all.append(label.shape[0])
class_count, _ = np.histogram(label, range(num_classes + 2))
label_weight += class_count
# repeat scene_idx for num_scene_point // num_sample_point times
sample_prob = np.array(num_point_all) / float(np.sum(num_point_all))
num_iter = int(np.sum(num_point_all) / float(self.num_points))
scene_idxs = []
for idx in range(len(self.data_infos)):
scene_idxs.extend([idx] * int(round(sample_prob[idx] * num_iter)))
scene_idxs = np.array(scene_idxs).astype(np.int32)
# calculate label weight, adopted from PointNet++
label_weight = label_weight[:-1].astype(np.float32)
label_weight = label_weight / label_weight.sum()
label_weight = self.label_weight_func(label_weight).astype(np.float32)
return scene_idxs, label_weight