|
import concurrent.futures |
|
import os |
|
import time |
|
import logging |
|
import threading |
|
from realtime_video_analysis import run_transcription |
|
from analyze_claude import analyze_with_claude |
|
from google_search import grounding_with_google_search |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger('backend_system') |
|
|
|
analysis_results = [] |
|
search_results = [] |
|
|
|
def periodic_claude_analysis(party, output_file_path="./transcribe_texts"): |
|
""" |
|
Function to read and analyze transcripts from a file |
|
- Wait 30 seconds before the first analysis |
|
- Analyze at 10-second intervals thereafter |
|
""" |
|
logger.info("Starting Claude analysis task") |
|
analysis_count = 0 |
|
|
|
while not os.path.exists(output_file_path): |
|
logger.info("Waiting for transcription file to be created...") |
|
time.sleep(2) |
|
|
|
logger.info("Waiting 30 seconds for initial transcription collection...") |
|
time.sleep(30) |
|
logger.info("Wait complete, starting analysis") |
|
|
|
while True: |
|
try: |
|
if not os.path.exists(output_file_path): |
|
logger.warning("Transcription file is missing. Waiting...") |
|
time.sleep(5) |
|
continue |
|
|
|
with open(output_file_path, "r", encoding="utf-8") as f: |
|
current_content = f.read() |
|
|
|
if current_content.strip(): |
|
analysis_count += 1 |
|
logger.info(f"Starting analysis #{analysis_count}: Read content from file") |
|
|
|
try: |
|
analysis_result = analyze_with_claude(current_content, party) |
|
print("\n" + "="*50) |
|
print(f"Analysis result #{analysis_count} - {time.strftime('%Y-%m-%d %H:%M:%S')}") |
|
print("="*50) |
|
print(analysis_result) |
|
print("="*50 + "\n") |
|
analysis_results.append(analysis_result) |
|
|
|
except Exception as e: |
|
logger.error(f"Error occurred during Claude summarization: {str(e)}") |
|
|
|
else: |
|
logger.info("No content in file. Waiting...") |
|
|
|
if "----STT work complete---" in current_content: |
|
break |
|
|
|
except Exception as e: |
|
logger.error(f"Error occurred while reading file: {str(e)}") |
|
|
|
time.sleep(10) |
|
|
|
logger.info("Claude analysis task complete") |
|
|
|
def periodic_google_search(party, output_file_path="./transcribe_texts"): |
|
""" |
|
Function to read entire transcript from a file and perform keyword extraction and search with Gemini |
|
- Wait 30 seconds before the first search |
|
- Search at 10-second intervals thereafter |
|
""" |
|
logger.info("Starting Google search task") |
|
search_count = 0 |
|
|
|
|
|
while not os.path.exists(output_file_path): |
|
logger.info("Waiting for transcription file...") |
|
time.sleep(2) |
|
|
|
|
|
logger.info("Waiting 30 seconds for initial transcription collection...") |
|
time.sleep(30) |
|
logger.info("Wait complete, starting Google search") |
|
|
|
|
|
while True: |
|
try: |
|
|
|
if not os.path.exists(output_file_path): |
|
logger.warning("Transcription file is missing. Waiting...") |
|
time.sleep(5) |
|
continue |
|
|
|
|
|
with open(output_file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
all_lines = content.splitlines() |
|
|
|
|
|
last_lines = all_lines[-5:] if len(all_lines) >= 5 else all_lines |
|
current_content = "".join(last_lines).strip() |
|
|
|
|
|
logger.info(f"Starting Google search #{search_count}: Analyzing last 5 lines in STT file") |
|
|
|
|
|
if current_content: |
|
search_count += 1 |
|
logger.info(f"Google Search #{search_count} Start: Analyzing last 5 lines in STT file") |
|
|
|
try: |
|
|
|
search_result = grounding_with_google_search(current_content, party) |
|
|
|
|
|
print("\n" + "="*50) |
|
print(f"Google Search Result #{search_count} - {time.strftime('%Y-%m-%d %H:%M:%S')}") |
|
print("="*50) |
|
print(search_result) |
|
print("="*50 + "\n") |
|
search_results.append(search_result) |
|
|
|
except Exception as e: |
|
logger.error(f"Error occurred during Google search: {str(e)}") |
|
else: |
|
logger.info("No content in file. Waiting...") |
|
|
|
|
|
if "----STT work complete---" in current_content: |
|
logger.info("Completion marker detected. Google search complete.") |
|
break |
|
|
|
except Exception as e: |
|
logger.error(f"Error occurred while reading file: {str(e)}") |
|
|
|
|
|
time.sleep(10) |
|
|
|
logger.info("Google search task complete") |
|
|
|
def main(party=None): |
|
"""Main function - run parallel tasks""" |
|
|
|
|
|
if party == "더불어민주당": |
|
audio_file = None |
|
elif party == "Agents for Amazon Bedrock": |
|
audio_file = './data/summit_sungwoo.wav' |
|
elif party == "Bundesliga Fan Experience": |
|
audio_file = './data/aws_bundesliga.wav' |
|
elif party == "AWS_2024_recap": |
|
audio_file = './data/aws.wav' |
|
else: |
|
audio_file = None |
|
party = "국민의힘" |
|
|
|
output_file_path = './transcribe_texts' |
|
|
|
logger.info("Backend system started") |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
|
|
task1 = executor.submit(run_transcription, audio_file, party) |
|
task2 = executor.submit(periodic_claude_analysis, party, output_file_path) |
|
task3 = executor.submit(periodic_google_search, party, output_file_path) |
|
|
|
|
|
task1.result() |
|
task2.result() |
|
task3.result() |
|
|
|
logger.info("All tasks complete") |
|
return "Analysis complete" |
|
|
|
if __name__ == "__main__": |
|
results = main() |