Spaces:
Runtime error
Runtime error
import os | |
import requests | |
# Set the host and port of the service | |
talker_service_host = os.environ.get("TALKER_SERVICE_HOST", "localhost") | |
talker_service_port = os.environ.get("TALKER_SERVICE_PORT", 8003) | |
# API endpoint URLs | |
CHANGE_MODEL_URL = f"http://{talker_service_host}:{talker_service_port}/talker_change_model/" | |
TALKER_RESPONSE_URL = f"http://{talker_service_host}:{talker_service_port}/talker_response/" | |
def change_talker_model(model_name): | |
"""Request to change the Talker model.""" | |
params = {"model_name": model_name} | |
try: | |
response = requests.post(CHANGE_MODEL_URL, params=params) | |
response.raise_for_status() # Raise an exception for HTTP errors | |
print(f"Model change successful: {response.json()}") | |
except requests.RequestException as e: | |
print(f"Model change failed: {e}") | |
def request_talker_response(source_image_path, driven_audio_path, payload, output_video_path): | |
"""Request Talker to generate video.""" | |
# Prepare payload as form data (ensure all values are strings) | |
data = { | |
'preprocess_type': payload.get('preprocess_type', 'crop'), | |
'is_still_mode': str(payload.get('is_still_mode', False)), | |
'enhancer': str(payload.get('enhancer', False)), | |
'batch_size': str(payload.get('batch_size', 4)), | |
'size_of_image': str(payload.get('size_of_image', 256)), | |
'pose_style': str(payload.get('pose_style', 0)), | |
'facerender': payload.get('facerender', 'facevid2vid'), | |
'exp_weight': str(payload.get('exp_weight', 1.0)), | |
'blink_every': str(payload.get('blink_every', True)), | |
'talker_method': payload.get('talker_method', 'SadTalker'), | |
'fps': str(payload.get('fps', 30)), | |
} | |
# Prepare files for upload | |
with open(source_image_path, 'rb') as image_file, open(driven_audio_path, 'rb') as audio_file: | |
files = { | |
'source_image': (os.path.basename(source_image_path), image_file, 'image/jpeg'), # Adjust MIME type if necessary | |
'driven_audio': (os.path.basename(driven_audio_path), audio_file, 'audio/wav'), # Adjust MIME type if necessary | |
} | |
try: | |
# Sending POST request to the server with form data and files | |
response = requests.post(TALKER_RESPONSE_URL, data=data, files=files) | |
response.raise_for_status() # Raise an exception for HTTP errors | |
with open(output_video_path, 'wb') as video_file: | |
video_file.write(response.content) | |
print(f"Talker response successful, video saved as: {output_video_path}") | |
except requests.RequestException as e: | |
print(f"Talker response failed: {e}") | |
if __name__ == "__main__": | |
# Models to test | |
models = [ | |
"SadTalker", | |
"Wav2Lip", | |
"Wav2Lipv2", | |
"NeRFTalk", | |
] | |
result_dir = "outputs" | |
os.makedirs(result_dir, exist_ok=True) | |
# Loop to change model and generate Talker response | |
for model_name in models: | |
print(f"Switching to model: {model_name}") | |
change_talker_model(model_name) | |
# Request Talker to generate video | |
payload = { | |
"preprocess_type": "crop", | |
"is_still_mode": False, | |
"enhancer": False, | |
"batch_size": 4, | |
"size_of_image": 256, | |
"pose_style": 0, | |
"facerender": "facevid2vid", | |
"exp_weight": 1.0, | |
"blink_every": True, | |
"talker_method": model_name, | |
"fps": 30, | |
} | |
request_talker_response( | |
"inputs/example.png", | |
"answer.wav", # 确保音频文件路径正确 | |
payload, os.path.join(result_dir, f"output_video_{model_name}.mp4") | |
) | |
print("\n" + "-" * 50 + "\n") | |