import concurrent.futures import os import time import logging import threading from realtime_video_analysis import run_transcription from analyze_claude import analyze_with_claude from google_search import grounding_with_google_search # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('backend_system') analysis_results = [] search_results = [] def periodic_claude_analysis(party, output_file_path="./transcribe_texts"): """ Function to read and analyze transcripts from a file - Wait 30 seconds before the first analysis - Analyze at 10-second intervals thereafter """ logger.info("Starting Claude analysis task") analysis_count = 0 while not os.path.exists(output_file_path): logger.info("Waiting for transcription file to be created...") time.sleep(2) logger.info("Waiting 30 seconds for initial transcription collection...") time.sleep(30) logger.info("Wait complete, starting analysis") while True: try: if not os.path.exists(output_file_path): logger.warning("Transcription file is missing. Waiting...") time.sleep(5) continue with open(output_file_path, "r", encoding="utf-8") as f: current_content = f.read() if current_content.strip(): analysis_count += 1 logger.info(f"Starting analysis #{analysis_count}: Read content from file") try: analysis_result = analyze_with_claude(current_content, party) print("\n" + "="*50) print(f"Analysis result #{analysis_count} - {time.strftime('%Y-%m-%d %H:%M:%S')}") print("="*50) print(analysis_result) print("="*50 + "\n") analysis_results.append(analysis_result) except Exception as e: logger.error(f"Error occurred during Claude summarization: {str(e)}") else: logger.info("No content in file. Waiting...") if "----STT work complete---" in current_content: break except Exception as e: logger.error(f"Error occurred while reading file: {str(e)}") time.sleep(10) logger.info("Claude analysis task complete") def periodic_google_search(party, output_file_path="./transcribe_texts"): """ Function to read entire transcript from a file and perform keyword extraction and search with Gemini - Wait 30 seconds before the first search - Search at 10-second intervals thereafter """ logger.info("Starting Google search task") search_count = 0 # Wait until the file is created while not os.path.exists(output_file_path): logger.info("Waiting for transcription file...") time.sleep(2) # Initial 30-second wait logger.info("Waiting 30 seconds for initial transcription collection...") time.sleep(30) logger.info("Wait complete, starting Google search") # If the file exists, read and search periodically while True: try: # Check if the file exists if not os.path.exists(output_file_path): logger.warning("Transcription file is missing. Waiting...") time.sleep(5) continue # Read entire file content with open(output_file_path, 'r', encoding='utf-8') as f: content = f.read() all_lines = content.splitlines() # Use only the last 5 lines of the entire file content for google_search last_lines = all_lines[-5:] if len(all_lines) >= 5 else all_lines current_content = "".join(last_lines).strip() # Log content (for debugging) logger.info(f"Starting Google search #{search_count}: Analyzing last 5 lines in STT file") # If there is content, perform search if current_content: search_count += 1 logger.info(f"Google Search #{search_count} Start: Analyzing last 5 lines in STT file") try: # Keyword extraction and search with Gemini search_result = grounding_with_google_search(current_content, party) # Output search results print("\n" + "="*50) print(f"Google Search Result #{search_count} - {time.strftime('%Y-%m-%d %H:%M:%S')}") print("="*50) print(search_result) print("="*50 + "\n") search_results.append(search_result) except Exception as e: logger.error(f"Error occurred during Google search: {str(e)}") else: logger.info("No content in file. Waiting...") # Check if the completion marker is present if "----STT work complete---" in current_content: logger.info("Completion marker detected. Google search complete.") break except Exception as e: logger.error(f"Error occurred while reading file: {str(e)}") # Wait 10 seconds time.sleep(10) logger.info("Google search task complete") def main(party=None): """Main function - run parallel tasks""" # Select audio file based on the button clicked if party == "더불어민주당": audio_file = None elif party == "Agents for Amazon Bedrock": audio_file = './data/summit_sungwoo.wav' elif party == "Bundesliga Fan Experience": audio_file = './data/aws_bundesliga.wav' elif party == "AWS_2024_recap": audio_file = './data/aws.wav' else: # Default or "국민의힘" audio_file = None party = "국민의힘" output_file_path = './transcribe_texts' logger.info("Backend system started") # Run parallel tasks using ThreadPoolExecutor with concurrent.futures.ThreadPoolExecutor() as executor: # Submit two tasks simultaneously task1 = executor.submit(run_transcription, audio_file, party) task2 = executor.submit(periodic_claude_analysis, party, output_file_path) task3 = executor.submit(periodic_google_search, party, output_file_path) # Wait for both tasks to complete task1.result() task2.result() task3.result() logger.info("All tasks complete") return "Analysis complete" if __name__ == "__main__": results = main()