mm3dtest / mmdet3d /apis /inferencers /base_3d_inferencer.py
giantmonkeyTC
2344
34d1f8b
# Copyright (c) OpenMMLab. All rights reserved.
import logging
import os.path as osp
from copy import deepcopy
from typing import Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
import torch.nn as nn
from mmengine import dump, print_log
from mmengine.infer.infer import BaseInferencer, ModelType
from mmengine.model.utils import revert_sync_batchnorm
from mmengine.registry import init_default_scope
from mmengine.runner import load_checkpoint
from mmengine.structures import InstanceData
from mmengine.visualization import Visualizer
from rich.progress import track
from mmdet3d.registry import DATASETS, MODELS
from mmdet3d.structures import Box3DMode, Det3DDataSample
from mmdet3d.utils import ConfigType
InstanceList = List[InstanceData]
InputType = Union[str, np.ndarray]
InputsType = Union[InputType, Sequence[InputType]]
PredType = Union[InstanceData, InstanceList]
ImgType = Union[np.ndarray, Sequence[np.ndarray]]
ResType = Union[Dict, List[Dict], InstanceData, List[InstanceData]]
class Base3DInferencer(BaseInferencer):
"""Base 3D model inferencer.
Args:
model (str, optional): Path to the config file or the model name
defined in metafile. For example, it could be
"pgd-kitti" or
"configs/pgd/pgd_r101-caffe_fpn_head-gn_4xb3-4x_kitti-mono3d.py".
If model is not specified, user must provide the
`weights` saved by MMEngine which contains the config string.
Defaults to None.
weights (str, optional): Path to the checkpoint. If it is not specified
and model is a model name of metafile, the weights will be loaded
from metafile. Defaults to None.
device (str, optional): Device to run inference. If None, the available
device will be automatically used. Defaults to None.
scope (str): The scope of the model. Defaults to 'mmdet3d'.
palette (str): Color palette used for visualization. The order of
priority is palette -> config -> checkpoint. Defaults to 'none'.
"""
preprocess_kwargs: set = {'cam_type'}
forward_kwargs: set = set()
visualize_kwargs: set = {
'return_vis', 'show', 'wait_time', 'draw_pred', 'pred_score_thr',
'img_out_dir', 'no_save_vis', 'cam_type_dir'
}
postprocess_kwargs: set = {
'print_result', 'pred_out_dir', 'return_datasample', 'no_save_pred'
}
def __init__(self,
model: Union[ModelType, str, None] = None,
weights: Optional[str] = None,
device: Optional[str] = None,
scope: str = 'mmdet3d',
palette: str = 'none') -> None:
# A global counter tracking the number of frames processed, for
# naming of the output results
self.num_predicted_frames = 0
self.palette = palette
init_default_scope(scope)
super().__init__(
model=model, weights=weights, device=device, scope=scope)
self.model = revert_sync_batchnorm(self.model)
def _convert_syncbn(self, cfg: ConfigType):
"""Convert config's naiveSyncBN to BN.
Args:
config (str or :obj:`mmengine.Config`): Config file path
or the config object.
"""
if isinstance(cfg, dict):
for item in cfg:
if item == 'norm_cfg':
cfg[item]['type'] = cfg[item]['type']. \
replace('naiveSyncBN', 'BN')
else:
self._convert_syncbn(cfg[item])
def _init_model(
self,
cfg: ConfigType,
weights: str,
device: str = 'cpu',
) -> nn.Module:
self._convert_syncbn(cfg.model)
cfg.model.train_cfg = None
model = MODELS.build(cfg.model)
checkpoint = load_checkpoint(model, weights, map_location='cpu')
if 'dataset_meta' in checkpoint.get('meta', {}):
# mmdet3d 1.x
model.dataset_meta = checkpoint['meta']['dataset_meta']
elif 'CLASSES' in checkpoint.get('meta', {}):
# < mmdet3d 1.x
classes = checkpoint['meta']['CLASSES']
model.dataset_meta = {'classes': classes}
if 'PALETTE' in checkpoint.get('meta', {}): # 3D Segmentor
model.dataset_meta['palette'] = checkpoint['meta']['PALETTE']
else:
# < mmdet3d 1.x
model.dataset_meta = {'classes': cfg.class_names}
if 'PALETTE' in checkpoint.get('meta', {}): # 3D Segmentor
model.dataset_meta['palette'] = checkpoint['meta']['PALETTE']
test_dataset_cfg = deepcopy(cfg.test_dataloader.dataset)
# lazy init. We only need the metainfo.
test_dataset_cfg['lazy_init'] = True
metainfo = DATASETS.build(test_dataset_cfg).metainfo
cfg_palette = metainfo.get('palette', None)
if cfg_palette is not None:
model.dataset_meta['palette'] = cfg_palette
model.cfg = cfg # save the config in the model for convenience
model.to(device)
model.eval()
return model
def _get_transform_idx(self, pipeline_cfg: ConfigType, name: str) -> int:
"""Returns the index of the transform in a pipeline.
If the transform is not found, returns -1.
"""
for i, transform in enumerate(pipeline_cfg):
if transform['type'] == name:
return i
return -1
def _init_visualizer(self, cfg: ConfigType) -> Optional[Visualizer]:
visualizer = super()._init_visualizer(cfg)
visualizer.dataset_meta = self.model.dataset_meta
return visualizer
def _dispatch_kwargs(self,
out_dir: str = '',
cam_type: str = '',
**kwargs) -> Tuple[Dict, Dict, Dict, Dict]:
"""Dispatch kwargs to preprocess(), forward(), visualize() and
postprocess() according to the actual demands.
Args:
out_dir (str): Dir to save the inference results.
cam_type (str): Camera type. Defaults to ''.
**kwargs (dict): Key words arguments passed to :meth:`preprocess`,
:meth:`forward`, :meth:`visualize` and :meth:`postprocess`.
Each key in kwargs should be in the corresponding set of
``preprocess_kwargs``, ``forward_kwargs``, ``visualize_kwargs``
and ``postprocess_kwargs``.
Returns:
Tuple[Dict, Dict, Dict, Dict]: kwargs passed to preprocess,
forward, visualize and postprocess respectively.
"""
kwargs['img_out_dir'] = out_dir
kwargs['pred_out_dir'] = out_dir
if cam_type != '':
kwargs['cam_type_dir'] = cam_type
return super()._dispatch_kwargs(**kwargs)
def __call__(self,
inputs: InputsType,
batch_size: int = 1,
return_datasamples: bool = False,
**kwargs) -> Optional[dict]:
"""Call the inferencer.
Args:
inputs (InputsType): Inputs for the inferencer.
batch_size (int): Batch size. Defaults to 1.
return_datasamples (bool): Whether to return results as
:obj:`BaseDataElement`. Defaults to False.
**kwargs: Key words arguments passed to :meth:`preprocess`,
:meth:`forward`, :meth:`visualize` and :meth:`postprocess`.
Each key in kwargs should be in the corresponding set of
``preprocess_kwargs``, ``forward_kwargs``, ``visualize_kwargs``
and ``postprocess_kwargs``.
Returns:
dict: Inference and visualization results.
"""
(
preprocess_kwargs,
forward_kwargs,
visualize_kwargs,
postprocess_kwargs,
) = self._dispatch_kwargs(**kwargs)
cam_type = preprocess_kwargs.pop('cam_type', 'CAM2')
ori_inputs = self._inputs_to_list(inputs, cam_type=cam_type)
inputs = self.preprocess(
ori_inputs, batch_size=batch_size, **preprocess_kwargs)
preds = []
results_dict = {'predictions': [], 'visualization': []}
for data in (track(inputs, description='Inference')
if self.show_progress else inputs):
preds.extend(self.forward(data, **forward_kwargs))
visualization = self.visualize(ori_inputs, preds,
**visualize_kwargs)
results = self.postprocess(preds, visualization,
return_datasamples,
**postprocess_kwargs)
results_dict['predictions'].extend(results['predictions'])
if results['visualization'] is not None:
results_dict['visualization'].extend(results['visualization'])
return results_dict
def postprocess(
self,
preds: PredType,
visualization: Optional[List[np.ndarray]] = None,
return_datasample: bool = False,
print_result: bool = False,
no_save_pred: bool = False,
pred_out_dir: str = '',
) -> Union[ResType, Tuple[ResType, np.ndarray]]:
"""Process the predictions and visualization results from ``forward``
and ``visualize``.
This method should be responsible for the following tasks:
1. Convert datasamples into a json-serializable dict if needed.
2. Pack the predictions and visualization results and return them.
3. Dump or log the predictions.
Args:
preds (List[Dict]): Predictions of the model.
visualization (np.ndarray, optional): Visualized predictions.
Defaults to None.
return_datasample (bool): Whether to use Datasample to store
inference results. If False, dict will be used.
Defaults to False.
print_result (bool): Whether to print the inference result w/o
visualization to the console. Defaults to False.
pred_out_dir (str): Directory to save the inference results w/o
visualization. If left as empty, no file will be saved.
Defaults to ''.
Returns:
dict: Inference and visualization results with key ``predictions``
and ``visualization``.
- ``visualization`` (Any): Returned by :meth:`visualize`.
- ``predictions`` (dict or DataSample): Returned by
:meth:`forward` and processed in :meth:`postprocess`.
If ``return_datasample=False``, it usually should be a
json-serializable dict containing only basic data elements such
as strings and numbers.
"""
if no_save_pred is True:
pred_out_dir = ''
result_dict = {}
results = preds
if not return_datasample:
results = []
for pred in preds:
result = self.pred2dict(pred, pred_out_dir)
results.append(result)
elif pred_out_dir != '':
print_log(
'Currently does not support saving datasample '
'when return_datasample is set to True. '
'Prediction results are not saved!',
level=logging.WARNING)
# Add img to the results after printing and dumping
result_dict['predictions'] = results
if print_result:
print(result_dict)
result_dict['visualization'] = visualization
return result_dict
# TODO: The data format and fields saved in json need further discussion.
# Maybe should include model name, timestamp, filename, image info etc.
def pred2dict(self,
data_sample: Det3DDataSample,
pred_out_dir: str = '') -> Dict:
"""Extract elements necessary to represent a prediction into a
dictionary.
It's better to contain only basic data elements such as strings and
numbers in order to guarantee it's json-serializable.
Args:
data_sample (:obj:`DetDataSample`): Predictions of the model.
pred_out_dir: Dir to save the inference results w/o
visualization. If left as empty, no file will be saved.
Defaults to ''.
Returns:
dict: Prediction results.
"""
result = {}
if 'pred_instances_3d' in data_sample:
pred_instances_3d = data_sample.pred_instances_3d.numpy()
result = {
'labels_3d': pred_instances_3d.labels_3d.tolist(),
'scores_3d': pred_instances_3d.scores_3d.tolist(),
'bboxes_3d': pred_instances_3d.bboxes_3d.tensor.cpu().tolist()
}
if 'pred_pts_seg' in data_sample:
pred_pts_seg = data_sample.pred_pts_seg.numpy()
result['pts_semantic_mask'] = \
pred_pts_seg.pts_semantic_mask.tolist()
if data_sample.box_mode_3d == Box3DMode.LIDAR:
result['box_type_3d'] = 'LiDAR'
elif data_sample.box_mode_3d == Box3DMode.CAM:
result['box_type_3d'] = 'Camera'
elif data_sample.box_mode_3d == Box3DMode.DEPTH:
result['box_type_3d'] = 'Depth'
if pred_out_dir != '':
if 'lidar_path' in data_sample:
lidar_path = osp.basename(data_sample.lidar_path)
lidar_path = osp.splitext(lidar_path)[0]
out_json_path = osp.join(pred_out_dir, 'preds',
lidar_path + '.json')
elif 'img_path' in data_sample:
img_path = osp.basename(data_sample.img_path)
img_path = osp.splitext(img_path)[0]
out_json_path = osp.join(pred_out_dir, 'preds',
img_path + '.json')
else:
out_json_path = osp.join(
pred_out_dir, 'preds',
f'{str(self.num_visualized_imgs).zfill(8)}.json')
dump(result, out_json_path)
return result