dyryu1208
commit
920dfd0
import concurrent.futures
import os
import time
import logging
import threading
from realtime_video_analysis import run_transcription
from analyze_claude import analyze_with_claude
from google_search import grounding_with_google_search
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('backend_system')
analysis_results = []
search_results = []
def periodic_claude_analysis(party, output_file_path="./transcribe_texts"):
"""
Function to read and analyze transcripts from a file
- Wait 30 seconds before the first analysis
- Analyze at 10-second intervals thereafter
"""
logger.info("Starting Claude analysis task")
analysis_count = 0
while not os.path.exists(output_file_path):
logger.info("Waiting for transcription file to be created...")
time.sleep(2)
logger.info("Waiting 30 seconds for initial transcription collection...")
time.sleep(30)
logger.info("Wait complete, starting analysis")
while True:
try:
if not os.path.exists(output_file_path):
logger.warning("Transcription file is missing. Waiting...")
time.sleep(5)
continue
with open(output_file_path, "r", encoding="utf-8") as f:
current_content = f.read()
if current_content.strip():
analysis_count += 1
logger.info(f"Starting analysis #{analysis_count}: Read content from file")
try:
analysis_result = analyze_with_claude(current_content, party)
print("\n" + "="*50)
print(f"Analysis result #{analysis_count} - {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*50)
print(analysis_result)
print("="*50 + "\n")
analysis_results.append(analysis_result)
except Exception as e:
logger.error(f"Error occurred during Claude summarization: {str(e)}")
else:
logger.info("No content in file. Waiting...")
if "----STT work complete---" in current_content:
break
except Exception as e:
logger.error(f"Error occurred while reading file: {str(e)}")
time.sleep(10)
logger.info("Claude analysis task complete")
def periodic_google_search(party, output_file_path="./transcribe_texts"):
"""
Function to read entire transcript from a file and perform keyword extraction and search with Gemini
- Wait 30 seconds before the first search
- Search at 10-second intervals thereafter
"""
logger.info("Starting Google search task")
search_count = 0
# Wait until the file is created
while not os.path.exists(output_file_path):
logger.info("Waiting for transcription file...")
time.sleep(2)
# Initial 30-second wait
logger.info("Waiting 30 seconds for initial transcription collection...")
time.sleep(30)
logger.info("Wait complete, starting Google search")
# If the file exists, read and search periodically
while True:
try:
# Check if the file exists
if not os.path.exists(output_file_path):
logger.warning("Transcription file is missing. Waiting...")
time.sleep(5)
continue
# Read entire file content
with open(output_file_path, 'r', encoding='utf-8') as f:
content = f.read()
all_lines = content.splitlines()
# Use only the last 5 lines of the entire file content for google_search
last_lines = all_lines[-5:] if len(all_lines) >= 5 else all_lines
current_content = "".join(last_lines).strip()
# Log content (for debugging)
logger.info(f"Starting Google search #{search_count}: Analyzing last 5 lines in STT file")
# If there is content, perform search
if current_content:
search_count += 1
logger.info(f"Google Search #{search_count} Start: Analyzing last 5 lines in STT file")
try:
# Keyword extraction and search with Gemini
search_result = grounding_with_google_search(current_content, party)
# Output search results
print("\n" + "="*50)
print(f"Google Search Result #{search_count} - {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*50)
print(search_result)
print("="*50 + "\n")
search_results.append(search_result)
except Exception as e:
logger.error(f"Error occurred during Google search: {str(e)}")
else:
logger.info("No content in file. Waiting...")
# Check if the completion marker is present
if "----STT work complete---" in current_content:
logger.info("Completion marker detected. Google search complete.")
break
except Exception as e:
logger.error(f"Error occurred while reading file: {str(e)}")
# Wait 10 seconds
time.sleep(10)
logger.info("Google search task complete")
def main(party=None):
"""Main function - run parallel tasks"""
# Select audio file based on the button clicked
if party == "더불어민주당":
audio_file = None
elif party == "Agents for Amazon Bedrock":
audio_file = './data/summit_sungwoo.wav'
elif party == "Bundesliga Fan Experience":
audio_file = './data/aws_bundesliga.wav'
elif party == "AWS_2024_recap":
audio_file = './data/aws.wav'
else: # Default or "국민의힘"
audio_file = None
party = "국민의힘"
output_file_path = './transcribe_texts'
logger.info("Backend system started")
# Run parallel tasks using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit two tasks simultaneously
task1 = executor.submit(run_transcription, audio_file, party)
task2 = executor.submit(periodic_claude_analysis, party, output_file_path)
task3 = executor.submit(periodic_google_search, party, output_file_path)
# Wait for both tasks to complete
task1.result()
task2.result()
task3.result()
logger.info("All tasks complete")
return "Analysis complete"
if __name__ == "__main__":
results = main()