import os import sys import cv2 import math import copy import hashlib import imageio import numpy as np import pandas as pd from scipy import interpolate from PIL import Image, ImageEnhance, ImageFile import torch import torch.nn.functional as F from torch.utils.data import Dataset ImageFile.LOAD_TRUNCATED_IMAGES = True sys.path.append("./") from external.landmark_detection.lib.dataset.augmentation import Augmentation from external.landmark_detection.lib.dataset.encoder import get_encoder class AlignmentDataset(Dataset): def __init__(self, tsv_flie, image_dir="", transform=None, width=256, height=256, channels=3, means=(127.5, 127.5, 127.5), scale=1 / 127.5, classes_num=None, crop_op=True, aug_prob=0.0, edge_info=None, flip_mapping=None, is_train=True, encoder_type='default', ): super(AlignmentDataset, self).__init__() self.use_AAM = True self.encoder_type = encoder_type self.encoder = get_encoder(height, width, encoder_type=encoder_type) self.items = pd.read_csv(tsv_flie, sep="\t") self.image_dir = image_dir self.landmark_num = classes_num[0] self.transform = transform self.image_width = width self.image_height = height self.channels = channels assert self.image_width == self.image_height self.means = means self.scale = scale self.aug_prob = aug_prob self.edge_info = edge_info self.is_train = is_train std_lmk_5pts = np.array([ 196.0, 226.0, 316.0, 226.0, 256.0, 286.0, 220.0, 360.4, 292.0, 360.4], np.float32) / 256.0 - 1.0 std_lmk_5pts = np.reshape(std_lmk_5pts, (5, 2)) # [-1 1] target_face_scale = 1.0 if crop_op else 1.25 self.augmentation = Augmentation( is_train=self.is_train, aug_prob=self.aug_prob, image_size=self.image_width, crop_op=crop_op, std_lmk_5pts=std_lmk_5pts, target_face_scale=target_face_scale, flip_rate=0.5, flip_mapping=flip_mapping, random_shift_sigma=0.05, random_rot_sigma=math.pi / 180 * 18, random_scale_sigma=0.1, random_gray_rate=0.2, random_occ_rate=0.4, random_blur_rate=0.3, random_gamma_rate=0.2, random_nose_fusion_rate=0.2) def _circle(self, img, pt, sigma=1.0, label_type='Gaussian'): # Check that any part of the gaussian is in-bounds tmp_size = sigma * 3 ul = [int(pt[0] - tmp_size), int(pt[1] - tmp_size)] br = [int(pt[0] + tmp_size + 1), int(pt[1] + tmp_size + 1)] if (ul[0] > img.shape[1] - 1 or ul[1] > img.shape[0] - 1 or br[0] - 1 < 0 or br[1] - 1 < 0): # If not, just return the image as is return img # Generate gaussian size = 2 * tmp_size + 1 x = np.arange(0, size, 1, np.float32) y = x[:, np.newaxis] x0 = y0 = size // 2 # The gaussian is not normalized, we want the center value to equal 1 if label_type == 'Gaussian': g = np.exp(- ((x - x0) ** 2 + (y - y0) ** 2) / (2 * sigma ** 2)) else: g = sigma / (((x - x0) ** 2 + (y - y0) ** 2 + sigma ** 2) ** 1.5) # Usable gaussian range g_x = max(0, -ul[0]), min(br[0], img.shape[1]) - ul[0] g_y = max(0, -ul[1]), min(br[1], img.shape[0]) - ul[1] # Image range img_x = max(0, ul[0]), min(br[0], img.shape[1]) img_y = max(0, ul[1]), min(br[1], img.shape[0]) img[img_y[0]:img_y[1], img_x[0]:img_x[1]] = 255 * g[g_y[0]:g_y[1], g_x[0]:g_x[1]] return img def _polylines(self, img, lmks, is_closed, color=255, thickness=1, draw_mode=cv2.LINE_AA, interpolate_mode=cv2.INTER_AREA, scale=4): h, w = img.shape img_scale = cv2.resize(img, (w * scale, h * scale), interpolation=interpolate_mode) lmks_scale = (lmks * scale + 0.5).astype(np.int32) cv2.polylines(img_scale, [lmks_scale], is_closed, color, thickness * scale, draw_mode) img = cv2.resize(img_scale, (w, h), interpolation=interpolate_mode) return img def _generate_edgemap(self, points, scale=0.25, thickness=1): h, w = self.image_height, self.image_width edgemaps = [] for is_closed, indices in self.edge_info: edgemap = np.zeros([h, w], dtype=np.float32) # align_corners: False. part = copy.deepcopy(points[np.array(indices)]) part = self._fit_curve(part, is_closed) part[:, 0] = np.clip(part[:, 0], 0, w - 1) part[:, 1] = np.clip(part[:, 1], 0, h - 1) edgemap = self._polylines(edgemap, part, is_closed, 255, thickness) edgemaps.append(edgemap) edgemaps = np.stack(edgemaps, axis=0) / 255.0 edgemaps = torch.from_numpy(edgemaps).float().unsqueeze(0) edgemaps = F.interpolate(edgemaps, size=(int(w * scale), int(h * scale)), mode='bilinear', align_corners=False).squeeze() return edgemaps def _fit_curve(self, lmks, is_closed=False, density=5): try: x = lmks[:, 0].copy() y = lmks[:, 1].copy() if is_closed: x = np.append(x, x[0]) y = np.append(y, y[0]) tck, u = interpolate.splprep([x, y], s=0, per=is_closed, k=3) # bins = (x.shape[0] - 1) * density + 1 # lmk_x, lmk_y = interpolate.splev(np.linspace(0, 1, bins), f) intervals = np.array([]) for i in range(len(u) - 1): intervals = np.concatenate((intervals, np.linspace(u[i], u[i + 1], density, endpoint=False))) if not is_closed: intervals = np.concatenate((intervals, [u[-1]])) lmk_x, lmk_y = interpolate.splev(intervals, tck, der=0) # der_x, der_y = interpolate.splev(intervals, tck, der=1) curve_lmks = np.stack([lmk_x, lmk_y], axis=-1) # curve_ders = np.stack([der_x, der_y], axis=-1) # origin_indices = np.arange(0, curve_lmks.shape[0], density) return curve_lmks except: return lmks def _image_id(self, image_path): if not os.path.exists(image_path): image_path = os.path.join(self.image_dir, image_path) return hashlib.md5(open(image_path, "rb").read()).hexdigest() def _load_image(self, image_path): if not os.path.exists(image_path): image_path = os.path.join(self.image_dir, image_path) try: # img = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8), cv2.IMREAD_COLOR)#HWC, BGR, [0-255] img = cv2.imread(image_path, cv2.IMREAD_COLOR) # HWC, BGR, [0-255] assert img is not None and len(img.shape) == 3 and img.shape[2] == 3 except: try: img = imageio.imread(image_path) # HWC, RGB, [0-255] img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) # HWC, BGR, [0-255] assert img is not None and len(img.shape) == 3 and img.shape[2] == 3 except: try: gifImg = imageio.mimread(image_path) # BHWC, RGB, [0-255] img = gifImg[0] # HWC, RGB, [0-255] img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) # HWC, BGR, [0-255] assert img is not None and len(img.shape) == 3 and img.shape[2] == 3 except: img = None return img def _compose_rotate_and_scale(self, angle, scale, shift_xy, from_center, to_center): cosv = math.cos(angle) sinv = math.sin(angle) fx, fy = from_center tx, ty = to_center acos = scale * cosv asin = scale * sinv a0 = acos a1 = -asin a2 = tx - acos * fx + asin * fy + shift_xy[0] b0 = asin b1 = acos b2 = ty - asin * fx - acos * fy + shift_xy[1] rot_scale_m = np.array([ [a0, a1, a2], [b0, b1, b2], [0.0, 0.0, 1.0] ], np.float32) return rot_scale_m def _transformPoints2D(self, points, matrix): """ points (nx2), matrix (3x3) -> points (nx2) """ dtype = points.dtype # nx3 points = np.concatenate([points, np.ones_like(points[:, [0]])], axis=1) points = points @ np.transpose(matrix) # nx3 points = points[:, :2] / points[:, [2, 2]] return points.astype(dtype) def _transformPerspective(self, image, matrix, target_shape): """ image, matrix3x3 -> transformed_image """ return cv2.warpPerspective( image, matrix, dsize=(target_shape[1], target_shape[0]), flags=cv2.INTER_LINEAR, borderValue=0) def _norm_points(self, points, h, w, align_corners=False): if align_corners: # [0, SIZE-1] -> [-1, +1] des_points = points / torch.tensor([w - 1, h - 1]).to(points).view(1, 2) * 2 - 1 else: # [-0.5, SIZE-0.5] -> [-1, +1] des_points = (points * 2 + 1) / torch.tensor([w, h]).to(points).view(1, 2) - 1 des_points = torch.clamp(des_points, -1, 1) return des_points def _denorm_points(self, points, h, w, align_corners=False): if align_corners: # [-1, +1] -> [0, SIZE-1] des_points = (points + 1) / 2 * torch.tensor([w - 1, h - 1]).to(points).view(1, 1, 2) else: # [-1, +1] -> [-0.5, SIZE-0.5] des_points = ((points + 1) * torch.tensor([w, h]).to(points).view(1, 1, 2) - 1) / 2 return des_points def __len__(self): return len(self.items) def __getitem__(self, index): sample = dict() image_path = self.items.iloc[index, 0] landmarks_5pts = self.items.iloc[index, 1] landmarks_5pts = np.array(list(map(float, landmarks_5pts.split(","))), dtype=np.float32).reshape(5, 2) landmarks_target = self.items.iloc[index, 2] landmarks_target = np.array(list(map(float, landmarks_target.split(","))), dtype=np.float32).reshape( self.landmark_num, 2) scale = float(self.items.iloc[index, 3]) center_w, center_h = float(self.items.iloc[index, 4]), float(self.items.iloc[index, 5]) if len(self.items.iloc[index]) > 6: tags = np.array(list(map(lambda x: int(float(x)), self.items.iloc[index, 6].split(",")))) else: tags = np.array([]) # image & keypoints alignment image_path = image_path.replace('\\', '/') # wflw testset image_path = image_path.replace( '//msr-facestore/Workspace/MSRA_EP_Allergan/users/yanghuan/training_data/wflw/rawImages/', '') # trainset image_path = image_path.replace('./rawImages/', '') image_path = os.path.join(self.image_dir, image_path) # image path sample["image_path"] = image_path img = self._load_image(image_path) # HWC, BGR, [0, 255] assert img is not None # augmentation # landmarks_target = [-0.5, edge-0.5] img, landmarks_target, matrix = \ self.augmentation.process(img, landmarks_target, landmarks_5pts, scale, center_w, center_h) landmarks = self._norm_points(torch.from_numpy(landmarks_target), self.image_height, self.image_width) sample["label"] = [landmarks, ] if self.use_AAM: pointmap = self.encoder.generate_heatmap(landmarks_target) edgemap = self._generate_edgemap(landmarks_target) sample["label"] += [pointmap, edgemap] sample['matrix'] = matrix # image normalization img = img.transpose(2, 0, 1).astype(np.float32) # CHW, BGR, [0, 255] img[0, :, :] = (img[0, :, :] - self.means[0]) * self.scale img[1, :, :] = (img[1, :, :] - self.means[1]) * self.scale img[2, :, :] = (img[2, :, :] - self.means[2]) * self.scale sample["data"] = torch.from_numpy(img) # CHW, BGR, [-1, 1] sample["tags"] = tags return sample