|
import os |
|
os.environ["STREAMLIT_SERVER_PORT"] = "8501" |
|
os.environ["STREAMLIT_SERVER_HEADLESS"] = "true" |
|
|
|
|
|
import streamlit as st |
|
st.set_page_config(page_title="DocAnalyzer Pro", layout="wide") |
|
|
|
|
|
from transformers import pipeline |
|
from PyPDF2 import PdfReader |
|
import docx |
|
import time |
|
import psutil |
|
from pathlib import Path |
|
import torch |
|
from tempfile import gettempdir |
|
|
|
|
|
|
|
|
|
def setup_environment(): |
|
"""Ensure cache directories exist in a safe location""" |
|
cache_dir = Path(gettempdir()) / "models" |
|
cache_dir.mkdir(exist_ok=True, parents=True) |
|
return cache_dir |
|
|
|
cache_dir = setup_environment() |
|
|
|
|
|
|
|
|
|
@st.cache_resource(ttl=3600) |
|
def load_models(): |
|
"""Load optimized models for Hugging Face Spaces""" |
|
try: |
|
with st.spinner("π Loading AI models (this may take 1-2 minutes)..."): |
|
return { |
|
'qa': pipeline( |
|
"question-answering", |
|
model="distilbert-base-cased-distilled-squad", |
|
device=-1 |
|
), |
|
'summarizer': pipeline( |
|
"summarization", |
|
model="sshleifer/distilbart-cnn-12-6", |
|
device=-1 |
|
) |
|
} |
|
except Exception as e: |
|
st.error(f"β Failed to load models: {str(e)}") |
|
st.stop() |
|
|
|
models = load_models() |
|
|
|
|
|
|
|
|
|
def extract_text(file): |
|
"""Extract text from PDF/DOCX files with error handling""" |
|
if file is None: |
|
return "" |
|
|
|
try: |
|
if file.type == "application/pdf": |
|
reader = PdfReader(file) |
|
return " ".join(page.extract_text() for page in reader.pages if page.extract_text()) |
|
elif file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": |
|
doc = docx.Document(file) |
|
return "\n".join(para.text for para in doc.paragraphs if para.text) |
|
except Exception as e: |
|
st.error(f"β οΈ Error processing document: {str(e)}") |
|
return "" |
|
|
|
|
|
|
|
|
|
def generate_summary(text, max_length=150): |
|
"""Generate summary with chunking for large documents""" |
|
if not text or len(text.strip()) == 0: |
|
return "" |
|
|
|
try: |
|
if len(text) > 10000: |
|
chunk_size = 3000 |
|
chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] |
|
summaries = [] |
|
for chunk in chunks: |
|
result = models['summarizer']( |
|
chunk, |
|
max_length=max(max_length//len(chunks), 30), |
|
min_length=30, |
|
do_sample=False |
|
) |
|
summaries.append(result[0]['summary_text']) |
|
return " ".join(summaries) |
|
return models['summarizer'](text, max_length=max_length)[0]['summary_text'] |
|
except Exception as e: |
|
st.error(f"β Summarization failed: {str(e)}") |
|
return "" |
|
|
|
|
|
|
|
|
|
st.title("π DocAnalyzer Pro") |
|
|
|
|
|
with st.expander("π€ Upload Document", expanded=True): |
|
uploaded_file = st.file_uploader("Choose PDF/DOCX", type=["pdf", "docx"]) |
|
manual_text = st.text_area("Or paste text here:", height=150) |
|
context = extract_text(uploaded_file) if uploaded_file else manual_text |
|
|
|
|
|
tab1, tab2 = st.tabs(["π Question Answering", "π Summarization"]) |
|
|
|
with tab1: |
|
if context and len(context.strip()) > 0: |
|
question = st.text_input("Ask about the document:") |
|
if question and len(question.strip()) > 0: |
|
with st.spinner("Analyzing..."): |
|
start_time = time.time() |
|
try: |
|
result = models['qa']( |
|
question=question, |
|
context=context[:100000] |
|
) |
|
st.success(f"Answered in {time.time()-start_time:.1f}s") |
|
st.markdown(f"**Answer:** {result['answer']}") |
|
st.progress(result['score']) |
|
st.caption(f"Confidence: {result['score']:.0%}") |
|
except Exception as e: |
|
st.error(f"β Question answering failed: {str(e)}") |
|
|
|
with tab2: |
|
if context and len(context.strip()) > 0: |
|
with st.form("summary_form"): |
|
length = st.slider("Summary Length", 50, 300, 150) |
|
if st.form_submit_button("Generate Summary"): |
|
with st.spinner("Summarizing..."): |
|
start_time = time.time() |
|
summary = generate_summary(context, length) |
|
if summary: |
|
st.success(f"Generated in {time.time()-start_time:.1f}s") |
|
st.markdown(f"**Summary:**\n\n{summary}") |
|
|
|
|
|
with st.expander("βοΈ System Status"): |
|
try: |
|
device_status = 'GPU β
' if torch.cuda.is_available() else 'CPU β οΈ' |
|
except: |
|
device_status = 'CPU (torch not configured)' |
|
|
|
st.code(f""" |
|
Models loaded: {', '.join(models.keys())} |
|
Device: {device_status} |
|
Memory: {psutil.virtual_memory().percent}% used |
|
CPU: {psutil.cpu_percent()}% used |
|
Cache location: {cache_dir} |
|
""") |
|
|