import os import sys current_dir = os.path.dirname(os.path.abspath(__file__)) print('add current dir to sys.path', current_dir) sys.path.append(current_dir) from sparktts.models.audio_tokenizer import BiCodecTokenizer from transformers import AutoTokenizer, AutoModelForCausalLM import soundfile as sf import numpy as np import torch from utilities import generate_embeddings def generate_speech(model, tokenizer, text, bicodec, prompt_text=None, prompt_audio=None, max_new_tokens=3000, do_sample=True, top_k=50, top_p=0.95, temperature=1.0, device="cuda:0"): """ 生成语音的函数 Args: model: 语言模型 tokenizer: 文本分词器 text: 要生成语音的文本 bicodec: BiCodecTokenizer 实例 prompt_text: 提示文本(可选) prompt_audio: 提示音频数组(可选) max_new_tokens: 最大生成token数 do_sample: 是否使用采样 top_k: top-k采样参数 top_p: top-p采样参数 temperature: 温度参数 device: 设备 Returns: wav: 生成的音频波形 """ # 设置eos_token_id - 根据训练代码,eos_token_id = model.config.vocab_size - 1 eos_token_id = model.config.vocab_size - 1 print(f"EOS token ID: {eos_token_id}") # 生成输入嵌入 embeddings = generate_embeddings( model=model, tokenizer=tokenizer, text=text, bicodec=bicodec, prompt_text=prompt_text, prompt_audio=prompt_audio ) print("开始生成语音...") print(f"输入嵌入形状: {embeddings['input_embs'].shape}") global_tokens = embeddings['global_tokens'].unsqueeze(0) # 设置模型为评估模式 print(f'embeddings dtype: {embeddings["input_embs"].dtype}') model.eval() with torch.no_grad(): # 使用模型的generate方法 generated_outputs = model.generate( inputs_embeds=embeddings['input_embs'], attention_mask=torch.ones((1, embeddings['input_embs'].shape[1]),dtype=torch.long,device=device), max_new_tokens=max_new_tokens, do_sample=do_sample, top_k=top_k, top_p=top_p, temperature=temperature, eos_token_id=eos_token_id, pad_token_id=tokenizer.pad_token_id if hasattr(tokenizer, 'pad_token_id') else tokenizer.eos_token_id, use_cache=True ) print(f"generated_outputs: {generated_outputs}") print(f"生成的token数量: {generated_outputs.shape}") print(f"生成的token IDs: {generated_outputs.tolist()}") # 直接使用生成的token ID作为semantic tokens # 注意:这里生成的token ID是模型词表中的ID,不是原始tokenizer的词表 semantic_tokens_tensor = generated_outputs[:,:-1] print(f"Semantic tokens shape: {semantic_tokens_tensor.shape}") print(f"Global tokens shape: {global_tokens.shape}") # 使用BiCodec解码生成音频 with torch.no_grad(): wav = bicodec.detokenize(global_tokens, semantic_tokens_tensor) print(f"生成的音频形状: {wav.shape}") return wav def trim_silence(wav, threshold=0.01, min_length=0.1, sample_rate=24000): """ 切除音频前后的静音部分 Args: wav: 音频数组 (numpy array 或 torch.Tensor) threshold: 静音阈值,低于此值认为是静音 min_length: 最小保留长度(秒) sample_rate: 采样率 Returns: trimmed_wav: 切除静音后的音频 """ if isinstance(wav, torch.Tensor): wav = wav.cpu().numpy() # 确保音频是1D数组 if wav.ndim > 1: wav = wav.flatten() # 计算音频的绝对值 audio_abs = np.abs(wav) # 找到超过阈值的第一个和最后一个样本 above_threshold = audio_abs > threshold if not np.any(above_threshold): # 如果整个音频都是静音,返回原始音频 return wav start_idx = np.argmax(above_threshold) end_idx = len(wav) - np.argmax(above_threshold[::-1]) # 确保保留最小长度 min_samples = int(min_length * sample_rate) if end_idx - start_idx < min_samples: # 如果切除后太短,从中间扩展 center = (start_idx + end_idx) // 2 start_idx = max(0, center - min_samples // 2) end_idx = min(len(wav), center + min_samples // 2) trimmed_wav = wav[start_idx:end_idx] print(f"原始音频长度: {len(wav) / sample_rate:.2f}秒") print(f"切除静音后长度: {len(trimmed_wav) / sample_rate:.2f}秒") print(f"切除的静音部分: 前{start_idx / sample_rate:.2f}秒, 后{(len(wav) - end_idx) / sample_rate:.2f}秒") return trimmed_wav device = 'cuda:0' audio_tokenizer = BiCodecTokenizer(model_dir=current_dir, device=device) print(audio_tokenizer) tokenizer = AutoTokenizer.from_pretrained(current_dir, trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained(current_dir, trust_remote_code=True) print(tokenizer) print(model) model = model.bfloat16().to(device) model.eval() prompt_text = "我们并不是通过物理移动手段找到星河的。" prompt_audio_file = os.path.join(current_dir, 'kafka.wav') prompt_audio, sampling_rate = sf.read(prompt_audio_file) print(f"Loaded prompt audio from {prompt_audio_file}") print(f"Original sampling rate: {sampling_rate}Hz") print(f"Audio shape: {prompt_audio.shape}") target_sample_rate = audio_tokenizer.config['sample_rate'] if sampling_rate != target_sample_rate: print(f"Resampling from {sampling_rate}Hz to {target_sample_rate}Hz...") from librosa import resample prompt_audio = resample(prompt_audio, orig_sr=sampling_rate, target_sr=target_sample_rate) prompt_audio = np.array(prompt_audio, dtype=np.float32) print(f"Resampled audio shape: {prompt_audio.shape}") else: print(f"Audio sampling rate already matches target ({target_sample_rate}Hz)") text = "1949年11月2日,二房他们已经接受了老爷子安排的:大房拿企业、二房拿钱的设定。富贵闲人他们也做了。在嫡长女和国资抢股权期间不出来搅局,就连老爷子的葬礼都没有露面,安安静静坐实老爷子一辈子的完美人设。大部分中国人只喝过农夫山泉怡宝娃哈哈康师傅之类的头部品牌水,没喝过别的杂牌。" from tn.chinese.normalizer import Normalizer normalizer = Normalizer(remove_erhua=False, full_to_half=False, overwrite_cache=True, remove_interjections=False) from tn.english.normalizer import Normalizer as EnglishNormalizer english_normalizer = EnglishNormalizer() # # text = "失败是成功之母,不要轻易放弃。心有猛虎,细嗅蔷薇。" text = "近日,各高校录取通知书陆续发放,北京化工大学的录取通知书因一则新奇视频走红网络。视频中,一位新生收到通知书后,发现其材质坚固异常,竟然能用它干脆利落地切开西瓜,仅两次操作,西瓜就一分为二。这一操作瞬间让北京化工大学录取通知书” 话题登上热搜。" text = "最近一则新闻让人唏嘘:一位坚持晨跑三年的54岁大叔,突然因脑梗离世。医生检查后发现,他的血管状况比同龄久坐人群还要糟糕!" text = "他们坚信“生命在于运动“,却不知道有些跑步方式正在悄悄伤害身体。" text = "清晨的公园里,总能看到一群中老年人挥汗如雨的跑步。" text = "很多人喜欢空腹晨跑,认为这样减肥效果更好。但早晨血液黏稠度高,血压处于峰值时段,此时剧烈运动极易诱发心脑血管意外。理想运动时间应在下午4到6点,此时人体机能处于最佳状态。" text = "从来没有人告诉我,原来长大以后,你可能只会做一份不怎么样的工作,将就着过一段不怎么样的生活。" text = "每天必须跑够5公里的执念害了不少人。中老年人关节退变、血管弹性下降,过量跑步会导致半月板磨损,还会使血管内皮反复受到血流冲击。建议采用“谈话测试”:跑步时能正常对话不喘粗气为宜。" text = normalizer.normalize(text) print(f"text: {text}") wav = generate_speech(model, tokenizer, text, audio_tokenizer, prompt_audio=prompt_audio, device=device) # 切除静音 wav = trim_silence(wav, threshold=0.01, min_length=0.1, sample_rate=target_sample_rate) sf.write('output.wav', wav, target_sample_rate) english_text = "The latest improvement is that the model can now generate speech in English and Chinese." english_text = english_normalizer.normalize(english_text) print(f"english_text: {english_text}") wav = generate_speech(model, tokenizer, english_text, audio_tokenizer, prompt_audio=prompt_audio, device=device) # 切除静音 wav = trim_silence(wav, threshold=0.01, min_length=0.1, sample_rate=target_sample_rate) sf.write('output_english.wav', wav, target_sample_rate)