|
import os |
|
import datetime |
|
import requests |
|
import re |
|
import pandas as pd |
|
import gradio as gr |
|
import threading |
|
import uuid |
|
import queue |
|
import time |
|
import fitz |
|
from transformers import AutoTokenizer |
|
from mistralai import Mistral |
|
from huggingface_hub import InferenceClient |
|
|
|
|
|
|
|
|
|
sheet_data = None |
|
file_name = None |
|
|
|
def debug_print(message: str): |
|
print(f"[{datetime.datetime.now().isoformat()}] {message}", flush=True) |
|
|
|
def initialize_tokenizer(): |
|
try: |
|
return AutoTokenizer.from_pretrained("gpt2") |
|
except Exception as e: |
|
debug_print("Failed to initialize tokenizer: " + str(e)) |
|
return None |
|
|
|
global_tokenizer = initialize_tokenizer() |
|
|
|
def count_tokens(text: str) -> int: |
|
if global_tokenizer: |
|
try: |
|
return len(global_tokenizer.encode(text)) |
|
except Exception: |
|
return len(text.split()) |
|
return len(text.split()) |
|
|
|
def generate_response(prompt: str, model_name: str, sheet_data: str) -> str: |
|
full_prompt = f"{prompt}\n\nSheet Data:\n{sheet_data}" |
|
|
|
if "Mistral" in model_name: |
|
mistral_api_key = os.getenv("MISTRAL_API_KEY") |
|
if not mistral_api_key: |
|
raise ValueError("MISTRAL_API_KEY environment variable not set.") |
|
mistral_client = Mistral(api_key=mistral_api_key) |
|
response = mistral_client.chat.complete( |
|
model="mistral-small-latest", |
|
messages=[{"role": "user", "content": full_prompt}], |
|
temperature=0.7, |
|
top_p=0.95 |
|
) |
|
return response.choices[0].message.content |
|
|
|
elif "Meta-Llama" in model_name: |
|
hf_api_token = os.getenv("HF_API_TOKEN") |
|
if not hf_api_token: |
|
raise ValueError("HF_API_TOKEN environment variable not set.") |
|
client = InferenceClient(token=hf_api_token) |
|
response = client.text_generation( |
|
full_prompt, |
|
model="meta-llama/Meta-Llama-3-8B-Instruct", |
|
temperature=0.7, |
|
top_p=0.95, |
|
max_new_tokens=512 |
|
) |
|
return response |
|
|
|
else: |
|
raise ValueError("Invalid model selection. Please choose either 'Mistral-API' or 'Meta-Llama-3'.") |
|
|
|
def process_query(prompt: str, model_name: str): |
|
global sheet_data |
|
|
|
if sheet_data is None: |
|
sheet_data = get_sheet_data() |
|
|
|
full_prompt = f"{prompt}\n\nSheet Data:\n{sheet_data}" |
|
debug_print(f"Processing query with model {model_name}: {full_prompt}") |
|
|
|
response = generate_response(prompt, model_name, sheet_data) |
|
input_tokens = count_tokens(prompt + "\n\n" + sheet_data) |
|
output_tokens = count_tokens(response) |
|
|
|
return response, f"Input tokens: {input_tokens}", f"Output tokens: {output_tokens}" |
|
|
|
def ui_process_query(prompt, model_name): |
|
return process_query(prompt, model_name) |
|
|
|
|
|
|
|
|
|
|
|
def clean_text(text: str, remove_spaces: bool, remove_headers_footers: bool, lowercase: bool, remove_special: bool) -> str: |
|
""" |
|
Cleans the given text based on the provided options. |
|
""" |
|
|
|
if remove_spaces: |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
|
|
if remove_headers_footers: |
|
lines = text.split('\n') |
|
freq = {} |
|
for line in lines: |
|
line_stripped = line.strip() |
|
if line_stripped: |
|
freq[line] = freq.get(line, 0) + 1 |
|
lines = [line for line in lines if freq.get(line, 0) <= 1] |
|
text = "\n".join(lines) |
|
|
|
if lowercase: |
|
text = text.lower() |
|
|
|
if remove_special: |
|
text = re.sub(r'[^a-zA-Z0-9\s]', '', text) |
|
|
|
return text |
|
|
|
def execute_cleaning(text: str, remove_spaces: bool, remove_headers: bool, lowercase: bool, remove_special: bool) -> str: |
|
if not text or text.strip() == "": |
|
return "No text available for cleaning." |
|
cleaned = clean_text(text, remove_spaces, remove_headers, lowercase, remove_special) |
|
return cleaned |
|
|
|
|
|
|
|
|
|
jobs = {} |
|
results_queue = queue.Queue() |
|
last_job_id = None |
|
|
|
|
|
|
|
|
|
|
|
def get_job_list(): |
|
job_list_md = "### π Submitted Jobs\n\n" |
|
|
|
if not jobs: |
|
return "No jobs found. Submit a query or load files to create jobs." |
|
|
|
sorted_jobs = sorted( |
|
[(job_id, job_info) for job_id, job_info in jobs.items()], |
|
key=lambda x: x[1].get("start_time", 0), |
|
reverse=True |
|
) |
|
|
|
for job_id, job_info in sorted_jobs: |
|
status = job_info.get("status", "unknown") |
|
job_type = job_info.get("type", "unknown") |
|
query = job_info.get("query", "") |
|
start_time = job_info.get("start_time", 0) |
|
time_str = datetime.datetime.fromtimestamp(start_time).strftime("%Y-%m-%d %H:%M:%S") |
|
query_preview = query[:30] + "..." if query and len(query) > 30 else query or "N/A" |
|
|
|
if status == "processing": |
|
status_formatted = f"<span style='color: red'>β³ {status}</span>" |
|
elif status == "completed": |
|
status_formatted = f"<span style='color: green'>β
{status}</span>" |
|
else: |
|
status_formatted = f"<span style='color: orange'>β {status}</span>" |
|
|
|
if job_type == "query": |
|
job_list_md += f"- [{job_id}](javascript:void) - {time_str} - {status_formatted} - Query: {query_preview}\n" |
|
else: |
|
job_list_md += f"- [{job_id}](javascript:void) - {time_str} - {status_formatted} - File Load Job\n" |
|
|
|
return job_list_md |
|
|
|
def get_sheet_data(): |
|
global sheet_data |
|
return sheet_data if sheet_data else "No data loaded." |
|
|
|
def process_in_background(job_id, func, args): |
|
result = func(*args) |
|
results_queue.put((job_id, result)) |
|
debug_print(f"Job {job_id} finished processing in background.") |
|
|
|
def submit_query_async(query, model_choice=None): |
|
global last_job_id |
|
global sheet_data |
|
|
|
if not query: |
|
return ("Please enter a non-empty query", "", "Input tokens: 0", "Output tokens: 0", "", "", get_job_list()) |
|
|
|
job_id = str(uuid.uuid4()) |
|
debug_print(f"Starting async job {job_id} for query: {query}") |
|
|
|
threading.Thread( |
|
target=process_in_background, |
|
args=(job_id, process_query, [query, model_choice or "Mistral-API"]) |
|
).start() |
|
|
|
jobs[job_id] = { |
|
"status": "processing", |
|
"type": "query", |
|
"start_time": time.time(), |
|
"query": query, |
|
"model": model_choice or "Mistral-API" |
|
} |
|
|
|
last_job_id = job_id |
|
|
|
return ( |
|
f"π Query submitted and processing in the background (Job ID: {job_id}).\n\n" |
|
f"Use the 'Check Job Status' section to view results.", |
|
f"Job ID: {job_id}", |
|
f"Input tokens: {count_tokens(query)}", |
|
"Output tokens: pending", |
|
job_id, |
|
query, |
|
get_job_list() |
|
) |
|
|
|
def job_selected(job_id): |
|
if job_id in jobs: |
|
return job_id, jobs[job_id].get("query", "No query for this job") |
|
return job_id, "Job not found" |
|
|
|
def refresh_job_list(): |
|
return get_job_list() |
|
|
|
def sync_model_dropdown(value): |
|
return value |
|
|
|
def check_job_status(job_id): |
|
if not job_id: |
|
html_response = "<div style='font-family: monospace;'><p>Please enter a job ID.</p></div>" |
|
return html_response, "", "", "", "" |
|
|
|
try: |
|
while not results_queue.empty(): |
|
completed_id, result = results_queue.get_nowait() |
|
if completed_id in jobs: |
|
jobs[completed_id]["status"] = "completed" |
|
jobs[completed_id]["result"] = result |
|
jobs[completed_id]["end_time"] = time.time() |
|
debug_print(f"Job {completed_id} completed and stored in jobs dictionary") |
|
except queue.Empty: |
|
pass |
|
|
|
if job_id not in jobs: |
|
html_response = "<div style='font-family: monospace;'><p>Job not found. Please check the ID and try again.</p></div>" |
|
return html_response, "", "", "", "" |
|
|
|
job = jobs[job_id] |
|
job_query = job.get("query", "No query available for this job") |
|
|
|
if job["status"] == "processing": |
|
elapsed_time = time.time() - job["start_time"] |
|
html_response = ( |
|
f"<div style='font-family: monospace;'>" |
|
f"<p><strong>β³ Query is still being processed</strong> (elapsed: {elapsed_time:.1f}s). Please check again shortly.</p>" |
|
f"</div>" |
|
) |
|
return ( |
|
html_response, |
|
f"Job ID: {job_id}", |
|
f"Input tokens: {count_tokens(job.get('query', ''))}", |
|
"Output tokens: pending", |
|
job_query |
|
) |
|
|
|
if job["status"] == "completed": |
|
result = job["result"] |
|
processing_time = job["end_time"] - job["start_time"] |
|
html_response = ( |
|
f"<div style='font-family: monospace;'>" |
|
f"<p><strong>β
Response:</strong> {result[0]}</p>" |
|
f"<p>Processing time: {processing_time:.1f}s</p>" |
|
f"</div>" |
|
) |
|
return ( |
|
html_response, |
|
f"Job ID: {job_id}", |
|
result[1], |
|
result[2], |
|
job_query |
|
) |
|
|
|
html_response = f"<div style='font-family: monospace;'><p>Job status: {job['status']}</p></div>" |
|
return html_response, "", "", "", job_query |
|
|
|
def cleanup_old_jobs(): |
|
current_time = time.time() |
|
to_delete = [] |
|
|
|
for job_id, job in jobs.items(): |
|
if job["status"] == "completed" and (current_time - job.get("end_time", 0)) > 86400: |
|
to_delete.append(job_id) |
|
elif job["status"] == "processing" and (current_time - job.get("start_time", 0)) > 172800: |
|
to_delete.append(job_id) |
|
|
|
for job_id in to_delete: |
|
del jobs[job_id] |
|
|
|
debug_print(f"Cleaned up {len(to_delete)} old jobs. {len(jobs)} jobs remaining.") |
|
return f"Cleaned up {len(to_delete)} old jobs", "", "" |
|
|
|
|
|
def run_query(max_value): |
|
return [[i, i**2] for i in range(1, max_value + 1)] |
|
|
|
def periodic_update(is_checked): |
|
interval = 3 if is_checked else None |
|
debug_print(f"Auto-refresh checkbox is {'checked' if is_checked else 'unchecked'}, every={interval}") |
|
if is_checked: |
|
global last_job_id |
|
job_list_md = refresh_job_list() |
|
job_status = check_job_status(last_job_id) if last_job_id else ("No job ID available", "", "", "", "") |
|
from bs4 import BeautifulSoup |
|
html_content = job_status[0] |
|
plain_text = "" |
|
if html_content: |
|
soup = BeautifulSoup(html_content, "html.parser") |
|
plain_text = soup.get_text() |
|
return job_list_md, job_status[0], plain_text, job_status[1], job_status[2], job_status[3], job_status[4] |
|
else: |
|
return "", "", "", "", "", "", "" |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as app: |
|
|
|
gr.Markdown("## π PDF Conversion") |
|
gr.Markdown("Text cleaning and processing tools.") |
|
|
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### π Load File Section") |
|
gr.Markdown("Upload your **.pdf** file below and specify the page range to extract text.") |
|
file_input = gr.File(label="Upload .pdf File") |
|
page_start_input_file = gr.Textbox(label="Page Start") |
|
page_end_input_file = gr.Textbox(label="Page End") |
|
load_button_file = gr.Button("Load File") |
|
sheet_output_file = gr.Textbox(label="Extracted Text", interactive=False) |
|
|
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### π Job Information") |
|
gr.Markdown("View all submitted jobs, refresh the list, and check the status of individual jobs.") |
|
job_list_display = gr.Markdown( |
|
get_job_list(), |
|
elem_id="job-list-display", |
|
elem_classes=["scrollable-job-list"] |
|
) |
|
gr.HTML(""" |
|
<style> |
|
.scrollable-job-list { |
|
height: 220px; |
|
overflow-y: auto; |
|
border: 1px solid #ccc; |
|
padding: 10px; |
|
margin-bottom: 10px; |
|
} |
|
</style> |
|
""") |
|
refresh_button = gr.Button("Refresh Job List") |
|
gr.Markdown("#### π Check Job Status") |
|
job_id_input = gr.Textbox(label="Enter Job ID") |
|
check_status_button = gr.Button("Check Job Status") |
|
|
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### Cleaning Options") |
|
remove_spaces_checkbox = gr.Checkbox(label="Remove extra spaces & newlines: Clean unnecessary whitespace.", value=True) |
|
remove_headers_checkbox = gr.Checkbox(label="Remove headers/footers: If repeated text appears on every page", value=False) |
|
lowercase_checkbox = gr.Checkbox(label="Convert text to lowercase: For uniformity in text analysis.", value=False) |
|
remove_special_checkbox = gr.Checkbox(label="Remove special characters: Useful for structured data extraction", value=False) |
|
clean_button = gr.Button("Clean") |
|
|
|
|
|
|
|
with gr.Column(scale=1): |
|
cleaned_output = gr.Textbox(label="Cleaned Text", interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.Markdown("### π Submit Query") |
|
gr.Markdown("Enter your prompt below and choose a model. Your query will be processed in the background.") |
|
model_dropdown = gr.Dropdown( |
|
choices=["πΊπΈ Remote Meta-Llama-3", "πͺπΊ Mistral-API"], |
|
value="πͺπΊ Mistral-API", |
|
label="Select Model" |
|
) |
|
prompt_input = gr.Textbox(label="Enter your prompt", value="", lines=6) |
|
with gr.Row(): |
|
auto_refresh_checkbox_query = gr.Checkbox( |
|
label="Enable Auto Refresh", |
|
value=False |
|
) |
|
submit_query_button = gr.Button("Submit Query") |
|
status_text = gr.Textbox(label="Response Text", visible=True) |
|
response_output = gr.Textbox(label="Response", interactive=False) |
|
token_info = gr.Textbox(label="Token Info", interactive=False) |
|
with gr.Column(scale=1): |
|
status_output = gr.HTML(label="Job Status", interactive=False) |
|
job_id_display = gr.Textbox(label="Job ID", interactive=False) |
|
input_tokens_display = gr.Textbox(label="Input Tokens", interactive=False) |
|
output_tokens_display = gr.Textbox(label="Output Tokens", interactive=False) |
|
job_query_display = gr.Textbox(label="Job Query", interactive=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_file(file, page_start, page_end): |
|
global sheet_data, file_name |
|
file_name = file |
|
if file is None or str(page_start).strip() == "" or str(page_end).strip() == "": |
|
return "Please upload a file and enter valid page numbers." |
|
try: |
|
doc = fitz.open(file.name) |
|
ps = int(page_start) |
|
pe = int(page_end) |
|
text = "" |
|
|
|
for page_num in range(ps - 1, pe): |
|
text += doc[page_num].get_text() + "\n" |
|
sheet_data = text |
|
return text |
|
except Exception as e: |
|
return f"Error reading PDF: {str(e)}" |
|
|
|
load_button_file.click( |
|
fn=load_file, |
|
inputs=[file_input, page_start_input_file, page_end_input_file], |
|
outputs=sheet_output_file |
|
) |
|
|
|
|
|
clean_button.click( |
|
fn=execute_cleaning, |
|
inputs=[sheet_output_file, remove_spaces_checkbox, remove_headers_checkbox, lowercase_checkbox, remove_special_checkbox], |
|
outputs=cleaned_output |
|
) |
|
|
|
submit_query_button.click( |
|
fn=submit_query_async, |
|
inputs=[prompt_input, model_dropdown], |
|
outputs=[ |
|
response_output, token_info, |
|
input_tokens_display, output_tokens_display, |
|
job_id_input, job_query_display, job_list_display |
|
] |
|
) |
|
|
|
check_status_button.click( |
|
fn=check_job_status, |
|
inputs=[job_id_input], |
|
outputs=[status_output, job_id_display, input_tokens_display, |
|
output_tokens_display, job_query_display] |
|
) |
|
|
|
refresh_button.click( |
|
fn=refresh_job_list, |
|
inputs=[], |
|
outputs=job_list_display |
|
) |
|
|
|
auto_refresh_checkbox_query.change( |
|
fn=periodic_update, |
|
inputs=[auto_refresh_checkbox_query], |
|
outputs=[job_list_display, status_output, status_text, job_id_display, input_tokens_display, output_tokens_display, job_query_display], |
|
every=3 |
|
) |
|
|
|
if __name__ == "__main__": |
|
debug_print("Launching Gradio UI...") |
|
app.queue().launch(share=False) |
|
|