multi-doc / app.py
datafreak's picture
Update app.py
dcd1c9e verified
import gradio as gr
import os
from pathlib import Path
from pinecone import Pinecone
from typing import List, Tuple
import tempfile
import shutil
from dotenv import load_dotenv
import time
from datetime import datetime
import json
# Load environment variables
load_dotenv()
# Validate required environment variables
required_env_vars = ["PINECONE_API_KEY"]
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
# Initialize Pinecone
pinecone_api_key = os.getenv("PINECONE_API_KEY")
pc = Pinecone(api_key=pinecone_api_key)
# Create uploads directory
UPLOAD_FOLDER = "uploads"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
def parse_pinecone_timestamp(iso_string):
"""
Parses an ISO 8601 string from Pinecone, handling nanosecond precision.
Args:
iso_string (str): The ISO-formatted timestamp string.
Returns:
datetime: The parsed datetime object.
"""
if not isinstance(iso_string, str) or not iso_string:
return datetime.min # Return a very old date for sorting purposes
# Handle ISO strings ending with 'Z' (convert to +00:00)
if iso_string.endswith('Z'):
iso_string = iso_string[:-1] + '+00:00'
# Find the decimal point for fractional seconds
decimal_point = iso_string.find('.')
if decimal_point != -1:
# Find where the timezone info starts (+ or -)
tz_start = max(iso_string.rfind('+'), iso_string.rfind('-'))
if tz_start > decimal_point:
# Extract the fractional seconds part
fractional_part = iso_string[decimal_point+1:tz_start]
# Truncate to 6 digits (microseconds) if longer
if len(fractional_part) > 6:
fractional_part = fractional_part[:6]
# Reconstruct the ISO string with truncated fractional seconds
iso_string = iso_string[:decimal_point+1] + fractional_part + iso_string[tz_start:]
return datetime.fromisoformat(iso_string)
def get_all_files():
"""Get all files from Pinecone Assistant and sort them"""
try:
assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes")
assistant = pc.assistant.Assistant(assistant_name=assistant_name)
# List all files in the assistant
files_response = assistant.list_files()
# Check if files_response is directly a list or has a files attribute
if hasattr(files_response, 'files'):
files_list = files_response.files
else:
files_list = files_response
if not files_list:
return []
# Sort files by creation time (most recent first) using robust timestamp parsing
sorted_files = sorted(
files_list,
key=lambda x: parse_pinecone_timestamp(getattr(x, 'created_on', '')),
reverse=True
)
return sorted_files
except Exception as e:
return []
def get_file_choices():
"""Get file choices for the dropdown - returns list of (title, file_id) tuples"""
try:
all_files = get_all_files()
if not all_files:
return []
choices = []
for file_obj in all_files:
file_name = getattr(file_obj, 'name', 'Unknown File')
file_id = getattr(file_obj, 'id', 'unknown')
created_on = getattr(file_obj, 'created_on', '')
# Format timestamp for display
try:
if created_on:
created_formatted = parse_pinecone_timestamp(created_on).strftime('%Y-%m-%d %H:%M')
display_name = f"{file_name} (uploaded: {created_formatted})"
else:
display_name = file_name
except:
display_name = file_name
choices.append((display_name, file_id))
return choices
except Exception as e:
return []
def refresh_delete_dropdown():
"""Refresh the dropdown with current files"""
choices = get_file_choices()
if not choices:
return gr.update(choices=[], value=None, interactive=False)
return gr.update(choices=choices, value=None, interactive=True)
def delete_selected_files(selected_file_ids, progress=gr.Progress()):
"""Delete multiple selected files by their IDs"""
if not selected_file_ids:
return "❌ **Error:** No files selected for deletion", ""
try:
progress(0.1, desc="πŸ”§ Initializing Pinecone Assistant...")
assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes")
assistant = pc.assistant.Assistant(assistant_name=assistant_name)
# Get current files to map IDs to names
all_files = get_all_files()
file_id_to_name = {getattr(f, 'id', ''): getattr(f, 'name', 'Unknown') for f in all_files}
total_files = len(selected_file_ids)
deleted_files = []
failed_files = []
progress(0.2, desc=f"πŸ—‘οΈ Starting deletion of {total_files} files...")
for i, file_id in enumerate(selected_file_ids):
try:
file_name = file_id_to_name.get(file_id, f"File ID: {file_id}")
progress((0.2 + (i / total_files) * 0.7), desc=f"πŸ—‘οΈ Deleting: {file_name}...")
# Delete the file
response = assistant.delete_file(file_id=file_id)
deleted_files.append({
'name': file_name,
'id': file_id,
'status': 'success'
})
time.sleep(0.2) # Small delay between deletions
except Exception as delete_error:
failed_files.append({
'name': file_id_to_name.get(file_id, f"File ID: {file_id}"),
'id': file_id,
'error': str(delete_error)
})
progress(1.0, desc="βœ… Deletion process completed!")
# Format results
success_count = len(deleted_files)
error_count = len(failed_files)
status_message = f"πŸ“Š **Deletion Complete**\n\n"
status_message += f"βœ… **Successfully deleted:** {success_count} files\n"
status_message += f"❌ **Failed to delete:** {error_count} files\n"
status_message += f"πŸ“ **Total processed:** {total_files} files\n\n"
# Detailed results
detailed_results = "## πŸ—‘οΈ **Deletion Results**\n\n"
if deleted_files:
detailed_results += "### βœ… **Successfully Deleted Files:**\n"
for file_info in deleted_files:
detailed_results += f"- **{file_info['name']}** (`{file_info['id']}`)\n"
detailed_results += "\n"
if failed_files:
detailed_results += "### ❌ **Failed Deletions:**\n"
for file_info in failed_files:
detailed_results += f"- **{file_info['name']}** (`{file_info['id']}`)\n"
detailed_results += f" - Error: {file_info['error']}\n"
detailed_results += "\n"
return status_message, detailed_results
except Exception as e:
error_msg = f"❌ **Critical Error during deletion:** {str(e)}"
return error_msg, ""
def list_uploaded_files_paginated(page_num=0, progress=gr.Progress()):
"""List files with pagination - 100 files per page"""
try:
progress(0.1, desc="πŸ” Getting files...")
# Get all files
all_files = get_all_files()
if not all_files:
progress(1.0, desc="βœ… Complete - No files found")
return "πŸ“‹ **No files found in Pinecone Assistant**", "", "No files available", gr.update(visible=False), gr.update(visible=False)
progress(0.5, desc="πŸ“Š Processing page...")
# Pagination settings
files_per_page = 100
start_idx = page_num * files_per_page
end_idx = start_idx + files_per_page
# Get files for current page
page_files = all_files[start_idx:end_idx]
total_files = len(all_files)
total_pages = (total_files + files_per_page - 1) // files_per_page
# Create summary
summary = f"πŸ“Š **Files Summary (Page {page_num + 1} of {total_pages})**\n\n"
summary += f"πŸ“ **Total files:** {total_files}\n"
summary += f"πŸ“‹ **Showing:** {start_idx + 1}-{min(end_idx, total_files)} of {total_files}\n\n"
# Create file list - ONLY TITLES, VERTICALLY
detailed_info = f"## πŸ“‹ **Latest Uploaded Files - Page {page_num + 1}**\n\n"
progress(0.8, desc="πŸ“ Formatting file titles...")
for i, file_obj in enumerate(page_files, 1):
try:
# Get only the file name/title
file_name = getattr(file_obj, 'name', 'Unknown File')
file_id = getattr(file_obj, 'id', 'Unknown ID')
created_on = getattr(file_obj, 'created_on', '')
global_index = start_idx + i
# Format timestamp
try:
if created_on:
created_formatted = parse_pinecone_timestamp(created_on).strftime('%Y-%m-%d %H:%M')
else:
created_formatted = 'Unknown'
except:
created_formatted = 'Unknown'
# Display file info
detailed_info += f"{global_index}. **{file_name}**\n"
detailed_info += f" πŸ“… Uploaded: {created_formatted} | πŸ†” ID: `{file_id}`\n\n"
except Exception as file_error:
detailed_info += f"{start_idx + i}. ❌ **Error loading file**\n\n"
# Pagination info
pagination_info = f"πŸ“„ Page {page_num + 1} of {total_pages} | Total: {total_files} files"
# Show/hide next/prev buttons
show_prev = page_num > 0
show_next = page_num < total_pages - 1
progress(1.0, desc="βœ… Page loaded successfully!")
return summary, detailed_info, pagination_info, gr.update(visible=show_prev), gr.update(visible=show_next)
except Exception as e:
error_msg = f"❌ **Error retrieving file list:** {str(e)}"
return error_msg, "", "Error", gr.update(visible=False), gr.update(visible=False)
def load_next_page(current_page_info):
"""Load next page of files"""
try:
# Extract current page number from pagination info
current_page = int(current_page_info.split("Page ")[1].split(" of")[0]) - 1
return list_uploaded_files_paginated(current_page + 1)
except:
return list_uploaded_files_paginated(0)
def load_prev_page(current_page_info):
"""Load previous page of files"""
try:
# Extract current page number from pagination info
current_page = int(current_page_info.split("Page ")[1].split(" of")[0]) - 1
return list_uploaded_files_paginated(max(0, current_page - 1))
except:
return list_uploaded_files_paginated(0)
def refresh_file_list():
"""Refresh the file list"""
return "πŸ”„ **Refreshing file list... Please wait**"
def process_files_with_progress(files, *metadata_inputs, progress=gr.Progress()):
"""Process multiple files with individual metadata and show progress"""
if not files:
return "❌ Error: No files selected", ""
if len(files) > 10:
return "❌ Error: Maximum 10 files allowed at a time", ""
try:
results = []
errors = []
total_files = len(files)
# Initialize Pinecone Assistant
progress(0, desc="πŸ”§ Initializing Pinecone Assistant...")
time.sleep(0.5) # Small delay to show the progress
assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes")
assistant = pc.assistant.Assistant(assistant_name=assistant_name)
# Process each file with its individual metadata
for i, file_path in enumerate(files):
try:
filename = os.path.basename(file_path)
progress((i / total_files), desc=f"πŸ“„ Processing {filename}... ({i+1}/{total_files})")
# Get metadata for this specific file (3 fields per file: sections, keywords, description)
sections_idx = i * 3
keywords_idx = i * 3 + 1
description_idx = i * 3 + 2
if sections_idx < len(metadata_inputs):
sections = metadata_inputs[sections_idx] or ""
keywords = metadata_inputs[keywords_idx] or ""
description = metadata_inputs[description_idx] or ""
else:
sections = keywords = description = ""
# Skip if no metadata provided for this file
if not sections.strip() and not keywords.strip() and not description.strip():
errors.append({
"filename": filename,
"error": "❌ Error: No metadata provided"
})
continue
# Prepare metadata for this file
progress((i / total_files), desc=f"🏷️ Preparing metadata for {filename}...")
metadata = {
"sections": [s.strip() for s in sections.split(",") if s.strip()],
"keywords": [k.strip() for k in keywords.split(",") if k.strip()],
"description": description.strip()
}
# Copy to uploads directory
progress((i / total_files), desc=f"πŸ“ Copying {filename} to uploads...")
destination_path = os.path.join(UPLOAD_FOLDER, filename)
shutil.copy2(file_path, destination_path)
# Upload to Pinecone Assistant
progress((i / total_files), desc=f"☁️ Uploading {filename} to Pinecone...")
response = assistant.upload_file(
file_path=destination_path,
metadata=metadata,
timeout=None
)
results.append({
"filename": filename,
"status": "βœ… Success",
"metadata": metadata,
"response": str(response)
})
except Exception as file_error:
errors.append({
"filename": os.path.basename(file_path),
"error": f"❌ Error: {str(file_error)}"
})
# Final progress update
progress(1.0, desc="βœ… Processing complete!")
time.sleep(0.5)
# Format results for display
success_count = len(results)
error_count = len(errors)
status_message = f"πŸ“Š **Processing Complete**\n\n"
status_message += f"βœ… **Successful uploads:** {success_count}\n"
status_message += f"❌ **Failed uploads:** {error_count}\n"
status_message += f"πŸ“ **Total files processed:** {len(files)}\n\n"
# Detailed results
detailed_results = "## πŸ“‹ **Detailed Results**\n\n"
if results:
detailed_results += "### βœ… **Successful Uploads:**\n"
for result in results:
detailed_results += f"- **{result['filename']}**\n"
detailed_results += f" - Sections: {', '.join(result['metadata']['sections'])}\n"
detailed_results += f" - Keywords: {', '.join(result['metadata']['keywords'])}\n"
detailed_results += f" - Description: {result['metadata']['description']}\n\n"
if errors:
detailed_results += "### ❌ **Failed Uploads:**\n"
for error in errors:
detailed_results += f"- **{error['filename']}** - {error['error']}\n"
return status_message, detailed_results, "βœ… **Processing completed successfully!**"
except Exception as e:
error_msg = f"❌ **Critical Error:** {str(e)}"
return error_msg, "", "❌ **Processing failed with error**"
def update_metadata_fields(files):
"""Update metadata fields based on uploaded files"""
if not files:
return [gr.update(visible=False)] * 30 # Hide all fields
if len(files) > 10:
# Show error and hide all fields
return [gr.update(visible=False)] * 30
updates = []
for i in range(len(files)):
if i < len(files):
filename = os.path.basename(files[i])
# Show 3 fields per file (sections, keywords, description)
updates.extend([
gr.update(visible=True, label=f"πŸ“‘ Sections for {filename}", placeholder="e.g., Introduction, Financial Data, Compliance"),
gr.update(visible=True, label=f"πŸ” Keywords for {filename}", placeholder="e.g., GST, tax, compliance, revenue"),
gr.update(visible=True, label=f"πŸ“ Description for {filename}", placeholder="Brief description of this document")
])
# Hide remaining fields
while len(updates) < 30:
updates.append(gr.update(visible=False))
return updates[:30]
def clear_form():
"""Clear all form fields"""
return [None] + [""] * 30 + ["", "", "🟒 **Ready to process documents**"]
def clear_delete_form():
"""Clear delete form"""
return gr.update(value=[]), "", ""
def start_processing():
"""Show processing started status"""
return "πŸ”„ **Processing documents... Please wait**"
def finish_processing():
"""Show processing finished status"""
return "βœ… **Processing completed successfully!**"
# Create Gradio interface
with gr.Blocks(
title="πŸ“„ Tax Document Ingestion System",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 1400px !important;
margin: auto;
}
.upload-container {
border: 2px dashed #4CAF50;
border-radius: 10px;
padding: 20px;
text-align: center;
background-color: #f8f9fa;
}
.delete-container {
border: 2px dashed #f44336;
border-radius: 10px;
padding: 20px;
background-color: #ffebee;
}
.tab-nav {
margin-bottom: 20px;
}
"""
) as app:
gr.Markdown(
"""
# πŸ“„ Tax Document Ingestion System
Upload, manage, and delete documents in the Pinecone Assistant for GST Minutes processing.
## πŸš€ Features:
- βœ… **Multiple file upload** - Select and upload multiple documents at once
- 🏷️ **Metadata tagging** - Add sections, keywords, and descriptions
- πŸ”„ **Batch processing** - All files processed with individual metadata
- πŸ—‘οΈ **File deletion** - Delete multiple files by selecting from dropdown
- πŸ“Š **File management** - View uploaded files with timestamps and metadata
- πŸ“‹ **Detailed reporting** - See success/failure status for each operation
---
"""
)
# Create tabs for different functionalities
with gr.Tabs() as tabs:
# Tab 1: Upload Documents
with gr.TabItem("πŸ“€ Upload Documents", id="upload_tab"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ“ **File Upload**")
files_input = gr.File(
label="Select Documents (Max 10 files)",
file_count="multiple",
file_types=[".pdf", ".doc", ".docx", ".txt"],
elem_classes=["upload-container"]
)
with gr.Column(scale=1):
gr.Markdown("### 🏷️ **Document Metadata (Individual for Each File)**")
gr.Markdown("*Upload files first, then metadata fields will appear for each document*")
# Dynamic metadata fields container
with gr.Column() as metadata_container:
# Create 30 text fields (enough for 10 files with 3 fields each)
metadata_fields = []
for i in range(30):
field = gr.Textbox(
label=f"Field {i}",
placeholder="",
visible=False,
lines=2
)
metadata_fields.append(field)
with gr.Row():
with gr.Column(scale=1):
upload_btn = gr.Button(
"πŸš€ Upload Documents to Pinecone Assistant",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
clear_btn = gr.Button(
"πŸ—‘οΈ Clear Form",
variant="secondary",
size="lg"
)
# Processing status indicator
with gr.Row():
processing_status = gr.Markdown(
value="🟒 **Ready to process documents**",
visible=True
)
gr.Markdown("---")
# Results section
with gr.Row():
with gr.Column():
status_output = gr.Markdown(
label="πŸ“Š Upload Status",
value="*Ready to upload documents...*"
)
with gr.Row():
with gr.Column():
results_output = gr.Markdown(
label="πŸ“‹ Detailed Results",
value="",
max_height=400 # Add height limit for scrolling
)
# Tab 2: Delete Documents
with gr.TabItem("πŸ—‘οΈ Delete Documents", id="delete_tab"):
gr.Markdown("### πŸ—‘οΈ **Delete Multiple Documents**")
gr.Markdown("Select multiple files from the dropdown to delete them from the Pinecone Assistant.")
with gr.Row():
with gr.Column(scale=2):
file_dropdown = gr.Dropdown(
label="πŸ“‹ Select Files to Delete (Multiple Selection)",
choices=[],
multiselect=True,
interactive=False,
elem_classes=["delete-container"]
)
with gr.Column(scale=1):
refresh_dropdown_btn = gr.Button(
"πŸ”„ Refresh File List",
variant="secondary",
size="lg"
)
with gr.Row():
with gr.Column(scale=1):
delete_btn = gr.Button(
"πŸ—‘οΈ Delete Selected Files",
variant="stop",
size="lg"
)
with gr.Column(scale=1):
clear_delete_btn = gr.Button(
"β†Ί Clear Selection",
variant="secondary",
size="lg"
)
gr.Markdown("---")
# Delete results section
with gr.Row():
with gr.Column():
delete_status_output = gr.Markdown(
label="πŸ“Š Deletion Status",
value="*Select files to delete...*"
)
with gr.Row():
with gr.Column():
delete_results_output = gr.Markdown(
label="πŸ—‘οΈ Deletion Results",
value="",
max_height=400
)
# Tab 3: View Uploaded Files
with gr.TabItem("πŸ“‹ View Uploaded Files", id="view_tab"):
gr.Markdown("### πŸ“‹ **Uploaded Files Management**")
gr.Markdown("View all files currently uploaded to the Pinecone Assistant with their metadata and timestamps.")
with gr.Row():
refresh_btn = gr.Button(
"πŸ”„ Fetch Files",
variant="primary",
size="lg"
)
# File list status
with gr.Row():
file_list_status = gr.Markdown(
value="🟑 **Click 'Fetch Files' to load uploaded files**",
visible=True
)
gr.Markdown("---")
# Pagination controls
with gr.Row():
prev_btn = gr.Button(
"⬅️ Previous 100",
variant="secondary",
visible=False
)
pagination_info = gr.Markdown(
value="πŸ“„ Page 1 of 1 | Total: 0 files",
elem_classes=["pagination-info"]
)
next_btn = gr.Button(
"Next 100 ➑️",
variant="secondary",
visible=False
)
# File list results
with gr.Row():
with gr.Column(scale=1):
file_summary = gr.Markdown(
label="πŸ“Š Files Summary",
value="*Click refresh to load file summary...*"
)
with gr.Row():
with gr.Column():
file_details = gr.Markdown(
label="πŸ“‹ File Details",
value="*Click refresh to load file details...*",
max_height=600 # Add height limit for scrolling
)
# Event handlers for Upload tab
# Update metadata fields when files are uploaded
files_input.change(
fn=update_metadata_fields,
inputs=[files_input],
outputs=metadata_fields
)
# Show processing status when upload starts
upload_btn.click(
fn=start_processing,
outputs=[processing_status]
).then(
fn=process_files_with_progress,
inputs=[files_input] + metadata_fields,
outputs=[status_output, results_output, processing_status]
)
clear_btn.click(
fn=clear_form,
outputs=[files_input] + metadata_fields + [status_output, results_output, processing_status]
)
# Event handlers for Delete tab
# Refresh dropdown with current files
refresh_dropdown_btn.click(
fn=refresh_delete_dropdown,
outputs=[file_dropdown]
)
# Delete selected files
delete_btn.click(
fn=delete_selected_files,
inputs=[file_dropdown],
outputs=[delete_status_output, delete_results_output]
)
# Clear delete form
clear_delete_btn.click(
fn=clear_delete_form,
outputs=[file_dropdown, delete_status_output, delete_results_output]
)
# Event handlers for View Files tab
# Fetch files - load page 1
refresh_btn.click(
fn=refresh_file_list,
outputs=[file_list_status]
).then(
fn=list_uploaded_files_paginated,
inputs=[],
outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn]
)
# Next page button
next_btn.click(
fn=load_next_page,
inputs=[pagination_info],
outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn]
)
# Previous page button
prev_btn.click(
fn=load_prev_page,
inputs=[pagination_info],
outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn]
)
# Footer
gr.Markdown(
"""
---
### πŸ’‘ **Usage Tips:**
**Upload Documents:**
- Select up to 10 PDF, DOC, DOCX, or TXT files at once
- Upload files first, then fill individual metadata for each document
- Each file gets its own sections, keywords, and description
- Check the results section for upload status
**Delete Documents:**
- Click 'Refresh File List' to load current files in dropdown
- Select multiple files using the dropdown (supports multi-select)
- Click 'Delete Selected Files' to remove them permanently
- View deletion results for success/failure status
**View Uploaded Files:**
- Click 'Fetch Files' to see all uploaded files
- View file details including upload timestamps and metadata
- Files are sorted by most recent first
- Use pagination to navigate through large file lists
### ⚠️ **Important Notes:**
- File deletion is **permanent** and cannot be undone
- Always verify your selection before deleting files
- The system maps file titles to IDs internally for deletion
### πŸ“ž **Support:**
For issues or questions, contact the development team.
"""
)
if __name__ == "__main__":
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=True,
show_error=True
)