|
import glob |
|
import json |
|
import os |
|
import skimage |
|
import numpy as np |
|
from pathlib import Path |
|
from natsort import natsorted |
|
from PIL import Image |
|
from relighting.image_processor import pil_square_image |
|
from tqdm.auto import tqdm |
|
import random |
|
import itertools |
|
from abc import ABC, abstractmethod |
|
|
|
class Dataset(ABC): |
|
def __init__(self, |
|
resolution=(1024, 1024), |
|
force_square=True, |
|
return_image_path=False, |
|
return_dict=False, |
|
): |
|
""" |
|
Resoution is (WIDTH, HEIGHT) |
|
""" |
|
self.resolution = resolution |
|
self.force_square = force_square |
|
self.return_image_path = return_image_path |
|
self.return_dict = return_dict |
|
self.scene_data = [] |
|
self.meta_data = [] |
|
self.boundary_info = [] |
|
|
|
@abstractmethod |
|
def _load_data_path(self): |
|
pass |
|
|
|
def __len__(self): |
|
return len(self.scene_data) |
|
|
|
def __getitem__(self, idx): |
|
image = Image.open(self.scene_data[idx]) |
|
if self.force_square: |
|
image = pil_square_image(image, self.resolution) |
|
else: |
|
image = image.resize(self.resolution) |
|
|
|
if self.return_dict: |
|
d = { |
|
"image": image, |
|
"path": self.scene_data[idx] |
|
} |
|
if len(self.boundary_info) > 0: |
|
d["boundary"] = self.boundary_info[idx] |
|
|
|
return d |
|
elif self.return_image_path: |
|
return image, self.scene_data[idx] |
|
else: |
|
return image |
|
|
|
class GeneralLoader(Dataset): |
|
def __init__(self, |
|
root=None, |
|
num_samples=None, |
|
res_threshold=((1024, 1024)), |
|
apply_threshold=False, |
|
random_shuffle=False, |
|
process_id = 0, |
|
process_total = 1, |
|
limit_input = 0, |
|
**kwargs, |
|
): |
|
super().__init__(**kwargs) |
|
self.root = root |
|
self.res_threshold = res_threshold |
|
self.apply_threshold = apply_threshold |
|
self.has_meta = False |
|
|
|
if self.root is not None: |
|
if not os.path.exists(self.root): |
|
raise Exception(f"Dataset {self.root} does not exist.") |
|
|
|
paths = natsorted( |
|
list(glob.glob(os.path.join(self.root, "*.png"))) + \ |
|
list(glob.glob(os.path.join(self.root, "*.jpg"))) |
|
) |
|
self.scene_data = self._load_data_path(paths, num_samples=num_samples) |
|
|
|
if random_shuffle: |
|
SEED = 0 |
|
random.Random(SEED).shuffle(self.scene_data) |
|
random.Random(SEED).shuffle(self.boundary_info) |
|
|
|
if limit_input > 0: |
|
self.scene_data = self.scene_data[:limit_input] |
|
self.boundary_info = self.boundary_info[:limit_input] |
|
|
|
|
|
if process_total > 1: |
|
self.scene_data = self.scene_data[process_id::process_total] |
|
self.boundary_info = self.boundary_info[process_id::process_total] |
|
print(f"Process {process_id} has {len(self.scene_data)} samples") |
|
|
|
def _load_data_path(self, paths, num_samples=None): |
|
if os.path.exists(os.path.splitext(paths[0])[0] + ".json") or os.path.exists(os.path.splitext(paths[-1])[0] + ".json"): |
|
self.has_meta = True |
|
|
|
if self.has_meta: |
|
|
|
TARGET_KEY = "chrome_mask256" |
|
for path in paths: |
|
with open(os.path.splitext(path)[0] + ".json") as f: |
|
meta = json.load(f) |
|
self.meta_data.append(meta) |
|
boundary = { |
|
"x": meta[TARGET_KEY]["x"], |
|
"y": meta[TARGET_KEY]["y"], |
|
"size": meta[TARGET_KEY]["w"], |
|
} |
|
self.boundary_info.append(boundary) |
|
|
|
|
|
scene_data = paths |
|
if self.apply_threshold: |
|
scene_data = [] |
|
for path in tqdm(paths): |
|
img = Image.open(path) |
|
if (img.size[0] >= self.res_threshold[0]) and (img.size[1] >= self.res_threshold[1]): |
|
scene_data.append(path) |
|
|
|
if num_samples is not None: |
|
max_idx = min(num_samples, len(scene_data)) |
|
scene_data = scene_data[:max_idx] |
|
|
|
return scene_data |
|
|
|
@classmethod |
|
def from_image_paths(cls, paths, *args, **kwargs): |
|
dataset = cls(*args, **kwargs) |
|
dataset.scene_data = dataset._load_data_path(paths) |
|
return dataset |
|
|
|
class ALPLoader(Dataset): |
|
def __init__(self, |
|
root=None, |
|
num_samples=None, |
|
res_threshold=((1024, 1024)), |
|
apply_threshold=False, |
|
**kwargs, |
|
): |
|
super().__init__(**kwargs) |
|
self.root = root |
|
self.res_threshold = res_threshold |
|
self.apply_threshold = apply_threshold |
|
self.has_meta = False |
|
|
|
if self.root is not None: |
|
if not os.path.exists(self.root): |
|
raise Exception(f"Dataset {self.root} does not exist.") |
|
|
|
dirs = natsorted(list(glob.glob(os.path.join(self.root, "*")))) |
|
self.scene_data = self._load_data_path(dirs) |
|
|
|
def _load_data_path(self, dirs): |
|
self.scene_names = [Path(dir).name for dir in dirs] |
|
|
|
scene_data = [] |
|
for dir in dirs: |
|
pseudo_probe_dirs = natsorted(list(glob.glob(os.path.join(dir, "*")))) |
|
pseudo_probe_dirs = [dir for dir in pseudo_probe_dirs if "gt" not in dir] |
|
data = [os.path.join(dir, "images", "0.png") for dir in pseudo_probe_dirs] |
|
scene_data.append(data) |
|
|
|
scene_data = list(itertools.chain(*scene_data)) |
|
return scene_data |
|
|
|
class MultiIlluminationLoader(Dataset): |
|
def __init__(self, |
|
root, |
|
mask_probe=True, |
|
mask_boundingbox=False, |
|
**kwargs, |
|
): |
|
""" |
|
@params resolution (tuple): (width, height) - resolution of the image |
|
@params force_square: will add black border to make the image square while keeping the aspect ratio |
|
@params mask_probe: mask the probe with the mask in the dataset |
|
|
|
""" |
|
super().__init__(**kwargs) |
|
self.root = root |
|
self.mask_probe = mask_probe |
|
self.mask_boundingbox = mask_boundingbox |
|
|
|
if self.root is not None: |
|
dirs = natsorted(list(glob.glob(os.path.join(self.root, "*")))) |
|
self.scene_data = self._load_data_path(dirs) |
|
|
|
def _load_data_path(self, dirs): |
|
self.scene_names = [Path(dir).name for dir in dirs] |
|
|
|
data = {} |
|
for dir in dirs: |
|
chrome_probes = natsorted(list(glob.glob(os.path.join(dir, "probes", "*chrome*.jpg")))) |
|
gray_probes = natsorted(list(glob.glob(os.path.join(dir, "probes", "*gray*.jpg")))) |
|
scenes = natsorted(list(glob.glob(os.path.join(dir, "dir_*.jpg")))) |
|
|
|
with open(os.path.join(dir, "meta.json")) as f: |
|
meta_data = json.load(f) |
|
|
|
bbox_chrome = meta_data["chrome"]["bounding_box"] |
|
bbox_gray = meta_data["gray"]["bounding_box"] |
|
|
|
mask_chrome = os.path.join(dir, "mask_chrome.png") |
|
mask_gray = os.path.join(dir, "mask_gray.png") |
|
|
|
scene_name = Path(dir).name |
|
data[scene_name] = { |
|
"scenes": scenes, |
|
"chrome_probes": chrome_probes, |
|
"gray_probes": gray_probes, |
|
"bbox_chrome": bbox_chrome, |
|
"bbox_gray": bbox_gray, |
|
"mask_chrome": mask_chrome, |
|
"mask_gray": mask_gray, |
|
} |
|
return data |
|
|
|
def _mask_probe(self, image, mask): |
|
""" |
|
mask probe with a png file in dataset |
|
""" |
|
image_anticheat = skimage.img_as_float(np.array(image)) |
|
mask_np = skimage.img_as_float(np.array(mask))[..., None] |
|
image_anticheat = ((1.0 - mask_np) * image_anticheat) + (0.5 * mask_np) |
|
image_anticheat = Image.fromarray(skimage.img_as_ubyte(image_anticheat)) |
|
return image_anticheat |
|
|
|
def _mask_boundingbox(self, image, bbox): |
|
""" |
|
mask image with the bounding box for anti-cheat |
|
""" |
|
bbox = {k:int(np.round(v/4.0)) for k,v in bbox.items()} |
|
x, y, w, h = bbox["x"], bbox["y"], bbox["w"], bbox["h"] |
|
image_anticheat = skimage.img_as_float(np.array(image)) |
|
image_anticheat[y:y+h, x:x+w] = 0.5 |
|
image_anticheat = Image.fromarray(skimage.img_as_ubyte(image_anticheat)) |
|
return image_anticheat |
|
|
|
def __getitem__(self, scene_name): |
|
data = self.scene_data[scene_name] |
|
|
|
mask_chrome = Image.open(data["mask_chrome"]) |
|
mask_gray = Image.open(data["mask_gray"]) |
|
images = [] |
|
for path in data["scenes"]: |
|
image = Image.open(path) |
|
if self.mask_probe: |
|
image = self._mask_probe(image, mask_chrome) |
|
image = self._mask_probe(image, mask_gray) |
|
if self.mask_boundingbox: |
|
image = self._mask_boundingbox(image, data["bbox_chrome"]) |
|
image = self._mask_boundingbox(image, data["bbox_gray"]) |
|
|
|
if self.force_square: |
|
image = pil_square_image(image, self.resolution) |
|
else: |
|
image = image.resize(self.resolution) |
|
images.append(image) |
|
|
|
chrome_probes = [Image.open(path) for path in data["chrome_probes"]] |
|
gray_probes = [Image.open(path) for path in data["gray_probes"]] |
|
bbox_chrome = data["bbox_chrome"] |
|
bbox_gray = data["bbox_gray"] |
|
|
|
return images, chrome_probes, gray_probes, bbox_chrome, bbox_gray |
|
|
|
|
|
def calculate_ball_info(self, scene_name): |
|
|
|
ball_data = [] |
|
for mtype in ['bbox_chrome', 'bbox_gray']: |
|
info = self.scene_data[scene_name][mtype] |
|
|
|
|
|
|
|
x = info['x'] / 4 |
|
y = info['y'] / 4 |
|
w = info['w'] / 4 |
|
h = info['h'] / 4 |
|
|
|
|
|
|
|
if self.force_square: |
|
h_ratio = (512.0 * 2.0 / 3.0) / 1000.0 |
|
w_ratio = 512.0 / 1500.0 |
|
else: |
|
h_ratio = self.resolution[0] / 1000.0 |
|
w_ratio = self.resolution[1] / 1500.0 |
|
|
|
x = x * w_ratio |
|
y = y * h_ratio |
|
w = w * w_ratio |
|
h = h * h_ratio |
|
|
|
if self.force_square: |
|
|
|
top_border_height = 512.0 * (1/6) |
|
y = y + top_border_height |
|
|
|
|
|
|
|
|
|
w = int(np.round(w)) |
|
h = int(np.round(h)) |
|
if w > h: |
|
r = h |
|
x = x + (w - h) / 2.0 |
|
else: |
|
r = w |
|
y = y + (h - w) / 2.0 |
|
|
|
x = int(np.round(x)) |
|
y = int(np.round(y)) |
|
|
|
ball_data.append((x, y, r)) |
|
|
|
return ball_data |
|
|
|
def calculate_bbox_info(self, scene_name): |
|
|
|
bbox_data = [] |
|
for mtype in ['bbox_chrome', 'bbox_gray']: |
|
info = self.scene_data[scene_name][mtype] |
|
|
|
|
|
|
|
x = info['x'] / 4 |
|
y = info['y'] / 4 |
|
w = info['w'] / 4 |
|
h = info['h'] / 4 |
|
|
|
|
|
|
|
if self.force_square: |
|
h_ratio = (512.0 * 2.0 / 3.0) / 1000.0 |
|
w_ratio = 512.0 / 1500.0 |
|
else: |
|
h_ratio = self.resolution[0] / 1000.0 |
|
w_ratio = self.resolution[1] / 1500.0 |
|
|
|
x = x * w_ratio |
|
y = y * h_ratio |
|
w = w * w_ratio |
|
h = h * h_ratio |
|
|
|
if self.force_square: |
|
|
|
top_border_height = 512.0 * (1/6) |
|
y = y + top_border_height |
|
|
|
|
|
w = int(np.round(w)) |
|
h = int(np.round(h)) |
|
x = int(np.round(x)) |
|
y = int(np.round(y)) |
|
|
|
bbox_data.append((x, y, w, h)) |
|
|
|
return bbox_data |
|
|
|
""" |
|
DO NOT remove this! |
|
This is for evaluating results from Multi-Illumination generated from the old version |
|
""" |
|
def calculate_ball_info_legacy(self, scene_name): |
|
|
|
ball_data = [] |
|
for mtype in ['bbox_chrome', 'bbox_gray']: |
|
info = self.scene_data[scene_name][mtype] |
|
|
|
|
|
|
|
x = info['x'] / 4 |
|
y = info['y'] / 4 |
|
w = info['w'] / 4 |
|
h = info['h'] / 4 |
|
|
|
|
|
h_ratio = 384.0 / 1000.0 |
|
w_ratio = 512.0 / 1500.0 |
|
x = x * w_ratio |
|
y = y * h_ratio |
|
w = w * w_ratio |
|
h = h * h_ratio |
|
|
|
|
|
top_border_height = 512.0 * (1/8) |
|
|
|
y = y + top_border_height |
|
|
|
|
|
|
|
r = np.max(np.array([w, h])) |
|
|
|
x = int(np.round(x)) |
|
y = int(np.round(y)) |
|
r = int(np.round(r)) |
|
|
|
ball_data.append((y, x, r)) |
|
|
|
return ball_data |