import nest_asyncio import asyncio import aiofiles from amazon_transcribe.client import TranscribeStreamingClient from amazon_transcribe.handlers import TranscriptResultStreamHandler from amazon_transcribe.model import TranscriptEvent import logging # Enable support for nested asyncio event loops nest_asyncio.apply() # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('transcription_system') class TranscriptHandler(TranscriptResultStreamHandler): """Handler class for processing Amazon Transcribe events""" def __init__(self, output_stream, output_file_path="./transcribe_texts"): super().__init__(output_stream) self.results = [] self.processing_complete = False self.output_file_path = output_file_path # Initialize the output file at the start with open(self.output_file_path, 'w', encoding="utf-8") as f: f.write("") async def handle_transcript_event(self, transcript_event: TranscriptEvent): results = transcript_event.transcript.results for result in results: if not result.is_partial and result.channel_id == 'ch_1': for alt in result.alternatives: start_time = None end_time = None current_speaker = None utterance = [] for item in alt.items: if start_time is None: start_time = item.start_time if item.speaker is not None: if current_speaker is None: current_speaker = item.speaker if current_speaker != item.speaker: transcript = f"Speaker {current_speaker}: {''.join(utterance).strip()}" print(f"\n{transcript}") self.results.append(transcript) self._append_to_file(transcript) current_speaker = item.speaker start_time = item.start_time utterance = [] if item.item_type == 'pronunciation' and utterance: utterance.append(' ') utterance.append(item.content) end_time = item.end_time # Output the last utterance if utterance: transcript = f"Speaker {current_speaker}: {''.join(utterance).strip()}" print(f"\n{transcript}") self.results.append(transcript) self._append_to_file(transcript) def _append_to_file(self, transcript): """Append STT script to file""" try: with open(self.output_file_path, 'a', encoding='utf-8') as f: f.write(transcript + "\n") except Exception as e: logger.error(f"Error occurred while writing to file: {str(e)}") def set_complete(self): """Indicate that transcription processing is complete""" self.processing_complete = True try: with open(self.output_file_path, 'a', encoding="utf-8") as f: f.write("\n----STT work complete---\n") except Exception as e: logger.error(f"Error occurred while writing completion marker: {str(e)}") async def process_audio_file(file_path, region="ap-northeast-2", sample_rate=32000, language=None, content_type=None): """Asynchronous function to process audio files and generate transcripts""" logger.info(f"Starting transcription for file '{file_path}'") if language is None: if content_type == "Bundesliga Fan Experience" or "bundesliga" in file_path.lower(): language = "en-US" logger.info("English content detected: changing language setting to 'en-us'") else: language = "ko-KR" logger.info("Default language setting: 'ko-KR'") client = TranscribeStreamingClient(region=region) stream = await client.start_stream_transcription( language_code=language, media_sample_rate_hz=sample_rate, media_encoding="pcm", enable_partial_results_stabilization=True, partial_results_stability="high", show_speaker_label=True, enable_channel_identification=True, number_of_channels=2 ) handler = TranscriptHandler(stream.output_stream) async def write_chunks(): try: async with aiofiles.open(file_path, 'rb') as afp: # Skip WAV header await afp.seek(44) while True: chunk = await afp.read(1024*16) if not chunk: break await stream.input_stream.send_audio_event(audio_chunk=chunk) await asyncio.sleep(0.125) await stream.input_stream.end_stream() except Exception as e: logger.error(f"Error occurred while writing chunks: {str(e)}") await asyncio.gather(write_chunks(), handler.handle_events()) handler.set_complete() logger.info(f"Transcription complete: {len(handler.results)} utterance segments processed") return handler def run_transcription(file_path, content_type=None): """Synchronous wrapper function to run in ThreadPoolExecutor""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: handler = loop.run_until_complete(process_audio_file(file_path, content_type=content_type)) return handler # Return the handler object itself finally: loop.close()