LTX-Video-0-9-6-HFIE / handler_NOT_WORKING.py
jbilcke-hf's picture
jbilcke-hf HF Staff
Rename handler_using_custom_code.py to handler_NOT_WORKING.py
70c0c83 verified
raw
history blame
26.7 kB
from dataclasses import dataclass
from pathlib import Path
import logging
import base64
import random
import gc
import os
import numpy as np
import torch
from typing import Dict, Any, Optional, List, Union, Tuple
import json
from safetensors import safe_open
from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder
from ltx_video.models.transformers.transformer3d import Transformer3DModel
from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier
from ltx_video.schedulers.rf import RectifiedFlowScheduler, TimestepShifter
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXVideoPipeline
from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy
from transformers import T5EncoderModel, T5Tokenizer, AutoModelForCausalLM, AutoProcessor, AutoTokenizer
from varnish import Varnish
from varnish.utils import is_truthy, process_input_image
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Get token from environment
hf_token = os.getenv("HF_API_TOKEN")
# Constraints
MAX_LARGE_SIDE = 1280
MAX_SMALL_SIDE = 768 # should be 720 but it must be divisible by 32
MAX_FRAMES = (8 * 21) + 1 # visual glitches appear after about 169 frames, so we cap it
# Check environment variable for pipeline support
support_image_prompt = is_truthy(os.getenv("SUPPORT_INPUT_IMAGE_PROMPT"))
@dataclass
class GenerationConfig:
"""Configuration for video generation"""
# general content settings
prompt: str = ""
negative_prompt: str = "saturated, highlight, overexposed, highlighted, overlit, shaking, too bright, worst quality, inconsistent motion, blurry, jittery, distorted, cropped, watermarked, watermark, logo, subtitle, subtitles, lowres"
# video model settings (will be used during generation of the initial raw video clip)
width: int = 768
height: int = 416
# this is a hack to fool LTX-Video into believing our input image is an actual video frame with poor encoding quality
# after a quick benchmark using the value 70 seems like a sweet spot
input_image_quality: int = 70
# users may tend to always set this to the max, to get as much useable content as possible (which is MAX_FRAMES ie. 257).
# The value must be a multiple of 8, plus 1 frame.
# visual glitches appear after about 169 frames, so we don't need more actually
num_frames: int = (8 * 14) + 1
# values between 3.0 and 4.0 are nice
guidance_scale: float = 3.5
num_inference_steps: int = 50
# reproducible generation settings
seed: int = -1 # -1 means random seed
# varnish settings (will be used for post-processing after the raw video clip has been generated
fps: int = 30 # FPS of the final video (only applied at the very end, when converting to mp4)
double_num_frames: bool = False # if True, the number of frames will be multiplied by 2 using RIFE
super_resolution: bool = False # if True, the resolution will be multiplied by 2 using Real_ESRGAN
grain_amount: float = 0.0 # be careful, adding film grain can negatively impact video compression
# audio settings
enable_audio: bool = False # Whether to generate audio
audio_prompt: str = "" # Text prompt for audio generation
audio_negative_prompt: str = "voices, voice, talking, speaking, speech" # Negative prompt for audio generation
# The range of the CRF scale is 0–51, where:
# 0 is lossless (for 8 bit only, for 10 bit use -qp 0)
# 23 is the default
# 51 is worst quality possible
# A lower value generally leads to higher quality, and a subjectively sane range is 17–28.
# Consider 17 or 18 to be visually lossless or nearly so;
# it should look the same or nearly the same as the input but it isn't technically lossless.
# The range is exponential, so increasing the CRF value +6 results in roughly half the bitrate / file size, while -6 leads to roughly twice the bitrate.
quality: int = 18
# STG (Spatiotemporal Guidance) settings
stg_scale: float = 1.0
stg_rescale: float = 0.7
stg_mode: str = "attention_values" # Can be "attention_values", "attention_skip", "residual", or "transformer_block"
stg_skip_layers: str = "19" # Comma-separated list of layers to block for spatiotemporal guidance
# VAE noise augmentation
decode_timestep: float = 0.05
decode_noise_scale: float = 0.025
# Other advanced settings
image_cond_noise_scale: float = 0.15
mixed_precision: bool = True # Use mixed precision for inference
stochastic_sampling: bool = False # Use stochastic sampling
# Sampling settings
sampler: Optional[str] = None # "uniform" or "linear-quadratic" or None (use default from checkpoint)
# Prompt enhancement
enhance_prompt: bool = False # Whether to enhance the prompt using an LLM
prompt_enhancement_words_threshold: int = 50 # Enhance prompt only if it has fewer words than this
def validate_and_adjust(self) -> 'GenerationConfig':
"""Validate and adjust parameters to meet constraints"""
# First check if it's one of our explicitly allowed resolutions
if not ((self.width == MAX_LARGE_SIDE and self.height == MAX_SMALL_SIDE) or
(self.width == MAX_SMALL_SIDE and self.height == MAX_LARGE_SIDE)):
# For other resolutions, ensure total pixels don't exceed max
MAX_TOTAL_PIXELS = MAX_SMALL_SIDE * MAX_LARGE_SIDE # or 921600 = 1280 * 720
# If total pixels exceed maximum, scale down proportionally
total_pixels = self.width * self.height
if total_pixels > MAX_TOTAL_PIXELS:
scale = (MAX_TOTAL_PIXELS / total_pixels) ** 0.5
self.width = max(128, min(MAX_LARGE_SIDE, round(self.width * scale / 32) * 32))
self.height = max(128, min(MAX_LARGE_SIDE, round(self.height * scale / 32) * 32))
else:
# Round dimensions to nearest multiple of 32
self.width = max(128, min(MAX_LARGE_SIDE, round(self.width / 32) * 32))
self.height = max(128, min(MAX_LARGE_SIDE, round(self.height / 32) * 32))
# Adjust number of frames to be in format 8k + 1
k = (self.num_frames - 1) // 8
self.num_frames = min((k * 8) + 1, MAX_FRAMES)
# Set random seed if not specified
if self.seed == -1:
self.seed = random.randint(0, 2**32 - 1)
# Set up STG parameters
if self.stg_mode.lower() == "stg_av" or self.stg_mode.lower() == "attention_values":
self.stg_mode = "attention_values"
elif self.stg_mode.lower() == "stg_as" or self.stg_mode.lower() == "attention_skip":
self.stg_mode = "attention_skip"
elif self.stg_mode.lower() == "stg_r" or self.stg_mode.lower() == "residual":
self.stg_mode = "residual"
elif self.stg_mode.lower() == "stg_t" or self.stg_mode.lower() == "transformer_block":
self.stg_mode = "transformer_block"
# Convert STG skip layers from string to list of integers
if isinstance(self.stg_skip_layers, str):
self.stg_skip_layers = [int(x.strip()) for x in self.stg_skip_layers.split(",")]
# Check if we should enhance the prompt
if self.enhance_prompt and self.prompt:
prompt_word_count = len(self.prompt.split())
if prompt_word_count >= self.prompt_enhancement_words_threshold:
logger.info(f"Prompt has {prompt_word_count} words, which exceeds the threshold of {self.prompt_enhancement_words_threshold}. Prompt enhancement disabled.")
self.enhance_prompt = False
return self
def load_image_to_tensor_with_resize_and_crop(
image_input: Union[str, bytes],
target_height: int = 512,
target_width: int = 768,
quality: int = 100
) -> torch.Tensor:
"""Load and process an image into a tensor.
Args:
image_input: Either a file path (str) or image data (bytes)
target_height: Desired height of output tensor
target_width: Desired width of output tensor
quality: JPEG quality to use when re-encoding (to simulate lower quality images)
"""
from PIL import Image
import io
import numpy as np
# Handle base64 data URI
if isinstance(image_input, str) and image_input.startswith('data:'):
header, encoded = image_input.split(",", 1)
image_data = base64.b64decode(encoded)
image = Image.open(io.BytesIO(image_data)).convert("RGB")
# Handle raw bytes
elif isinstance(image_input, bytes):
image = Image.open(io.BytesIO(image_input)).convert("RGB")
# Handle file path
elif isinstance(image_input, str):
image = Image.open(image_input).convert("RGB")
else:
raise ValueError("image_input must be either a file path, bytes, or base64 data URI")
# Apply JPEG compression if quality < 100 (to simulate a video frame)
if quality < 100:
buffer = io.BytesIO()
image.save(buffer, format="JPEG", quality=quality)
buffer.seek(0)
image = Image.open(buffer).convert("RGB")
input_width, input_height = image.size
aspect_ratio_target = target_width / target_height
aspect_ratio_frame = input_width / input_height
if aspect_ratio_frame > aspect_ratio_target:
new_width = int(input_height * aspect_ratio_target)
new_height = input_height
x_start = (input_width - new_width) // 2
y_start = 0
else:
new_width = input_width
new_height = int(input_width / aspect_ratio_target)
x_start = 0
y_start = (input_height - new_height) // 2
image = image.crop((x_start, y_start, x_start + new_width, y_start + new_height))
image = image.resize((target_width, target_height))
frame_tensor = torch.tensor(np.array(image)).permute(2, 0, 1).float()
frame_tensor = (frame_tensor / 127.5) - 1.0
# Create 5D tensor: (batch_size=1, channels=3, num_frames=1, height, width)
return frame_tensor.unsqueeze(0).unsqueeze(2)
def calculate_padding(
source_height: int, source_width: int, target_height: int, target_width: int
) -> tuple[int, int, int, int]:
"""Calculate padding to reach target dimensions"""
# Calculate total padding needed
pad_height = target_height - source_height
pad_width = target_width - source_width
# Calculate padding for each side
pad_top = pad_height // 2
pad_bottom = pad_height - pad_top # Handles odd padding
pad_left = pad_width // 2
pad_right = pad_width - pad_left # Handles odd padding
# Return padded tensor
# Padding format is (left, right, top, bottom)
padding = (pad_left, pad_right, pad_top, pad_bottom)
return padding
def prepare_conditioning(
conditioning_media_paths: List[str],
conditioning_strengths: List[float],
conditioning_start_frames: List[int],
height: int,
width: int,
num_frames: int,
input_image_quality: int = 100,
pipeline: Optional[LTXVideoPipeline] = None,
) -> Optional[List[ConditioningItem]]:
"""Prepare conditioning items based on input media paths and their parameters"""
conditioning_items = []
for path, strength, start_frame in zip(
conditioning_media_paths, conditioning_strengths, conditioning_start_frames
):
# Load and process the conditioning image
frame_tensor = load_image_to_tensor_with_resize_and_crop(
path, height, width, quality=input_image_quality
)
# Trim frame count if needed
if pipeline:
frame_count = 1 # For image inputs, it's always 1
frame_count = pipeline.trim_conditioning_sequence(
start_frame, frame_count, num_frames
)
conditioning_items.append(
ConditioningItem(frame_tensor, start_frame, strength)
)
return conditioning_items
def create_ltx_video_pipeline(
config: GenerationConfig,
device: str = "cuda"
) -> LTXVideoPipeline:
"""Create and configure the LTX video pipeline"""
# Get the absolute paths for the model components
current_dir = Path.cwd()
ckpt_path = "./ltxv-2b-0.9.6-distilled-04-25.safetensors"
# Get allowed inference steps from config if available
allowed_inference_steps = None
assert os.path.exists(
ckpt_path
), f"Ckpt path provided (--ckpt_path) {ckpt_path} does not exist"
with safe_open(ckpt_path, framework="pt") as f:
metadata = f.metadata()
config_str = metadata.get("config")
configs = json.loads(config_str)
allowed_inference_steps = configs.get("allowed_inference_steps", None)
# Initialize model components
vae = CausalVideoAutoencoder.from_pretrained(ckpt_path)
transformer = Transformer3DModel.from_pretrained(ckpt_path)
# Use constructor if sampler is specified, otherwise use from_pretrained
if config.sampler:
scheduler = RectifiedFlowScheduler(
sampler=("Uniform" if config.sampler.lower() == "uniform" else "LinearQuadratic")
)
else:
scheduler = RectifiedFlowScheduler.from_pretrained(ckpt_path)
text_encoder = T5EncoderModel.from_pretrained("./text_encoder")
patchifier = SymmetricPatchifier(patch_size=1)
tokenizer = T5Tokenizer.from_pretrained("./tokenizer")
# Move models to the correct device
vae = vae.to(device)
transformer = transformer.to(device)
text_encoder = text_encoder.to(device)
# Set up precision
vae = vae.to(torch.bfloat16)
transformer = transformer.to(torch.bfloat16)
text_encoder = text_encoder.to(torch.bfloat16)
# Initialize prompt enhancer components if needed
prompt_enhancer_components = {
"prompt_enhancer_image_caption_model": None,
"prompt_enhancer_image_caption_processor": None,
"prompt_enhancer_llm_model": None,
"prompt_enhancer_llm_tokenizer": None
}
if config.enhance_prompt:
try:
# Use default models or ones specified by config
prompt_enhancer_image_caption_model = AutoModelForCausalLM.from_pretrained(
"MiaoshouAI/Florence-2-large-PromptGen-v2.0",
trust_remote_code=True
)
prompt_enhancer_image_caption_processor = AutoProcessor.from_pretrained(
"MiaoshouAI/Florence-2-large-PromptGen-v2.0",
trust_remote_code=True
)
prompt_enhancer_llm_model = AutoModelForCausalLM.from_pretrained(
"unsloth/Llama-3.2-3B-Instruct",
torch_dtype="bfloat16",
)
prompt_enhancer_llm_tokenizer = AutoTokenizer.from_pretrained(
"unsloth/Llama-3.2-3B-Instruct",
)
prompt_enhancer_components = {
"prompt_enhancer_image_caption_model": prompt_enhancer_image_caption_model,
"prompt_enhancer_image_caption_processor": prompt_enhancer_image_caption_processor,
"prompt_enhancer_llm_model": prompt_enhancer_llm_model,
"prompt_enhancer_llm_tokenizer": prompt_enhancer_llm_tokenizer
}
except Exception as e:
logger.warning(f"Failed to load prompt enhancer models: {e}")
config.enhance_prompt = False
# Construct the pipeline
pipeline = LTXVideoPipeline(
transformer=transformer,
patchifier=patchifier,
text_encoder=text_encoder,
tokenizer=tokenizer,
scheduler=scheduler,
vae=vae,
allowed_inference_steps=allowed_inference_steps,
**prompt_enhancer_components
)
return pipeline
class EndpointHandler:
"""Handler for the LTX Video endpoint"""
def __init__(self, model_path: str = ""):
"""Initialize the endpoint handler
Args:
model_path: Path to model weights (not used, as weights are in current directory)
"""
# Enable TF32 for potential speedup on Ampere GPUs
torch.backends.cuda.matmul.allow_tf32 = True
# Initialize Varnish for post-processing
self.varnish = Varnish(
device="cuda",
model_base_dir="varnish",
enable_mmaudio=False, # Disable audio generation for now, since it is broken
)
# The actual LTX pipeline will be loaded during inference to save memory
self.pipeline = None
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process inference requests
Args:
data: Request data containing inputs and parameters
Returns:
Dictionary with generated video and metadata
"""
# Extract inputs and parameters
inputs = data.get("inputs", {})
# Support both formats:
# 1. {"inputs": {"prompt": "...", "image": "..."}}
# 2. {"inputs": "..."} (prompt only)
if isinstance(inputs, str):
input_prompt = inputs
input_image = None
else:
input_prompt = inputs.get("prompt", "")
input_image = inputs.get("image")
params = data.get("parameters", {})
if not input_prompt and not input_image:
raise ValueError("Either prompt or image must be provided")
# Create and validate configuration
config = GenerationConfig(
# general content settings
prompt=input_prompt,
negative_prompt=params.get("negative_prompt", GenerationConfig.negative_prompt),
# video model settings
width=params.get("width", GenerationConfig.width),
height=params.get("height", GenerationConfig.height),
input_image_quality=params.get("input_image_quality", GenerationConfig.input_image_quality),
num_frames=params.get("num_frames", GenerationConfig.num_frames),
guidance_scale=params.get("guidance_scale", GenerationConfig.guidance_scale),
num_inference_steps=params.get("num_inference_steps", GenerationConfig.num_inference_steps),
# STG settings
stg_scale=params.get("stg_scale", GenerationConfig.stg_scale),
stg_rescale=params.get("stg_rescale", GenerationConfig.stg_rescale),
stg_mode=params.get("stg_mode", GenerationConfig.stg_mode),
stg_skip_layers=params.get("stg_skip_layers", GenerationConfig.stg_skip_layers),
# VAE noise settings
decode_timestep=params.get("decode_timestep", GenerationConfig.decode_timestep),
decode_noise_scale=params.get("decode_noise_scale", GenerationConfig.decode_noise_scale),
image_cond_noise_scale=params.get("image_cond_noise_scale", GenerationConfig.image_cond_noise_scale),
# reproducible generation settings
seed=params.get("seed", GenerationConfig.seed),
# varnish settings
fps=params.get("fps", GenerationConfig.fps),
double_num_frames=params.get("double_num_frames", GenerationConfig.double_num_frames),
super_resolution=params.get("super_resolution", GenerationConfig.super_resolution),
grain_amount=params.get("grain_amount", GenerationConfig.grain_amount),
enable_audio=params.get("enable_audio", GenerationConfig.enable_audio),
audio_prompt=params.get("audio_prompt", GenerationConfig.audio_prompt),
audio_negative_prompt=params.get("audio_negative_prompt", GenerationConfig.audio_negative_prompt),
quality=params.get("quality", GenerationConfig.quality),
# advanced settings
mixed_precision=params.get("mixed_precision", GenerationConfig.mixed_precision),
stochastic_sampling=params.get("stochastic_sampling", GenerationConfig.stochastic_sampling),
sampler=params.get("sampler", GenerationConfig.sampler),
# prompt enhancement
enhance_prompt=params.get("enhance_prompt", GenerationConfig.enhance_prompt),
prompt_enhancement_words_threshold=params.get(
"prompt_enhancement_words_threshold",
GenerationConfig.prompt_enhancement_words_threshold
),
).validate_and_adjust()
try:
with torch.amp.autocast(device_type='cuda', dtype=torch.bfloat16), torch.no_grad():
# Set random seeds for reproducibility
random.seed(config.seed)
np.random.seed(config.seed)
torch.manual_seed(config.seed)
generator = torch.Generator(device='cuda').manual_seed(config.seed)
# Create pipeline if not already created
if self.pipeline is None:
self.pipeline = create_ltx_video_pipeline(config)
# Prepare conditioning items if an image is provided
conditioning_items = None
if input_image:
conditioning_items = [
ConditioningItem(
load_image_to_tensor_with_resize_and_crop(
input_image,
config.height,
config.width,
quality=config.input_image_quality
),
0, # Start frame
1.0 # Conditioning strength
)
]
# Set up spatiotemporal guidance strategy
if config.stg_mode == "attention_values":
skip_layer_strategy = SkipLayerStrategy.AttentionValues
elif config.stg_mode == "attention_skip":
skip_layer_strategy = SkipLayerStrategy.AttentionSkip
elif config.stg_mode == "residual":
skip_layer_strategy = SkipLayerStrategy.Residual
elif config.stg_mode == "transformer_block":
skip_layer_strategy = SkipLayerStrategy.TransformerBlock
# Generate video with LTX pipeline
result = self.pipeline(
height=config.height,
width=config.width,
num_frames=config.num_frames,
frame_rate=config.fps,
prompt=config.prompt,
negative_prompt=config.negative_prompt,
guidance_scale=config.guidance_scale,
num_inference_steps=config.num_inference_steps,
generator=generator,
output_type="pt", # Return as PyTorch tensor
skip_layer_strategy=skip_layer_strategy,
skip_block_list=config.stg_skip_layers,
stg_scale=config.stg_scale,
do_rescaling=config.stg_rescale != 1.0,
rescaling_scale=config.stg_rescale,
conditioning_items=conditioning_items,
decode_timestep=config.decode_timestep,
decode_noise_scale=config.decode_noise_scale,
image_cond_noise_scale=config.image_cond_noise_scale,
mixed_precision=config.mixed_precision,
is_video=True,
vae_per_channel_normalize=True,
stochastic_sampling=config.stochastic_sampling,
enhance_prompt=config.enhance_prompt,
)
# Get the generated frames
frames = result.images
# Process the generated frames with Varnish
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Prepare frames for Varnish (denormalize to 0-255 range)
frames = frames * 127.5 + 127.5
frames = frames.to(torch.uint8)
# Process with Varnish for post-processing
varnish_result = loop.run_until_complete(
self.varnish(
frames,
fps=config.fps,
double_num_frames=config.double_num_frames,
super_resolution=config.super_resolution,
grain_amount=config.grain_amount,
enable_audio=config.enable_audio,
audio_prompt=config.audio_prompt or config.prompt,
audio_negative_prompt=config.audio_negative_prompt,
)
)
# Get the final video as a data URI
video_uri = loop.run_until_complete(
varnish_result.write(
type="data-uri",
quality=config.quality
)
)
# Prepare metadata about the generated video
metadata = {
"width": varnish_result.metadata.width,
"height": varnish_result.metadata.height,
"num_frames": varnish_result.metadata.frame_count,
"fps": varnish_result.metadata.fps,
"duration": varnish_result.metadata.duration,
"seed": config.seed,
"prompt": config.prompt,
}
# Clean up to prevent CUDA OOM errors
del result
torch.cuda.empty_cache()
gc.collect()
return {
"video": video_uri,
"content-type": "video/mp4",
"metadata": metadata
}
except Exception as e:
# Log the error and reraise
import traceback
error_message = f"Error generating video: {str(e)}\n{traceback.format_exc()}"
logger.error(error_message)
raise RuntimeError(error_message)