Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import tempfile | |
import warnings | |
import time | |
import shutil | |
import requests | |
from urllib.parse import urlparse, unquote | |
from pathlib import Path | |
import torch | |
import torchaudio | |
import yt_dlp | |
from contextlib import contextmanager | |
warnings.filterwarnings("ignore") | |
os.environ['HF_HUB_DISABLE_SYMLINKS_WARNING'] = '1' | |
def suppress_stdout_stderr(): | |
with open(os.devnull, "w") as devnull: | |
old_stdout = sys.stdout | |
old_stderr = sys.stderr | |
sys.stdout = devnull | |
sys.stderr = devnull | |
try: | |
yield | |
finally: | |
sys.stdout = old_stdout | |
sys.stderr = old_stderr | |
class SimpleAudioExtractor: | |
def __init__(self): | |
self.supported_video_formats = ['.mp4', '.webm', '.avi', '.mov', '.mkv', '.m4v'] | |
self.supported_audio_formats = ['.mp3', '.wav', '.m4a', '.aac', '.ogg', '.flac'] | |
self.user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' | |
def extract_audio_from_source(self, source): | |
"""Extract audio from file path, direct media URL, or Loom URL""" | |
start_time = time.time() | |
# Check if source is a file path | |
if self._is_file_path(source): | |
print(f"π Processing uploaded file: {source}") | |
return self._process_local_file(source, start_time) | |
# Check if source is a direct media URL | |
if self._is_direct_media_url(source): | |
print(f"π Processing direct media URL: {source}") | |
return self._download_direct_media(source, start_time) | |
# Check if source is a Loom URL | |
if self._is_loom_url(source): | |
print(f"π₯ Processing Loom URL: {source}") | |
return self._extract_from_loom(source, start_time) | |
raise Exception("Unsupported URL format. Please use Loom URLs or direct media links.") | |
def _is_file_path(self, source): | |
"""Check if source is a local file path""" | |
try: | |
path = Path(source) | |
return path.exists() and path.is_file() | |
except: | |
return False | |
def _is_direct_media_url(self, url): | |
"""Check if URL points directly to a media file""" | |
try: | |
parsed = urlparse(url.lower()) | |
path = unquote(parsed.path) | |
return any(path.endswith(ext) for ext in self.supported_video_formats + self.supported_audio_formats) | |
except: | |
return False | |
def _is_loom_url(self, url): | |
"""Check if URL is a Loom video""" | |
return 'loom.com' in url.lower() | |
def _process_local_file(self, file_path, start_time): | |
"""Process a local file (uploaded file)""" | |
try: | |
file_ext = Path(file_path).suffix.lower() | |
# If it's already an audio file, convert to WAV if needed | |
if file_ext in self.supported_audio_formats: | |
if file_ext == '.wav': | |
end_time = time.time() | |
print(f"[β±οΈ] Audio file processing took {end_time - start_time:.2f} seconds.") | |
return file_path | |
else: | |
return self._convert_to_wav(file_path, start_time) | |
# If it's a video file, extract audio | |
elif file_ext in self.supported_video_formats: | |
return self._extract_audio_from_video_file(file_path, start_time) | |
else: | |
raise Exception(f"Unsupported file format: {file_ext}") | |
except Exception as e: | |
raise Exception(f"Failed to process local file: {str(e)}") | |
def _download_direct_media(self, url, start_time): | |
"""Download direct media URL""" | |
temp_dir = tempfile.mkdtemp() | |
try: | |
headers = { | |
'User-Agent': self.user_agent, | |
'Accept': '*/*', | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Connection': 'keep-alive', | |
} | |
response = requests.get(url, headers=headers, stream=True, timeout=60) | |
response.raise_for_status() | |
# Determine file extension from URL or content type | |
parsed_url = urlparse(url) | |
url_ext = Path(parsed_url.path).suffix.lower() | |
if url_ext in self.supported_video_formats + self.supported_audio_formats: | |
ext = url_ext | |
else: | |
# Try to get from content type | |
content_type = response.headers.get('content-type', '').lower() | |
if 'video' in content_type: | |
ext = '.mp4' | |
elif 'audio' in content_type: | |
ext = '.mp3' | |
else: | |
ext = '.mp4' # default | |
downloaded_file = os.path.join(temp_dir, f'downloaded{ext}') | |
with open(downloaded_file, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
f.write(chunk) | |
print(f"β Downloaded {os.path.getsize(downloaded_file) / 1024 / 1024:.1f}MB") | |
# Process the downloaded file | |
if ext in self.supported_audio_formats: | |
if ext == '.wav': | |
end_time = time.time() | |
print(f"[β±οΈ] Direct audio download took {end_time - start_time:.2f} seconds.") | |
return downloaded_file | |
else: | |
return self._convert_to_wav(downloaded_file, start_time) | |
else: | |
return self._extract_audio_from_video_file(downloaded_file, start_time) | |
except Exception as e: | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
raise Exception(f"Failed to download direct media: {str(e)}") | |
def _extract_from_loom(self, url, start_time): | |
"""Extract audio from Loom URL using yt-dlp""" | |
temp_dir = tempfile.mkdtemp() | |
try: | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'wav', | |
'preferredquality': '192', | |
}], | |
'outtmpl': os.path.join(temp_dir, 'loom_audio.%(ext)s'), | |
'quiet': True, | |
'no_warnings': True, | |
'noplaylist': True, | |
'http_headers': { | |
'User-Agent': self.user_agent, | |
}, | |
} | |
with suppress_stdout_stderr(): | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([url]) | |
# Find the extracted audio file | |
for file in os.listdir(temp_dir): | |
if file.endswith('.wav'): | |
audio_path = os.path.join(temp_dir, file) | |
end_time = time.time() | |
print(f"[β±οΈ] Loom audio extraction took {end_time - start_time:.2f} seconds.") | |
return audio_path | |
raise Exception("Audio file not found after Loom extraction") | |
except Exception as e: | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
raise Exception(f"Failed to extract from Loom: {str(e)}") | |
def _extract_audio_from_video_file(self, video_file, start_time): | |
"""Extract audio from video file using FFmpeg or torchaudio""" | |
temp_dir = tempfile.mkdtemp() | |
output_audio = os.path.join(temp_dir, 'extracted_audio.wav') | |
try: | |
# Try FFmpeg first | |
import subprocess | |
cmd = [ | |
'ffmpeg', '-i', video_file, | |
'-vn', # no video | |
'-acodec', 'pcm_s16le', # uncompressed WAV | |
'-ar', '16000', # 16kHz sample rate | |
'-ac', '1', # mono | |
'-y', # overwrite output file | |
output_audio | |
] | |
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) | |
if result.returncode == 0 and os.path.exists(output_audio): | |
end_time = time.time() | |
print(f"[β±οΈ] Audio extraction from video took {end_time - start_time:.2f} seconds.") | |
return output_audio | |
else: | |
raise Exception("FFmpeg failed, trying torchaudio...") | |
except (FileNotFoundError, Exception): | |
# Fallback to torchaudio | |
return self._convert_to_wav(video_file, start_time) | |
def _convert_to_wav(self, audio_file, start_time): | |
"""Convert audio file to WAV format using torchaudio""" | |
try: | |
waveform, sample_rate = torchaudio.load(audio_file) | |
# Convert to mono if needed | |
if waveform.shape[0] > 1: | |
waveform = torch.mean(waveform, dim=0, keepdim=True) | |
# Resample to 16kHz if needed | |
if sample_rate != 16000: | |
waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(waveform) | |
# Save as WAV | |
temp_dir = tempfile.mkdtemp() | |
output_wav = os.path.join(temp_dir, 'converted_audio.wav') | |
torchaudio.save(output_wav, waveform, 16000) | |
end_time = time.time() | |
print(f"[β±οΈ] Audio conversion took {end_time - start_time:.2f} seconds.") | |
return output_wav | |
except Exception as e: | |
raise Exception(f"Failed to convert audio to WAV: {str(e)}") | |
def chunk_audio_1min(waveform, sample_rate, short_audio_threshold=30): | |
"""Create 1-minute chunks from audio, handle short audio as single chunk""" | |
total_samples = waveform.size(1) | |
duration_sec = total_samples / sample_rate | |
# If audio is short (β€30 seconds by default), return as single chunk | |
if duration_sec <= short_audio_threshold: | |
print(f"π¦ Short audio ({duration_sec:.2f}s), keeping as single chunk") | |
return [waveform] | |
# For longer audio, use 1-minute chunks | |
chunk_length_sec = 60 # 1 minute chunks | |
chunk_samples = chunk_length_sec * sample_rate | |
chunks = [] | |
for start in range(0, total_samples, chunk_samples): | |
end = min(start + chunk_samples, total_samples) | |
chunk = waveform[:, start:end] | |
# Only include chunks that are at least 10 seconds long | |
if chunk.size(1) > sample_rate * 10: | |
chunks.append(chunk) | |
print(f"π¦ Created {len(chunks)} 1-minute chunks") | |
return chunks | |
def prepare_audio(video_source, short_audio_threshold=30): | |
"""Main function to extract and prepare audio chunks, handling short audio as single segment""" | |
try: | |
print(f"π΅ Extracting audio from source...") | |
extractor = SimpleAudioExtractor() | |
audio_path = extractor.extract_audio_from_source(video_source) | |
print(f"β Audio extracted to: {audio_path}") | |
print(f"π― Loading and preparing audio...") | |
start = time.time() | |
waveform, sample_rate = torchaudio.load(audio_path) | |
# Resample to 16kHz if needed | |
if sample_rate != 16000: | |
waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(waveform) | |
sample_rate = 16000 | |
# Convert to mono if needed | |
if waveform.shape[0] > 1: | |
waveform = torch.mean(waveform, dim=0, keepdim=True) | |
end = time.time() | |
print(f"[β±οΈ] Audio preparation took {end - start:.2f} seconds.") | |
# Calculate duration and create chunks | |
duration_minutes = waveform.size(1) / sample_rate / 60 | |
print(f"π§© Creating chunks (short audio threshold: {short_audio_threshold}s)...") | |
start = time.time() | |
chunks = chunk_audio_1min(waveform, sample_rate, short_audio_threshold) | |
end = time.time() | |
print(f"[β±οΈ] Chunking took {end - start:.2f} seconds. Total chunks: {len(chunks)}") | |
return { | |
"success": True, | |
"chunks": chunks, | |
"audio_path": audio_path, | |
"duration_minutes": duration_minutes, | |
"total_chunks": len(chunks) | |
} | |
except Exception as e: | |
print(f"β Error in audio preparation.: {str(e)}") | |
return { | |
"success": False, | |
"error": str(e), | |
"chunks": [], | |
"audio_path": None | |
} |