Real-time-AI-Video-Summarization-Service / realtime_video_analysis.py
dyryu1208
commit
920dfd0
import nest_asyncio
import asyncio
import aiofiles
from amazon_transcribe.client import TranscribeStreamingClient
from amazon_transcribe.handlers import TranscriptResultStreamHandler
from amazon_transcribe.model import TranscriptEvent
import logging
# Enable support for nested asyncio event loops
nest_asyncio.apply()
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('transcription_system')
class TranscriptHandler(TranscriptResultStreamHandler):
"""Handler class for processing Amazon Transcribe events"""
def __init__(self, output_stream, output_file_path="./transcribe_texts"):
super().__init__(output_stream)
self.results = []
self.processing_complete = False
self.output_file_path = output_file_path
# Initialize the output file at the start
with open(self.output_file_path, 'w', encoding="utf-8") as f:
f.write("")
async def handle_transcript_event(self, transcript_event: TranscriptEvent):
results = transcript_event.transcript.results
for result in results:
if not result.is_partial and result.channel_id == 'ch_1':
for alt in result.alternatives:
start_time = None
end_time = None
current_speaker = None
utterance = []
for item in alt.items:
if start_time is None:
start_time = item.start_time
if item.speaker is not None:
if current_speaker is None:
current_speaker = item.speaker
if current_speaker != item.speaker:
transcript = f"Speaker {current_speaker}: {''.join(utterance).strip()}"
print(f"\n{transcript}")
self.results.append(transcript)
self._append_to_file(transcript)
current_speaker = item.speaker
start_time = item.start_time
utterance = []
if item.item_type == 'pronunciation' and utterance:
utterance.append(' ')
utterance.append(item.content)
end_time = item.end_time
# Output the last utterance
if utterance:
transcript = f"Speaker {current_speaker}: {''.join(utterance).strip()}"
print(f"\n{transcript}")
self.results.append(transcript)
self._append_to_file(transcript)
def _append_to_file(self, transcript):
"""Append STT script to file"""
try:
with open(self.output_file_path, 'a', encoding='utf-8') as f:
f.write(transcript + "\n")
except Exception as e:
logger.error(f"Error occurred while writing to file: {str(e)}")
def set_complete(self):
"""Indicate that transcription processing is complete"""
self.processing_complete = True
try:
with open(self.output_file_path, 'a', encoding="utf-8") as f:
f.write("\n----STT work complete---\n")
except Exception as e:
logger.error(f"Error occurred while writing completion marker: {str(e)}")
async def process_audio_file(file_path, region="ap-northeast-2", sample_rate=32000, language=None, content_type=None):
"""Asynchronous function to process audio files and generate transcripts"""
logger.info(f"Starting transcription for file '{file_path}'")
if language is None:
if content_type == "Bundesliga Fan Experience" or "bundesliga" in file_path.lower():
language = "en-US"
logger.info("English content detected: changing language setting to 'en-us'")
else:
language = "ko-KR"
logger.info("Default language setting: 'ko-KR'")
client = TranscribeStreamingClient(region=region)
stream = await client.start_stream_transcription(
language_code=language,
media_sample_rate_hz=sample_rate,
media_encoding="pcm",
enable_partial_results_stabilization=True,
partial_results_stability="high",
show_speaker_label=True,
enable_channel_identification=True,
number_of_channels=2
)
handler = TranscriptHandler(stream.output_stream)
async def write_chunks():
try:
async with aiofiles.open(file_path, 'rb') as afp:
# Skip WAV header
await afp.seek(44)
while True:
chunk = await afp.read(1024*16)
if not chunk:
break
await stream.input_stream.send_audio_event(audio_chunk=chunk)
await asyncio.sleep(0.125)
await stream.input_stream.end_stream()
except Exception as e:
logger.error(f"Error occurred while writing chunks: {str(e)}")
await asyncio.gather(write_chunks(), handler.handle_events())
handler.set_complete()
logger.info(f"Transcription complete: {len(handler.results)} utterance segments processed")
return handler
def run_transcription(file_path, content_type=None):
"""Synchronous wrapper function to run in ThreadPoolExecutor"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
handler = loop.run_until_complete(process_audio_file(file_path, content_type=content_type))
return handler # Return the handler object itself
finally:
loop.close()