File size: 6,889 Bytes
920dfd0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import concurrent.futures
import os
import time
import logging
import threading
from realtime_video_analysis import run_transcription
from analyze_claude import analyze_with_claude
from google_search import grounding_with_google_search
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('backend_system')
analysis_results = []
search_results = []
def periodic_claude_analysis(party, output_file_path="./transcribe_texts"):
"""
Function to read and analyze transcripts from a file
- Wait 30 seconds before the first analysis
- Analyze at 10-second intervals thereafter
"""
logger.info("Starting Claude analysis task")
analysis_count = 0
while not os.path.exists(output_file_path):
logger.info("Waiting for transcription file to be created...")
time.sleep(2)
logger.info("Waiting 30 seconds for initial transcription collection...")
time.sleep(30)
logger.info("Wait complete, starting analysis")
while True:
try:
if not os.path.exists(output_file_path):
logger.warning("Transcription file is missing. Waiting...")
time.sleep(5)
continue
with open(output_file_path, "r", encoding="utf-8") as f:
current_content = f.read()
if current_content.strip():
analysis_count += 1
logger.info(f"Starting analysis #{analysis_count}: Read content from file")
try:
analysis_result = analyze_with_claude(current_content, party)
print("\n" + "="*50)
print(f"Analysis result #{analysis_count} - {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*50)
print(analysis_result)
print("="*50 + "\n")
analysis_results.append(analysis_result)
except Exception as e:
logger.error(f"Error occurred during Claude summarization: {str(e)}")
else:
logger.info("No content in file. Waiting...")
if "----STT work complete---" in current_content:
break
except Exception as e:
logger.error(f"Error occurred while reading file: {str(e)}")
time.sleep(10)
logger.info("Claude analysis task complete")
def periodic_google_search(party, output_file_path="./transcribe_texts"):
"""
Function to read entire transcript from a file and perform keyword extraction and search with Gemini
- Wait 30 seconds before the first search
- Search at 10-second intervals thereafter
"""
logger.info("Starting Google search task")
search_count = 0
# Wait until the file is created
while not os.path.exists(output_file_path):
logger.info("Waiting for transcription file...")
time.sleep(2)
# Initial 30-second wait
logger.info("Waiting 30 seconds for initial transcription collection...")
time.sleep(30)
logger.info("Wait complete, starting Google search")
# If the file exists, read and search periodically
while True:
try:
# Check if the file exists
if not os.path.exists(output_file_path):
logger.warning("Transcription file is missing. Waiting...")
time.sleep(5)
continue
# Read entire file content
with open(output_file_path, 'r', encoding='utf-8') as f:
content = f.read()
all_lines = content.splitlines()
# Use only the last 5 lines of the entire file content for google_search
last_lines = all_lines[-5:] if len(all_lines) >= 5 else all_lines
current_content = "".join(last_lines).strip()
# Log content (for debugging)
logger.info(f"Starting Google search #{search_count}: Analyzing last 5 lines in STT file")
# If there is content, perform search
if current_content:
search_count += 1
logger.info(f"Google Search #{search_count} Start: Analyzing last 5 lines in STT file")
try:
# Keyword extraction and search with Gemini
search_result = grounding_with_google_search(current_content, party)
# Output search results
print("\n" + "="*50)
print(f"Google Search Result #{search_count} - {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*50)
print(search_result)
print("="*50 + "\n")
search_results.append(search_result)
except Exception as e:
logger.error(f"Error occurred during Google search: {str(e)}")
else:
logger.info("No content in file. Waiting...")
# Check if the completion marker is present
if "----STT work complete---" in current_content:
logger.info("Completion marker detected. Google search complete.")
break
except Exception as e:
logger.error(f"Error occurred while reading file: {str(e)}")
# Wait 10 seconds
time.sleep(10)
logger.info("Google search task complete")
def main(party=None):
"""Main function - run parallel tasks"""
# Select audio file based on the button clicked
if party == "더불어민주당":
audio_file = None
elif party == "Agents for Amazon Bedrock":
audio_file = './data/summit_sungwoo.wav'
elif party == "Bundesliga Fan Experience":
audio_file = './data/aws_bundesliga.wav'
elif party == "AWS_2024_recap":
audio_file = './data/aws.wav'
else: # Default or "국민의힘"
audio_file = None
party = "국민의힘"
output_file_path = './transcribe_texts'
logger.info("Backend system started")
# Run parallel tasks using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit two tasks simultaneously
task1 = executor.submit(run_transcription, audio_file, party)
task2 = executor.submit(periodic_claude_analysis, party, output_file_path)
task3 = executor.submit(periodic_google_search, party, output_file_path)
# Wait for both tasks to complete
task1.result()
task2.result()
task3.result()
logger.info("All tasks complete")
return "Analysis complete"
if __name__ == "__main__":
results = main() |