from dataclasses import dataclass from pathlib import Path import logging import base64 import random import gc import os import numpy as np import torch from typing import Dict, Any, Optional, List, Union, Tuple import json from safetensors import safe_open from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder from ltx_video.models.transformers.transformer3d import Transformer3DModel from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier from ltx_video.schedulers.rf import RectifiedFlowScheduler, TimestepShifter from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXVideoPipeline from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy from transformers import T5EncoderModel, T5Tokenizer, AutoModelForCausalLM, AutoProcessor, AutoTokenizer from varnish import Varnish from varnish.utils import is_truthy, process_input_image # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Get token from environment hf_token = os.getenv("HF_API_TOKEN") # Constraints MAX_LARGE_SIDE = 1280 MAX_SMALL_SIDE = 768 # should be 720 but it must be divisible by 32 MAX_FRAMES = (8 * 21) + 1 # visual glitches appear after about 169 frames, so we cap it # Check environment variable for pipeline support support_image_prompt = is_truthy(os.getenv("SUPPORT_INPUT_IMAGE_PROMPT")) @dataclass class GenerationConfig: """Configuration for video generation""" # general content settings prompt: str = "" negative_prompt: str = "saturated, highlight, overexposed, highlighted, overlit, shaking, too bright, worst quality, inconsistent motion, blurry, jittery, distorted, cropped, watermarked, watermark, logo, subtitle, subtitles, lowres" # video model settings (will be used during generation of the initial raw video clip) width: int = 768 height: int = 416 # this is a hack to fool LTX-Video into believing our input image is an actual video frame with poor encoding quality # after a quick benchmark using the value 70 seems like a sweet spot input_image_quality: int = 70 # users may tend to always set this to the max, to get as much useable content as possible (which is MAX_FRAMES ie. 257). # The value must be a multiple of 8, plus 1 frame. # visual glitches appear after about 169 frames, so we don't need more actually num_frames: int = (8 * 14) + 1 # values between 3.0 and 4.0 are nice guidance_scale: float = 3.5 num_inference_steps: int = 50 # reproducible generation settings seed: int = -1 # -1 means random seed # varnish settings (will be used for post-processing after the raw video clip has been generated fps: int = 30 # FPS of the final video (only applied at the very end, when converting to mp4) double_num_frames: bool = False # if True, the number of frames will be multiplied by 2 using RIFE super_resolution: bool = False # if True, the resolution will be multiplied by 2 using Real_ESRGAN grain_amount: float = 0.0 # be careful, adding film grain can negatively impact video compression # audio settings enable_audio: bool = False # Whether to generate audio audio_prompt: str = "" # Text prompt for audio generation audio_negative_prompt: str = "voices, voice, talking, speaking, speech" # Negative prompt for audio generation # The range of the CRF scale is 0–51, where: # 0 is lossless (for 8 bit only, for 10 bit use -qp 0) # 23 is the default # 51 is worst quality possible # A lower value generally leads to higher quality, and a subjectively sane range is 17–28. # Consider 17 or 18 to be visually lossless or nearly so; # it should look the same or nearly the same as the input but it isn't technically lossless. # The range is exponential, so increasing the CRF value +6 results in roughly half the bitrate / file size, while -6 leads to roughly twice the bitrate. quality: int = 18 # STG (Spatiotemporal Guidance) settings stg_scale: float = 1.0 stg_rescale: float = 0.7 stg_mode: str = "attention_values" # Can be "attention_values", "attention_skip", "residual", or "transformer_block" stg_skip_layers: str = "19" # Comma-separated list of layers to block for spatiotemporal guidance # VAE noise augmentation decode_timestep: float = 0.05 decode_noise_scale: float = 0.025 # Other advanced settings image_cond_noise_scale: float = 0.15 mixed_precision: bool = True # Use mixed precision for inference stochastic_sampling: bool = False # Use stochastic sampling # Sampling settings sampler: Optional[str] = None # "uniform" or "linear-quadratic" or None (use default from checkpoint) # Prompt enhancement enhance_prompt: bool = False # Whether to enhance the prompt using an LLM prompt_enhancement_words_threshold: int = 50 # Enhance prompt only if it has fewer words than this def validate_and_adjust(self) -> 'GenerationConfig': """Validate and adjust parameters to meet constraints""" # First check if it's one of our explicitly allowed resolutions if not ((self.width == MAX_LARGE_SIDE and self.height == MAX_SMALL_SIDE) or (self.width == MAX_SMALL_SIDE and self.height == MAX_LARGE_SIDE)): # For other resolutions, ensure total pixels don't exceed max MAX_TOTAL_PIXELS = MAX_SMALL_SIDE * MAX_LARGE_SIDE # or 921600 = 1280 * 720 # If total pixels exceed maximum, scale down proportionally total_pixels = self.width * self.height if total_pixels > MAX_TOTAL_PIXELS: scale = (MAX_TOTAL_PIXELS / total_pixels) ** 0.5 self.width = max(128, min(MAX_LARGE_SIDE, round(self.width * scale / 32) * 32)) self.height = max(128, min(MAX_LARGE_SIDE, round(self.height * scale / 32) * 32)) else: # Round dimensions to nearest multiple of 32 self.width = max(128, min(MAX_LARGE_SIDE, round(self.width / 32) * 32)) self.height = max(128, min(MAX_LARGE_SIDE, round(self.height / 32) * 32)) # Adjust number of frames to be in format 8k + 1 k = (self.num_frames - 1) // 8 self.num_frames = min((k * 8) + 1, MAX_FRAMES) # Set random seed if not specified if self.seed == -1: self.seed = random.randint(0, 2**32 - 1) # Set up STG parameters if self.stg_mode.lower() == "stg_av" or self.stg_mode.lower() == "attention_values": self.stg_mode = "attention_values" elif self.stg_mode.lower() == "stg_as" or self.stg_mode.lower() == "attention_skip": self.stg_mode = "attention_skip" elif self.stg_mode.lower() == "stg_r" or self.stg_mode.lower() == "residual": self.stg_mode = "residual" elif self.stg_mode.lower() == "stg_t" or self.stg_mode.lower() == "transformer_block": self.stg_mode = "transformer_block" # Convert STG skip layers from string to list of integers if isinstance(self.stg_skip_layers, str): self.stg_skip_layers = [int(x.strip()) for x in self.stg_skip_layers.split(",")] # Check if we should enhance the prompt if self.enhance_prompt and self.prompt: prompt_word_count = len(self.prompt.split()) if prompt_word_count >= self.prompt_enhancement_words_threshold: logger.info(f"Prompt has {prompt_word_count} words, which exceeds the threshold of {self.prompt_enhancement_words_threshold}. Prompt enhancement disabled.") self.enhance_prompt = False return self def load_image_to_tensor_with_resize_and_crop( image_input: Union[str, bytes], target_height: int = 512, target_width: int = 768, quality: int = 100 ) -> torch.Tensor: """Load and process an image into a tensor. Args: image_input: Either a file path (str) or image data (bytes) target_height: Desired height of output tensor target_width: Desired width of output tensor quality: JPEG quality to use when re-encoding (to simulate lower quality images) """ from PIL import Image import io import numpy as np # Handle base64 data URI if isinstance(image_input, str) and image_input.startswith('data:'): header, encoded = image_input.split(",", 1) image_data = base64.b64decode(encoded) image = Image.open(io.BytesIO(image_data)).convert("RGB") # Handle raw bytes elif isinstance(image_input, bytes): image = Image.open(io.BytesIO(image_input)).convert("RGB") # Handle file path elif isinstance(image_input, str): image = Image.open(image_input).convert("RGB") else: raise ValueError("image_input must be either a file path, bytes, or base64 data URI") # Apply JPEG compression if quality < 100 (to simulate a video frame) if quality < 100: buffer = io.BytesIO() image.save(buffer, format="JPEG", quality=quality) buffer.seek(0) image = Image.open(buffer).convert("RGB") input_width, input_height = image.size aspect_ratio_target = target_width / target_height aspect_ratio_frame = input_width / input_height if aspect_ratio_frame > aspect_ratio_target: new_width = int(input_height * aspect_ratio_target) new_height = input_height x_start = (input_width - new_width) // 2 y_start = 0 else: new_width = input_width new_height = int(input_width / aspect_ratio_target) x_start = 0 y_start = (input_height - new_height) // 2 image = image.crop((x_start, y_start, x_start + new_width, y_start + new_height)) image = image.resize((target_width, target_height)) frame_tensor = torch.tensor(np.array(image)).permute(2, 0, 1).float() frame_tensor = (frame_tensor / 127.5) - 1.0 # Create 5D tensor: (batch_size=1, channels=3, num_frames=1, height, width) return frame_tensor.unsqueeze(0).unsqueeze(2) def calculate_padding( source_height: int, source_width: int, target_height: int, target_width: int ) -> tuple[int, int, int, int]: """Calculate padding to reach target dimensions""" # Calculate total padding needed pad_height = target_height - source_height pad_width = target_width - source_width # Calculate padding for each side pad_top = pad_height // 2 pad_bottom = pad_height - pad_top # Handles odd padding pad_left = pad_width // 2 pad_right = pad_width - pad_left # Handles odd padding # Return padded tensor # Padding format is (left, right, top, bottom) padding = (pad_left, pad_right, pad_top, pad_bottom) return padding def prepare_conditioning( conditioning_media_paths: List[str], conditioning_strengths: List[float], conditioning_start_frames: List[int], height: int, width: int, num_frames: int, input_image_quality: int = 100, pipeline: Optional[LTXVideoPipeline] = None, ) -> Optional[List[ConditioningItem]]: """Prepare conditioning items based on input media paths and their parameters""" conditioning_items = [] for path, strength, start_frame in zip( conditioning_media_paths, conditioning_strengths, conditioning_start_frames ): # Load and process the conditioning image frame_tensor = load_image_to_tensor_with_resize_and_crop( path, height, width, quality=input_image_quality ) # Trim frame count if needed if pipeline: frame_count = 1 # For image inputs, it's always 1 frame_count = pipeline.trim_conditioning_sequence( start_frame, frame_count, num_frames ) conditioning_items.append( ConditioningItem(frame_tensor, start_frame, strength) ) return conditioning_items def create_ltx_video_pipeline( config: GenerationConfig, device: str = "cuda" ) -> LTXVideoPipeline: """Create and configure the LTX video pipeline""" # Get the absolute paths for the model components current_dir = Path.cwd() ckpt_path = "./ltxv-2b-0.9.6-distilled-04-25.safetensors" # Get allowed inference steps from config if available allowed_inference_steps = None assert os.path.exists( ckpt_path ), f"Ckpt path provided (--ckpt_path) {ckpt_path} does not exist" with safe_open(ckpt_path, framework="pt") as f: metadata = f.metadata() config_str = metadata.get("config") configs = json.loads(config_str) allowed_inference_steps = configs.get("allowed_inference_steps", None) # Initialize model components vae = CausalVideoAutoencoder.from_pretrained(ckpt_path) transformer = Transformer3DModel.from_pretrained(ckpt_path) # Use constructor if sampler is specified, otherwise use from_pretrained if config.sampler: scheduler = RectifiedFlowScheduler( sampler=("Uniform" if config.sampler.lower() == "uniform" else "LinearQuadratic") ) else: scheduler = RectifiedFlowScheduler.from_pretrained(ckpt_path) text_encoder = T5EncoderModel.from_pretrained("./text_encoder") patchifier = SymmetricPatchifier(patch_size=1) tokenizer = T5Tokenizer.from_pretrained("./tokenizer") # Move models to the correct device vae = vae.to(device) transformer = transformer.to(device) text_encoder = text_encoder.to(device) # Set up precision vae = vae.to(torch.bfloat16) transformer = transformer.to(torch.bfloat16) text_encoder = text_encoder.to(torch.bfloat16) # Initialize prompt enhancer components if needed prompt_enhancer_components = { "prompt_enhancer_image_caption_model": None, "prompt_enhancer_image_caption_processor": None, "prompt_enhancer_llm_model": None, "prompt_enhancer_llm_tokenizer": None } if config.enhance_prompt: try: # Use default models or ones specified by config prompt_enhancer_image_caption_model = AutoModelForCausalLM.from_pretrained( "MiaoshouAI/Florence-2-large-PromptGen-v2.0", trust_remote_code=True ) prompt_enhancer_image_caption_processor = AutoProcessor.from_pretrained( "MiaoshouAI/Florence-2-large-PromptGen-v2.0", trust_remote_code=True ) prompt_enhancer_llm_model = AutoModelForCausalLM.from_pretrained( "unsloth/Llama-3.2-3B-Instruct", torch_dtype="bfloat16", ) prompt_enhancer_llm_tokenizer = AutoTokenizer.from_pretrained( "unsloth/Llama-3.2-3B-Instruct", ) prompt_enhancer_components = { "prompt_enhancer_image_caption_model": prompt_enhancer_image_caption_model, "prompt_enhancer_image_caption_processor": prompt_enhancer_image_caption_processor, "prompt_enhancer_llm_model": prompt_enhancer_llm_model, "prompt_enhancer_llm_tokenizer": prompt_enhancer_llm_tokenizer } except Exception as e: logger.warning(f"Failed to load prompt enhancer models: {e}") config.enhance_prompt = False # Construct the pipeline pipeline = LTXVideoPipeline( transformer=transformer, patchifier=patchifier, text_encoder=text_encoder, tokenizer=tokenizer, scheduler=scheduler, vae=vae, allowed_inference_steps=allowed_inference_steps, **prompt_enhancer_components ) return pipeline class EndpointHandler: """Handler for the LTX Video endpoint""" def __init__(self, model_path: str = ""): """Initialize the endpoint handler Args: model_path: Path to model weights (not used, as weights are in current directory) """ # Enable TF32 for potential speedup on Ampere GPUs torch.backends.cuda.matmul.allow_tf32 = True # Initialize Varnish for post-processing self.varnish = Varnish( device="cuda", model_base_dir="varnish", enable_mmaudio=False, # Disable audio generation for now, since it is broken ) # The actual LTX pipeline will be loaded during inference to save memory self.pipeline = None def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: """Process inference requests Args: data: Request data containing inputs and parameters Returns: Dictionary with generated video and metadata """ # Extract inputs and parameters inputs = data.get("inputs", {}) # Support both formats: # 1. {"inputs": {"prompt": "...", "image": "..."}} # 2. {"inputs": "..."} (prompt only) if isinstance(inputs, str): input_prompt = inputs input_image = None else: input_prompt = inputs.get("prompt", "") input_image = inputs.get("image") params = data.get("parameters", {}) if not input_prompt and not input_image: raise ValueError("Either prompt or image must be provided") # Create and validate configuration config = GenerationConfig( # general content settings prompt=input_prompt, negative_prompt=params.get("negative_prompt", GenerationConfig.negative_prompt), # video model settings width=params.get("width", GenerationConfig.width), height=params.get("height", GenerationConfig.height), input_image_quality=params.get("input_image_quality", GenerationConfig.input_image_quality), num_frames=params.get("num_frames", GenerationConfig.num_frames), guidance_scale=params.get("guidance_scale", GenerationConfig.guidance_scale), num_inference_steps=params.get("num_inference_steps", GenerationConfig.num_inference_steps), # STG settings stg_scale=params.get("stg_scale", GenerationConfig.stg_scale), stg_rescale=params.get("stg_rescale", GenerationConfig.stg_rescale), stg_mode=params.get("stg_mode", GenerationConfig.stg_mode), stg_skip_layers=params.get("stg_skip_layers", GenerationConfig.stg_skip_layers), # VAE noise settings decode_timestep=params.get("decode_timestep", GenerationConfig.decode_timestep), decode_noise_scale=params.get("decode_noise_scale", GenerationConfig.decode_noise_scale), image_cond_noise_scale=params.get("image_cond_noise_scale", GenerationConfig.image_cond_noise_scale), # reproducible generation settings seed=params.get("seed", GenerationConfig.seed), # varnish settings fps=params.get("fps", GenerationConfig.fps), double_num_frames=params.get("double_num_frames", GenerationConfig.double_num_frames), super_resolution=params.get("super_resolution", GenerationConfig.super_resolution), grain_amount=params.get("grain_amount", GenerationConfig.grain_amount), enable_audio=params.get("enable_audio", GenerationConfig.enable_audio), audio_prompt=params.get("audio_prompt", GenerationConfig.audio_prompt), audio_negative_prompt=params.get("audio_negative_prompt", GenerationConfig.audio_negative_prompt), quality=params.get("quality", GenerationConfig.quality), # advanced settings mixed_precision=params.get("mixed_precision", GenerationConfig.mixed_precision), stochastic_sampling=params.get("stochastic_sampling", GenerationConfig.stochastic_sampling), sampler=params.get("sampler", GenerationConfig.sampler), # prompt enhancement enhance_prompt=params.get("enhance_prompt", GenerationConfig.enhance_prompt), prompt_enhancement_words_threshold=params.get( "prompt_enhancement_words_threshold", GenerationConfig.prompt_enhancement_words_threshold ), ).validate_and_adjust() try: with torch.amp.autocast(device_type='cuda', dtype=torch.bfloat16), torch.no_grad(): # Set random seeds for reproducibility random.seed(config.seed) np.random.seed(config.seed) torch.manual_seed(config.seed) generator = torch.Generator(device='cuda').manual_seed(config.seed) # Create pipeline if not already created if self.pipeline is None: self.pipeline = create_ltx_video_pipeline(config) # Prepare conditioning items if an image is provided conditioning_items = None if input_image: conditioning_items = [ ConditioningItem( load_image_to_tensor_with_resize_and_crop( input_image, config.height, config.width, quality=config.input_image_quality ), 0, # Start frame 1.0 # Conditioning strength ) ] # Set up spatiotemporal guidance strategy if config.stg_mode == "attention_values": skip_layer_strategy = SkipLayerStrategy.AttentionValues elif config.stg_mode == "attention_skip": skip_layer_strategy = SkipLayerStrategy.AttentionSkip elif config.stg_mode == "residual": skip_layer_strategy = SkipLayerStrategy.Residual elif config.stg_mode == "transformer_block": skip_layer_strategy = SkipLayerStrategy.TransformerBlock # Generate video with LTX pipeline result = self.pipeline( height=config.height, width=config.width, num_frames=config.num_frames, frame_rate=config.fps, prompt=config.prompt, negative_prompt=config.negative_prompt, guidance_scale=config.guidance_scale, num_inference_steps=config.num_inference_steps, generator=generator, output_type="pt", # Return as PyTorch tensor skip_layer_strategy=skip_layer_strategy, skip_block_list=config.stg_skip_layers, stg_scale=config.stg_scale, do_rescaling=config.stg_rescale != 1.0, rescaling_scale=config.stg_rescale, conditioning_items=conditioning_items, decode_timestep=config.decode_timestep, decode_noise_scale=config.decode_noise_scale, image_cond_noise_scale=config.image_cond_noise_scale, mixed_precision=config.mixed_precision, is_video=True, vae_per_channel_normalize=True, stochastic_sampling=config.stochastic_sampling, enhance_prompt=config.enhance_prompt, ) # Get the generated frames frames = result.images # Process the generated frames with Varnish import asyncio try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Prepare frames for Varnish (denormalize to 0-255 range) frames = frames * 127.5 + 127.5 frames = frames.to(torch.uint8) # Process with Varnish for post-processing varnish_result = loop.run_until_complete( self.varnish( frames, fps=config.fps, double_num_frames=config.double_num_frames, super_resolution=config.super_resolution, grain_amount=config.grain_amount, enable_audio=config.enable_audio, audio_prompt=config.audio_prompt or config.prompt, audio_negative_prompt=config.audio_negative_prompt, ) ) # Get the final video as a data URI video_uri = loop.run_until_complete( varnish_result.write( type="data-uri", quality=config.quality ) ) # Prepare metadata about the generated video metadata = { "width": varnish_result.metadata.width, "height": varnish_result.metadata.height, "num_frames": varnish_result.metadata.frame_count, "fps": varnish_result.metadata.fps, "duration": varnish_result.metadata.duration, "seed": config.seed, "prompt": config.prompt, } # Clean up to prevent CUDA OOM errors del result torch.cuda.empty_cache() gc.collect() return { "video": video_uri, "content-type": "video/mp4", "metadata": metadata } except Exception as e: # Log the error and reraise import traceback error_message = f"Error generating video: {str(e)}\n{traceback.format_exc()}" logger.error(error_message) raise RuntimeError(error_message)