import os import datetime import requests import re import pandas as pd import gradio as gr import threading import uuid import queue import time import fitz # PyMuPDF for reading PDF files from transformers import AutoTokenizer from mistralai import Mistral from huggingface_hub import InferenceClient # ------------------------------ # Helper functions and globals # ------------------------------ sheet_data = None file_name = None def debug_print(message: str): print(f"[{datetime.datetime.now().isoformat()}] {message}", flush=True) def initialize_tokenizer(): try: return AutoTokenizer.from_pretrained("gpt2") except Exception as e: debug_print("Failed to initialize tokenizer: " + str(e)) return None global_tokenizer = initialize_tokenizer() def count_tokens(text: str) -> int: if global_tokenizer: try: return len(global_tokenizer.encode(text)) except Exception: return len(text.split()) return len(text.split()) def generate_response(prompt: str, model_name: str, sheet_data: str) -> str: full_prompt = f"{prompt}\n\nSheet Data:\n{sheet_data}" # Append loaded text to prompt if "Mistral" in model_name: mistral_api_key = os.getenv("MISTRAL_API_KEY") if not mistral_api_key: raise ValueError("MISTRAL_API_KEY environment variable not set.") mistral_client = Mistral(api_key=mistral_api_key) response = mistral_client.chat.complete( model="mistral-small-latest", messages=[{"role": "user", "content": full_prompt}], temperature=0.7, top_p=0.95 ) return response.choices[0].message.content elif "Meta-Llama" in model_name: hf_api_token = os.getenv("HF_API_TOKEN") if not hf_api_token: raise ValueError("HF_API_TOKEN environment variable not set.") client = InferenceClient(token=hf_api_token) response = client.text_generation( full_prompt, model="meta-llama/Meta-Llama-3-8B-Instruct", temperature=0.7, top_p=0.95, max_new_tokens=512 ) return response else: raise ValueError("Invalid model selection. Please choose either 'Mistral-API' or 'Meta-Llama-3'.") def process_query(prompt: str, model_name: str): global sheet_data if sheet_data is None: sheet_data = get_sheet_data() full_prompt = f"{prompt}\n\nSheet Data:\n{sheet_data}" debug_print(f"Processing query with model {model_name}: {full_prompt}") response = generate_response(prompt, model_name, sheet_data) input_tokens = count_tokens(prompt + "\n\n" + sheet_data) output_tokens = count_tokens(response) return response, f"Input tokens: {input_tokens}", f"Output tokens: {output_tokens}" def ui_process_query(prompt, model_name): return process_query(prompt, model_name) # ------------------------------ # Cleaning Functions # ------------------------------ def clean_text(text: str, remove_spaces: bool, remove_headers_footers: bool, lowercase: bool, remove_special: bool) -> str: """ Cleans the given text based on the provided options. """ # Remove extra spaces & newlines if remove_spaces: text = re.sub(r'\s+', ' ', text).strip() # Remove headers/footers: a simple heuristic to remove lines that repeat if remove_headers_footers: lines = text.split('\n') freq = {} for line in lines: line_stripped = line.strip() if line_stripped: freq[line] = freq.get(line, 0) + 1 lines = [line for line in lines if freq.get(line, 0) <= 1] text = "\n".join(lines) if lowercase: text = text.lower() if remove_special: text = re.sub(r'[^a-zA-Z0-9\s]', '', text) return text def execute_cleaning(text: str, remove_spaces: bool, remove_headers: bool, lowercase: bool, remove_special: bool) -> str: if not text or text.strip() == "": return "No text available for cleaning." cleaned = clean_text(text, remove_spaces, remove_headers, lowercase, remove_special) return cleaned # ------------------------------ # Global variables for background jobs # ------------------------------ jobs = {} results_queue = queue.Queue() last_job_id = None # ------------------------------ # Job management functions # ------------------------------ def get_job_list(): job_list_md = "### πŸ“Š Submitted Jobs\n\n" if not jobs: return "No jobs found. Submit a query or load files to create jobs." sorted_jobs = sorted( [(job_id, job_info) for job_id, job_info in jobs.items()], key=lambda x: x[1].get("start_time", 0), reverse=True ) for job_id, job_info in sorted_jobs: status = job_info.get("status", "unknown") job_type = job_info.get("type", "unknown") query = job_info.get("query", "") start_time = job_info.get("start_time", 0) time_str = datetime.datetime.fromtimestamp(start_time).strftime("%Y-%m-%d %H:%M:%S") query_preview = query[:30] + "..." if query and len(query) > 30 else query or "N/A" if status == "processing": status_formatted = f"⏳ {status}" elif status == "completed": status_formatted = f"βœ… {status}" else: status_formatted = f"❓ {status}" if job_type == "query": job_list_md += f"- [{job_id}](javascript:void) - {time_str} - {status_formatted} - Query: {query_preview}\n" else: job_list_md += f"- [{job_id}](javascript:void) - {time_str} - {status_formatted} - File Load Job\n" return job_list_md def get_sheet_data(): global sheet_data return sheet_data if sheet_data else "No data loaded." def process_in_background(job_id, func, args): result = func(*args) results_queue.put((job_id, result)) debug_print(f"Job {job_id} finished processing in background.") def submit_query_async(query, model_choice=None): global last_job_id global sheet_data if not query: return ("Please enter a non-empty query", "", "Input tokens: 0", "Output tokens: 0", "", "", get_job_list()) job_id = str(uuid.uuid4()) debug_print(f"Starting async job {job_id} for query: {query}") threading.Thread( target=process_in_background, args=(job_id, process_query, [query, model_choice or "Mistral-API"]) ).start() jobs[job_id] = { "status": "processing", "type": "query", "start_time": time.time(), "query": query, "model": model_choice or "Mistral-API" } last_job_id = job_id return ( f"πŸš€ Query submitted and processing in the background (Job ID: {job_id}).\n\n" f"Use the 'Check Job Status' section to view results.", f"Job ID: {job_id}", f"Input tokens: {count_tokens(query)}", "Output tokens: pending", job_id, query, get_job_list() ) def job_selected(job_id): if job_id in jobs: return job_id, jobs[job_id].get("query", "No query for this job") return job_id, "Job not found" def refresh_job_list(): return get_job_list() def sync_model_dropdown(value): return value def check_job_status(job_id): if not job_id: html_response = "

Please enter a job ID.

" return html_response, "", "", "", "" try: while not results_queue.empty(): completed_id, result = results_queue.get_nowait() if completed_id in jobs: jobs[completed_id]["status"] = "completed" jobs[completed_id]["result"] = result jobs[completed_id]["end_time"] = time.time() debug_print(f"Job {completed_id} completed and stored in jobs dictionary") except queue.Empty: pass if job_id not in jobs: html_response = "

Job not found. Please check the ID and try again.

" return html_response, "", "", "", "" job = jobs[job_id] job_query = job.get("query", "No query available for this job") if job["status"] == "processing": elapsed_time = time.time() - job["start_time"] html_response = ( f"
" f"

⏳ Query is still being processed (elapsed: {elapsed_time:.1f}s). Please check again shortly.

" f"
" ) return ( html_response, f"Job ID: {job_id}", f"Input tokens: {count_tokens(job.get('query', ''))}", "Output tokens: pending", job_query ) if job["status"] == "completed": result = job["result"] processing_time = job["end_time"] - job["start_time"] html_response = ( f"
" f"

βœ… Response: {result[0]}

" f"

Processing time: {processing_time:.1f}s

" f"
" ) return ( html_response, f"Job ID: {job_id}", result[1], result[2], job_query ) html_response = f"

Job status: {job['status']}

" return html_response, "", "", "", job_query def cleanup_old_jobs(): current_time = time.time() to_delete = [] for job_id, job in jobs.items(): if job["status"] == "completed" and (current_time - job.get("end_time", 0)) > 86400: to_delete.append(job_id) elif job["status"] == "processing" and (current_time - job.get("start_time", 0)) > 172800: to_delete.append(job_id) for job_id in to_delete: del jobs[job_id] debug_print(f"Cleaned up {len(to_delete)} old jobs. {len(jobs)} jobs remaining.") return f"Cleaned up {len(to_delete)} old jobs", "", "" # Function to run query (dummy function) def run_query(max_value): return [[i, i**2] for i in range(1, max_value + 1)] def periodic_update(is_checked): interval = 3 if is_checked else None debug_print(f"Auto-refresh checkbox is {'checked' if is_checked else 'unchecked'}, every={interval}") if is_checked: global last_job_id job_list_md = refresh_job_list() job_status = check_job_status(last_job_id) if last_job_id else ("No job ID available", "", "", "", "") from bs4 import BeautifulSoup html_content = job_status[0] plain_text = "" if html_content: soup = BeautifulSoup(html_content, "html.parser") plain_text = soup.get_text() return job_list_md, job_status[0], plain_text, job_status[1], job_status[2], job_status[3], job_status[4] else: return "", "", "", "", "", "", "" # ------------------------------ # Gradio UI Layout: Scouting AI App # ------------------------------ with gr.Blocks() as app: # App Title and Description gr.Markdown("## πŸ“– PDF Conversion") gr.Markdown("Text cleaning and processing tools.") # Top section: File Load and Job Information (two columns) with gr.Row(): # Left Column: File Load Section (50% width) with gr.Column(scale=1): gr.Markdown("### πŸ“ Load File Section") gr.Markdown("Upload your **.pdf** file below and specify the page range to extract text.") file_input = gr.File(label="Upload .pdf File") page_start_input_file = gr.Textbox(label="Page Start") page_end_input_file = gr.Textbox(label="Page End") load_button_file = gr.Button("Load File") sheet_output_file = gr.Textbox(label="Extracted Text", interactive=False) # Right Column: Job Information Section (50% width) with gr.Column(scale=1): gr.Markdown("### πŸ“Š Job Information") gr.Markdown("View all submitted jobs, refresh the list, and check the status of individual jobs.") job_list_display = gr.Markdown( get_job_list(), elem_id="job-list-display", elem_classes=["scrollable-job-list"] ) gr.HTML(""" """) refresh_button = gr.Button("Refresh Job List") gr.Markdown("#### πŸ” Check Job Status") job_id_input = gr.Textbox(label="Enter Job ID") check_status_button = gr.Button("Check Job Status") # New row: Cleaning Tasks placed in two equal columns under the load section with gr.Row(): # Left half: Cleaning Tasks checkboxes and Clean button with gr.Column(scale=1): gr.Markdown("### Cleaning Options") remove_spaces_checkbox = gr.Checkbox(label="Remove extra spaces & newlines: Clean unnecessary whitespace.", value=True) remove_headers_checkbox = gr.Checkbox(label="Remove headers/footers: If repeated text appears on every page", value=False) lowercase_checkbox = gr.Checkbox(label="Convert text to lowercase: For uniformity in text analysis.", value=False) remove_special_checkbox = gr.Checkbox(label="Remove special characters: Useful for structured data extraction", value=False) clean_button = gr.Button("Clean") # Right half: Display Cleaned Text with gr.Column(scale=1): cleaned_output = gr.Textbox(label="Cleaned Text", interactive=False) # Submit Query Section remains unchanged with gr.Row(): with gr.Column(scale=1): gr.Markdown("### πŸš€ Submit Query") gr.Markdown("Enter your prompt below and choose a model. Your query will be processed in the background.") model_dropdown = gr.Dropdown( choices=["πŸ‡ΊπŸ‡Έ Remote Meta-Llama-3", "πŸ‡ͺπŸ‡Ί Mistral-API"], value="πŸ‡ͺπŸ‡Ί Mistral-API", label="Select Model" ) prompt_input = gr.Textbox(label="Enter your prompt", value="", lines=6) with gr.Row(): auto_refresh_checkbox_query = gr.Checkbox( label="Enable Auto Refresh", value=False ) submit_query_button = gr.Button("Submit Query") status_text = gr.Textbox(label="Response Text", visible=True) response_output = gr.Textbox(label="Response", interactive=False) token_info = gr.Textbox(label="Token Info", interactive=False) with gr.Column(scale=1): status_output = gr.HTML(label="Job Status", interactive=False) job_id_display = gr.Textbox(label="Job ID", interactive=False) input_tokens_display = gr.Textbox(label="Input Tokens", interactive=False) output_tokens_display = gr.Textbox(label="Output Tokens", interactive=False) job_query_display = gr.Textbox(label="Job Query", interactive=False) # ------------------------------ # Set up interactions # ------------------------------ # Updated Load file interaction: read PDF pages def load_file(file, page_start, page_end): global sheet_data, file_name file_name = file if file is None or str(page_start).strip() == "" or str(page_end).strip() == "": return "Please upload a file and enter valid page numbers." try: doc = fitz.open(file.name) ps = int(page_start) pe = int(page_end) text = "" # Convert page numbers from 1-indexed to 0-indexed for page_num in range(ps - 1, pe): text += doc[page_num].get_text() + "\n" sheet_data = text return text except Exception as e: return f"Error reading PDF: {str(e)}" load_button_file.click( fn=load_file, inputs=[file_input, page_start_input_file, page_end_input_file], outputs=sheet_output_file ) # Cleaning button interaction: clean the loaded text using selected options. clean_button.click( fn=execute_cleaning, inputs=[sheet_output_file, remove_spaces_checkbox, remove_headers_checkbox, lowercase_checkbox, remove_special_checkbox], outputs=cleaned_output ) submit_query_button.click( fn=submit_query_async, inputs=[prompt_input, model_dropdown], outputs=[ response_output, token_info, input_tokens_display, output_tokens_display, job_id_input, job_query_display, job_list_display ] ) check_status_button.click( fn=check_job_status, inputs=[job_id_input], outputs=[status_output, job_id_display, input_tokens_display, output_tokens_display, job_query_display] ) refresh_button.click( fn=refresh_job_list, inputs=[], outputs=job_list_display ) auto_refresh_checkbox_query.change( fn=periodic_update, inputs=[auto_refresh_checkbox_query], outputs=[job_list_display, status_output, status_text, job_id_display, input_tokens_display, output_tokens_display, job_query_display], every=3 ) if __name__ == "__main__": debug_print("Launching Gradio UI...") app.queue().launch(share=False)