LVM-Med / object_detection /convert_to_coco.py
duynhm's picture
Initial commit
be2715b
raw
history blame
10.3 kB
# based on https://www.kaggle.com/sreevishnudamodaran/vinbigdata-fusing-bboxes-coco-dataset#Building-COCO-DATASET
import os
from pathlib import Path
from datetime import datetime
import shutil
from collections import Counter
import warnings
import json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import cv2 as cv
from tqdm import tqdm
from ensemble_boxes import weighted_boxes_fusion
warnings.filterwarnings("ignore", category=UserWarning)
def plot_img(img, size=(18, 18), is_rgb=True, title="", cmap='gray'):
plt.figure(figsize=size)
plt.imshow(img, cmap=cmap)
plt.suptitle(title)
plt.show()
def plot_imgs(imgs, cols=2, size=10, is_rgb=True, title="", cmap='gray', img_size=None):
rows = len(imgs)//cols + 1
fig = plt.figure(figsize=(cols*size, rows*size))
for i, img in enumerate(imgs):
if img_size is not None:
img = cv.resize(img, img_size)
fig.add_subplot(rows, cols, i+1)
plt.imshow(img, cmap=cmap)
plt.suptitle(title)
def draw_bbox(image, box, label, color, thickness=3):
alpha = 0.1
alpha_box = 0.4
overlay_bbox = image.copy()
overlay_text = image.copy()
output = image.copy()
text_width, text_height = cv.getTextSize(label.upper(), cv.FONT_HERSHEY_SIMPLEX, 0.6, 1)[0]
cv.rectangle(overlay_bbox, (box[0], box[1]), (box[2], box[3]),
color, -1)
cv.addWeighted(overlay_bbox, alpha, output, 1 - alpha, 0, output)
cv.rectangle(overlay_text, (box[0], box[1]-7-text_height), (box[0]+text_width+2, box[1]),
(0, 0, 0), -1)
cv.addWeighted(overlay_text, alpha_box, output, 1 - alpha_box, 0, output)
cv.rectangle(output, (box[0], box[1]), (box[2], box[3]),
color, thickness)
cv.putText(output, label.upper(), (box[0], box[1]-5),
cv.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1, cv.LINE_AA)
return output
def normalize_bboxes(df):
df['x_min'] = df.apply(lambda row: (row.x_min)/row.width * 512, axis =1)
df['y_min'] = df.apply(lambda row: (row.y_min)/row.height * 512, axis =1)
df['x_max'] = df.apply(lambda row: (row.x_max)/row.width * 512, axis =1)
df['y_max'] = df.apply(lambda row: (row.y_max)/row.height * 512, axis =1)
df['x_mid'] = df.apply(lambda row: (row.x_max+row.x_min)/2 * 512, axis =1)
df['y_mid'] = df.apply(lambda row: (row.y_max+row.y_min)/2 * 512, axis =1)
df['w'] = df.apply(lambda row: (row.x_max-row.x_min), axis =1)
df['h'] = df.apply(lambda row: (row.y_max-row.y_min), axis =1)
df['area'] = df['w']*df['h']
return df
labels = [
"__ignore__",
"Aortic_enlargement",
"Atelectasis",
"Calcification",
"Cardiomegaly",
"Consolidation",
"ILD",
"Infiltration",
"Lung_Opacity",
"Nodule/Mass",
"Other_lesion",
"Pleural_effusion",
"Pleural_thickening",
"Pneumothorax",
"Pulmonary_fibrosis"
]
label2color = [[59, 238, 119], [222, 21, 229], [94, 49, 164], [206, 221, 133], [117, 75, 3],
[210, 224, 119], [211, 176, 166], [63, 7, 197], [102, 65, 77], [194, 134, 175],
[209, 219, 50], [255, 44, 47], [89, 125, 149], [110, 27, 100]]
viz_labels = labels[1:]
now = datetime.now()
data = dict(
info=dict(
description=None,
url=None,
version=None,
year=now.year,
contributor=None,
date_created=now.strftime('%Y-%m-%d %H:%M:%S.%f'),
),
licenses=[dict(
url=None,
id=0,
name=None,
)],
images=[],
type='instances',
annotations=[],
categories=[],
)
class_name_to_id = {}
for i, each_label in enumerate(labels):
class_id = i - 1 # starts with -1
class_name = each_label
if class_id == -1:
assert class_name == '__ignore__'
continue
class_name_to_id[class_name] = class_id
data['categories'].append(dict(
supercategory=None,
id=class_id,
name=class_name,
))
train_out_dir = 'data/train'
valid_out_dir = 'data/valid'
test_out_dir = 'data/test'
for dir in [train_out_dir, valid_out_dir, test_out_dir]:
if Path(dir).exists():
shutil.rmtree(dir)
os.makedirs(dir)
train_out_file = 'data/train_annotations.json'
valid_out_file = 'data/valid_annotations.json'
test_out_file = 'data/test_annotations.json'
all_images_folder = 'vinbigdata/train'
all_files = os.listdir(all_images_folder)
all_files = np.sort(np.array(all_files))
data_train = data.copy()
data_valid = data.copy()
data_test = data.copy()
for data in [data_train, data_valid, data_test]:
data['images'] = []
data['annotations'] = []
all_annotations = pd.read_csv('vinbigdata/train.csv')
all_annotations = all_annotations[all_annotations.class_id != 14]
all_annotations['image_path'] = all_annotations['image_id'].map(lambda id:
os.path.join(all_images_folder, str(id) + '.png'))
normalize_bboxes(all_annotations)
all_image_paths = all_annotations['image_path'].unique()
np.random.seed(1)
indices = np.arange(len(all_image_paths))
np.random.shuffle(indices)
# train, valid, test
splits = [0.7, 0.1, 0.2]
train_split_index = int(splits[0] * len(indices))
valid_split_index = int((splits[0] + splits[1]) * len(indices))
train_paths = all_image_paths[:train_split_index]
valid_paths = all_image_paths[train_split_index:valid_split_index]
test_paths = all_image_paths[valid_split_index:]
print(f'train: {len(train_paths)}, test: {len(test_paths)}, valid: {len(valid_paths)}')
folders = [train_out_dir, valid_out_dir, test_out_dir]
paths = [train_paths, valid_paths, test_paths]
data_dicts = [data_train, data_valid, data_test]
out_files = [train_out_file, valid_out_file, test_out_file]
# parameters for weighted box fusion
iou_thr = 0.2
skip_box_thr = 0.0001
for (folder, paths, data, out_file) in zip(folders, paths, data_dicts, out_files):
print(f'Saving to {folder}...')
viz_images = []
for i, path in tqdm(enumerate(paths)):
img_array = cv.imread(path)
image_basename = Path(path).stem
shutil.copy2(path, folder)
## Add Images to annotation
data['images'].append(dict(
license=0,
url=None,
file_name=os.path.join(folder.split('/')[-1], image_basename+ '.png'),
height=img_array.shape[0],
width=img_array.shape[1],
date_captured=None,
id=i
))
img_annotations = all_annotations[all_annotations.image_id==image_basename]
boxes_viz = img_annotations[['x_min', 'y_min', 'x_max', 'y_max']].to_numpy().tolist()
labels_viz = img_annotations['class_id'].to_numpy().tolist()
## Visualize Original Bboxes every 500th
if (i%500==0):
img_before = img_array.copy()
for box, label in zip(boxes_viz, labels_viz):
x_min, y_min, x_max, y_max = (box[0], box[1], box[2], box[3])
color = label2color[int(label)]
img_before = draw_bbox(img_before, list(np.int_(box)), viz_labels[label], color)
viz_images.append(img_before)
boxes_list = []
scores_list = []
labels_list = []
weights = []
boxes_single = []
labels_single = []
cls_ids = img_annotations['class_id'].unique().tolist()
count_dict = Counter(img_annotations['class_id'].tolist())
for cid in cls_ids:
## Performing Fusing operation only for multiple bboxes with the same label
if count_dict[cid]==1:
labels_single.append(cid)
boxes_single.append(img_annotations[img_annotations.class_id==cid][['x_min', 'y_min', 'x_max', 'y_max']].to_numpy().squeeze().tolist())
else:
cls_list =img_annotations[img_annotations.class_id==cid]['class_id'].tolist()
labels_list.append(cls_list)
bbox = img_annotations[img_annotations.class_id==cid][['x_min', 'y_min', 'x_max', 'y_max']].to_numpy()
## Normalizing Bbox by Image Width and Height
bbox = bbox/(img_array.shape[1], img_array.shape[0], img_array.shape[1], img_array.shape[0])
bbox = np.clip(bbox, 0, 1)
boxes_list.append(bbox.tolist())
scores_list.append(np.ones(len(cls_list)).tolist())
weights.append(1)
## Perform WBF
boxes, scores, box_labels = weighted_boxes_fusion(boxes_list=boxes_list, scores_list=scores_list,
labels_list=labels_list, weights=weights,
iou_thr=iou_thr, skip_box_thr=skip_box_thr)
boxes = boxes*(img_array.shape[1], img_array.shape[0], img_array.shape[1], img_array.shape[0])
boxes = boxes.round(1).tolist()
box_labels = box_labels.astype(int).tolist()
boxes.extend(boxes_single)
box_labels.extend(labels_single)
for box, label in zip(boxes, box_labels):
x_min, y_min, x_max, y_max = (box[0], box[1], box[2], box[3])
area = round((x_max-x_min)*(y_max-y_min),1)
bbox =[
round(x_min, 1),
round(y_min, 1),
round((x_max-x_min), 1),
round((y_max-y_min), 1)
]
data['annotations'].append(dict( id=len(data['annotations']), image_id=i,
category_id=int(label), area=area, bbox=bbox,
iscrowd=0))
## Visualize Bboxes after operation every 500th
if (i%500==0):
img_after = img_array.copy()
for box, label in zip(boxes, box_labels):
color = label2color[int(label)]
img_after = draw_bbox(img_after, list(np.int_(box)), viz_labels[label], color)
viz_images.append(img_after)
plot_imgs(viz_images, cmap=None, size=40)
plt.figtext(0.3, 0.9,"Original Bboxes", va="top", ha="center", size=15)
plt.figtext(0.73, 0.9,"WBF", va="top", ha="center", size=15)
plt.show()
with open(out_file, 'w') as f:
json.dump(data, f, indent=4)