Spaces:
Sleeping
Sleeping
""" | |
Defines the Gradio user interface and manages the application's state | |
and event handling. | |
This module is responsible for the presentation layer of the application. | |
It creates the interactive components and orchestrates the analysis workflow | |
by calling functions from the data_processing module. | |
""" | |
import gradio as gr | |
import json | |
import concurrent.futures | |
import threading | |
from data_processing import ( | |
llm_generate_analysis_plan_with_history, | |
execute_quantitative_query, | |
execute_qualitative_query, | |
llm_synthesize_enriched_report_stream, | |
llm_generate_visualization_code, | |
execute_viz_code_and_get_path, | |
parse_suggestions_from_report | |
) | |
# Create a lock to protect the Solr client from concurrent access | |
solr_lock = threading.Lock() | |
def create_ui(llm_model, solr_client): | |
""" | |
Builds the Gradio UI and wires up all the event handlers. | |
Args: | |
llm_model: The initialized Google Gemini model client. | |
solr_client: The initialized pysolr client. | |
""" | |
with gr.Blocks(theme=gr.themes.Soft(), css="footer {display: none !important}") as demo: | |
state = gr.State() | |
with gr.Row(): | |
with gr.Column(scale=4): | |
gr.Markdown("# PharmaCircle AI Data Analyst") | |
with gr.Column(scale=1): | |
clear_button = gr.Button( | |
"π Start New Analysis", variant="primary") | |
gr.Markdown("Ask a question to begin your analysis. I will generate an analysis plan, retrieve quantitative and qualitative data, create a visualization, and write an enriched report.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
chatbot = gr.Chatbot( | |
label="Analysis Chat Log", height=700, show_copy_button=True) | |
msg_textbox = gr.Textbox( | |
placeholder="Ask a question, e.g., 'Show me the top 5 companies by total deal value in 2023'", label="Your Question", interactive=True) | |
with gr.Column(scale=2): | |
with gr.Accordion("Dynamic Field Suggestions", open=False): | |
suggestions_display = gr.Markdown( | |
"Suggestions from the external API will appear here...", visible=True) | |
with gr.Accordion("Generated Analysis Plan", open=False): | |
plan_display = gr.Markdown( | |
"Plan will appear here...", visible=True) | |
with gr.Accordion("Retrieved Quantitative Data", open=False): | |
quantitative_url_display = gr.Markdown( | |
"Quantitative URL will appear here...", visible=False) | |
quantitative_data_display = gr.Markdown( | |
"Aggregate data will appear here...", visible=False) | |
with gr.Accordion("Retrieved Qualitative Data (Examples)", open=False): | |
qualitative_url_display = gr.Markdown( | |
"Qualitative URL will appear here...", visible=False) | |
qualitative_data_display = gr.Markdown( | |
"Example data will appear here...", visible=False) | |
with gr.Accordion("Token Usage", open=False): | |
token_summary_box = gr.Markdown(visible=False) | |
plot_display = gr.Image( | |
label="Visualization", type="filepath", visible=False) | |
report_display = gr.Markdown( | |
"Report will be streamed here...", visible=False) | |
def process_analysis_flow(user_input, history, state): | |
""" | |
Manages the conversation and yields UI updates. | |
""" | |
analysis_plan_input_token_count = analysis_plan_output_token_count = analysis_plan_total_token_count = None | |
enriched_report_input_token_count = enriched_report_output_token_count = enriched_report_total_token_count = None | |
visualization_input_token_count = visualization_output_token_count = visualization_total_token_count = None | |
if state is None: | |
state = {'query_count': 0, 'last_suggestions': []} | |
if history is None: | |
history = [] | |
# Reset all displays at the beginning of a new flow | |
yield (history, state, gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value=None, visible=False), gr.update(value="Suggestions from the external API will appear here...", visible=False)) | |
query_context = user_input.strip() | |
if not query_context: | |
history.append((user_input, "Please enter a question to analyze.")) | |
yield (history, state, None, None, None, None, None, None, None, None, None) | |
return | |
history.append((user_input, f"Analyzing: '{query_context}'\n\n*Generating analysis plan...*")) | |
yield (history, state, None, None, None, None, None, None, None, None, None) | |
# Generate plan, get search field suggestions, and intent. | |
analysis_plan, mapped_search_fields, core_name, intent, analysis_plan_input_token_count, analysis_plan_output_token_count, analysis_plan_total_token_count = llm_generate_analysis_plan_with_history(llm_model, query_context, history) | |
# Update and display search field suggestions in its own accordion | |
if mapped_search_fields: | |
suggestions_md = "**API Suggestions (with mappings applied):**\n" + "\n".join([f"- `{field['field_name']}`: `{field['field_value']}`" for field in mapped_search_fields]) | |
suggestions_display_update = gr.update(value=suggestions_md, visible=True) | |
else: | |
suggestions_display_update = gr.update(value="No suggestions were returned from the external API.", visible=True) | |
if not analysis_plan: | |
if intent and intent != 'search_list': | |
message = f"I am sorry, I can only perform analysis for 'search_list' type queries. Your query was identified as a '{intent}', which is not supported." | |
else: | |
message = "I'm sorry, I couldn't generate a valid analysis plan. Please try rephrasing your question." | |
history.append((None, message)) | |
yield (history, state, None, None, None, None, None, None, None, None, suggestions_display_update) | |
return | |
history.append((None, f"β Analysis plan generated for core: **`{core_name}`**")) | |
plan_summary = f""" | |
* **Analysis Dimension:** `{analysis_plan.get('analysis_dimension')}` | |
* **Analysis Measure:** `{analysis_plan.get('analysis_measure')}` | |
* **Query Filter:** `{analysis_plan.get('query_filter')}` | |
""" | |
history.append((None, plan_summary)) | |
formatted_plan = f"**Full Analysis Plan (Core: `{core_name}`):**\n```json\n{json.dumps(analysis_plan, indent=2)}\n```" | |
yield (history, state, None, None, gr.update(value=formatted_plan, visible=True), None, None, None, None, None, suggestions_display_update) | |
history.append((None, "*Executing queries for aggregates and examples...*")) | |
yield (history, state, None, None, gr.update(value=formatted_plan, visible=True), None, None, None, None, None, suggestions_display_update) | |
# --- DYNAMIC CORE SWITCH (Thread-safe) --- | |
with solr_lock: | |
original_solr_url = solr_client.url | |
# Correctly construct the new URL by replacing the last component (the core name) | |
base_url = original_solr_url.rsplit('/', 1)[0] | |
new_url = f"{base_url}/{core_name}" | |
solr_client.url = new_url | |
print(f"[INFO] Switched Solr client to core: {core_name} at URL: {solr_client.url}") | |
# Execute queries in parallel | |
aggregate_data, quantitative_url = None, None | |
example_data, qualitative_url = None, None | |
try: | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future_agg = executor.submit(execute_quantitative_query, solr_client, analysis_plan) | |
future_ex = executor.submit(execute_qualitative_query, solr_client, analysis_plan) | |
aggregate_data, quantitative_url = future_agg.result() | |
example_data, qualitative_url = future_ex.result() | |
finally: | |
# --- IMPORTANT: Reset client to default URL --- | |
solr_client.url = original_solr_url | |
print(f"[INFO] Reset Solr client to default URL: {original_solr_url}") | |
if not aggregate_data or aggregate_data.get('count', 0) == 0: | |
history.append((None, f"No data was found for your query in the '{core_name}' core. Please try a different question.")) | |
yield (history, state, None, None, gr.update(value=formatted_plan, visible=True), None, None, None, None, None, suggestions_display_update) | |
return | |
# Display retrieved data | |
quantitative_url_update = gr.update(value=f"**Solr URL:** [{quantitative_url}]({quantitative_url})", visible=True) | |
qualitative_url_update = gr.update(value=f"**Solr URL:** [{qualitative_url}]({qualitative_url})", visible=True) | |
formatted_agg_data = f"**Quantitative (Aggregate) Data:**\n```json\n{json.dumps(aggregate_data, indent=2)}\n```" | |
formatted_qual_data = f"**Qualitative (Example) Data:**\n```json\n{json.dumps(example_data, indent=2)}\n```" | |
qual_data_display_update = gr.update(value=formatted_qual_data, visible=True) | |
yield (history, state, None, None, gr.update(value=formatted_plan, visible=True), quantitative_url_update, gr.update(value=formatted_agg_data, visible=True), qualitative_url_update, qual_data_display_update, None, suggestions_display_update) | |
history.append((None, "β Data retrieved. Generating visualization and final report...")) | |
yield (history, state, None, None, gr.update(value=formatted_plan, visible=True), quantitative_url_update, gr.update(value=formatted_agg_data, visible=True), qualitative_url_update, qual_data_display_update, None, suggestions_display_update) | |
# Generate viz and report | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
viz_future = executor.submit(llm_generate_visualization_code, llm_model, query_context, aggregate_data) | |
viz_code, visualization_input_token_count, visualization_output_token_count, visualization_total_token_count = viz_future.result() | |
report_text = "" | |
stream_history = history[:] | |
report_stream = llm_synthesize_enriched_report_stream(llm_model, query_context, aggregate_data, example_data, analysis_plan) | |
for item in report_stream: | |
if item["text"] is not None: | |
report_text += item["text"] | |
yield (stream_history, state, None, gr.update(value=report_text, visible=True), gr.update(value=formatted_plan, visible=True), quantitative_url_update, gr.update(value=formatted_agg_data, visible=True), qualitative_url_update, qual_data_display_update, None, suggestions_display_update) | |
elif item["tokens"] is not None: | |
enriched_report_input_token_count = item["tokens"]["input"] | |
enriched_report_output_token_count = item["tokens"]["output"] | |
enriched_report_total_token_count = item["tokens"]["total"] | |
history.append((None, report_text)) | |
plot_path = execute_viz_code_and_get_path(viz_code, aggregate_data) | |
output_plot = gr.update(value=plot_path, visible=True) if plot_path else gr.update(visible=False) | |
if not plot_path: | |
history.append((None, "*I was unable to generate a plot for this data.*\n")) | |
cumulative_tokens = sum(filter(None, [ | |
analysis_plan_total_token_count, | |
enriched_report_total_token_count, | |
visualization_total_token_count | |
])) | |
total_input = sum(filter(None, [ | |
analysis_plan_input_token_count, | |
enriched_report_input_token_count, | |
visualization_input_token_count | |
])) | |
total_output = sum(filter(None, [ | |
analysis_plan_output_token_count, | |
enriched_report_output_token_count, | |
visualization_output_token_count | |
])) | |
expected_cost = round((total_input*0.3+total_output*2.5)/1000000, 3) | |
token_summary_box_update = gr.update( | |
value=f"""**Analysis Plan Tokens** β Prompt: `{analysis_plan_input_token_count or '-'}`, Output: `{analysis_plan_output_token_count or '-'}`, Total: `{analysis_plan_total_token_count or '-'}` | |
**Report Tokens** β Prompt: `{enriched_report_input_token_count or '-'}`, Output: `{enriched_report_output_token_count or '-'}`, Total: `{enriched_report_total_token_count or '-'}` | |
**Visualization Tokens** β Prompt: `{visualization_input_token_count or '-'}`, Output: `{visualization_output_token_count or '-'}`, Total: `{visualization_total_token_count or '-'}` | |
**Cumulative Tokens** β `{cumulative_tokens}` | |
**Expected Cost** β `{expected_cost}$`""", | |
visible=True | |
) | |
yield (history, state, output_plot, gr.update(value=report_text), gr.update(value=formatted_plan, visible=True), quantitative_url_update, gr.update(value=formatted_agg_data, visible=True), qualitative_url_update, qual_data_display_update, token_summary_box_update, suggestions_display_update) | |
state['query_count'] += 1 | |
state['last_suggestions'] = parse_suggestions_from_report(report_text) | |
next_prompt = "Analysis complete. What would you like to explore next?" | |
history.append((None, next_prompt)) | |
yield (history, state, output_plot, gr.update(value=report_text), gr.update(value=formatted_plan, visible=True), quantitative_url_update, gr.update(value=formatted_agg_data, visible=True), qualitative_url_update, qual_data_display_update, token_summary_box_update, suggestions_display_update) | |
def reset_all(): | |
"""Resets the entire UI for a new analysis session.""" | |
return ( | |
[], | |
None, | |
"", | |
gr.update(value=None, visible=False), | |
gr.update(value=None, visible=False), | |
gr.update(value=None, visible=False), | |
gr.update(value=None, visible=False), | |
gr.update(value=None, visible=False), | |
gr.update(value=None, visible=False), | |
gr.update(value=None, visible=False), | |
gr.update(value=None, visible=False), | |
gr.update(value=None, visible=False) | |
) | |
msg_textbox.submit( | |
fn=process_analysis_flow, | |
inputs=[msg_textbox, chatbot, state], | |
outputs=[chatbot, state, plot_display, report_display, plan_display, quantitative_url_display, | |
quantitative_data_display, qualitative_url_display, qualitative_data_display, token_summary_box, suggestions_display], | |
).then( | |
lambda: gr.update(value=""), | |
None, | |
[msg_textbox], | |
queue=False, | |
) | |
clear_button.click( | |
fn=reset_all, | |
inputs=None, | |
outputs=[chatbot, state, msg_textbox, plot_display, report_display, plan_display, quantitative_url_display, | |
quantitative_data_display, qualitative_url_display, qualitative_data_display, token_summary_box, suggestions_display], | |
queue=False | |
) | |
return demo |