|
""" |
|
Core data processing and analysis logic for the PharmaCircle AI Data Analyst. |
|
|
|
This module orchestrates the main analysis workflow: |
|
1. Takes a user's natural language query. |
|
2. Uses the LLM to generate a structured analysis plan. |
|
3. Executes parallel queries against Solr for quantitative and qualitative data. |
|
4. Generates a data visualization using the LLM. |
|
5. Synthesizes the findings into a comprehensive, user-facing report. |
|
""" |
|
|
|
import json |
|
import re |
|
import datetime |
|
import pandas as pd |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
import os |
|
import concurrent.futures |
|
import copy |
|
import google.generativeai as genai |
|
import urllib |
|
import pysolr |
|
import config |
|
|
|
from llm_prompts import ( |
|
get_analysis_plan_prompt, |
|
get_synthesis_report_prompt, |
|
get_visualization_code_prompt |
|
) |
|
from extract_results import get_search_list_params |
|
|
|
def parse_suggestions_from_report(report_text): |
|
"""Extracts numbered suggestions from the report's markdown text.""" |
|
suggestions_match = re.search(r"### (?:Deeper Dive: Suggested Follow-up Analyses|Suggestions for Further Exploration)\s*\n(.*?)$", report_text, re.DOTALL | re.IGNORECASE) |
|
if not suggestions_match: return [] |
|
suggestions_text = suggestions_match.group(1) |
|
suggestions = re.findall(r"^\s*\d+\.\s*(.*)", suggestions_text, re.MULTILINE) |
|
return [s.strip() for s in suggestions] |
|
|
|
def llm_generate_analysis_plan_with_history(llm_model, natural_language_query, chat_history): |
|
""" |
|
Generates a complete analysis plan from a user query, considering chat history |
|
and dynamic field suggestions from an external API. |
|
""" |
|
search_fields, search_name, field_mappings = [], "", {} |
|
intent = None |
|
try: |
|
intent, search_fields, search_name, field_mappings = get_search_list_params(natural_language_query) |
|
print(f"API returned intent: '{intent}', core: '{search_name}' with {len(search_fields)} fields and {len(field_mappings)} mappings.") |
|
|
|
if intent != 'search_list': |
|
print(f"API returned intent '{intent}' which is not 'search_list'. Aborting analysis.") |
|
return None, None, None, intent |
|
|
|
except Exception as e: |
|
print(f"Warning: Could not retrieve dynamic search fields. Proceeding without them. Error: {e}") |
|
return None, [], None, 'api_error' |
|
|
|
core_name = search_name if search_name else 'news' |
|
|
|
mapped_search_fields = [] |
|
if search_fields and field_mappings: |
|
for field in search_fields: |
|
original_name = field.get('field_name') |
|
mapped_field = field.copy() |
|
if original_name in field_mappings: |
|
mapped_field['field_name'] = field_mappings[original_name] |
|
print(f"Mapped field '{original_name}' to '{mapped_field['field_name']}'") |
|
mapped_search_fields.append(mapped_field) |
|
else: |
|
mapped_search_fields = search_fields |
|
|
|
prompt = get_analysis_plan_prompt(natural_language_query, chat_history, mapped_search_fields, core_name) |
|
|
|
try: |
|
response = llm_model.generate_content(prompt) |
|
cleaned_text = re.sub(r'```json\s*|\s*```', '', response.text, flags=re.MULTILINE | re.DOTALL).strip() |
|
plan = json.loads(cleaned_text) |
|
return plan, mapped_search_fields, core_name, intent |
|
except json.JSONDecodeError as e: |
|
raw_response_text = response.text if 'response' in locals() else 'N/A' |
|
print(f"Error decoding JSON from LLM response: {e}\nRaw Response:\n{raw_response_text}") |
|
return None, mapped_search_fields, core_name, intent |
|
except Exception as e: |
|
raw_response_text = response.text if 'response' in locals() else 'N/A' |
|
print(f"Error in llm_generate_analysis_plan_with_history: {e}\nRaw Response:\n{raw_response_text}") |
|
return None, mapped_search_fields, core_name, intent |
|
|
|
def execute_quantitative_query(solr_client, plan): |
|
"""Executes the facet query to get aggregate data.""" |
|
if not plan or 'quantitative_request' not in plan or 'json.facet' not in plan.get('quantitative_request', {}): |
|
print("Skipping quantitative query due to incomplete plan.") |
|
return None, None |
|
try: |
|
params = { |
|
"q": plan.get('query_filter', '*_*'), |
|
"rows": 0, |
|
"json.facet": json.dumps(plan['quantitative_request']['json.facet']) |
|
} |
|
base_url = f"{solr_client.url}/select" |
|
query_string = urllib.parse.urlencode(params) |
|
full_url = f"{base_url}?{query_string}" |
|
print(f"[DEBUG] Solr QUANTITATIVE query URL (local): {full_url}") |
|
|
|
|
|
public_url = full_url.replace(f'http://127.0.0.1:{config.LOCAL_BIND_PORT}', f'http://{config.REMOTE_SOLR_HOST}:{config.REMOTE_SOLR_PORT}') |
|
|
|
results = solr_client.search(**params) |
|
return results.raw_response.get("facets", {}), public_url |
|
except pysolr.SolrError as e: |
|
print(f"Solr Error in quantitative query on core {solr_client.url}: {e}") |
|
return None, None |
|
except Exception as e: |
|
print(f"Unexpected error in quantitative query: {e}") |
|
return None, None |
|
|
|
def execute_qualitative_query(solr_client, plan): |
|
"""Executes the grouping query to get the best example docs.""" |
|
if not plan or 'qualitative_request' not in plan: |
|
print("Skipping qualitative query due to incomplete plan.") |
|
return None, None |
|
try: |
|
qual_request = copy.deepcopy(plan['qualitative_request']) |
|
params = { |
|
"q": plan.get('query_filter', '*_*'), |
|
"rows": 5, |
|
"fl": "*,score", |
|
**qual_request |
|
} |
|
base_url = f"{solr_client.url}/select" |
|
query_string = urllib.parse.urlencode(params) |
|
full_url = f"{base_url}?{query_string}" |
|
print(f"[DEBUG] Solr QUALITATIVE query URL (local): {full_url}") |
|
|
|
|
|
public_url = full_url.replace(f'http://127.0.0.1:{config.LOCAL_BIND_PORT}', f'http://{config.REMOTE_SOLR_HOST}:{config.REMOTE_SOLR_PORT}') |
|
|
|
results = solr_client.search(**params) |
|
return results.grouped, public_url |
|
except pysolr.SolrError as e: |
|
print(f"Solr Error in qualitative query on core {solr_client.url}: {e}") |
|
return None, None |
|
except Exception as e: |
|
print(f"Unexpected error in qualitative query: {e}") |
|
return None, None |
|
|
|
def llm_synthesize_enriched_report_stream(llm_model, query, quantitative_data, qualitative_data, plan): |
|
""" |
|
Generates an enriched report by synthesizing quantitative aggregates |
|
and qualitative examples, and streams the result. |
|
""" |
|
prompt = get_synthesis_report_prompt(query, quantitative_data, qualitative_data, plan) |
|
try: |
|
response_stream = llm_model.generate_content(prompt, stream=True) |
|
for chunk in response_stream: |
|
yield chunk.text |
|
except Exception as e: |
|
print(f"Error in llm_synthesize_enriched_report_stream: {e}") |
|
yield "Sorry, an error occurred while generating the report. Please check the logs for details." |
|
|
|
def llm_generate_visualization_code(llm_model, query_context, facet_data): |
|
"""Generates Python code for visualization based on query and data.""" |
|
prompt = get_visualization_code_prompt(query_context, facet_data) |
|
try: |
|
generation_config = genai.types.GenerationConfig(temperature=0) |
|
response = llm_model.generate_content(prompt, generation_config=generation_config) |
|
code = re.sub(r'^```python\s*|```$', '', response.text, flags=re.MULTILINE) |
|
return code |
|
except Exception as e: |
|
raw_response_text = response.text if 'response' in locals() else 'N/A' |
|
print(f"Error in llm_generate_visualization_code: {e}\nRaw response: {raw_response_text}") |
|
return None |
|
|
|
def execute_viz_code_and_get_path(viz_code, facet_data): |
|
"""Executes visualization code and returns the path to the saved plot image.""" |
|
if not viz_code: |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
print("\n--- WARNING: Executing LLM-generated code. ---") |
|
|
|
try: |
|
if not os.path.exists('/tmp/plots'): |
|
os.makedirs('/tmp/plots') |
|
plot_path = f"/tmp/plots/plot_{datetime.datetime.now().timestamp()}.png" |
|
|
|
|
|
exec_globals = {'facet_data': facet_data, 'plt': plt, 'sns': sns, 'pd': pd} |
|
|
|
|
|
exec(viz_code, exec_globals) |
|
|
|
fig = exec_globals.get('fig') |
|
if fig: |
|
fig.savefig(plot_path, bbox_inches='tight') |
|
plt.close(fig) |
|
print("--- LLM-generated code executed successfully. ---") |
|
return plot_path |
|
else: |
|
print("--- LLM-generated code did not produce a 'fig' object. ---") |
|
return None |
|
|
|
except Exception as e: |
|
print(f"\n--- ERROR executing visualization code: ---") |
|
print(f"Error: {e}") |
|
print(f"--- Code---\n{viz_code}") |
|
print("-----------------------------------------") |
|
return None |