|
import os, subprocess
|
|
import gradio as gr
|
|
import shutil, time, torch, gc
|
|
from mega import Mega
|
|
from datetime import datetime
|
|
import pandas as pd
|
|
import os, sys, subprocess, numpy as np
|
|
from pydub import AudioSegment
|
|
try:
|
|
from whisperspeech.pipeline import Pipeline as TTS
|
|
whisperspeak_on = True
|
|
except:
|
|
whisperspeak_on = False
|
|
|
|
|
|
class CachedModels:
|
|
def __init__(self):
|
|
csv_url = "https://docs.google.com/spreadsheets/d/1tAUaQrEHYgRsm1Lvrnj14HFHDwJWl0Bd9x0QePewNco/export?format=csv&gid=1977693859"
|
|
if os.path.exists("spreadsheet.csv"):
|
|
self.cached_data = pd.read_csv("spreadsheet.csv")
|
|
else:
|
|
self.cached_data = pd.read_csv(csv_url)
|
|
self.cached_data.to_csv("spreadsheet.csv", index=False)
|
|
|
|
self.models = {}
|
|
for _, row in self.cached_data.iterrows():
|
|
filename = row['Filename']
|
|
url = None
|
|
for value in row.values:
|
|
if isinstance(value, str) and "huggingface" in value:
|
|
url = value
|
|
break
|
|
if url:
|
|
self.models[filename] = url
|
|
|
|
def get_models(self):
|
|
return self.models
|
|
|
|
def show(path,ext,on_error=None):
|
|
try:
|
|
return list(filter(lambda x: x.endswith(ext), os.listdir(path)))
|
|
except:
|
|
return on_error
|
|
|
|
def run_subprocess(command):
|
|
try:
|
|
subprocess.run(command, check=True)
|
|
return True, None
|
|
except Exception as e:
|
|
return False, e
|
|
|
|
def download_from_url(url=None, model=None):
|
|
if not url:
|
|
try:
|
|
url = model[f'{model}']
|
|
except:
|
|
gr.Warning("Failed")
|
|
return ''
|
|
if model == '':
|
|
try:
|
|
model = url.split('/')[-1].split('?')[0]
|
|
except:
|
|
gr.Warning('Please name the model')
|
|
return
|
|
model = model.replace('.pth', '').replace('.index', '').replace('.zip', '')
|
|
url = url.replace('/blob/main/', '/resolve/main/').strip()
|
|
|
|
for directory in ["downloads", "unzips","zip"]:
|
|
|
|
os.makedirs(directory, exist_ok=True)
|
|
|
|
try:
|
|
if url.endswith('.pth'):
|
|
subprocess.run(["wget", url, "-O", f'assets/weights/{model}.pth'])
|
|
elif url.endswith('.index'):
|
|
os.makedirs(f'logs/{model}', exist_ok=True)
|
|
subprocess.run(["wget", url, "-O", f'logs/{model}/added_{model}.index'])
|
|
elif url.endswith('.zip'):
|
|
subprocess.run(["wget", url, "-O", f'downloads/{model}.zip'])
|
|
else:
|
|
if "drive.google.com" in url:
|
|
url = url.split('/')[0]
|
|
subprocess.run(["gdown", url, "--fuzzy", "-O", f'downloads/{model}'])
|
|
elif "mega.nz" in url:
|
|
Mega().download_url(url, 'downloads')
|
|
else:
|
|
subprocess.run(["wget", url, "-O", f'downloads/{model}'])
|
|
|
|
downloaded_file = next((f for f in os.listdir("downloads")), None)
|
|
if downloaded_file:
|
|
if downloaded_file.endswith(".zip"):
|
|
shutil.unpack_archive(f'downloads/{downloaded_file}', "unzips", 'zip')
|
|
for root, _, files in os.walk('unzips'):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
if file.endswith(".index"):
|
|
os.makedirs(f'logs/{model}', exist_ok=True)
|
|
shutil.copy2(file_path, f'logs/{model}')
|
|
elif file.endswith(".pth") and "G_" not in file and "D_" not in file:
|
|
shutil.copy(file_path, f'assets/weights/{model}.pth')
|
|
elif downloaded_file.endswith(".pth"):
|
|
shutil.copy(f'downloads/{downloaded_file}', f'assets/weights/{model}.pth')
|
|
elif downloaded_file.endswith(".index"):
|
|
os.makedirs(f'logs/{model}', exist_ok=True)
|
|
shutil.copy(f'downloads/{downloaded_file}', f'logs/{model}/added_{model}.index')
|
|
else:
|
|
gr.Warning("Failed to download file")
|
|
return 'Failed'
|
|
|
|
gr.Info("Done")
|
|
except Exception as e:
|
|
gr.Warning(f"There's been an error: {str(e)}")
|
|
finally:
|
|
shutil.rmtree("downloads", ignore_errors=True)
|
|
shutil.rmtree("unzips", ignore_errors=True)
|
|
shutil.rmtree("zip", ignore_errors=True)
|
|
return 'Done'
|
|
|
|
def speak(audio, text):
|
|
print(f"({audio}, {text})")
|
|
current_dir = os.getcwd()
|
|
os.chdir('./gpt_sovits_demo')
|
|
process = subprocess.Popen([
|
|
"python", "./zero.py",
|
|
"--input_file", audio,
|
|
"--audio_lang", "English",
|
|
"--text", text,
|
|
"--text_lang", "English"
|
|
], stdout=subprocess.PIPE, text=True)
|
|
|
|
for line in process.stdout:
|
|
line = line.strip()
|
|
if "All keys matched successfully" in line:
|
|
continue
|
|
if line.startswith("(") and line.endswith(")"):
|
|
path, finished = line[1:-1].split(", ")
|
|
if finished:
|
|
os.chdir(current_dir)
|
|
return path
|
|
os.chdir(current_dir)
|
|
return None
|
|
|
|
def whisperspeak(text, tts_lang, cps=10.5):
|
|
if whisperspeak_on is None: return None
|
|
if not "tts_pipe" in locals(): tts_pipe = TTS(t2s_ref='whisperspeech/whisperspeech:t2s-v1.95-small-8lang.model', s2a_ref='whisperspeech/whisperspeech:s2a-v1.95-medium-7lang.model')
|
|
from fastprogress.fastprogress import master_bar, progress_bar
|
|
master_bar.update = lambda *args, **kwargs: None
|
|
progress_bar.update = lambda *args, **kwargs: None
|
|
|
|
output = f"audios/tts_audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
|
|
tts_pipe.generate_to_file(output, text, cps=cps, lang=tts_lang)
|
|
return os.path.abspath(output)
|
|
|
|
def stereo_process(audio1,audio2,choice):
|
|
audio = audio1 if choice == "Input" else audio2
|
|
print(audio)
|
|
sample_rate, audio_array = audio
|
|
if len(audio_array.shape) == 1:
|
|
audio_bytes = audio_array.tobytes()
|
|
segment = AudioSegment(
|
|
data=audio_bytes,
|
|
sample_width=audio_array.dtype.itemsize,
|
|
frame_rate=sample_rate,
|
|
channels=1
|
|
)
|
|
samples = np.array(segment.get_array_of_samples())
|
|
delay_samples = int(segment.frame_rate * (0.6 / 1000.0))
|
|
left_channel = np.zeros_like(samples)
|
|
right_channel = samples
|
|
left_channel[delay_samples:] = samples[:-delay_samples]
|
|
stereo_samples = np.column_stack((left_channel, right_channel))
|
|
return (sample_rate, stereo_samples.astype(np.int16))
|
|
else:
|
|
return audio
|
|
|
|
def sr_process(audio1, audio2, choice):
|
|
torch.cuda.empty_cache()
|
|
gc.collect()
|
|
if "tts_pipe" in locals(): del tts_pipe
|
|
audio = audio1 if choice == "Input" else audio2
|
|
sample_rate, audio_array = audio
|
|
audio_segment = AudioSegment(
|
|
audio_array.tobytes(),
|
|
frame_rate=sample_rate,
|
|
sample_width=audio_array.dtype.itemsize,
|
|
channels=1 if len(audio_array.shape) == 1 else 2
|
|
)
|
|
temp_file = os.path.join('TEMP', f'{choice}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.wav')
|
|
audio_segment.export(temp_file, format="wav")
|
|
output_folder = "SR"
|
|
model_name = "speech"
|
|
suffix = "_ldm"
|
|
guidance_scale = 2.7
|
|
ddim_steps = 50
|
|
venv_dir = "audiosr"
|
|
|
|
def split_audio(input_file, output_folder, chunk_duration=5.12):
|
|
os.makedirs(output_folder, exist_ok=True)
|
|
ffmpeg_command = f"ffmpeg -i {input_file} -f segment -segment_time {chunk_duration} -c:a pcm_s16le {output_folder}/out%03d.wav"
|
|
subprocess.run(ffmpeg_command, shell=True, check=True)
|
|
|
|
def create_file_list(output_folder):
|
|
file_list = os.path.join(output_folder, "file_list.txt")
|
|
with open(file_list, "w") as f:
|
|
for filename in sorted(os.listdir(output_folder)):
|
|
if filename.endswith(".wav"):
|
|
f.write(os.path.join(output_folder, filename) + "\n")
|
|
return file_list
|
|
|
|
def run_audiosr(file_list, model_name, suffix, guidance_scale, ddim_steps, output_folder, venv_dir):
|
|
command = f"{venv_dir}/bin/python -m audiosr --input_file_list {file_list} --model_name {model_name} --suffix {suffix} --guidance_scale {guidance_scale} --ddim_steps {ddim_steps} --save_path {output_folder}"
|
|
try:
|
|
subprocess.run(command, shell=True, check=True, stderr=subprocess.PIPE)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error running audiosr: {e.stderr.decode()}")
|
|
|
|
|
|
split_audio(temp_file, output_folder)
|
|
file_list = create_file_list(output_folder)
|
|
run_audiosr(file_list, model_name, suffix, guidance_scale, ddim_steps, output_folder, venv_dir)
|
|
|
|
output_file = None
|
|
time.sleep(1)
|
|
processed_chunks = []
|
|
for root, dirs, files in os.walk(output_folder):
|
|
for file in sorted(files):
|
|
if file.startswith("out") and file.endswith(f"{suffix}.wav"):
|
|
chunk_file = os.path.join(root, file)
|
|
processed_chunks.append(AudioSegment.from_wav(chunk_file))
|
|
|
|
if processed_chunks:
|
|
merged_audio = sum(processed_chunks)
|
|
output_file = os.path.join(output_folder, f"{choice}_merged{suffix}.wav")
|
|
merged_audio.export(output_file, format="wav")
|
|
|
|
display_file = AudioSegment.from_file(output_file)
|
|
sample_rate = display_file.frame_rate
|
|
audio_array = np.array(display_file.get_array_of_samples())
|
|
return (sample_rate, audio_array)
|
|
else:
|
|
print(f"Error: Could not find any processed audio chunks in {output_folder}")
|
|
return None
|
|
|