lisonallen commited on
Commit
1082c60
·
1 Parent(s): da097bc

增强调试信息和异常处理,优化FIFO队列操作,确保生成过程中的信号推送和中断逻辑更加可靠

Browse files
Files changed (2) hide show
  1. app.py +70 -14
  2. diffusers_helper/thread_utils.py +50 -3
app.py CHANGED
@@ -672,10 +672,23 @@ def worker(input_image, prompt, n_prompt, seed, total_second_length, latent_wind
672
 
673
  try:
674
  # 首先检查是否有停止信号
675
- if stream.input_queue.top() == 'end':
676
- print("检测到停止信号,中断采样过程...")
677
- stream.output_queue.push(('end', None))
678
- raise KeyboardInterrupt('用户主动结束任务')
 
 
 
 
 
 
 
 
 
 
 
 
 
679
 
680
  preview = d['denoised']
681
  preview = vae_decode_fake(preview)
@@ -688,12 +701,15 @@ def worker(input_image, prompt, n_prompt, seed, total_second_length, latent_wind
688
  hint = f'Sampling {current_step}/{steps}'
689
  desc = f'Total generated frames: {int(max(0, total_generated_latent_frames * 4 - 3))}, Video length: {max(0, (total_generated_latent_frames * 4 - 3) / 30) :.2f} seconds (FPS-30). The video is being extended now ...'
690
  stream.output_queue.push(('progress', (preview, desc, make_progress_bar_html(percentage, hint))))
691
- except KeyboardInterrupt:
692
  # 捕获并重新抛出中断异常,确保它能传播到采样函数
 
 
693
  raise
694
  except Exception as e:
695
- print(f"回调函数中出错: {e}")
696
  # 不中断采样过程
 
697
  return
698
 
699
  try:
@@ -701,6 +717,7 @@ def worker(input_image, prompt, n_prompt, seed, total_second_length, latent_wind
701
  print(f"开始采样,设备: {device}, 数据类型: {transformer.dtype}, 使用TeaCache: {use_teacache and not cpu_fallback_mode}")
702
 
703
  try:
 
704
  generated_latents = sample_hunyuan(
705
  transformer=transformer,
706
  sampler='unipc',
@@ -732,20 +749,26 @@ def worker(input_image, prompt, n_prompt, seed, total_second_length, latent_wind
732
  callback=callback,
733
  )
734
 
735
- print(f"采样完成,用时: {time.time() - sampling_start_time:.2f}秒")
736
- except KeyboardInterrupt:
737
  # 用户主动中断
738
- print("用户主动中断采样过程")
 
739
 
740
  # 如果已经有生成的视频,返回最后生成的视频
741
  if last_output_filename:
 
742
  stream.output_queue.push(('file', last_output_filename))
743
  error_msg = "用户中断生成过程,但已生成部分视频"
744
  else:
 
745
  error_msg = "用户中断生成过程,未生成视频"
746
 
 
747
  stream.output_queue.push(('error', error_msg))
 
748
  stream.output_queue.push(('end', None))
 
749
  return
750
  except Exception as e:
751
  print(f"采样过程中出错: {e}")
@@ -850,26 +873,39 @@ def worker(input_image, prompt, n_prompt, seed, total_second_length, latent_wind
850
  if is_last_section:
851
  break
852
  except Exception as e:
853
- print(f"处理过程中出现错误: {e}")
 
854
  traceback.print_exc()
 
 
 
 
855
 
856
  if not high_vram and not cpu_fallback_mode:
857
  try:
 
858
  unload_complete_models(
859
  text_encoder, text_encoder_2, image_encoder, vae, transformer
860
  )
861
- except Exception:
 
 
862
  pass
863
 
864
  # 如果已经有生成的视频,返回最后生成的视频
865
  if last_output_filename:
 
866
  stream.output_queue.push(('file', last_output_filename))
 
 
867
 
868
  # 返回错误信息
869
  error_msg = f"处理过程中出现错误: {e}"
 
870
  stream.output_queue.push(('error', error_msg))
871
 
872
  # 确保总是返回end信号
 
873
  stream.output_queue.push(('end', None))
874
  return
875
 
@@ -1032,12 +1068,32 @@ else:
1032
 
1033
  def end_process():
1034
  """停止生成过程函数 - 通过在队列中推送'end'信号来中断生成"""
1035
- print("用户点击了停止按钮,发送停止信号...")
1036
  # 确保stream已初始化
1037
  if 'stream' in globals() and stream is not None:
1038
- stream.input_queue.push('end')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1039
  else:
1040
- print("警告: stream未初始化,无法发送停止信号")
1041
  return None
1042
 
1043
 
 
672
 
673
  try:
674
  # 首先检查是否有停止信号
675
+ print(f"【调试】回调函数: 步骤 {d['i']}, 检查是否有停止信号")
676
+ try:
677
+ queue_top = stream.input_queue.top()
678
+ print(f"【调试】回调函数: 队列顶部信号 = {queue_top}")
679
+
680
+ if queue_top == 'end':
681
+ print("【调试】回调函数: 检测到停止信号,准备中断...")
682
+ try:
683
+ stream.output_queue.push(('end', None))
684
+ print("【调试】回调函数: 成功向输出队列推送end信号")
685
+ except Exception as e:
686
+ print(f"【调试】回调函数: 向输出队列推送end信号失败: {e}")
687
+
688
+ print("【调试】回调函数: 即将抛出KeyboardInterrupt异常")
689
+ raise KeyboardInterrupt('用户主动结束任务')
690
+ except Exception as e:
691
+ print(f"【调试】回调函数: 检查队列顶部信号出错: {e}")
692
 
693
  preview = d['denoised']
694
  preview = vae_decode_fake(preview)
 
701
  hint = f'Sampling {current_step}/{steps}'
702
  desc = f'Total generated frames: {int(max(0, total_generated_latent_frames * 4 - 3))}, Video length: {max(0, (total_generated_latent_frames * 4 - 3) / 30) :.2f} seconds (FPS-30). The video is being extended now ...'
703
  stream.output_queue.push(('progress', (preview, desc, make_progress_bar_html(percentage, hint))))
704
+ except KeyboardInterrupt as e:
705
  # 捕获并重新抛出中断异常,确保它能传播到采样函数
706
+ print(f"【调试】回调函数: 捕获到KeyboardInterrupt: {e}")
707
+ print("【调试】回调函数: 重新抛出中断异常,确保传播到采样函数")
708
  raise
709
  except Exception as e:
710
+ print(f"【调试】回调函数中出错: {e}")
711
  # 不中断采样过程
712
+ print(f"【调试】回调函数: 步骤 {d['i']} 完成")
713
  return
714
 
715
  try:
 
717
  print(f"开始采样,设备: {device}, 数据类型: {transformer.dtype}, 使用TeaCache: {use_teacache and not cpu_fallback_mode}")
718
 
719
  try:
720
+ print("【调试】开始sample_hunyuan采样流程")
721
  generated_latents = sample_hunyuan(
722
  transformer=transformer,
723
  sampler='unipc',
 
749
  callback=callback,
750
  )
751
 
752
+ print(f"【调试】采样完成,用时: {time.time() - sampling_start_time:.2f}秒")
753
+ except KeyboardInterrupt as e:
754
  # 用户主动中断
755
+ print(f"【调试】捕获到KeyboardInterrupt: {e}")
756
+ print("【调试】用户主动中断采样过程,处理中断逻辑")
757
 
758
  # 如果已经有生成的视频,返回最后生成的视频
759
  if last_output_filename:
760
+ print(f"【调试】已有部分生成视频: {last_output_filename},返回此视频")
761
  stream.output_queue.push(('file', last_output_filename))
762
  error_msg = "用户中断生成过程,但已生成部分视频"
763
  else:
764
+ print("【调试】没有部分生成视频,返回中断消息")
765
  error_msg = "用户中断生成过程,未生成视频"
766
 
767
+ print(f"【调试】推送错误消息: {error_msg}")
768
  stream.output_queue.push(('error', error_msg))
769
+ print("【调试】推送end信号")
770
  stream.output_queue.push(('end', None))
771
+ print("【调试】中断处理完成,返回")
772
  return
773
  except Exception as e:
774
  print(f"采样过程中出错: {e}")
 
873
  if is_last_section:
874
  break
875
  except Exception as e:
876
+ print(f"【调试】处理过程中出现错误: {e}, 类型: {type(e)}")
877
+ print(f"【调试】错误详情:")
878
  traceback.print_exc()
879
+
880
+ # 检查是否是中断类型异常
881
+ if isinstance(e, KeyboardInterrupt):
882
+ print("【调试】捕获到外层KeyboardInterrupt异常")
883
 
884
  if not high_vram and not cpu_fallback_mode:
885
  try:
886
+ print("【调试】尝试卸载模型以释放资源")
887
  unload_complete_models(
888
  text_encoder, text_encoder_2, image_encoder, vae, transformer
889
  )
890
+ print("【调试】模型卸载成功")
891
+ except Exception as unload_error:
892
+ print(f"【调试】卸载模型时出错: {unload_error}")
893
  pass
894
 
895
  # 如果已经有生成的视频,返回最后生成的视频
896
  if last_output_filename:
897
+ print(f"【调试】外层异常处理: 返回已生成的部分视频 {last_output_filename}")
898
  stream.output_queue.push(('file', last_output_filename))
899
+ else:
900
+ print("【调试】外层异常处理: 未找到已生成的视频")
901
 
902
  # 返回错误信息
903
  error_msg = f"处理过程中出现错误: {e}"
904
+ print(f"【调试】外层异常处理: 推送错误信息: {error_msg}")
905
  stream.output_queue.push(('error', error_msg))
906
 
907
  # 确保总是返回end信号
908
+ print("【调试】工作函数结束,推送end信号")
909
  stream.output_queue.push(('end', None))
910
  return
911
 
 
1068
 
1069
  def end_process():
1070
  """停止生成过程函数 - 通过在队列中推送'end'信号来中断生成"""
1071
+ print("【调试】用户点击了停止按钮,发送停止信号...")
1072
  # 确保stream已初始化
1073
  if 'stream' in globals() and stream is not None:
1074
+ # 在推送前检查队列状态
1075
+ try:
1076
+ current_top = stream.input_queue.top()
1077
+ print(f"【调试】当前队列顶部信号: {current_top}")
1078
+ except Exception as e:
1079
+ print(f"【调试】检查队列状态出错: {e}")
1080
+
1081
+ # 推送end信号
1082
+ try:
1083
+ stream.input_queue.push('end')
1084
+ print("【调试】成功推送end信号到队列")
1085
+
1086
+ # 验证信号是否成功推送
1087
+ try:
1088
+ current_top_after = stream.input_queue.top()
1089
+ print(f"【调试】推送后队列顶部信号: {current_top_after}")
1090
+ except Exception as e:
1091
+ print(f"【调试】验证推送后队列状态出错: {e}")
1092
+
1093
+ except Exception as e:
1094
+ print(f"【调试】推送end信号到队列失败: {e}")
1095
  else:
1096
+ print("【调试】警告: stream未初始化,无法发送停止信号")
1097
  return None
1098
 
1099
 
diffusers_helper/thread_utils.py CHANGED
@@ -44,28 +44,42 @@ class FIFOQueue:
44
  def __init__(self):
45
  self.queue = []
46
  self.lock = Lock()
 
47
 
48
  def push(self, item):
 
49
  with self.lock:
50
  self.queue.append(item)
 
51
 
52
  def pop(self):
 
53
  with self.lock:
54
  if self.queue:
55
- return self.queue.pop(0)
 
 
 
56
  return None
57
 
58
  def top(self):
 
59
  with self.lock:
60
  if self.queue:
61
- return self.queue[0]
 
 
 
62
  return None
63
 
64
  def next(self):
 
65
  while True:
66
  with self.lock:
67
  if self.queue:
68
- return self.queue.pop(0)
 
 
69
 
70
  time.sleep(0.001)
71
 
@@ -74,3 +88,36 @@ class AsyncStream:
74
  def __init__(self):
75
  self.input_queue = FIFOQueue()
76
  self.output_queue = FIFOQueue()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
  def __init__(self):
45
  self.queue = []
46
  self.lock = Lock()
47
+ print("【调试】创建新的FIFOQueue")
48
 
49
  def push(self, item):
50
+ print(f"【调试】FIFOQueue.push: 准备添加项目: {item}")
51
  with self.lock:
52
  self.queue.append(item)
53
+ print(f"【调试】FIFOQueue.push: 成功添加项目: {item}, 当前队列长度: {len(self.queue)}")
54
 
55
  def pop(self):
56
+ print("【调试】FIFOQueue.pop: 准备弹出队列首项")
57
  with self.lock:
58
  if self.queue:
59
+ item = self.queue.pop(0)
60
+ print(f"【调试】FIFOQueue.pop: 成功弹出项目: {item}, 剩余队列长度: {len(self.queue)}")
61
+ return item
62
+ print("【调试】FIFOQueue.pop: 队列为空,返回None")
63
  return None
64
 
65
  def top(self):
66
+ print("【调试】FIFOQueue.top: 准备查看队列首项")
67
  with self.lock:
68
  if self.queue:
69
+ item = self.queue[0]
70
+ print(f"【调试】FIFOQueue.top: 队列首项为: {item}, 当前队列长度: {len(self.queue)}")
71
+ return item
72
+ print("【调试】FIFOQueue.top: 队列为空,返回None")
73
  return None
74
 
75
  def next(self):
76
+ print("【调试】FIFOQueue.next: 等待弹出队列首项")
77
  while True:
78
  with self.lock:
79
  if self.queue:
80
+ item = self.queue.pop(0)
81
+ print(f"【调试】FIFOQueue.next: 成功弹出项目: {item}, 剩余队列长度: {len(self.queue)}")
82
+ return item
83
 
84
  time.sleep(0.001)
85
 
 
88
  def __init__(self):
89
  self.input_queue = FIFOQueue()
90
  self.output_queue = FIFOQueue()
91
+
92
+
93
+ class InterruptibleStreamData:
94
+ def __init__(self):
95
+ self.input_queue = FIFOQueue()
96
+ self.output_queue = FIFOQueue()
97
+ print("【调试】创建新的InterruptibleStreamData,初始化输入输出队列")
98
+
99
+ # 推送数据至输出队列
100
+ def push_output(self, item):
101
+ print(f"【调试】InterruptibleStreamData.push_output: 准备推送输出: {type(item)}")
102
+ self.output_queue.push(item)
103
+ print(f"【调试】InterruptibleStreamData.push_output: 成功推送输出")
104
+
105
+ # 获取下一个输出数据
106
+ def get_output(self):
107
+ print("【调试】InterruptibleStreamData.get_output: 准备获取下一个输出数据")
108
+ item = self.output_queue.next()
109
+ print(f"【调试】InterruptibleStreamData.get_output: 获取到输出数据: {type(item)}")
110
+ return item
111
+
112
+ # 推送数据至输入队列
113
+ def push_input(self, item):
114
+ print(f"【调试】InterruptibleStreamData.push_input: 准备推送输入: {type(item)}")
115
+ self.input_queue.push(item)
116
+ print(f"【调试】InterruptibleStreamData.push_input: 成功推送输入")
117
+
118
+ # 获取下一个输入数据
119
+ def get_input(self):
120
+ print("【调试】InterruptibleStreamData.get_input: 准备获取下一个输入数据")
121
+ item = self.input_queue.next()
122
+ print(f"【调试】InterruptibleStreamData.get_input: 获取到输入数据: {type(item)}")
123
+ return item