Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import os | |
import matplotlib.pyplot as plt | |
import io | |
from PIL import Image | |
import base64 | |
import re | |
import numpy as np | |
from llama_index.llms.groq import Groq | |
from llama_index.core.query_pipeline import ( | |
QueryPipeline as QP, | |
Link, | |
InputComponent, | |
) | |
from llama_index.experimental.query_engine.pandas import ( | |
PandasInstructionParser, | |
) | |
from llama_index.core import PromptTemplate | |
# Example datasets | |
EXAMPLE_DATASETS = { | |
"Hotel Bookings": "hotel_bookings.csv", | |
} | |
def load_dataframe(file_path): | |
try: | |
if isinstance(file_path, str): | |
# If it's a URL or file path | |
df = pd.read_csv(file_path) | |
else: | |
# If it's an uploaded file | |
df = pd.read_csv(file_path.name) | |
return df, f"Successfully loaded dataset with {df.shape[0]} rows and {df.shape[1]} columns." | |
except Exception as e: | |
return None, f"Error loading dataset: {str(e)}" | |
def create_query_pipeline(df, api_key, model="llama-3.3-70b-versatile"): | |
# Create Groq LLM with the provided API key | |
try: | |
llm = Groq(model=model, api_key=api_key) | |
except Exception as e: | |
return None, f"Error initializing Groq LLM: {str(e)}" | |
instruction_str = ( | |
"1. Convert the query to executable Python code using Pandas.\n" | |
"2. The final line of code should be a Python expression that can be called with the `eval()` function.\n" | |
"3. The code should represent a solution to the query.\n" | |
"4. PRINT ONLY THE EXPRESSION.\n" | |
"5. Do not quote the expression.\n" | |
) | |
pandas_prompt_str = ( | |
"You are working with a pandas dataframe in Python.\n" | |
"The name of the dataframe is `df`.\n" | |
"This is the result of `print(df.head())`:\n" | |
"{df_str}\n\n" | |
"Follow these instructions:\n" | |
"{instruction_str}\n" | |
"Query: {query_str}\n\n" | |
"Expression:" | |
) | |
response_synthesis_prompt_str = ( | |
"Given an input question, synthesize a response from the query results.\n" | |
"Query: {query_str}\n\n" | |
"Pandas Instructions (optional):\n{pandas_instructions}\n\n" | |
"Pandas Output: {pandas_output}\n\n" | |
"Response: " | |
) | |
pandas_prompt = PromptTemplate(pandas_prompt_str).partial_format( | |
instruction_str=instruction_str, df_str=df.head(5) | |
) | |
pandas_output_parser = PandasInstructionParser(df) | |
response_synthesis_prompt = PromptTemplate(response_synthesis_prompt_str) | |
qp = QP( | |
modules={ | |
"input": InputComponent(), | |
"pandas_prompt": pandas_prompt, | |
"llm1": llm, | |
"pandas_output_parser": pandas_output_parser, | |
"response_synthesis_prompt": response_synthesis_prompt, | |
"llm2": llm, | |
}, | |
verbose=True, | |
) | |
qp.add_chain(["input", "pandas_prompt", "llm1", "pandas_output_parser"]) | |
qp.add_links( | |
[ | |
Link("input", "response_synthesis_prompt", dest_key="query_str"), | |
Link( | |
"llm1", "response_synthesis_prompt", dest_key="pandas_instructions" | |
), | |
Link( | |
"pandas_output_parser", | |
"response_synthesis_prompt", | |
dest_key="pandas_output", | |
), | |
] | |
) | |
qp.add_link("response_synthesis_prompt", "llm2") | |
return qp, "Query pipeline created successfully!" | |
def enhance_visualization(df, query): | |
""" | |
Create an enhanced visualization based on the dataframe and query | |
This function attempts to create a better visualization with proper labels and formatting | |
""" | |
try: | |
# Close any existing figures to avoid conflicts | |
plt.close('all') | |
# Create a new figure with larger size for better quality | |
plt.figure(figsize=(12, 8), dpi=100) | |
# Time-related visualization handling (for bookings over time, trends, etc.) | |
if any(term in query.lower() for term in ['trend', 'time', 'year', 'month', 'booking', 'reservation']): | |
# Try to detect date columns | |
date_cols = [col for col in df.columns if any(term in col.lower() for term in | |
['date', 'year', 'month', 'time', 'arrival', 'reservation'])] | |
if 'arrival_date_year' in df.columns and 'arrival_date_month' in df.columns: | |
try: | |
# Create a year-month based visualization | |
# Convert month names to numbers for sorting | |
month_order = { | |
'January': 1, 'February': 2, 'March': 3, 'April': 4, 'May': 5, 'June': 6, | |
'July': 7, 'August': 8, 'September': 9, 'October': 10, 'November': 11, 'December': 12 | |
} | |
# Count bookings by year and month | |
booking_counts = df.groupby(['arrival_date_year', 'arrival_date_month']).size().reset_index(name='count') | |
# Add month order for sorting | |
booking_counts['month_order'] = booking_counts['arrival_date_month'].map(month_order) | |
booking_counts = booking_counts.sort_values(['arrival_date_year', 'month_order']) | |
# Create pivot table for visualization | |
pivot_data = booking_counts.pivot(index='arrival_date_year', columns='arrival_date_month', values='count') | |
# Reorder columns by month | |
months = sorted(booking_counts['arrival_date_month'].unique(), key=lambda x: month_order.get(x, 13)) | |
if len(months) > 0: # Check if the months list is not empty | |
pivot_data = pivot_data[months] | |
# Plot the data | |
ax = pivot_data.plot(kind='bar', figsize=(14, 8), width=0.8) | |
# Enhance the plot | |
plt.title('Bookings by Month and Year', fontsize=16) | |
plt.xlabel('Year', fontsize=14) | |
plt.ylabel('Number of Bookings', fontsize=14) | |
plt.legend(title='Month', fontsize=12) | |
plt.grid(axis='y', linestyle='--', alpha=0.7) | |
plt.tight_layout() | |
# Add value labels on top of bars | |
for container in ax.containers: | |
ax.bar_label(container, fontsize=9, fmt='%d') | |
else: | |
return None # No months data found | |
except Exception as e: | |
print(f"Error in time visualization: {str(e)}") | |
return None | |
elif len(date_cols) > 0 and any(col in df.columns for col in date_cols): | |
try: | |
# Handle other time-based visualizations | |
date_col = [col for col in date_cols if col in df.columns][0] | |
df_count = df.groupby(date_col).size().reset_index(name='count') | |
plt.bar(df_count[date_col], df_count['count'], color='steelblue') | |
plt.title(f'Distribution by {date_col}', fontsize=16) | |
plt.xlabel(date_col, fontsize=14) | |
plt.ylabel('Count', fontsize=14) | |
plt.grid(axis='y', linestyle='--', alpha=0.7) | |
plt.xticks(rotation=45) | |
plt.tight_layout() | |
except Exception as e: | |
print(f"Error in date column visualization: {str(e)}") | |
return None | |
else: | |
# Default time visualization if we can't find specific columns | |
return None # Let matplotlib handle it | |
# Distribution visualization (for questions about distributions) | |
elif any(term in query.lower() for term in ['distribution', 'histogram', 'spread']): | |
try: | |
numeric_cols = df.select_dtypes(include=['number']).columns.tolist() | |
if len(numeric_cols) > 0: | |
# Choose a relevant column based on query or the first numeric column | |
target_col = None | |
for col in numeric_cols: | |
if col.lower() in query.lower(): | |
target_col = col | |
break | |
if target_col is None and numeric_cols: | |
target_col = numeric_cols[0] | |
if target_col: | |
# Create histogram | |
plt.hist(df[target_col].dropna(), bins=30, color='steelblue', edgecolor='black', alpha=0.7) | |
plt.title(f'Distribution of {target_col}', fontsize=16) | |
plt.xlabel(target_col, fontsize=14) | |
plt.ylabel('Frequency', fontsize=14) | |
plt.grid(axis='y', linestyle='--', alpha=0.7) | |
plt.tight_layout() | |
else: | |
return None # Let matplotlib handle it | |
else: | |
return None # Let matplotlib handle it | |
except Exception as e: | |
print(f"Error in distribution visualization: {str(e)}") | |
return None | |
# Comparison visualization (for questions comparing categories) | |
elif any(term in query.lower() for term in ['compare', 'comparison', 'versus', 'vs', 'most', 'least']): | |
try: | |
categorical_cols = df.select_dtypes(include=['object']).columns.tolist() | |
if len(categorical_cols) > 0: | |
# Choose a relevant column based on query or the first categorical column | |
target_col = None | |
for col in categorical_cols: | |
if col.lower() in query.lower(): | |
target_col = col | |
break | |
if target_col is None and categorical_cols: | |
target_col = categorical_cols[0] | |
if target_col: | |
# Get top categories by count | |
top_categories = df[target_col].value_counts().nlargest(10) | |
# Create bar chart | |
plt.bar(top_categories.index, top_categories.values, color='steelblue') | |
plt.title(f'Top Categories by {target_col}', fontsize=16) | |
plt.xlabel(target_col, fontsize=14) | |
plt.ylabel('Count', fontsize=14) | |
plt.grid(axis='y', linestyle='--', alpha=0.7) | |
plt.xticks(rotation=45, ha='right') | |
plt.tight_layout() | |
else: | |
return None # Let matplotlib handle it | |
else: | |
return None # Let matplotlib handle it | |
except Exception as e: | |
print(f"Error in comparison visualization: {str(e)}") | |
return None | |
else: | |
# For other types of queries, let the default matplotlib handle it | |
return None | |
# Save figure to buffer | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png') | |
buf.seek(0) | |
# Create an image from the buffer | |
img = Image.open(buf) | |
plt.close('all') # Close the figure to free memory | |
return img | |
except Exception as e: | |
print(f"Error in enhance_visualization: {str(e)}") | |
plt.close('all') # Make sure to close any figures in case of error | |
return None | |
def process_query(query, api_key, df, model_choice): | |
if df is None: | |
return "Please load a dataset first.", None | |
if not api_key: | |
return "Please provide your Groq API key.", None | |
try: | |
# First, try to create an enhanced visualization based on the query | |
enhanced_img = enhance_visualization(df, query) | |
# Create and run the query pipeline | |
pipeline, message = create_query_pipeline(df, api_key, model_choice) | |
if pipeline is None: | |
return message, None | |
# Run the query | |
response = pipeline.run(query_str=query) | |
# If we already have an enhanced visualization, use it | |
if enhanced_img is not None: | |
return response.message.content, enhanced_img | |
# Otherwise check if any matplotlib figures were created by the query | |
figures = plt.get_fignums() | |
if figures: | |
try: | |
# Improve any existing figure if possible | |
fig = plt.figure(figures[0]) | |
axes = fig.axes | |
if axes and len(axes) > 0: # Make sure axes list isn't empty | |
ax = axes[0] | |
# Add grid lines | |
ax.grid(axis='y', linestyle='--', alpha=0.7) | |
# Enhance title and labels if they exist | |
if ax.get_title(): | |
ax.set_title(ax.get_title(), fontsize=16) | |
if ax.get_xlabel(): | |
ax.set_xlabel(ax.get_xlabel(), fontsize=14) | |
if ax.get_ylabel(): | |
ax.set_ylabel(ax.get_ylabel(), fontsize=14) | |
# Handle legend if it exists | |
if ax.get_legend(): | |
ax.legend(fontsize=12) | |
fig.tight_layout() | |
# Save the figure to a bytes buffer | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png', dpi=100) | |
buf.seek(0) | |
# Create an image from the buffer | |
img = Image.open(buf) | |
plt.close('all') # Close the figure to free memory | |
return response.message.content, img | |
except Exception as e: | |
plt.close('all') | |
# Log the error but continue without crashing | |
print(f"Visualization error: {str(e)}") | |
return response.message.content, None | |
else: | |
# No visualization was generated | |
return response.message.content, None | |
except Exception as e: | |
plt.close('all') # Make sure to close any figures in case of error | |
return f"Error processing query: {str(e)}", None | |
def handle_example_selection(example_name): | |
if example_name in EXAMPLE_DATASETS: | |
file_path = EXAMPLE_DATASETS[example_name] | |
df, message = load_dataframe(file_path) | |
return df, message, gr.update(value=f"Dataset preview:\n{df.head().to_string()}") | |
return None, "Please select a valid example dataset.", gr.update(value="") | |
def handle_file_upload(file): | |
if file is not None: | |
df, message = load_dataframe(file) | |
return df, message, gr.update(value=f"Dataset preview:\n{df.head().to_string()}") | |
return None, "No file uploaded.", gr.update(value="") | |
# Create Gradio interface | |
with gr.Blocks(title="Pandas Data Analysis with Groq LLM") as app: | |
gr.Markdown("# Pandas Data Analysis with Groq LLM") | |
gr.Markdown("Upload your CSV data or choose an example dataset, then ask questions about it.") | |
# State variables | |
df_state = gr.State(value=None) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
with gr.Group(): | |
gr.Markdown("### Data Selection") | |
with gr.Tab("Upload Data"): | |
file_input = gr.File(label="Upload CSV File", file_types=[".csv"]) | |
upload_button = gr.Button("Load Uploaded Data") | |
with gr.Tab("Example Datasets"): | |
example_dropdown = gr.Dropdown( | |
choices=list(EXAMPLE_DATASETS.keys()), | |
label="Select Example Dataset" | |
) | |
example_button = gr.Button("Load Example Dataset") | |
data_status = gr.Textbox(label="Data Loading Status", interactive=False) | |
with gr.Group(): | |
gr.Markdown("### Groq API Configuration") | |
api_key = gr.Textbox( | |
label="Enter your Groq API Key", | |
placeholder="gsk_...", | |
type="password" | |
) | |
model_choice = gr.Dropdown( | |
choices=["llama-3.3-70b-versatile", "mixtral-8x7b-32768", "gemma-7b-it"], | |
value="llama-3.3-70b-versatile", | |
label="Select Groq Model" | |
) | |
with gr.Column(scale=1): | |
data_preview = gr.Textbox(label="Dataset Preview", interactive=False, lines=10) | |
query_input = gr.Textbox( | |
label="Ask a question about your data", | |
placeholder="e.g., What is the trend of monthly bookings over time?", | |
lines=2 | |
) | |
query_button = gr.Button("Submit Query") | |
# Output display with tabs for text and visualization | |
with gr.Tabs(): | |
with gr.TabItem("Text Response"): | |
response_output = gr.Textbox(label="Response", interactive=False, lines=10) | |
with gr.TabItem("Visualization"): | |
image_output = gr.Image(label="Data Visualization", interactive=False) | |
# Handle events | |
upload_button.click( | |
handle_file_upload, | |
inputs=[file_input], | |
outputs=[df_state, data_status, data_preview] | |
) | |
example_button.click( | |
handle_example_selection, | |
inputs=[example_dropdown], | |
outputs=[df_state, data_status, data_preview] | |
) | |
query_button.click( | |
process_query, | |
inputs=[query_input, api_key, df_state, model_choice], | |
outputs=[response_output, image_output] | |
) | |
gr.Markdown(""" | |
### Instructions | |
1. Upload your CSV file or select an example dataset | |
2. Enter your Groq API key (get one at [https://console.groq.com](https://console.groq.com)) | |
3. Ask questions about your data in natural language | |
4. Get AI-powered insights and visualizations based on your data | |
### Example Questions | |
- What is the trend of monthly bookings over time? | |
- What's the distribution of stay duration? | |
- Which country has the most bookings? | |
- Is there a correlation between lead time and cancellations? | |
- Show me bookings by month and year | |
""") | |
# Launch the app | |
if __name__ == "__main__": | |
app.launch() |