|
""" |
|
Defines the Gradio user interface and manages the application's state |
|
and event handling. |
|
|
|
This module is responsible for the presentation layer of the application. |
|
It creates the interactive components and orchestrates the analysis workflow |
|
by calling functions from the data_processing module. |
|
""" |
|
|
|
|
|
import gradio as gr |
|
import json |
|
import concurrent.futures |
|
import threading |
|
from data_processing import ( |
|
llm_generate_analysis_plan_with_history, |
|
execute_quantitative_query, |
|
execute_qualitative_query, |
|
llm_synthesize_enriched_report_stream, |
|
llm_generate_visualization_code, |
|
execute_viz_code_and_get_path, |
|
parse_suggestions_from_report |
|
) |
|
|
|
|
|
solr_lock = threading.Lock() |
|
|
|
|
|
def create_ui(llm_model, solr_client): |
|
""" |
|
Builds the Gradio UI and wires up all the event handlers. |
|
|
|
Args: |
|
llm_model: The initialized Google Gemini model client. |
|
solr_client: The initialized pysolr client. |
|
""" |
|
with gr.Blocks(theme=gr.themes.Soft(), css="footer {display: none !important}") as demo: |
|
state = gr.State() |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=4): |
|
gr.Markdown("# PharmaCircle AI Data Analyst") |
|
with gr.Column(scale=1): |
|
clear_button = gr.Button( |
|
"π Start New Analysis", variant="primary") |
|
|
|
gr.Markdown("Ask a question to begin your analysis. I will generate an analysis plan, retrieve quantitative and qualitative data, create a visualization, and write an enriched report.") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
chatbot = gr.Chatbot( |
|
label="Analysis Chat Log", height=700, show_copy_button=True) |
|
msg_textbox = gr.Textbox( |
|
placeholder="Ask a question, e.g., 'Show me the top 5 companies by total deal value in 2023'", label="Your Question", interactive=True) |
|
|
|
with gr.Column(scale=2): |
|
with gr.Accordion("Dynamic Field Suggestions", open=False): |
|
suggestions_display = gr.Markdown( |
|
"Suggestions from the external API will appear here...", visible=True) |
|
with gr.Accordion("Generated Analysis Plan", open=False): |
|
plan_display = gr.Markdown( |
|
"Plan will appear here...", visible=True) |
|
with gr.Accordion("Retrieved Quantitative Data", open=False): |
|
quantitative_url_display = gr.Markdown( |
|
"Quantitative URL will appear here...", visible=False) |
|
quantitative_data_display = gr.Markdown( |
|
"Aggregate data will appear here...", visible=False) |
|
with gr.Accordion("Retrieved Qualitative Data (Examples)", open=False): |
|
qualitative_url_display = gr.Markdown( |
|
"Qualitative URL will appear here...", visible=False) |
|
qualitative_data_display = gr.Markdown( |
|
"Example data will appear here...", visible=False) |
|
plot_display = gr.Image( |
|
label="Visualization", type="filepath", visible=False) |
|
report_display = gr.Markdown( |
|
"Report will be streamed here...", visible=False) |
|
|
|
def process_analysis_flow(user_input, history, state): |
|
""" |
|
Manages the conversation and yields UI updates. |
|
""" |
|
if state is None: |
|
state = {'query_count': 0, 'last_suggestions': []} |
|
if history is None: |
|
history = [] |
|
|
|
|
|
yield (history, state, gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value="Suggestions from the external API will appear here...", visible=False)) |
|
|
|
query_context = user_input.strip() |
|
if not query_context: |
|
history.append((user_input, "Please enter a question to analyze.")) |
|
yield (history, state, None, None, None, None, None, None, None, None) |
|
return |
|
|
|
history.append((user_input, f"Analyzing: '{query_context}'\n\n*Generating analysis plan...*")) |
|
yield (history, state, None, None, None, None, None, None, None, None) |
|
|
|
|
|
analysis_plan, mapped_search_fields, core_name, intent = llm_generate_analysis_plan_with_history(llm_model, query_context, history) |
|
|
|
|
|
if mapped_search_fields: |
|
suggestions_md = "**API Suggestions (with mappings applied):**\n" + "\n".join([f"- `{field['field_name']}`: `{field['field_value']}`" for field in mapped_search_fields]) |
|
suggestions_display_update = gr.update(value=suggestions_md, visible=True) |
|
else: |
|
suggestions_display_update = gr.update(value="No suggestions were returned from the external API.", visible=True) |
|
|
|
if not analysis_plan: |
|
if intent and intent != 'search_list': |
|
message = f"I am sorry, I can only perform analysis for 'search_list' type queries. Your query was identified as a '{intent}', which is not supported." |
|
else: |
|
message = "I'm sorry, I couldn't generate a valid analysis plan. Please try rephrasing your question." |
|
history.append((None, message)) |
|
yield (history, state, None, None, None, None, None, None, None, suggestions_display_update) |
|
return |
|
|
|
history.append((None, f"β
Analysis plan generated for core: **`{core_name}`**")) |
|
plan_summary = f""" |
|
* **Analysis Dimension:** `{analysis_plan.get('analysis_dimension')}` |
|
* **Analysis Measure:** `{analysis_plan.get('analysis_measure')}` |
|
* **Query Filter:** `{analysis_plan.get('query_filter')}` |
|
""" |
|
history.append((None, plan_summary)) |
|
formatted_plan = f"**Full Analysis Plan (Core: `{core_name}`):**\n```json\n{json.dumps(analysis_plan, indent=2)}\n```" |
|
yield (history, state, None, None, gr.update(value=formatted_plan, visible=True), None, None, None, None, suggestions_display_update) |
|
|
|
history.append((None, "*Executing queries for aggregates and examples...*")) |
|
yield (history, state, None, None, gr.update(value=formatted_plan, visible=True), None, None, None, None, suggestions_display_update) |
|
|
|
|
|
with solr_lock: |
|
original_solr_url = solr_client.url |
|
|
|
base_url = original_solr_url.rsplit('/', 1)[0] |
|
new_url = f"{base_url}/{core_name}" |
|
solr_client.url = new_url |
|
print(f"[INFO] Switched Solr client to core: {core_name} at URL: {solr_client.url}") |
|
|
|
|
|
aggregate_data, quantitative_url = None, None |
|
example_data, qualitative_url = None, None |
|
try: |
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
future_agg = executor.submit(execute_quantitative_query, solr_client, analysis_plan) |
|
future_ex = executor.submit(execute_qualitative_query, solr_client, analysis_plan) |
|
aggregate_data, quantitative_url = future_agg.result() |
|
example_data, qualitative_url = future_ex.result() |
|
finally: |
|
|
|
solr_client.url = original_solr_url |
|
print(f"[INFO] Reset Solr client to default URL: {original_solr_url}") |
|
|
|
if not aggregate_data or aggregate_data.get('count', 0) == 0: |
|
history.append((None, f"No data was found for your query in the '{core_name}' core. Please try a different question.")) |
|
yield (history, state, None, None, gr.update(value=formatted_plan, visible=True), None, None, None, None, suggestions_display_update) |
|
return |
|
|
|
|
|
quantitative_url_update = gr.update(value=f"**Solr URL:** [{quantitative_url}]({quantitative_url})", visible=True) |
|
qualitative_url_update = gr.update(value=f"**Solr URL:** [{qualitative_url}]({qualitative_url})", visible=True) |
|
formatted_agg_data = f"**Quantitative (Aggregate) Data:**\n```json\n{json.dumps(aggregate_data, indent=2)}\n```" |
|
formatted_qual_data = f"**Qualitative (Example) Data:**\n```json\n{json.dumps(example_data, indent=2)}\n```" |
|
qual_data_display_update = gr.update(value=formatted_qual_data, visible=True) |
|
yield (history, state, None, None, gr.update(value=formatted_plan, visible=True), quantitative_url_update, gr.update(value=formatted_agg_data, visible=True), qualitative_url_update, qual_data_display_update, suggestions_display_update) |
|
|
|
history.append((None, "β
Data retrieved. Generating visualization and final report...")) |
|
yield (history, state, None, None, gr.update(value=formatted_plan, visible=True), quantitative_url_update, gr.update(value=formatted_agg_data, visible=True), qualitative_url_update, qual_data_display_update, suggestions_display_update) |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
viz_future = executor.submit(llm_generate_visualization_code, llm_model, query_context, aggregate_data) |
|
|
|
report_text = "" |
|
stream_history = history[:] |
|
for chunk in llm_synthesize_enriched_report_stream(llm_model, query_context, aggregate_data, example_data, analysis_plan): |
|
report_text += chunk |
|
yield (stream_history, state, None, gr.update(value=report_text, visible=True), gr.update(value=formatted_plan, visible=True), quantitative_url_update, gr.update(value=formatted_agg_data, visible=True), qualitative_url_update, qual_data_display_update, suggestions_display_update) |
|
|
|
history.append((None, report_text)) |
|
|
|
viz_code = viz_future.result() |
|
plot_path = execute_viz_code_and_get_path(viz_code, aggregate_data) |
|
output_plot = gr.update(value=plot_path, visible=True) if plot_path else gr.update(visible=False) |
|
if not plot_path: |
|
history.append((None, "*I was unable to generate a plot for this data.*\n")) |
|
|
|
yield (history, state, output_plot, gr.update(value=report_text), gr.update(value=formatted_plan, visible=True), quantitative_url_update, gr.update(value=formatted_agg_data, visible=True), qualitative_url_update, qual_data_display_update, suggestions_display_update) |
|
|
|
state['query_count'] += 1 |
|
state['last_suggestions'] = parse_suggestions_from_report(report_text) |
|
next_prompt = "Analysis complete. What would you like to explore next?" |
|
history.append((None, next_prompt)) |
|
yield (history, state, output_plot, gr.update(value=report_text), gr.update(value=formatted_plan, visible=True), quantitative_url_update, gr.update(value=formatted_agg_data, visible=True), qualitative_url_update, qual_data_display_update, suggestions_display_update) |
|
|
|
def reset_all(): |
|
"""Resets the entire UI for a new analysis session.""" |
|
return ( |
|
[], |
|
None, |
|
"", |
|
gr.update(value=None, visible=False), |
|
gr.update(value=None, visible=False), |
|
gr.update(value=None, visible=False), |
|
gr.update(value=None, visible=False), |
|
gr.update(value=None, visible=False), |
|
gr.update(value=None, visible=False), |
|
gr.update(value=None, visible=False), |
|
gr.update(value=None, visible=False) |
|
) |
|
|
|
msg_textbox.submit( |
|
fn=process_analysis_flow, |
|
inputs=[msg_textbox, chatbot, state], |
|
outputs=[chatbot, state, plot_display, report_display, plan_display, quantitative_url_display, |
|
quantitative_data_display, qualitative_url_display, qualitative_data_display, suggestions_display], |
|
).then( |
|
lambda: gr.update(value=""), |
|
None, |
|
[msg_textbox], |
|
queue=False, |
|
) |
|
|
|
clear_button.click( |
|
fn=reset_all, |
|
inputs=None, |
|
outputs=[chatbot, state, msg_textbox, plot_display, report_display, plan_display, quantitative_url_display, |
|
quantitative_data_display, qualitative_url_display, qualitative_data_display, suggestions_display], |
|
queue=False |
|
) |
|
|
|
return demo |