from tensorboard.plugins import projector import tensorflow as tf from PIL import Image import numpy as np import math from datasets import load_dataset from transformers import CLIPProcessor, CLIPModel, AutoModel, AutoProcessor import torch import numpy as np import os from collections import defaultdict rename_qsn = { "Are there any abnormalities in the image? Check all that are present.": "๐Ÿงฌ Abnorm", "Are there any anatomical landmarks in the image? Check all that are present.": "๐Ÿ“ Landmark", "Are there any instruments in the image? Check all that are present.": "๐Ÿ› ๏ธ Instrum", "Have all polyps been removed?": "โŒ Polyps_Removed", "Is this finding easy to detect?": "๐Ÿ” Easy_Detect", "Is there a green/black box artefact?": "๐ŸŸฉ Box_Artifact", "Is there text?": "๐Ÿ”ค Has_Text", "What type of polyp is present?": "๐Ÿ”ฌ Polyp_Type", "What type of procedure is the image taken from?": "๐Ÿฅ Proc_Type", "What is the size of the polyp?": "๐Ÿ“ Polyp_Size", "How many findings are present?": "๐Ÿงพ Find_Count", "How many polyps are in the image?": "๐Ÿ”ข Polyp_Count", "Where in the image is the instrument?": "๐Ÿ“Œ Instrum_Loc", "Where in the image is the abnormality?": "๐Ÿ“Œ Abnorm_Loc", "Where in the image is the anatomical landmark?": "๐Ÿ“Œ Landmark_Loc", "How many instrumnets are in the image?": "๐Ÿ”ข Instrum_Count", "What color is the abnormality? If more than one separate with ;": "๐ŸŽจ Abnorm_Color", "What color is the anatomical landmark? If more than one separate with ;": "๐ŸŽจ Landmark_Color", "Does this image contain any finding?": "๐Ÿ“ธ Has_Finding", "none": "๐Ÿšซ Nan", } ds = load_dataset("SimulaMet-HOST/Kvasir-VQA")["raw"] qas = defaultdict(dict) for q, a, img_id in zip(ds["question"], ds["answer"], ds["img_id"]): qas[img_id][rename_qsn[q]] = a sorted_qas = { img_id: dict(sorted(questions.items())) for img_id, questions in qas.items() } # === Step 2: Prepare Log Directory === log_dir = "logs/projector1" os.makedirs(log_dir, exist_ok=True) def create_sprite_image(dataset, save_path='sprite.png', image_column='image', size=(100, 100), max_images=6500): imgs = [] for i, x in enumerate(dataset): if i >= max_images: break img = x[image_column].resize(size).convert('RGB') imgs.append(np.asarray(img) / 255.0) imgs = np.array(imgs) n = math.ceil(math.sqrt(len(imgs))) pad = ((0, n**2 - len(imgs)), (0, 0), (0, 0), (0, 0)) imgs = np.pad(imgs, pad, constant_values=1) imgs = imgs.reshape((n, n, size[1], size[0], 3)).transpose( 0, 2, 1, 3, 4).reshape(n*size[1], n*size[0], 3) Image.fromarray((imgs * 255).astype(np.uint8)).save(save_path) dsx = ds.select({v: k for k, v in enumerate(ds['img_id'])}.values()) # dsx = dsx.select(range(10)) device = "cuda" if torch.cuda.is_available() else "cpu" model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device) processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") # checkpoint = "ikim-uk-essen/BiomedCLIP_ViT_patch16_224" # model = AutoModel.from_pretrained(checkpoint).to(device) # processor = AutoProcessor.from_pretrained(checkpoint) # create_sprite_image(dsx, save_path=f"{log_dir}/{checkpoint.replace('/', '__')}_sprite.png") def get_emb(batch): inputs = processor(images=batch["image"], return_tensors="pt", padding=True).to(device) with torch.no_grad(): # feats = model(**inputs).last_hidden_state[:, 0, :] # for BiomedCLIP feats = model.get_image_features(**inputs) # for CLIP return {"emb": feats.cpu().numpy()} dsx = dsx.map(get_emb, batched=True, batch_size=512) np.savez_compressed(os.path.join(log_dir, "all_embeddings.npz"), embeddings=np.array(dsx["emb"]), metadata=np.array(list(zip(dsx["img_id"], dsx["source"], dsx["question"], dsx["answer"])))) np.savetxt(os.path.join(log_dir, "vectors.tsv"), np.array(dsx["emb"]), delimiter="\t") # breakpoint() # === Step 3: Save Embeddings to TensorFlow Variable === embeddings_np = np.array(dsx["emb"]) embedding_tensor = tf.Variable(embeddings_np, name="image_embeddings") checkpoint = tf.train.Checkpoint(embedding=embedding_tensor) checkpoint.save(os.path.join(log_dir, "embedding.ckpt")) # === Step 4: Write metadata.tsv (WITH HEADERS) === metadata_path = os.path.join(log_dir, "metadata.tsv") with open(metadata_path, "w", encoding="utf-8") as f: f.write("source\tQ/A\timg_hash\n") # header row for img_id, source, question, answer in zip(dsx["img_id"], dsx["source"], dsx["question"], dsx["answer"]): img_hash = str(img_id).replace("\t", " ").replace("\n", " ") img_id = " | ".join(f"{k}: {v}" for k, v in qas.get(img_id, {}).items()) source = str(source).replace("\t", " ").replace("\n", " ") question = str(question).replace("\t", " ").replace("\n", " ") answer = str(answer).replace("\t", " ").replace("\n", " ") f.write(f"{source}\t{img_id}\t{img_hash}\n") # === Step 5: Projector Config === config = projector.ProjectorConfig() embedding = config.embeddings.add() embedding.tensor_name = embedding_tensor.name # should be 'image_embeddings' embedding.metadata_path = "metadata.tsv" # relative to log_dir # relative to log_dir embedding.sprite.image_path = "openai__clip-vit-large-patch14-336_sprite.png" embedding.sprite.single_image_dim.extend( [100, 100]) # size of each image in the sprite projector.visualize_embeddings(log_dir, config) # tf.compat.v1.disable_eager_execution() # saver = tf.compat.v1.train.Saver([ tf.Variable(1.0, name="var1"), tf.Variable(2.0, name="var2")]) # with tf.compat.v1.Session() as sess: # sess.run(tf.compat.v1.global_variables_initializer()) # saver.save(sess, os.path.join(log_dir, "model.ckpt"), 1) # === Step 6: Launch TensorBoard Command === print("โœ… All done! Launch TensorBoard using:") print(f"tensorboard --logdir={log_dir}")