# # Toyota Motor Europe NV/SA and its affiliated companies retain all intellectual # property and proprietary rights in and to this software and related documentation. # Any commercial use, reproduction, disclosure or distribution of this software and # related documentation without an express license agreement from Toyota Motor Europe NV/SA # is strictly prohibited. # from vhap.util.log import get_logger from typing import Literal from tqdm import tqdm import face_alignment import numpy as np import matplotlib.path as mpltPath from fdlite import ( FaceDetection, FaceLandmark, face_detection_to_roi, IrisLandmark, iris_roi_from_face_landmarks, ) logger = get_logger(__name__) class LandmarkDetectorFA: IMAGE_FILE_NAME = "image_0000.png" LMK_FILE_NAME = "keypoints_static_0000.json" def __init__( self, face_detector:Literal["sfd", "blazeface"]="sfd", ): """ Creates dataset_path where all results are stored :param video_path: path to video file :param dataset_path: path to results directory """ logger.info("Initialize FaceAlignment module...") # 68 facial landmark detector self.fa = face_alignment.FaceAlignment( face_alignment.LandmarksType.TWO_HALF_D, face_detector=face_detector, flip_input=True, device="cuda" ) def detect_single_image(self, img): bbox = self.fa.face_detector.detect_from_image(img) if len(bbox) == 0: lmks = np.zeros([68, 3]) - 1 # set to -1 when landmarks is inavailable else: if len(bbox) > 1: # if multiple boxes detected, use the one with highest confidence bbox = [bbox[np.argmax(np.array(bbox)[:, -1])]] lmks = self.fa.get_landmarks_from_image(img, detected_faces=bbox)[0] lmks = np.concatenate([lmks, np.ones_like(lmks[:, :1])], axis=1) if (lmks[:, :2] == -1).sum() > 0: lmks[:, 2:] = 0.0 else: lmks[:, 2:] = 1.0 h, w = img.shape[:2] lmks[:, 0] /= w lmks[:, 1] /= h bbox[0][[0, 2]] /= w bbox[0][[1, 3]] /= h return bbox, lmks def detect_dataset(self, dataloader): """ Annotates each frame with 68 facial landmarks :return: dict mapping frame number to landmarks numpy array and the same thing for bboxes """ landmarks = {} bboxes = {} logger.info("Begin annotating landmarks...") for item in tqdm(dataloader): timestep_id = item["timestep_id"][0] camera_id = item["camera_id"][0] scale_factor = item["scale_factor"][0] logger.info( f"Annotate facial landmarks for timestep: {timestep_id}, camera: {camera_id}" ) img = item["rgb"][0].numpy() bbox, lmks = self.detect_single_image(img) if len(bbox) == 0: logger.error( f"No bbox found for frame: {timestep_id}, camera: {camera_id}. Setting landmarks to all -1." ) if camera_id not in landmarks: landmarks[camera_id] = {} if camera_id not in bboxes: bboxes[camera_id] = {} landmarks[camera_id][timestep_id] = lmks bboxes[camera_id][timestep_id] = bbox[0] if len(bbox) > 0 else np.zeros(5) - 1 return landmarks, bboxes def annotate_iris_landmarks(self, dataloader): """ Annotates each frame with 2 iris landmarks :return: dict mapping frame number to landmarks numpy array """ # iris detector detect_faces = FaceDetection() detect_face_landmarks = FaceLandmark() detect_iris_landmarks = IrisLandmark() landmarks = {} for item in tqdm(dataloader): timestep_id = item["timestep_id"][0] camera_id = item["camera_id"][0] scale_factor = item["scale_factor"][0] if timestep_id not in landmarks: landmarks[timestep_id] = {} logger.info( f"Annotate iris landmarks for timestep: {timestep_id}, camera: {camera_id}" ) img = item["rgb"][0].numpy() height, width = img.shape[:2] img_size = (width, height) face_detections = detect_faces(img) if len(face_detections) != 1: logger.error("Empty iris landmarks (type 1)") landmarks[timestep_id][camera_id] = None else: for face_detection in face_detections: try: face_roi = face_detection_to_roi(face_detection, img_size) except ValueError: logger.error("Empty iris landmarks (type 2)") landmarks[timestep_id][camera_id] = None break face_landmarks = detect_face_landmarks(img, face_roi) if len(face_landmarks) == 0: logger.error("Empty iris landmarks (type 3)") landmarks[timestep_id][camera_id] = None break iris_rois = iris_roi_from_face_landmarks(face_landmarks, img_size) if len(iris_rois) != 2: logger.error("Empty iris landmarks (type 4)") landmarks[timestep_id][camera_id] = None break lmks = [] for iris_roi in iris_rois[::-1]: try: iris_landmarks = detect_iris_landmarks(img, iris_roi).iris[ 0:1 ] except np.linalg.LinAlgError: logger.error("Failed to get iris landmarks") landmarks[timestep_id][camera_id] = None break for landmark in iris_landmarks: lmks.append([landmark.x * width, landmark.y * height, 1.0]) lmks = np.array(lmks, dtype=np.float32) h, w = img.shape[:2] lmks[:, 0] /= w lmks[:, 1] /= h landmarks[timestep_id][camera_id] = lmks return landmarks def iris_consistency(self, lm_iris, lm_eye): """ Checks if landmarks for eye and iris are consistent :param lm_iris: :param lm_eye: :return: """ lm_iris = lm_iris[:, :2] lm_eye = lm_eye[:, :2] polygon_eye = mpltPath.Path(lm_eye) valid = polygon_eye.contains_points(lm_iris) return valid[0] def annotate_landmarks(self, dataloader, add_iris=False): """ Annotates each frame with landmarks for face and iris. Assumes frames have been extracted :param add_iris: :return: """ lmks_face, bboxes_faces = self.detect_dataset(dataloader) if add_iris: lmks_iris = self.annotate_iris_landmarks(dataloader) # check conistency of iris landmarks and facial keypoints for camera_id, lmk_face_camera in lmks_face.items(): for timestep_id in lmk_face_camera.keys(): discard_iris_lmks = False bboxes_face_i = bboxes_faces[camera_id][timestep_id] if bboxes_face_i is not None: lmks_face_i = lmks_face[camera_id][timestep_id] lmks_iris_i = lmks_iris[camera_id][timestep_id] if lmks_iris_i is not None: # validate iris landmarks left_face = lmks_face_i[36:42] right_face = lmks_face_i[42:48] right_iris = lmks_iris_i[:1] left_iris = lmks_iris_i[1:] if not ( self.iris_consistency(left_iris, left_face) and self.iris_consistency(right_iris, right_face) ): logger.error( f"Inconsistent iris landmarks for timestep: {timestep_id}, camera: {camera_id}" ) discard_iris_lmks = True else: logger.error( f"No iris landmarks detected for timestep: {timestep_id}, camera: {camera_id}" ) discard_iris_lmks = True else: logger.error( f"Discarding iris landmarks because no face landmark is available for timestep: {timestep_id}, camera: {camera_id}" ) discard_iris_lmks = True if discard_iris_lmks: lmks_iris[timestep_id][camera_id] = ( np.zeros([2, 3]) - 1 ) # set to -1 for inconsistent iris landmarks # construct final json for camera_id, lmk_face_camera in lmks_face.items(): bounding_box = [] face_landmark_2d = [] iris_landmark_2d = [] for timestep_id in lmk_face_camera.keys(): bounding_box.append(bboxes_faces[camera_id][timestep_id][None]) face_landmark_2d.append(lmks_face[camera_id][timestep_id][None]) if add_iris: iris_landmark_2d.append(lmks_iris[camera_id][timestep_id][None]) lmk_dict = { "bounding_box": bounding_box, "face_landmark_2d": face_landmark_2d, } if len(iris_landmark_2d) > 0: lmk_dict["iris_landmark_2d"] = iris_landmark_2d for k, v in lmk_dict.items(): if len(v) > 0: lmk_dict[k] = np.concatenate(v, axis=0) out_path = dataloader.dataset.get_property_path( "landmark2d/face-alignment", camera_id=camera_id ) logger.info(f"Saving landmarks to: {out_path}") if not out_path.parent.exists(): out_path.parent.mkdir(parents=True) np.savez(out_path, **lmk_dict) if __name__ == "__main__": import tyro from tqdm import tqdm from torch.utils.data import DataLoader from vhap.config.base import DataConfig, import_module cfg = tyro.cli(DataConfig) dataset = import_module(cfg._target)( cfg=cfg, img_to_tensor=False, batchify_all_views=True, ) dataset.items = dataset.items[:2] dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=4) detector = LandmarkDetectorFA() detector.annotate_landmarks(dataloader)