Spaces:
Sleeping
Sleeping
import os | |
import streamlit as st | |
from OCR import OCR | |
from Feedback import Grader | |
from PDFFeedbackGenerator import PDFFeedbackGenerator | |
import matplotlib | |
from io import BytesIO | |
from streamlit.web.server.websocket_headers import _get_websocket_headers | |
import re | |
import time | |
from pdf2image import convert_from_path | |
matplotlib.use("Agg") # Non-GUI backend for matplotlib | |
# Constants | |
LOGO_PATH = "cslogo.png" | |
TEMP_DIR = "temp" # Changed from /tmp to relative path | |
POPPLER_PATH = os.path.join(os.path.dirname(__file__), "poppler", "bin") | |
# Create temp directory if it doesn't exist | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
# Allow iframe embedding and add CORS headers | |
def custom_get_websocket_headers(*args, **kwargs): | |
headers = _get_websocket_headers(*args, **kwargs) | |
headers["X-Frame-Options"] = "ALLOWALL" | |
headers["Access-Control-Allow-Origin"] = "*" | |
headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" | |
headers["Access-Control-Allow-Headers"] = "Content-Type" | |
return headers | |
# Apply the override | |
import streamlit.web.server.websocket_headers | |
streamlit.web.server.websocket_headers._get_websocket_headers = custom_get_websocket_headers | |
# Google Cloud credentials | |
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "css-edge-e347b0ed2b9e.json" | |
# Initialize instances | |
api_key = os.environ.get("OPENAI_API_KEY") | |
if not api_key: | |
raise RuntimeError("OPENAI_API_KEY environment variable not set") | |
ocr = OCR() | |
grader = Grader(api_key=api_key) | |
# Main application logic | |
def main(): | |
st.sidebar.title("Navigation") | |
choice = st.sidebar.radio("Steps", ["Upload File", "Generate Feedback"]) | |
if choice == "Upload File": | |
st.sidebar.markdown(""" | |
### Instructions: | |
- Prepare your response | |
- Save as PDF/PNG/JPG | |
- Upload using the uploader | |
- Verify extracted text | |
""") | |
st.title("Upload File for Processing") | |
st.header("Step 1: Upload File") | |
# Start timer for extraction | |
if 'extraction_start_time' not in st.session_state: | |
st.session_state['extraction_start_time'] = time.time() | |
uploaded_files = st.file_uploader( | |
"Upload up to 15 PDF or Image Files", | |
type=["pdf", "png", "jpg", "jpeg", "bmp", "gif", "tiff"], | |
accept_multiple_files=True | |
) | |
if uploaded_files: | |
if len(uploaded_files) > 15: | |
st.error("You can upload a maximum of 15 files at once.") | |
else: | |
extracted_texts = [] | |
for uploaded_file in uploaded_files: | |
try: | |
file_path = os.path.join(TEMP_DIR, uploaded_file.name) | |
with open(file_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
st.success(f"File {uploaded_file.name} uploaded successfully!") | |
is_handwritten = st.radio( | |
f"File type for {uploaded_file.name}:", | |
("Computer-Written", "Handwritten"), | |
index=0, | |
key=uploaded_file.name | |
) | |
if uploaded_file.name.lower().endswith(".pdf"): | |
extracted_text, accuracy_metrics = ocr.process_pdf_file_with_vision(file_path) | |
else: | |
extracted_text, accuracy_metrics = ocr.process_image_with_vision(file_path) | |
if accuracy_metrics.get("overall_accuracy", 0.0) < 0.6: | |
st.warning(f"OCR accuracy for {uploaded_file.name} is below 60%. Please upload a clearer image or higher quality file.") | |
continue | |
if not extracted_text.strip(): | |
st.warning(f"No text extracted from {uploaded_file.name}") | |
else: | |
extracted_texts.append(extracted_text) | |
except Exception as e: | |
st.error(f"Error processing file {uploaded_file.name}: {str(e)}") | |
continue | |
if not extracted_texts: | |
st.error("No files with acceptable OCR accuracy. Please upload clearer images or higher quality files.") | |
else: | |
combined_text = "\n\n".join(extracted_texts) | |
st.warning("Verify and edit the combined extracted text from all files below:") | |
user_text = st.text_area( | |
"Combined Extracted Text:", | |
combined_text, | |
height=400, | |
key="combined_extracted_text" | |
) | |
if st.button("Confirm All Text"): | |
if user_text.strip(): | |
st.session_state["extracted_text"] = user_text | |
st.session_state['extraction_end_time'] = time.time() | |
elapsed_extraction = st.session_state['extraction_end_time'] - st.session_state['extraction_start_time'] | |
st.success(f"All text verified and ready for feedback! (Extraction Time: {elapsed_extraction:.2f} seconds)") | |
else: | |
st.error("Text cannot be empty") | |
elif choice == "Generate Feedback": | |
st.sidebar.markdown(""" | |
### Instructions: | |
- Review extracted text | |
- Enter your name | |
- Download report | |
""") | |
st.title("Feedback and Grading Tool") | |
st.header("Step 2: Generate Feedback") | |
extracted_text = st.session_state.get("extracted_text", "") | |
if not extracted_text.strip(): | |
st.error("No text to process. Please go back and upload files with better quality or confirm the extracted text.") | |
return | |
try: | |
st.write("Generating feedback...") | |
feedback_start_time = time.time() | |
structured_feedback = grader.grade_answer_with_gpt( | |
extracted_text, | |
"CSS FPSC Guidelines Context" | |
) | |
feedback_end_time = time.time() | |
elapsed_feedback = feedback_end_time - feedback_start_time | |
st.success(f"Feedback generated! (Feedback Generation Time: {elapsed_feedback:.2f} seconds)") | |
# Generate rephrased text | |
rephrased_analysis = grader.rephrase_text_with_gpt(extracted_text) | |
structured_feedback["rephrased_analysis"] = rephrased_analysis | |
if not structured_feedback or "sections" not in structured_feedback: | |
st.error("Error: Invalid feedback format received. Please try again.") | |
return | |
st.success("Feedback generated!") | |
# Display feedback in web view | |
st.write("### Detailed Feedback") | |
# Add custom CSS for improved text alignment and presentation | |
st.markdown(""" | |
<style> | |
.highlight { | |
background-color: rgba(255, 255, 0, 0.3); | |
padding: 0 2px; | |
} | |
.feedback-section { | |
margin: 20px 0; | |
padding: 18px 20px; | |
border-radius: 10px; | |
background-color: #f8f9fa; | |
border: 1.5px solid #e0e0e0; | |
box-shadow: 0 2px 8px rgba(44,62,80,0.06); | |
} | |
.feedback-header { | |
font-size: 1.1em; | |
font-weight: bold; | |
margin: 15px 0 8px 0; | |
color: #2c3e50; | |
padding-bottom: 3px; | |
border-bottom: 1px solid #e0e0e0; | |
} | |
.feedback-content { | |
margin-left: 20px; | |
line-height: 1.6; | |
text-align: justify; | |
} | |
.feedback-item { | |
margin: 8px 0; | |
padding: 5px 0; | |
} | |
.quote-text { | |
font-style: italic; | |
color: #34495e; | |
margin: 10px 0; | |
padding: 10px; | |
border-left: 3px solid #3498db; | |
background-color: #f1f8ff; | |
} | |
.section-title { | |
font-size: 1.4em; | |
color: #2c3e50; | |
margin: 15px 0 18px 0; | |
padding-bottom: 5px; | |
border-bottom: 2px solid #3498db; | |
} | |
.error-type { | |
color: #e74c3c; | |
font-weight: bold; | |
} | |
.correction { | |
color: #27ae60; | |
font-weight: bold; | |
} | |
.explanation { | |
color: #7f8c8d; | |
font-style: italic; | |
} | |
.critical-area { | |
color: #e67e22; | |
font-weight: bold; | |
} | |
.error-frequency { | |
margin: 10px 0; | |
padding: 10px; | |
background-color: #fff; | |
border-radius: 5px; | |
border: 1px solid #e0e0e0; | |
} | |
.score-impact { | |
margin: 10px 0; | |
padding: 10px; | |
background-color: #f8f9fa; | |
border-radius: 5px; | |
border-left: 3px solid #3498db; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Essay Structure feedback UI (with explanations for failed criteria) | |
essay_structure_feedback = structured_feedback.get('essay_structure', {}) | |
st.markdown("<h4 style='margin-bottom:0.5em;'>Essay Structure</h4>", unsafe_allow_html=True) | |
if not isinstance(essay_structure_feedback, dict): | |
st.warning(f"Essay structure feedback is not a dict: {essay_structure_feedback}") | |
else: | |
for section, criteria in essay_structure_feedback.items(): | |
with st.expander(section, expanded=False): | |
if not isinstance(criteria, dict): | |
st.warning(f"Criteria for section '{section}' is not a dict: {criteria}") | |
continue | |
for crit, result in criteria.items(): | |
if not isinstance(result, dict): | |
st.warning(f"Result for criterion '{crit}' in section '{section}' is not a dict: {result}") | |
continue | |
passed = result.get('value', False) | |
explanation = result.get('explanation', '') | |
icon = 'β ' if passed else 'β' | |
color = '#27ae60' if passed else '#e74c3c' | |
if not passed and explanation: | |
st.markdown(f"<div style='margin-bottom:8px;'><b>β’ {crit}</b> <span style='color:{color};font-size:1.2em;'>{icon}</span> <span style='background:#f8d7da;color:#c0392b;padding:4px 10px;border-radius:8px;margin-left:8px;'>{explanation}</span></div>", unsafe_allow_html=True) | |
else: | |
st.markdown(f"<div style='margin-bottom:8px;'><b>β’ {crit}</b> <span style='color:{color};font-size:1.2em;'>{icon}</span></div>", unsafe_allow_html=True) | |
# Display AI Evaluation & Score Section | |
st.write("### AI Evaluation & Score") | |
for section in structured_feedback["sections"]: | |
score = section.get("score", 0) | |
issues = section.get("issues", []) | |
num_issues = len(issues) | |
section_name = section.get("name", "") | |
color = { | |
"Grammar & Punctuation": "#f8d7da", | |
"Tone & Formality": "#ffe5b4", | |
"Sentence Clarity & Structure": "#d6eaff", | |
"Vocabulary Suggestions": "#d4f8e8" | |
}.get(section_name, "#f0f0f0") | |
with st.container(): | |
st.markdown(f"<div style='background:{color};border-radius:12px;padding:18px 20px;margin-bottom:18px;box-shadow:0 2px 8px rgba(44,62,80,0.06);'>", unsafe_allow_html=True) | |
cols = st.columns([0.7, 0.3]) | |
with cols[0]: | |
st.markdown(f"<b style='font-size:1.1em'>{section_name}</b>", unsafe_allow_html=True) | |
with cols[1]: | |
st.markdown(f"<div style='float:right;'><span style='font-size:1.2em;font-weight:bold;'>{score}%</span></div>", unsafe_allow_html=True) | |
st.markdown(f"<div style='margin-top:8px;margin-bottom:8px;'><span style='background:#fff3f3;border-radius:8px;padding:4px 12px;color:#c0392b;font-weight:500;'>{num_issues} Issue{'s' if num_issues!=1 else ''}</span></div>", unsafe_allow_html=True) | |
with st.expander("Show Issues" if num_issues else "No Issues", expanded=False): | |
if num_issues == 0: | |
st.write("No issues found in this category.") | |
else: | |
for idx, issue in enumerate(issues, 1): | |
before = issue.get("before", "") | |
after = issue.get("after", "") | |
st.markdown(f"<div style='margin-bottom:12px;'><span style='color:#e74c3c;font-weight:bold;'>Before:</span> {before}<br><span style='color:#27ae60;font-weight:bold;'>After:</span> {after}</div>", unsafe_allow_html=True) | |
st.markdown("</div>", unsafe_allow_html=True) | |
st.write("---") | |
# Display Overall Scoring | |
overall_score = structured_feedback.get("overall_score", 40) | |
st.markdown("<h4 style='margin-bottom:0.5em;'>Overall Scoring</h4>", unsafe_allow_html=True) | |
st.markdown(f""" | |
<div style='background:#fff;border:2px solid #2986f5;border-radius:12px;padding:18px 0 18px 0;margin-bottom:18px;display:flex;align-items:center;justify-content:center;width:340px;'> | |
<div style='display:flex;align-items:center;justify-content:center;width:100%;'> | |
<div style='position:relative;width:80px;height:80px;'> | |
<svg width='80' height='80'> | |
<circle cx='40' cy='40' r='34' stroke='#e0e0e0' stroke-width='8' fill='none'/> | |
<circle cx='40' cy='40' r='34' stroke='#2986f5' stroke-width='8' fill='none' stroke-dasharray='213.6' stroke-dashoffset='{213.6 - (overall_score/100)*213.6}' stroke-linecap='round' transform='rotate(-90 40 40)'/> | |
</svg> | |
<div style='position:absolute;top:0;left:0;width:80px;height:80px;display:flex;align-items:center;justify-content:center;font-size:1.4em;font-weight:bold;color:#2986f5;'>{overall_score}%</div> | |
</div> | |
<div style='margin-left:24px;font-size:1.1em;font-weight:500;color:#222;'>Overall Essay Evaluation</div> | |
</div> | |
</div> | |
""", unsafe_allow_html=True) | |
# PDF Generation part | |
user_name = st.text_input("Enter your name:") | |
if user_name: | |
try: | |
pdf_buffer_feedback = BytesIO() | |
pdf_buffer_rephrased = BytesIO() | |
pdf_generator_feedback = PDFFeedbackGenerator( | |
output_path=pdf_buffer_feedback, | |
logo_path=LOGO_PATH | |
) | |
pdf_generator_rephrased = PDFFeedbackGenerator( | |
output_path=pdf_buffer_rephrased, | |
logo_path=LOGO_PATH | |
) | |
# Feedback PDF (no rephrased text) | |
pdf_generator_feedback.create_feedback_pdf( | |
user_name, | |
structured_feedback | |
) | |
pdf_buffer_feedback.seek(0) | |
# Rephrased Text PDF | |
pdf_generator_rephrased.create_rephrased_pdf( | |
user_name, | |
rephrased_analysis | |
) | |
pdf_buffer_rephrased.seek(0) | |
col1, col2 = st.columns(2) | |
with col1: | |
st.download_button( | |
label="Download Feedback Report (PDF)", | |
data=pdf_buffer_feedback, | |
file_name="feedback_report.pdf", | |
mime="application/pdf", | |
on_click=lambda: st.session_state.update({"feedback_downloaded": True}), | |
) | |
with col2: | |
st.download_button( | |
label="Download Rephrased Text Report (PDF)", | |
data=pdf_buffer_rephrased, | |
file_name="rephrased_text_report.pdf", | |
mime="application/pdf", | |
) | |
st.success("Reports ready for download!") | |
except Exception as e: | |
st.error(f"Error generating PDF: {str(e)}") | |
else: | |
st.info("π Enter your name to generate the detailed reports") | |
except Exception as e: | |
st.error(f"Error generating feedback: {str(e)}") | |
print(f"Feedback Generation Error: {str(e)}") | |
if __name__ == "__main__": | |
main() |