Spaces:
Running
Running
from flask import render_template, request, jsonify, current_app | |
from app import db # Assuming db is initialized in app's __init__.py | |
from app.models import Case, Country, InterrogationSession, GeneratedQuestion, InterrogationResponse, Report, CaseStatus | |
from app.utils.groq_client import GroqClient | |
import json | |
# This will be the main blueprint or directly in app.routes if not using blueprints | |
# For simplicity, adding routes directly here. In a larger app, use Blueprints. | |
def register_routes(app): | |
def dashboard(): | |
# Fetch some data for the dashboard | |
active_cases_count = Case.query.filter_by(status=CaseStatus.ACTIVE).count() | |
total_interrogations = InterrogationSession.query.count() | |
completed_reports_count = Report.query.join(Case).filter(Case.status == CaseStatus.COMPLETED).count() | |
recent_cases = Case.query.order_by(Case.created_at.desc()).limit(5).all() | |
countries = Country.query.order_by(Country.name).all() | |
# Placeholder for resolution rate | |
resolution_rate = "85%" # This would be calculated based on closed/completed cases | |
return render_template( | |
'dashboard.html', | |
active_cases_count=active_cases_count, | |
total_interrogations=total_interrogations, | |
completed_reports_count=completed_reports_count, | |
resolution_rate=resolution_rate, | |
recent_cases=recent_cases, | |
countries=countries # Pass countries to the template for selection | |
) | |
def manage_cases(): | |
if request.method == 'POST': | |
data = request.form | |
new_case = Case( | |
case_id_display=data.get('case_id_display', f"C-NEW-{db.session.query(Case).count() + 1}"), # Generate a simple new ID | |
case_type=data.get('case_type'), | |
suspect_name=data.get('suspect_name'), | |
profile_details=data.get('profile_details'), | |
evidence_summary=data.get('evidence_summary'), | |
status=CaseStatus.PENDING, # Default status | |
country_id=data.get('country_id') # Assuming country_id is passed from form | |
) | |
db.session.add(new_case) | |
db.session.commit() | |
return jsonify({'message': 'Case created successfully', 'case_id': new_case.id}), 201 | |
cases = Case.query.all() | |
return render_template('cases.html', cases=cases) # Placeholder, need a cases.html | |
def view_case(case_id): | |
case = Case.query.get_or_404(case_id) | |
countries = Country.query.all() | |
return render_template('case_detail.html', case=case, countries=countries) # Placeholder, need case_detail.html | |
def generate_questions_route(case_id): | |
case = Case.query.get_or_404(case_id) | |
if not case.profile_details or not case.evidence_summary: | |
return jsonify({'error': 'Case profile and evidence summary are required to generate questions.'}), 400 | |
groq_cli = GroqClient() | |
try: | |
questions_list = groq_cli.generate_interrogation_questions( | |
case_details=f"Case Type: {case.case_type}, ID: {case.case_id_display}", | |
suspect_profile=case.profile_details, | |
evidence_summary=case.evidence_summary | |
) | |
# Store questions in the database | |
# First, ensure an interrogation session exists or create one | |
session = InterrogationSession.query.filter_by(case_id=case.id).first() | |
if not session: | |
session = InterrogationSession(case_id=case.id, summary_notes="Initial question generation session.") | |
db.session.add(session) | |
db.session.commit() | |
generated_q_objects = [] | |
for q_text in questions_list: | |
gq = GeneratedQuestion(interrogation_session_id=session.id, question_text=q_text, category="AI Generated") | |
db.session.add(gq) | |
generated_q_objects.append({'id': None, 'text': q_text}) # ID will be set after commit | |
db.session.commit() | |
# Update IDs after commit | |
for i, gq_obj in enumerate(GeneratedQuestion.query.filter_by(interrogation_session_id=session.id).order_by(GeneratedQuestion.id.desc()).limit(len(questions_list)).all()): | |
# This is a bit of a hack to get IDs back, ideally return them from DB operation | |
if i < len(generated_q_objects): | |
generated_q_objects[-(i+1)]['id'] = gq_obj.id | |
return jsonify({'case_id': case.id, 'questions': generated_q_objects}), 200 | |
except Exception as e: | |
current_app.logger.error(f"Error in /generate_questions/{case_id}: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def generate_report_route(case_id): | |
case = Case.query.get_or_404(case_id) | |
data = request.json | |
selected_country_id = data.get('country_id') | |
interrogation_summary_text = data.get('interrogation_summary', 'No detailed interrogation summary provided by user yet.') | |
if not selected_country_id: | |
return jsonify({'error': 'Country ID is required for report generation.'}), 400 | |
selected_country = Country.query.get(selected_country_id) | |
if not selected_country: | |
return jsonify({'error': 'Invalid Country ID.'}), 400 | |
# Consolidate interrogation data for the LLM | |
full_interrogation_summary = interrogation_summary_text # Start with user provided summary | |
sessions = InterrogationSession.query.filter_by(case_id=case.id).all() | |
if sessions: | |
full_interrogation_summary += "\n\n--- Recorded Interrogation Details ---" | |
for sess_idx, session in enumerate(sessions): | |
full_interrogation_summary += f"""\nSession {sess_idx+1} (Date: {session.session_date.strftime("%Y-%m-%d")}):\n""" | |
if session.summary_notes: | |
full_interrogation_summary += f"Session Notes: {session.summary_notes}\n" | |
questions = GeneratedQuestion.query.filter_by(interrogation_session_id=session.id).all() | |
if questions: | |
full_interrogation_summary += "Questions and Responses:\n" | |
for q_idx, q in enumerate(questions): | |
full_interrogation_summary += f" Q{q_idx+1}: {q.question_text}\n" | |
responses = InterrogationResponse.query.filter_by(generated_question_id=q.id).all() | |
if responses: | |
for r_idx, r in enumerate(responses): | |
full_interrogation_summary += f" A{r_idx+1}: {r.response_text} (Tags: {r.tags or 'N/A'})\n" | |
else: | |
full_interrogation_summary += " A: No response recorded.\n" | |
else: | |
full_interrogation_summary += "No specific questions recorded for this session.\n" | |
else: | |
full_interrogation_summary += "\nNo formal interrogation sessions found in the database for this case." | |
groq_cli = GroqClient() | |
try: | |
report_json_str = groq_cli.generate_report_and_recommendations( | |
interrogation_summary=full_interrogation_summary, | |
profile_details=case.profile_details or "Not provided", | |
evidence_summary=case.evidence_summary or "Not provided", | |
selected_country_name=selected_country.name | |
) | |
# Validate JSON and extract summary for DB | |
report_content_summary_for_db = "Error parsing LLM JSON output." | |
recommendations_for_db = "Error parsing LLM JSON output." | |
try: | |
report_data = json.loads(report_json_str) | |
# Extract a brief summary and recommendations for storing in main text fields | |
# This is a simplified extraction. A more robust parsing would be needed for complex JSON. | |
cs = report_data.get("caseSummary", {}) | |
if isinstance(cs, dict): | |
report_content_summary_for_db = cs.get("briefOverview", "Summary not found in JSON.") | |
else: # if caseSummary is a string directly | |
report_content_summary_for_db = str(cs) | |
recom = report_data.get("recommendations", {}) | |
if isinstance(recom, dict): | |
recommendations_for_db = json.dumps(recom) # Store the whole recommendations object as JSON string | |
else: | |
recommendations_for_db = str(recom) | |
except json.JSONDecodeError as e: | |
current_app.logger.error(f"Failed to parse report JSON from LLM: {e}. Raw: {report_json_str}") | |
# report_json_str itself will be stored, and summary/recom will have error messages | |
# Store the report | |
new_report = Report( | |
case_id=case.id, | |
llm_json_output=report_json_str, | |
report_content_summary=report_content_summary_for_db, | |
recommendations=recommendations_for_db, | |
country_id=selected_country.id | |
) | |
db.session.add(new_report) | |
case.status = CaseStatus.COMPLETED # Optionally update case status | |
db.session.commit() | |
return jsonify({'message': 'Report generated successfully', 'report_id': new_report.id, 'raw_json_report': report_json_str}), 200 | |
except Exception as e: | |
current_app.logger.error(f"Error in /generate_report/{case_id}: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def get_countries(): | |
countries = Country.query.order_by(Country.name).all() | |
return jsonify([{'id': c.id, 'name': c.name, 'region': c.region} for c in countries]) | |
# Add a route to view a specific report | |
def view_report(report_id): | |
report = Report.query.get_or_404(report_id) | |
# The LLM output is stored as a JSON string, parse it for rendering | |
try: | |
report_data = json.loads(report.llm_json_output) | |
except (json.JSONDecodeError, TypeError): | |
report_data = {"error": "Could not parse report data", "raw": report.llm_json_output} | |
return render_template('report_detail.html', report=report, report_data=report_data) # Placeholder, need report_detail.html | |
# Placeholder for other routes from sidebar | |
def interrogations_page(): | |
sessions = InterrogationSession.query.all() | |
return render_template('interrogations.html', sessions=sessions) # Placeholder | |
# Renamed to avoid conflict with /report/<id> | |
def reports_list_page(): | |
reports = Report.query.all() | |
return render_template('reports_list.html', reports=reports) # Placeholder | |
def regional_guidelines_page(): | |
countries = Country.query.all() | |
return render_template('regional_guidelines.html', countries=countries) # Placeholder | |
# Simple placeholder for other menu items | |
for item_route in ['team', 'analytics', 'evidence', 'settings', 'help']: | |
app.add_url_rule(f'/{item_route}', item_route, lambda item_route=item_route: f"{item_route.capitalize()} page coming soon! This is a placeholder.") | |