|
import nest_asyncio |
|
import asyncio |
|
import aiofiles |
|
from amazon_transcribe.client import TranscribeStreamingClient |
|
from amazon_transcribe.handlers import TranscriptResultStreamHandler |
|
from amazon_transcribe.model import TranscriptEvent |
|
import logging |
|
|
|
|
|
nest_asyncio.apply() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger('transcription_system') |
|
|
|
class TranscriptHandler(TranscriptResultStreamHandler): |
|
"""Handler class for processing Amazon Transcribe events""" |
|
|
|
def __init__(self, output_stream, output_file_path="./transcribe_texts"): |
|
super().__init__(output_stream) |
|
self.results = [] |
|
self.processing_complete = False |
|
self.output_file_path = output_file_path |
|
|
|
|
|
with open(self.output_file_path, 'w', encoding="utf-8") as f: |
|
f.write("") |
|
|
|
async def handle_transcript_event(self, transcript_event: TranscriptEvent): |
|
results = transcript_event.transcript.results |
|
for result in results: |
|
if not result.is_partial and result.channel_id == 'ch_1': |
|
for alt in result.alternatives: |
|
start_time = None |
|
end_time = None |
|
current_speaker = None |
|
utterance = [] |
|
|
|
for item in alt.items: |
|
if start_time is None: |
|
start_time = item.start_time |
|
|
|
if item.speaker is not None: |
|
if current_speaker is None: |
|
current_speaker = item.speaker |
|
|
|
if current_speaker != item.speaker: |
|
transcript = f"Speaker {current_speaker}: {''.join(utterance).strip()}" |
|
print(f"\n{transcript}") |
|
self.results.append(transcript) |
|
|
|
self._append_to_file(transcript) |
|
|
|
current_speaker = item.speaker |
|
start_time = item.start_time |
|
utterance = [] |
|
|
|
if item.item_type == 'pronunciation' and utterance: |
|
utterance.append(' ') |
|
utterance.append(item.content) |
|
end_time = item.end_time |
|
|
|
|
|
if utterance: |
|
transcript = f"Speaker {current_speaker}: {''.join(utterance).strip()}" |
|
print(f"\n{transcript}") |
|
self.results.append(transcript) |
|
self._append_to_file(transcript) |
|
|
|
def _append_to_file(self, transcript): |
|
"""Append STT script to file""" |
|
try: |
|
with open(self.output_file_path, 'a', encoding='utf-8') as f: |
|
f.write(transcript + "\n") |
|
except Exception as e: |
|
logger.error(f"Error occurred while writing to file: {str(e)}") |
|
|
|
def set_complete(self): |
|
"""Indicate that transcription processing is complete""" |
|
self.processing_complete = True |
|
|
|
try: |
|
with open(self.output_file_path, 'a', encoding="utf-8") as f: |
|
f.write("\n----STT work complete---\n") |
|
except Exception as e: |
|
logger.error(f"Error occurred while writing completion marker: {str(e)}") |
|
|
|
async def process_audio_file(file_path, region="ap-northeast-2", sample_rate=32000, language=None, content_type=None): |
|
"""Asynchronous function to process audio files and generate transcripts""" |
|
logger.info(f"Starting transcription for file '{file_path}'") |
|
|
|
if language is None: |
|
if content_type == "Bundesliga Fan Experience" or "bundesliga" in file_path.lower(): |
|
language = "en-US" |
|
logger.info("English content detected: changing language setting to 'en-us'") |
|
else: |
|
language = "ko-KR" |
|
logger.info("Default language setting: 'ko-KR'") |
|
|
|
client = TranscribeStreamingClient(region=region) |
|
|
|
stream = await client.start_stream_transcription( |
|
language_code=language, |
|
media_sample_rate_hz=sample_rate, |
|
media_encoding="pcm", |
|
enable_partial_results_stabilization=True, |
|
partial_results_stability="high", |
|
show_speaker_label=True, |
|
enable_channel_identification=True, |
|
number_of_channels=2 |
|
) |
|
|
|
handler = TranscriptHandler(stream.output_stream) |
|
|
|
async def write_chunks(): |
|
try: |
|
async with aiofiles.open(file_path, 'rb') as afp: |
|
|
|
await afp.seek(44) |
|
|
|
while True: |
|
chunk = await afp.read(1024*16) |
|
if not chunk: |
|
break |
|
await stream.input_stream.send_audio_event(audio_chunk=chunk) |
|
await asyncio.sleep(0.125) |
|
|
|
await stream.input_stream.end_stream() |
|
except Exception as e: |
|
logger.error(f"Error occurred while writing chunks: {str(e)}") |
|
|
|
await asyncio.gather(write_chunks(), handler.handle_events()) |
|
|
|
handler.set_complete() |
|
logger.info(f"Transcription complete: {len(handler.results)} utterance segments processed") |
|
return handler |
|
|
|
def run_transcription(file_path, content_type=None): |
|
"""Synchronous wrapper function to run in ThreadPoolExecutor""" |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
try: |
|
handler = loop.run_until_complete(process_audio_file(file_path, content_type=content_type)) |
|
return handler |
|
finally: |
|
loop.close() |