English_Dialect_Classifier / audio_extractor.py
Amr-h's picture
,
ff8b257
import os
import sys
import tempfile
import warnings
import time
import shutil
import requests
from urllib.parse import urlparse, unquote
from pathlib import Path
import torch
import torchaudio
import yt_dlp
from contextlib import contextmanager
warnings.filterwarnings("ignore")
os.environ['HF_HUB_DISABLE_SYMLINKS_WARNING'] = '1'
@contextmanager
def suppress_stdout_stderr():
with open(os.devnull, "w") as devnull:
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = devnull
sys.stderr = devnull
try:
yield
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
class SimpleAudioExtractor:
def __init__(self):
self.supported_video_formats = ['.mp4', '.webm', '.avi', '.mov', '.mkv', '.m4v']
self.supported_audio_formats = ['.mp3', '.wav', '.m4a', '.aac', '.ogg', '.flac']
self.user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
def extract_audio_from_source(self, source):
"""Extract audio from file path, direct media URL, or Loom URL"""
start_time = time.time()
# Check if source is a file path
if self._is_file_path(source):
print(f"πŸ“ Processing uploaded file: {source}")
return self._process_local_file(source, start_time)
# Check if source is a direct media URL
if self._is_direct_media_url(source):
print(f"πŸ”— Processing direct media URL: {source}")
return self._download_direct_media(source, start_time)
# Check if source is a Loom URL
if self._is_loom_url(source):
print(f"πŸŽ₯ Processing Loom URL: {source}")
return self._extract_from_loom(source, start_time)
raise Exception("Unsupported URL format. Please use Loom URLs or direct media links.")
def _is_file_path(self, source):
"""Check if source is a local file path"""
try:
path = Path(source)
return path.exists() and path.is_file()
except:
return False
def _is_direct_media_url(self, url):
"""Check if URL points directly to a media file"""
try:
parsed = urlparse(url.lower())
path = unquote(parsed.path)
return any(path.endswith(ext) for ext in self.supported_video_formats + self.supported_audio_formats)
except:
return False
def _is_loom_url(self, url):
"""Check if URL is a Loom video"""
return 'loom.com' in url.lower()
def _process_local_file(self, file_path, start_time):
"""Process a local file (uploaded file)"""
try:
file_ext = Path(file_path).suffix.lower()
# If it's already an audio file, convert to WAV if needed
if file_ext in self.supported_audio_formats:
if file_ext == '.wav':
end_time = time.time()
print(f"[⏱️] Audio file processing took {end_time - start_time:.2f} seconds.")
return file_path
else:
return self._convert_to_wav(file_path, start_time)
# If it's a video file, extract audio
elif file_ext in self.supported_video_formats:
return self._extract_audio_from_video_file(file_path, start_time)
else:
raise Exception(f"Unsupported file format: {file_ext}")
except Exception as e:
raise Exception(f"Failed to process local file: {str(e)}")
def _download_direct_media(self, url, start_time):
"""Download direct media URL"""
temp_dir = tempfile.mkdtemp()
try:
headers = {
'User-Agent': self.user_agent,
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9',
'Connection': 'keep-alive',
}
response = requests.get(url, headers=headers, stream=True, timeout=60)
response.raise_for_status()
# Determine file extension from URL or content type
parsed_url = urlparse(url)
url_ext = Path(parsed_url.path).suffix.lower()
if url_ext in self.supported_video_formats + self.supported_audio_formats:
ext = url_ext
else:
# Try to get from content type
content_type = response.headers.get('content-type', '').lower()
if 'video' in content_type:
ext = '.mp4'
elif 'audio' in content_type:
ext = '.mp3'
else:
ext = '.mp4' # default
downloaded_file = os.path.join(temp_dir, f'downloaded{ext}')
with open(downloaded_file, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"βœ… Downloaded {os.path.getsize(downloaded_file) / 1024 / 1024:.1f}MB")
# Process the downloaded file
if ext in self.supported_audio_formats:
if ext == '.wav':
end_time = time.time()
print(f"[⏱️] Direct audio download took {end_time - start_time:.2f} seconds.")
return downloaded_file
else:
return self._convert_to_wav(downloaded_file, start_time)
else:
return self._extract_audio_from_video_file(downloaded_file, start_time)
except Exception as e:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir, ignore_errors=True)
raise Exception(f"Failed to download direct media: {str(e)}")
def _extract_from_loom(self, url, start_time):
"""Extract audio from Loom URL using yt-dlp"""
temp_dir = tempfile.mkdtemp()
try:
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
'preferredquality': '192',
}],
'outtmpl': os.path.join(temp_dir, 'loom_audio.%(ext)s'),
'quiet': True,
'no_warnings': True,
'noplaylist': True,
'http_headers': {
'User-Agent': self.user_agent,
},
}
with suppress_stdout_stderr():
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# Find the extracted audio file
for file in os.listdir(temp_dir):
if file.endswith('.wav'):
audio_path = os.path.join(temp_dir, file)
end_time = time.time()
print(f"[⏱️] Loom audio extraction took {end_time - start_time:.2f} seconds.")
return audio_path
raise Exception("Audio file not found after Loom extraction")
except Exception as e:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir, ignore_errors=True)
raise Exception(f"Failed to extract from Loom: {str(e)}")
def _extract_audio_from_video_file(self, video_file, start_time):
"""Extract audio from video file using FFmpeg or torchaudio"""
temp_dir = tempfile.mkdtemp()
output_audio = os.path.join(temp_dir, 'extracted_audio.wav')
try:
# Try FFmpeg first
import subprocess
cmd = [
'ffmpeg', '-i', video_file,
'-vn', # no video
'-acodec', 'pcm_s16le', # uncompressed WAV
'-ar', '16000', # 16kHz sample rate
'-ac', '1', # mono
'-y', # overwrite output file
output_audio
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0 and os.path.exists(output_audio):
end_time = time.time()
print(f"[⏱️] Audio extraction from video took {end_time - start_time:.2f} seconds.")
return output_audio
else:
raise Exception("FFmpeg failed, trying torchaudio...")
except (FileNotFoundError, Exception):
# Fallback to torchaudio
return self._convert_to_wav(video_file, start_time)
def _convert_to_wav(self, audio_file, start_time):
"""Convert audio file to WAV format using torchaudio"""
try:
waveform, sample_rate = torchaudio.load(audio_file)
# Convert to mono if needed
if waveform.shape[0] > 1:
waveform = torch.mean(waveform, dim=0, keepdim=True)
# Resample to 16kHz if needed
if sample_rate != 16000:
waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(waveform)
# Save as WAV
temp_dir = tempfile.mkdtemp()
output_wav = os.path.join(temp_dir, 'converted_audio.wav')
torchaudio.save(output_wav, waveform, 16000)
end_time = time.time()
print(f"[⏱️] Audio conversion took {end_time - start_time:.2f} seconds.")
return output_wav
except Exception as e:
raise Exception(f"Failed to convert audio to WAV: {str(e)}")
def chunk_audio_1min(waveform, sample_rate, short_audio_threshold=30):
"""Create 1-minute chunks from audio, handle short audio as single chunk"""
total_samples = waveform.size(1)
duration_sec = total_samples / sample_rate
# If audio is short (≀30 seconds by default), return as single chunk
if duration_sec <= short_audio_threshold:
print(f"πŸ“¦ Short audio ({duration_sec:.2f}s), keeping as single chunk")
return [waveform]
# For longer audio, use 1-minute chunks
chunk_length_sec = 60 # 1 minute chunks
chunk_samples = chunk_length_sec * sample_rate
chunks = []
for start in range(0, total_samples, chunk_samples):
end = min(start + chunk_samples, total_samples)
chunk = waveform[:, start:end]
# Only include chunks that are at least 10 seconds long
if chunk.size(1) > sample_rate * 10:
chunks.append(chunk)
print(f"πŸ“¦ Created {len(chunks)} 1-minute chunks")
return chunks
def prepare_audio(video_source, short_audio_threshold=30):
"""Main function to extract and prepare audio chunks, handling short audio as single segment"""
try:
print(f"🎡 Extracting audio from source...")
extractor = SimpleAudioExtractor()
audio_path = extractor.extract_audio_from_source(video_source)
print(f"βœ… Audio extracted to: {audio_path}")
print(f"🎯 Loading and preparing audio...")
start = time.time()
waveform, sample_rate = torchaudio.load(audio_path)
# Resample to 16kHz if needed
if sample_rate != 16000:
waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(waveform)
sample_rate = 16000
# Convert to mono if needed
if waveform.shape[0] > 1:
waveform = torch.mean(waveform, dim=0, keepdim=True)
end = time.time()
print(f"[⏱️] Audio preparation took {end - start:.2f} seconds.")
# Calculate duration and create chunks
duration_minutes = waveform.size(1) / sample_rate / 60
print(f"🧩 Creating chunks (short audio threshold: {short_audio_threshold}s)...")
start = time.time()
chunks = chunk_audio_1min(waveform, sample_rate, short_audio_threshold)
end = time.time()
print(f"[⏱️] Chunking took {end - start:.2f} seconds. Total chunks: {len(chunks)}")
return {
"success": True,
"chunks": chunks,
"audio_path": audio_path,
"duration_minutes": duration_minutes,
"total_chunks": len(chunks)
}
except Exception as e:
print(f"❌ Error in audio preparation.: {str(e)}")
return {
"success": False,
"error": str(e),
"chunks": [],
"audio_path": None
}