daihui.zhang commited on
Commit
6f13b8c
·
1 Parent(s): 485d8e3

change to pipelines multiple processing

Browse files
config.py CHANGED
@@ -13,6 +13,7 @@ logging.getLogger("pywhispercpp").setLevel(logging.WARNING)
13
 
14
  BASE_DIR = pathlib.Path(__file__).parent
15
  MODEL_DIR = BASE_DIR / "moyoyo_asr_models"
 
16
  # 标点
17
  SENTENCE_END_MARKERS = ['.', '!', '?', '。', '!', '?', ';', ';', ':', ':']
18
  PAUSE_END_MARKERS = [',', ',', '、']
 
13
 
14
  BASE_DIR = pathlib.Path(__file__).parent
15
  MODEL_DIR = BASE_DIR / "moyoyo_asr_models"
16
+ ASSERT_DIR = BASE_DIR / "assets"
17
  # 标点
18
  SENTENCE_END_MARKERS = ['.', '!', '?', '。', '!', '?', ';', ';', ':', ':']
19
  PAUSE_END_MARKERS = [',', ',', '、']
transcribe/pipelines/base.py CHANGED
@@ -1,6 +1,7 @@
1
 
2
  from dataclasses import dataclass, field
3
- from multiprocessing import Process
 
4
 
5
  @dataclass
6
  class Segment:
@@ -19,10 +20,20 @@ class MetaItem:
19
 
20
 
21
  class BasePipe(Process):
22
- def __init__(self, in_queue, out_queue) -> None:
23
  super().__init__() # Initialize the Process class
24
- self._in_queue = in_queue
25
- self._out_queue = out_queue
 
 
 
 
 
 
 
 
 
 
26
 
27
  @property
28
  def output_queue(self):
@@ -42,9 +53,10 @@ class BasePipe(Process):
42
 
43
  def run(self):
44
  self.init()
 
45
  while True:
46
- item = self._in_queue.get()
47
  if item is None: # Check for termination signal
48
  break
49
  out_item = self.process(item)
50
- self._out_queue.put(out_item)
 
1
 
2
  from dataclasses import dataclass, field
3
+ from multiprocessing import Process, Queue
4
+ from multiprocessing import Event
5
 
6
  @dataclass
7
  class Segment:
 
20
 
21
 
22
  class BasePipe(Process):
23
+ def __init__(self, in_queue=None, out_queue=None) -> None:
24
  super().__init__() # Initialize the Process class
25
+ self._in_queue = in_queue if in_queue else Queue()
26
+ self._out_queue = out_queue if out_queue else Queue()
27
+ self._ready = Event()
28
+
29
+ def set_ready(self):
30
+ self._ready.set()
31
+
32
+ def is_ready(self):
33
+ return self._ready.is_set()
34
+
35
+ def wait(self):
36
+ self._ready.wait()
37
 
38
  @property
39
  def output_queue(self):
 
53
 
54
  def run(self):
55
  self.init()
56
+ self.set_ready()
57
  while True:
58
+ item = self.input_queue.get()
59
  if item is None: # Check for termination signal
60
  break
61
  out_item = self.process(item)
62
+ self.output_queue.put(out_item)
transcribe/pipelines/pipe_translate.py CHANGED
@@ -4,6 +4,7 @@ from llama_cpp import Llama
4
  from ..translator import QwenTranslator
5
  from config import LLM_MODEL_PATH, LLM_SYS_PROMPT
6
 
 
7
  class TranslatePipe(BasePipe):
8
  translator = None
9
 
 
4
  from ..translator import QwenTranslator
5
  from config import LLM_MODEL_PATH, LLM_SYS_PROMPT
6
 
7
+
8
  class TranslatePipe(BasePipe):
9
  translator = None
10
 
transcribe/pipelines/pipe_whisper.py CHANGED
@@ -7,7 +7,7 @@ from ..whisper import WhisperCPP
7
  class WhisperPipe(BasePipe):
8
  whisper = None
9
 
10
- def __init__(self, in_queue, out_queue) -> None:
11
  super().__init__(in_queue, out_queue)
12
 
13
 
 
7
  class WhisperPipe(BasePipe):
8
  whisper = None
9
 
10
+ def __init__(self, in_queue=None, out_queue=None) -> None:
11
  super().__init__(in_queue, out_queue)
12
 
13
 
transcribe/transcription.py CHANGED
@@ -176,8 +176,8 @@ class TranscriptionServer:
176
  frame_data = websocket.recv()
177
  if frame_data == b"END_OF_AUDIO":
178
  return False
179
- return np.frombuffer(frame_data, dtype=np.int16).astype(np.float32) / 32768.0
180
- # return np.frombuffer(frame_data, dtype=np.float32)
181
 
182
 
183
  def handle_new_connection(self, websocket):
 
176
  frame_data = websocket.recv()
177
  if frame_data == b"END_OF_AUDIO":
178
  return False
179
+ # return np.frombuffer(frame_data, dtype=np.int16).astype(np.float32) / 32768.0
180
+ return np.frombuffer(frame_data, dtype=np.float32)
181
 
182
 
183
  def handle_new_connection(self, websocket):
transcribe/translatepipes.py CHANGED
@@ -5,21 +5,16 @@ import config
5
  class TranslatePipes:
6
  def __init__(self) -> None:
7
 
8
- self.whisper_input_q = mp.Queue()
9
- self.translate_input_q = mp.Queue()
10
- self.result_queue = mp.Queue()
11
 
12
  # whisper 转录
13
- self._whisper_pipe = WhisperPipe(
14
- in_queue=self.whisper_input_q,
15
- out_queue=self.translate_input_q
16
- )
17
 
18
  # llm 翻译
19
- self._translate_pipe = TranslatePipe(
20
- in_queue=self.translate_input_q,
21
- out_queue=self.result_queue,
22
- )
23
 
24
  self._whisper_pipe.daemon = True
25
  self._whisper_pipe.start()
@@ -27,12 +22,15 @@ class TranslatePipes:
27
  self._translate_pipe.daemon = True
28
  self._translate_pipe.start()
29
 
 
 
 
30
 
31
  def translate(self, text, src_lang, dst_lang) -> MetaItem:
32
  item = MetaItem(
33
  transcribe_content=text,
34
- source_language=src_lang,
35
- destination_language=dst_lang)
36
  self._translate_pipe.input_queue.put(item)
37
  return self._translate_pipe.output_queue.get()
38
 
 
5
  class TranslatePipes:
6
  def __init__(self) -> None:
7
 
8
+ # self.whisper_input_q = mp.Queue()
9
+ # self.translate_input_q = mp.Queue()
10
+ # self.result_queue = mp.Queue()
11
 
12
  # whisper 转录
13
+ self._whisper_pipe = WhisperPipe()
 
 
 
14
 
15
  # llm 翻译
16
+ self._translate_pipe = TranslatePipe()
17
+
 
 
18
 
19
  self._whisper_pipe.daemon = True
20
  self._whisper_pipe.start()
 
22
  self._translate_pipe.daemon = True
23
  self._translate_pipe.start()
24
 
25
+ def wait_ready(self):
26
+ self._whisper_pipe.wait()
27
+ self._translate_pipe.wait()
28
 
29
  def translate(self, text, src_lang, dst_lang) -> MetaItem:
30
  item = MetaItem(
31
  transcribe_content=text,
32
+ source_language=src_lang,
33
+ destination_language=dst_lang)
34
  self._translate_pipe.input_queue.put(item)
35
  return self._translate_pipe.output_queue.get()
36
 
transcribe/translator.py CHANGED
@@ -8,12 +8,8 @@ class QwenTranslator:
8
  def __init__(self, model_path, system_prompt="") -> None:
9
  self.llm = Llama(
10
  model_path=model_path,
11
- # n_gpu_layers=-1, # Uncomment to use GPU acceleration
12
- # seed=1337, # Uncomment to set a specific seed
13
- # n_ctx=2048, # Uncomment to increase the context window
14
  chat_format="chatml",
15
- verbose=False
16
- )
17
  self.sys_prompt = system_prompt
18
 
19
  def to_message(self, prompt, src_lang, dst_lang):
@@ -26,9 +22,7 @@ class QwenTranslator:
26
 
27
  def translate(self, prompt, src_lang, dst_lang) -> str:
28
  message = self.to_message(prompt, src_lang, dst_lang)
29
- start_time = time.monotonic()
30
- output = self.llm.create_chat_completion(messages=message, temperature=0.9)
31
-
32
  return output['choices'][0]['message']['content']
33
 
34
  def __call__(self, prompt,*args, **kwargs):
 
8
  def __init__(self, model_path, system_prompt="") -> None:
9
  self.llm = Llama(
10
  model_path=model_path,
 
 
 
11
  chat_format="chatml",
12
+ verbose=False)
 
13
  self.sys_prompt = system_prompt
14
 
15
  def to_message(self, prompt, src_lang, dst_lang):
 
22
 
23
  def translate(self, prompt, src_lang, dst_lang) -> str:
24
  message = self.to_message(prompt, src_lang, dst_lang)
25
+ output = self.llm.create_chat_completion(messages=message, temperature=0)
 
 
26
  return output['choices'][0]['message']['content']
27
 
28
  def __call__(self, prompt,*args, **kwargs):
transcribe/whisper.py CHANGED
@@ -20,7 +20,7 @@ class WhisperCPP:
20
 
21
 
22
  def warmup(cls, warmup_steps=1):
23
- mel, _, = soundfile.read("assets/jfk.flac")
24
  for _ in range(warmup_steps):
25
  cls.model.transcribe(mel, print_progress=False)
26
 
@@ -35,8 +35,7 @@ class WhisperCPP:
35
  def transcribe(self, audio_buffer:bytes, language):
36
  max_len, prompt = self.config_language(language)
37
  audio_buffer = np.frombuffer(audio_buffer, dtype=np.float32)
38
- print("audio buffer got:", len(audio_buffer))
39
- output = self.model.transcribe(
40
  audio_buffer,
41
  initial_prompt=prompt,
42
  language=language,
 
20
 
21
 
22
  def warmup(cls, warmup_steps=1):
23
+ mel, _, = soundfile.read(f"{config.ASSERT_DIR}/jfk.flac")
24
  for _ in range(warmup_steps):
25
  cls.model.transcribe(mel, print_progress=False)
26
 
 
35
  def transcribe(self, audio_buffer:bytes, language):
36
  max_len, prompt = self.config_language(language)
37
  audio_buffer = np.frombuffer(audio_buffer, dtype=np.float32)
38
+ output = self.model.transcribe(
 
39
  audio_buffer,
40
  initial_prompt=prompt,
41
  language=language,
transcribe/whisper_llm_serve.py CHANGED
@@ -20,9 +20,15 @@ from queue import Queue
20
  from scipy.io.wavfile import write
21
  from api_model import TransResult, Message
22
  from .utils import log_block
 
23
 
24
  logger = getLogger("TranslatorApp")
25
 
 
 
 
 
 
26
  def save_to_wave(filename, data:np.ndarray, sample_rate=16000):
27
  write(filename, sample_rate, data)
28
 
@@ -30,9 +36,6 @@ class TripleTextBuffer:
30
  def __init__(self, size=2):
31
  self.history = collections.deque(maxlen=size)
32
 
33
- def _clean(self):
34
- self.history.clear()
35
-
36
  def add_entry(self, text, index):
37
  """
38
  text: 文本
@@ -56,7 +59,7 @@ class TripleTextBuffer:
56
  # print("比较: ", text1, text2," => ", sim_12)
57
  # sim_23 = self.text_similarity(text2, text3)
58
  if sim_12 >= similarity_threshold:
59
- self._clean()
60
  return idx2
61
  return None
62
 
@@ -112,71 +115,6 @@ class SegmentManager:
112
  self.commit_segment()
113
 
114
 
115
- class PywhisperInference:
116
- whisper_model = None
117
- llm_model = None
118
- # vad_model = None
119
-
120
- @classmethod
121
- def initializer(cls, event:mp.Event, warmup=True):
122
- models_dir = config.MODEL_DIR.as_posix()
123
- cls.whisper_model = Model(
124
- model=config.WHISPER_MODEL,
125
- models_dir=models_dir,
126
- print_realtime=False,
127
- print_progress=False,
128
- print_timestamps=False,
129
- )
130
- if warmup:
131
- cls.warmup()
132
-
133
- # init llamacpp
134
- cls.llm_model = QwenTranslator(config.LLM_MODEL_PATH, config.LLM_SYS_PROMPT)
135
- # cls.vad_model = VoiceActivityDetector()
136
- event.set()
137
-
138
-
139
- @classmethod
140
- def init(cls):
141
- pass
142
-
143
-
144
- @classmethod
145
- def warmup(cls, warmup_steps=1):
146
- mel, _, = soundfile.read("assets/jfk.flac")
147
- for _ in range(warmup_steps):
148
- cls.whisper_model.transcribe(mel, print_progress=False)
149
-
150
- @staticmethod
151
- def config_language(language):
152
- if language == "zh":
153
- return config.MAX_LENTH_ZH, config.WHISPER_PROMPT_ZH
154
- elif language == "en":
155
- return config.MAX_LENGTH_EN, config.WHISPER_PROMPT_EN
156
- raise ValueError(f"Unsupported language : {language}")
157
-
158
- @classmethod
159
- def transcribe(cls, audio_buffer, language):
160
- max_len, prompt = cls.config_language(language)
161
- audio_buffer = np.frombuffer(audio_buffer, dtype=np.float32)
162
- return cls.whisper_model.transcribe(
163
- audio_buffer,
164
- initial_prompt=prompt,
165
- language=language,
166
- token_timestamps=True,
167
- max_len=max_len
168
- )
169
-
170
- @classmethod
171
- def translate(cls, context: str, src_lang, dst_lang):
172
- return cls.llm_model.translate(context, src_lang, dst_lang)
173
-
174
- # @classmethod
175
- # def voice_detect(cls, audio_buffer):
176
- # audio_buffer = np.frombuffer(audio_buffer, dtype=np.float32)
177
- # return cls.vad_model(audio_buffer)
178
-
179
-
180
 
181
  class PyWhiperCppServe(ServeClientBase):
182
 
@@ -189,30 +127,23 @@ class PyWhiperCppServe(ServeClientBase):
189
  self._text_buffer = TripleTextBuffer()
190
  # 存储转录数据
191
  self._segment_manager = SegmentManager()
 
 
192
  self.lock = threading.Lock()
193
  self.frames_np = None
194
  self.sample_rate = 16000
195
  # self._audio_queue = Queue()
196
  # 进程初始化后再开始收音频
197
- self._ready_state = mp.Event()
198
- self._pool = PPool(
199
- max_workers=1,
200
- initializer=PywhisperInference.initializer,
201
- initargs=(self._ready_state,)
202
- )
203
- self._pool.submit(PywhisperInference.init)
204
  logger.info('Create a process to process audio.')
205
  self.send_ready_state()
206
- # self.load_frame_thread = threading.Thread(target=self.load_frame_from_queue)
207
- # self.load_frame_thread.daemon = True
208
- # self.load_frame_thread.start()
209
 
210
  self.trans_thread = threading.Thread(target=self.speech_to_text)
211
  self.trans_thread.daemon = True
212
  self.trans_thread.start()
213
 
214
  def send_ready_state(self):
215
- self._ready_state.wait()
216
  self.websocket.send(json.dumps({
217
  "uid": self.client_uid,
218
  "message": self.SERVER_READY,
@@ -223,20 +154,6 @@ class PyWhiperCppServe(ServeClientBase):
223
  self.language = src_lang
224
  self.dst_lang = dst_lang
225
 
226
- # def load_frame_from_queue(self):
227
- # while True:
228
- # frame_np = self._audio_queue.get()
229
- # fut = self._pool.submit(PywhisperInference.voice_detect, frame_np.tobytes())
230
- # output = fut.result()
231
- # logger.info(f"VAD: {output}")
232
- # if output == True:
233
- # with self.lock:
234
- # if self.frames_np is None:
235
- # self.frames_np = frame_np.copy()
236
- # else:
237
- # self.frames_np = np.append(self.frames_np,frame_np)
238
-
239
-
240
  def add_frames(self, frame_np):
241
  # self._audio_queue.put(frame_np)
242
  with self.lock:
@@ -261,10 +178,8 @@ class PyWhiperCppServe(ServeClientBase):
261
  log_block("Audio buffer length", f"{audio_buffer.shape[0]/self.sample_rate:.2f}", "s")
262
  start_time = time.perf_counter()
263
 
264
- transcribe_fut = self._pool.submit(
265
- PywhisperInference.transcribe, audio_buffer.tobytes(), self.language)
266
-
267
- segments = transcribe_fut.result()
268
  log_block("Whisper transcrible time", f"{(time.perf_counter() - start_time):.3f}", "s")
269
 
270
  return segments
@@ -275,12 +190,9 @@ class PyWhiperCppServe(ServeClientBase):
275
  # return "sample english"
276
  log_block("LLM translate input", f"{text}")
277
  start_time = time.perf_counter()
278
- translate_fut = self._pool.submit(
279
- PywhisperInference.translate, text, self.language, self.dst_lang)
280
-
281
- ret = translate_fut.result()
282
  log_block("LLM translate time", f"{(time.perf_counter() - start_time):.3f}", "s")
283
- return ret
284
 
285
  def _segments_split(self, segments, audio_buffer: np.ndarray):
286
  """根据左边第一个标点符号来将序列拆分成 观察段 和 剩余部分"""
@@ -327,23 +239,22 @@ class PyWhiperCppServe(ServeClientBase):
327
  return None, left_watch_string, right_watch_string, is_end_sentence
328
 
329
  def speech_to_text(self):
330
- # c = 0
331
  while True:
332
  if self.exit:
333
  logger.info("Exiting speech to text thread")
334
- self._pool.shutdown(wait=False, cancel_futures=True)
335
  break
336
 
337
  if self.frames_np is None:
338
  time.sleep(0.02) # wait for any audio to arrive
339
  continue
340
-
341
  audio_buffer = self.get_audio_chunk_for_processing()
342
  # c+= 1
343
  # name = f"dev-{c}.wav"
344
  # save_to_wave(name, audio_buffer)
345
  try:
346
- # logger.info(f"Audio buffer length: {len(audio_buffer) / self.sample_rate:.2f}s")
347
  segments = self.transcribe_audio(audio_buffer)
348
  for tran_result in self.handle_transcription_output(segments, audio_buffer):
349
  self.send_to_client(tran_result)
@@ -352,8 +263,6 @@ class PyWhiperCppServe(ServeClientBase):
352
  except Exception as e:
353
  logger.error(f"{e}")
354
 
355
-
356
-
357
  def handle_transcription_output(self, segments, audio_buffer):
358
  texts = "".join(i.text for i in segments)
359
  if not len(texts):
@@ -423,7 +332,4 @@ class PyWhiperCppServe(ServeClientBase):
423
  return padded_audio.copy()
424
 
425
  def cleanup(self):
426
- logger.info("start shut down worker pool.")
427
- self._pool.shutdown(wait=False, cancel_futures=True)
428
- logger.info("shut down worker pool success.")
429
  return super().cleanup()#
 
20
  from scipy.io.wavfile import write
21
  from api_model import TransResult, Message
22
  from .utils import log_block
23
+ from .translatepipes import TranslatePipes
24
 
25
  logger = getLogger("TranslatorApp")
26
 
27
+ translate_pipes = TranslatePipes()
28
+ translate_pipes.wait_ready()
29
+
30
+ logger.info("Pipeline is ready.")
31
+
32
  def save_to_wave(filename, data:np.ndarray, sample_rate=16000):
33
  write(filename, sample_rate, data)
34
 
 
36
  def __init__(self, size=2):
37
  self.history = collections.deque(maxlen=size)
38
 
 
 
 
39
  def add_entry(self, text, index):
40
  """
41
  text: 文本
 
59
  # print("比较: ", text1, text2," => ", sim_12)
60
  # sim_23 = self.text_similarity(text2, text3)
61
  if sim_12 >= similarity_threshold:
62
+ self.history.clear()
63
  return idx2
64
  return None
65
 
 
115
  self.commit_segment()
116
 
117
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
 
119
  class PyWhiperCppServe(ServeClientBase):
120
 
 
127
  self._text_buffer = TripleTextBuffer()
128
  # 存储转录数据
129
  self._segment_manager = SegmentManager()
130
+ self._ready_state = mp.Event()
131
+
132
  self.lock = threading.Lock()
133
  self.frames_np = None
134
  self.sample_rate = 16000
135
  # self._audio_queue = Queue()
136
  # 进程初始化后再开始收音频
137
+
 
 
 
 
 
 
138
  logger.info('Create a process to process audio.')
139
  self.send_ready_state()
 
 
 
140
 
141
  self.trans_thread = threading.Thread(target=self.speech_to_text)
142
  self.trans_thread.daemon = True
143
  self.trans_thread.start()
144
 
145
  def send_ready_state(self):
146
+ # self._ready_state.wait()
147
  self.websocket.send(json.dumps({
148
  "uid": self.client_uid,
149
  "message": self.SERVER_READY,
 
154
  self.language = src_lang
155
  self.dst_lang = dst_lang
156
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
157
  def add_frames(self, frame_np):
158
  # self._audio_queue.put(frame_np)
159
  with self.lock:
 
178
  log_block("Audio buffer length", f"{audio_buffer.shape[0]/self.sample_rate:.2f}", "s")
179
  start_time = time.perf_counter()
180
 
181
+ item = translate_pipes.transcrible(audio_buffer.tobytes(), self.language)
182
+ segments = item.segments
 
 
183
  log_block("Whisper transcrible time", f"{(time.perf_counter() - start_time):.3f}", "s")
184
 
185
  return segments
 
190
  # return "sample english"
191
  log_block("LLM translate input", f"{text}")
192
  start_time = time.perf_counter()
193
+ ret = translate_pipes.translate(text, self.language, self.dst_lang)
 
 
 
194
  log_block("LLM translate time", f"{(time.perf_counter() - start_time):.3f}", "s")
195
+ return ret.translate_content
196
 
197
  def _segments_split(self, segments, audio_buffer: np.ndarray):
198
  """根据左边第一个标点符号来将序列拆分成 观察段 和 剩余部分"""
 
239
  return None, left_watch_string, right_watch_string, is_end_sentence
240
 
241
  def speech_to_text(self):
242
+ c = 0
243
  while True:
244
  if self.exit:
245
  logger.info("Exiting speech to text thread")
 
246
  break
247
 
248
  if self.frames_np is None:
249
  time.sleep(0.02) # wait for any audio to arrive
250
  continue
251
+
252
  audio_buffer = self.get_audio_chunk_for_processing()
253
  # c+= 1
254
  # name = f"dev-{c}.wav"
255
  # save_to_wave(name, audio_buffer)
256
  try:
257
+ logger.info(f"Audio buffer length: {len(audio_buffer) / self.sample_rate:.2f}s")
258
  segments = self.transcribe_audio(audio_buffer)
259
  for tran_result in self.handle_transcription_output(segments, audio_buffer):
260
  self.send_to_client(tran_result)
 
263
  except Exception as e:
264
  logger.error(f"{e}")
265
 
 
 
266
  def handle_transcription_output(self, segments, audio_buffer):
267
  texts = "".join(i.text for i in segments)
268
  if not len(texts):
 
332
  return padded_audio.copy()
333
 
334
  def cleanup(self):
 
 
 
335
  return super().cleanup()#