File size: 6,057 Bytes
920dfd0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import nest_asyncio
import asyncio
import aiofiles
from amazon_transcribe.client import TranscribeStreamingClient
from amazon_transcribe.handlers import TranscriptResultStreamHandler
from amazon_transcribe.model import TranscriptEvent
import logging
# Enable support for nested asyncio event loops
nest_asyncio.apply()
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('transcription_system')
class TranscriptHandler(TranscriptResultStreamHandler):
"""Handler class for processing Amazon Transcribe events"""
def __init__(self, output_stream, output_file_path="./transcribe_texts"):
super().__init__(output_stream)
self.results = []
self.processing_complete = False
self.output_file_path = output_file_path
# Initialize the output file at the start
with open(self.output_file_path, 'w', encoding="utf-8") as f:
f.write("")
async def handle_transcript_event(self, transcript_event: TranscriptEvent):
results = transcript_event.transcript.results
for result in results:
if not result.is_partial and result.channel_id == 'ch_1':
for alt in result.alternatives:
start_time = None
end_time = None
current_speaker = None
utterance = []
for item in alt.items:
if start_time is None:
start_time = item.start_time
if item.speaker is not None:
if current_speaker is None:
current_speaker = item.speaker
if current_speaker != item.speaker:
transcript = f"Speaker {current_speaker}: {''.join(utterance).strip()}"
print(f"\n{transcript}")
self.results.append(transcript)
self._append_to_file(transcript)
current_speaker = item.speaker
start_time = item.start_time
utterance = []
if item.item_type == 'pronunciation' and utterance:
utterance.append(' ')
utterance.append(item.content)
end_time = item.end_time
# Output the last utterance
if utterance:
transcript = f"Speaker {current_speaker}: {''.join(utterance).strip()}"
print(f"\n{transcript}")
self.results.append(transcript)
self._append_to_file(transcript)
def _append_to_file(self, transcript):
"""Append STT script to file"""
try:
with open(self.output_file_path, 'a', encoding='utf-8') as f:
f.write(transcript + "\n")
except Exception as e:
logger.error(f"Error occurred while writing to file: {str(e)}")
def set_complete(self):
"""Indicate that transcription processing is complete"""
self.processing_complete = True
try:
with open(self.output_file_path, 'a', encoding="utf-8") as f:
f.write("\n----STT work complete---\n")
except Exception as e:
logger.error(f"Error occurred while writing completion marker: {str(e)}")
async def process_audio_file(file_path, region="ap-northeast-2", sample_rate=32000, language=None, content_type=None):
"""Asynchronous function to process audio files and generate transcripts"""
logger.info(f"Starting transcription for file '{file_path}'")
if language is None:
if content_type == "Bundesliga Fan Experience" or "bundesliga" in file_path.lower():
language = "en-US"
logger.info("English content detected: changing language setting to 'en-us'")
else:
language = "ko-KR"
logger.info("Default language setting: 'ko-KR'")
client = TranscribeStreamingClient(region=region)
stream = await client.start_stream_transcription(
language_code=language,
media_sample_rate_hz=sample_rate,
media_encoding="pcm",
enable_partial_results_stabilization=True,
partial_results_stability="high",
show_speaker_label=True,
enable_channel_identification=True,
number_of_channels=2
)
handler = TranscriptHandler(stream.output_stream)
async def write_chunks():
try:
async with aiofiles.open(file_path, 'rb') as afp:
# Skip WAV header
await afp.seek(44)
while True:
chunk = await afp.read(1024*16)
if not chunk:
break
await stream.input_stream.send_audio_event(audio_chunk=chunk)
await asyncio.sleep(0.125)
await stream.input_stream.end_stream()
except Exception as e:
logger.error(f"Error occurred while writing chunks: {str(e)}")
await asyncio.gather(write_chunks(), handler.handle_events())
handler.set_complete()
logger.info(f"Transcription complete: {len(handler.results)} utterance segments processed")
return handler
def run_transcription(file_path, content_type=None):
"""Synchronous wrapper function to run in ThreadPoolExecutor"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
handler = loop.run_until_complete(process_audio_file(file_path, content_type=content_type))
return handler # Return the handler object itself
finally:
loop.close() |