import os
import datetime
import requests
import re
import pandas as pd
import gradio as gr
import threading
import uuid
import queue
import time
import fitz # PyMuPDF for reading PDF files
from transformers import AutoTokenizer
from mistralai import Mistral
from huggingface_hub import InferenceClient
# ------------------------------
# Helper functions and globals
# ------------------------------
sheet_data = None
file_name = None
def debug_print(message: str):
print(f"[{datetime.datetime.now().isoformat()}] {message}", flush=True)
def initialize_tokenizer():
try:
return AutoTokenizer.from_pretrained("gpt2")
except Exception as e:
debug_print("Failed to initialize tokenizer: " + str(e))
return None
global_tokenizer = initialize_tokenizer()
def count_tokens(text: str) -> int:
if global_tokenizer:
try:
return len(global_tokenizer.encode(text))
except Exception:
return len(text.split())
return len(text.split())
def generate_response(prompt: str, model_name: str, sheet_data: str) -> str:
full_prompt = f"{prompt}\n\nSheet Data:\n{sheet_data}" # Append loaded text to prompt
if "Mistral" in model_name:
mistral_api_key = os.getenv("MISTRAL_API_KEY")
if not mistral_api_key:
raise ValueError("MISTRAL_API_KEY environment variable not set.")
mistral_client = Mistral(api_key=mistral_api_key)
response = mistral_client.chat.complete(
model="mistral-small-latest",
messages=[{"role": "user", "content": full_prompt}],
temperature=0.7,
top_p=0.95
)
return response.choices[0].message.content
elif "Meta-Llama" in model_name:
hf_api_token = os.getenv("HF_API_TOKEN")
if not hf_api_token:
raise ValueError("HF_API_TOKEN environment variable not set.")
client = InferenceClient(token=hf_api_token)
response = client.text_generation(
full_prompt,
model="meta-llama/Meta-Llama-3-8B-Instruct",
temperature=0.7,
top_p=0.95,
max_new_tokens=512
)
return response
else:
raise ValueError("Invalid model selection. Please choose either 'Mistral-API' or 'Meta-Llama-3'.")
def process_query(prompt: str, model_name: str):
global sheet_data
if sheet_data is None:
sheet_data = get_sheet_data()
full_prompt = f"{prompt}\n\nSheet Data:\n{sheet_data}"
debug_print(f"Processing query with model {model_name}: {full_prompt}")
response = generate_response(prompt, model_name, sheet_data)
input_tokens = count_tokens(prompt + "\n\n" + sheet_data)
output_tokens = count_tokens(response)
return response, f"Input tokens: {input_tokens}", f"Output tokens: {output_tokens}"
def ui_process_query(prompt, model_name):
return process_query(prompt, model_name)
# ------------------------------
# Cleaning Functions
# ------------------------------
def clean_text(text: str, remove_spaces: bool, remove_headers_footers: bool, lowercase: bool, remove_special: bool) -> str:
"""
Cleans the given text based on the provided options.
"""
# Remove extra spaces & newlines
if remove_spaces:
text = re.sub(r'\s+', ' ', text).strip()
# Remove headers/footers: a simple heuristic to remove lines that repeat
if remove_headers_footers:
lines = text.split('\n')
freq = {}
for line in lines:
line_stripped = line.strip()
if line_stripped:
freq[line] = freq.get(line, 0) + 1
lines = [line for line in lines if freq.get(line, 0) <= 1]
text = "\n".join(lines)
if lowercase:
text = text.lower()
if remove_special:
text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
return text
def execute_cleaning(text: str, remove_spaces: bool, remove_headers: bool, lowercase: bool, remove_special: bool) -> str:
if not text or text.strip() == "":
return "No text available for cleaning."
cleaned = clean_text(text, remove_spaces, remove_headers, lowercase, remove_special)
return cleaned
# ------------------------------
# Global variables for background jobs
# ------------------------------
jobs = {}
results_queue = queue.Queue()
last_job_id = None
# ------------------------------
# Job management functions
# ------------------------------
def get_job_list():
job_list_md = "### π Submitted Jobs\n\n"
if not jobs:
return "No jobs found. Submit a query or load files to create jobs."
sorted_jobs = sorted(
[(job_id, job_info) for job_id, job_info in jobs.items()],
key=lambda x: x[1].get("start_time", 0),
reverse=True
)
for job_id, job_info in sorted_jobs:
status = job_info.get("status", "unknown")
job_type = job_info.get("type", "unknown")
query = job_info.get("query", "")
start_time = job_info.get("start_time", 0)
time_str = datetime.datetime.fromtimestamp(start_time).strftime("%Y-%m-%d %H:%M:%S")
query_preview = query[:30] + "..." if query and len(query) > 30 else query or "N/A"
if status == "processing":
status_formatted = f"β³ {status}"
elif status == "completed":
status_formatted = f"β
{status}"
else:
status_formatted = f"β {status}"
if job_type == "query":
job_list_md += f"- [{job_id}](javascript:void) - {time_str} - {status_formatted} - Query: {query_preview}\n"
else:
job_list_md += f"- [{job_id}](javascript:void) - {time_str} - {status_formatted} - File Load Job\n"
return job_list_md
def get_sheet_data():
global sheet_data
return sheet_data if sheet_data else "No data loaded."
def process_in_background(job_id, func, args):
result = func(*args)
results_queue.put((job_id, result))
debug_print(f"Job {job_id} finished processing in background.")
def submit_query_async(query, model_choice=None):
global last_job_id
global sheet_data
if not query:
return ("Please enter a non-empty query", "", "Input tokens: 0", "Output tokens: 0", "", "", get_job_list())
job_id = str(uuid.uuid4())
debug_print(f"Starting async job {job_id} for query: {query}")
threading.Thread(
target=process_in_background,
args=(job_id, process_query, [query, model_choice or "Mistral-API"])
).start()
jobs[job_id] = {
"status": "processing",
"type": "query",
"start_time": time.time(),
"query": query,
"model": model_choice or "Mistral-API"
}
last_job_id = job_id
return (
f"π Query submitted and processing in the background (Job ID: {job_id}).\n\n"
f"Use the 'Check Job Status' section to view results.",
f"Job ID: {job_id}",
f"Input tokens: {count_tokens(query)}",
"Output tokens: pending",
job_id,
query,
get_job_list()
)
def job_selected(job_id):
if job_id in jobs:
return job_id, jobs[job_id].get("query", "No query for this job")
return job_id, "Job not found"
def refresh_job_list():
return get_job_list()
def sync_model_dropdown(value):
return value
def check_job_status(job_id):
if not job_id:
html_response = "
"
return html_response, "", "", "", ""
try:
while not results_queue.empty():
completed_id, result = results_queue.get_nowait()
if completed_id in jobs:
jobs[completed_id]["status"] = "completed"
jobs[completed_id]["result"] = result
jobs[completed_id]["end_time"] = time.time()
debug_print(f"Job {completed_id} completed and stored in jobs dictionary")
except queue.Empty:
pass
if job_id not in jobs:
html_response = "Job not found. Please check the ID and try again.
"
return html_response, "", "", "", ""
job = jobs[job_id]
job_query = job.get("query", "No query available for this job")
if job["status"] == "processing":
elapsed_time = time.time() - job["start_time"]
html_response = (
f""
f"
β³ Query is still being processed (elapsed: {elapsed_time:.1f}s). Please check again shortly.
"
f"
"
)
return (
html_response,
f"Job ID: {job_id}",
f"Input tokens: {count_tokens(job.get('query', ''))}",
"Output tokens: pending",
job_query
)
if job["status"] == "completed":
result = job["result"]
processing_time = job["end_time"] - job["start_time"]
html_response = (
f""
f"
β
Response: {result[0]}
"
f"
Processing time: {processing_time:.1f}s
"
f"
"
)
return (
html_response,
f"Job ID: {job_id}",
result[1],
result[2],
job_query
)
html_response = f"Job status: {job['status']}
"
return html_response, "", "", "", job_query
def cleanup_old_jobs():
current_time = time.time()
to_delete = []
for job_id, job in jobs.items():
if job["status"] == "completed" and (current_time - job.get("end_time", 0)) > 86400:
to_delete.append(job_id)
elif job["status"] == "processing" and (current_time - job.get("start_time", 0)) > 172800:
to_delete.append(job_id)
for job_id in to_delete:
del jobs[job_id]
debug_print(f"Cleaned up {len(to_delete)} old jobs. {len(jobs)} jobs remaining.")
return f"Cleaned up {len(to_delete)} old jobs", "", ""
# Function to run query (dummy function)
def run_query(max_value):
return [[i, i**2] for i in range(1, max_value + 1)]
def periodic_update(is_checked):
interval = 3 if is_checked else None
debug_print(f"Auto-refresh checkbox is {'checked' if is_checked else 'unchecked'}, every={interval}")
if is_checked:
global last_job_id
job_list_md = refresh_job_list()
job_status = check_job_status(last_job_id) if last_job_id else ("No job ID available", "", "", "", "")
from bs4 import BeautifulSoup
html_content = job_status[0]
plain_text = ""
if html_content:
soup = BeautifulSoup(html_content, "html.parser")
plain_text = soup.get_text()
return job_list_md, job_status[0], plain_text, job_status[1], job_status[2], job_status[3], job_status[4]
else:
return "", "", "", "", "", "", ""
# ------------------------------
# Gradio UI Layout: Scouting AI App
# ------------------------------
with gr.Blocks() as app:
# App Title and Description
gr.Markdown("## π PDF Conversion")
gr.Markdown("Text cleaning and processing tools.")
# Top section: File Load and Job Information (two columns)
with gr.Row():
# Left Column: File Load Section (50% width)
with gr.Column(scale=1):
gr.Markdown("### π Load File Section")
gr.Markdown("Upload your **.pdf** file below and specify the page range to extract text.")
file_input = gr.File(label="Upload .pdf File")
page_start_input_file = gr.Textbox(label="Page Start")
page_end_input_file = gr.Textbox(label="Page End")
load_button_file = gr.Button("Load File")
sheet_output_file = gr.Textbox(label="Extracted Text", interactive=False)
# Right Column: Job Information Section (50% width)
with gr.Column(scale=1):
gr.Markdown("### π Job Information")
gr.Markdown("View all submitted jobs, refresh the list, and check the status of individual jobs.")
job_list_display = gr.Markdown(
get_job_list(),
elem_id="job-list-display",
elem_classes=["scrollable-job-list"]
)
gr.HTML("""
""")
refresh_button = gr.Button("Refresh Job List")
gr.Markdown("#### π Check Job Status")
job_id_input = gr.Textbox(label="Enter Job ID")
check_status_button = gr.Button("Check Job Status")
# New row: Cleaning Tasks placed in two equal columns under the load section
with gr.Row():
# Left half: Cleaning Tasks checkboxes and Clean button
with gr.Column(scale=1):
gr.Markdown("### Cleaning Options")
remove_spaces_checkbox = gr.Checkbox(label="Remove extra spaces & newlines: Clean unnecessary whitespace.", value=True)
remove_headers_checkbox = gr.Checkbox(label="Remove headers/footers: If repeated text appears on every page", value=False)
lowercase_checkbox = gr.Checkbox(label="Convert text to lowercase: For uniformity in text analysis.", value=False)
remove_special_checkbox = gr.Checkbox(label="Remove special characters: Useful for structured data extraction", value=False)
clean_button = gr.Button("Clean")
# Right half: Display Cleaned Text
with gr.Column(scale=1):
cleaned_output = gr.Textbox(label="Cleaned Text", interactive=False)
# Submit Query Section remains unchanged
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### π Submit Query")
gr.Markdown("Enter your prompt below and choose a model. Your query will be processed in the background.")
model_dropdown = gr.Dropdown(
choices=["πΊπΈ Remote Meta-Llama-3", "πͺπΊ Mistral-API"],
value="πͺπΊ Mistral-API",
label="Select Model"
)
prompt_input = gr.Textbox(label="Enter your prompt", value="", lines=6)
with gr.Row():
auto_refresh_checkbox_query = gr.Checkbox(
label="Enable Auto Refresh",
value=False
)
submit_query_button = gr.Button("Submit Query")
status_text = gr.Textbox(label="Response Text", visible=True)
response_output = gr.Textbox(label="Response", interactive=False)
token_info = gr.Textbox(label="Token Info", interactive=False)
with gr.Column(scale=1):
status_output = gr.HTML(label="Job Status", interactive=False)
job_id_display = gr.Textbox(label="Job ID", interactive=False)
input_tokens_display = gr.Textbox(label="Input Tokens", interactive=False)
output_tokens_display = gr.Textbox(label="Output Tokens", interactive=False)
job_query_display = gr.Textbox(label="Job Query", interactive=False)
# ------------------------------
# Set up interactions
# ------------------------------
# Updated Load file interaction: read PDF pages
def load_file(file, page_start, page_end):
global sheet_data, file_name
file_name = file
if file is None or str(page_start).strip() == "" or str(page_end).strip() == "":
return "Please upload a file and enter valid page numbers."
try:
doc = fitz.open(file.name)
ps = int(page_start)
pe = int(page_end)
text = ""
# Convert page numbers from 1-indexed to 0-indexed
for page_num in range(ps - 1, pe):
text += doc[page_num].get_text() + "\n"
sheet_data = text
return text
except Exception as e:
return f"Error reading PDF: {str(e)}"
load_button_file.click(
fn=load_file,
inputs=[file_input, page_start_input_file, page_end_input_file],
outputs=sheet_output_file
)
# Cleaning button interaction: clean the loaded text using selected options.
clean_button.click(
fn=execute_cleaning,
inputs=[sheet_output_file, remove_spaces_checkbox, remove_headers_checkbox, lowercase_checkbox, remove_special_checkbox],
outputs=cleaned_output
)
submit_query_button.click(
fn=submit_query_async,
inputs=[prompt_input, model_dropdown],
outputs=[
response_output, token_info,
input_tokens_display, output_tokens_display,
job_id_input, job_query_display, job_list_display
]
)
check_status_button.click(
fn=check_job_status,
inputs=[job_id_input],
outputs=[status_output, job_id_display, input_tokens_display,
output_tokens_display, job_query_display]
)
refresh_button.click(
fn=refresh_job_list,
inputs=[],
outputs=job_list_display
)
auto_refresh_checkbox_query.change(
fn=periodic_update,
inputs=[auto_refresh_checkbox_query],
outputs=[job_list_display, status_output, status_text, job_id_display, input_tokens_display, output_tokens_display, job_query_display],
every=3
)
if __name__ == "__main__":
debug_print("Launching Gradio UI...")
app.queue().launch(share=False)