import os import streamlit as st from OCR import OCR from Feedback import Grader from PDFFeedbackGenerator import PDFFeedbackGenerator import matplotlib from io import BytesIO from streamlit.web.server.websocket_headers import _get_websocket_headers import re import time from pdf2image import convert_from_path matplotlib.use("Agg") # Non-GUI backend for matplotlib # Constants LOGO_PATH = "cslogo.png" TEMP_DIR = "temp" # Changed from /tmp to relative path POPPLER_PATH = os.path.join(os.path.dirname(__file__), "poppler", "bin") # Create temp directory if it doesn't exist os.makedirs(TEMP_DIR, exist_ok=True) # Allow iframe embedding and add CORS headers def custom_get_websocket_headers(*args, **kwargs): headers = _get_websocket_headers(*args, **kwargs) headers["X-Frame-Options"] = "ALLOWALL" headers["Access-Control-Allow-Origin"] = "*" headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" headers["Access-Control-Allow-Headers"] = "Content-Type" return headers # Apply the override import streamlit.web.server.websocket_headers streamlit.web.server.websocket_headers._get_websocket_headers = custom_get_websocket_headers # Google Cloud credentials os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "css-edge-e347b0ed2b9e.json" # Initialize instances api_key = os.environ.get("OPENAI_API_KEY") if not api_key: raise RuntimeError("OPENAI_API_KEY environment variable not set") ocr = OCR() grader = Grader(api_key=api_key) # Main application logic def main(): st.sidebar.title("Navigation") choice = st.sidebar.radio("Steps", ["Upload File", "Generate Feedback"]) if choice == "Upload File": st.sidebar.markdown(""" ### Instructions: - Prepare your response - Save as PDF/PNG/JPG - Upload using the uploader - Verify extracted text """) st.title("Upload File for Processing") st.header("Step 1: Upload File") # Start timer for extraction if 'extraction_start_time' not in st.session_state: st.session_state['extraction_start_time'] = time.time() uploaded_files = st.file_uploader( "Upload up to 15 PDF or Image Files", type=["pdf", "png", "jpg", "jpeg", "bmp", "gif", "tiff"], accept_multiple_files=True ) if uploaded_files: if len(uploaded_files) > 15: st.error("You can upload a maximum of 15 files at once.") else: extracted_texts = [] for uploaded_file in uploaded_files: try: file_path = os.path.join(TEMP_DIR, uploaded_file.name) with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) st.success(f"File {uploaded_file.name} uploaded successfully!") is_handwritten = st.radio( f"File type for {uploaded_file.name}:", ("Computer-Written", "Handwritten"), index=0, key=uploaded_file.name ) if uploaded_file.name.lower().endswith(".pdf"): extracted_text, accuracy_metrics = ocr.process_pdf_file_with_vision(file_path) else: extracted_text, accuracy_metrics = ocr.process_image_with_vision(file_path) if accuracy_metrics.get("overall_accuracy", 0.0) < 0.6: st.warning(f"OCR accuracy for {uploaded_file.name} is below 60%. Please upload a clearer image or higher quality file.") continue if not extracted_text.strip(): st.warning(f"No text extracted from {uploaded_file.name}") else: extracted_texts.append(extracted_text) except Exception as e: st.error(f"Error processing file {uploaded_file.name}: {str(e)}") continue if not extracted_texts: st.error("No files with acceptable OCR accuracy. Please upload clearer images or higher quality files.") else: combined_text = "\n\n".join(extracted_texts) st.warning("Verify and edit the combined extracted text from all files below:") user_text = st.text_area( "Combined Extracted Text:", combined_text, height=400, key="combined_extracted_text" ) if st.button("Confirm All Text"): if user_text.strip(): st.session_state["extracted_text"] = user_text st.session_state['extraction_end_time'] = time.time() elapsed_extraction = st.session_state['extraction_end_time'] - st.session_state['extraction_start_time'] st.success(f"All text verified and ready for feedback! (Extraction Time: {elapsed_extraction:.2f} seconds)") else: st.error("Text cannot be empty") elif choice == "Generate Feedback": st.sidebar.markdown(""" ### Instructions: - Review extracted text - Enter your name - Download report """) st.title("Feedback and Grading Tool") st.header("Step 2: Generate Feedback") extracted_text = st.session_state.get("extracted_text", "") if not extracted_text.strip(): st.error("No text to process. Please go back and upload files with better quality or confirm the extracted text.") return try: st.write("Generating feedback...") feedback_start_time = time.time() structured_feedback = grader.grade_answer_with_gpt( extracted_text, "CSS FPSC Guidelines Context" ) feedback_end_time = time.time() elapsed_feedback = feedback_end_time - feedback_start_time st.success(f"Feedback generated! (Feedback Generation Time: {elapsed_feedback:.2f} seconds)") # Generate rephrased text rephrased_analysis = grader.rephrase_text_with_gpt(extracted_text) structured_feedback["rephrased_analysis"] = rephrased_analysis if not structured_feedback or "sections" not in structured_feedback: st.error("Error: Invalid feedback format received. Please try again.") return st.success("Feedback generated!") # Display feedback in web view st.write("### Detailed Feedback") # Add custom CSS for improved text alignment and presentation st.markdown(""" """, unsafe_allow_html=True) # Essay Structure feedback UI (with explanations for failed criteria) essay_structure_feedback = structured_feedback.get('essay_structure', {}) st.markdown("

Essay Structure

", unsafe_allow_html=True) if not isinstance(essay_structure_feedback, dict): st.warning(f"Essay structure feedback is not a dict: {essay_structure_feedback}") else: for section, criteria in essay_structure_feedback.items(): with st.expander(section, expanded=False): if not isinstance(criteria, dict): st.warning(f"Criteria for section '{section}' is not a dict: {criteria}") continue for crit, result in criteria.items(): if not isinstance(result, dict): st.warning(f"Result for criterion '{crit}' in section '{section}' is not a dict: {result}") continue passed = result.get('value', False) explanation = result.get('explanation', '') icon = '✅' if passed else '❌' color = '#27ae60' if passed else '#e74c3c' if not passed and explanation: st.markdown(f"
• {crit} {icon} {explanation}
", unsafe_allow_html=True) else: st.markdown(f"
• {crit} {icon}
", unsafe_allow_html=True) # Display AI Evaluation & Score Section st.write("### AI Evaluation & Score") for section in structured_feedback["sections"]: score = section.get("score", 0) issues = section.get("issues", []) num_issues = len(issues) section_name = section.get("name", "") color = { "Grammar & Punctuation": "#f8d7da", "Tone & Formality": "#ffe5b4", "Sentence Clarity & Structure": "#d6eaff", "Vocabulary Suggestions": "#d4f8e8" }.get(section_name, "#f0f0f0") with st.container(): st.markdown(f"
", unsafe_allow_html=True) cols = st.columns([0.7, 0.3]) with cols[0]: st.markdown(f"{section_name}", unsafe_allow_html=True) with cols[1]: st.markdown(f"
{score}%
", unsafe_allow_html=True) st.markdown(f"
{num_issues} Issue{'s' if num_issues!=1 else ''}
", unsafe_allow_html=True) with st.expander("Show Issues" if num_issues else "No Issues", expanded=False): if num_issues == 0: st.write("No issues found in this category.") else: for idx, issue in enumerate(issues, 1): before = issue.get("before", "") after = issue.get("after", "") st.markdown(f"
Before: {before}
After: {after}
", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) st.write("---") # Display Overall Scoring overall_score = structured_feedback.get("overall_score", 40) st.markdown("

Overall Scoring

", unsafe_allow_html=True) st.markdown(f"""
{overall_score}%
Overall Essay Evaluation
""", unsafe_allow_html=True) # PDF Generation part user_name = st.text_input("Enter your name:") if user_name: try: pdf_buffer_feedback = BytesIO() pdf_buffer_rephrased = BytesIO() pdf_generator_feedback = PDFFeedbackGenerator( output_path=pdf_buffer_feedback, logo_path=LOGO_PATH ) pdf_generator_rephrased = PDFFeedbackGenerator( output_path=pdf_buffer_rephrased, logo_path=LOGO_PATH ) # Feedback PDF (no rephrased text) pdf_generator_feedback.create_feedback_pdf( user_name, structured_feedback ) pdf_buffer_feedback.seek(0) # Rephrased Text PDF pdf_generator_rephrased.create_rephrased_pdf( user_name, rephrased_analysis ) pdf_buffer_rephrased.seek(0) col1, col2 = st.columns(2) with col1: st.download_button( label="Download Feedback Report (PDF)", data=pdf_buffer_feedback, file_name="feedback_report.pdf", mime="application/pdf", on_click=lambda: st.session_state.update({"feedback_downloaded": True}), ) with col2: st.download_button( label="Download Rephrased Text Report (PDF)", data=pdf_buffer_rephrased, file_name="rephrased_text_report.pdf", mime="application/pdf", ) st.success("Reports ready for download!") except Exception as e: st.error(f"Error generating PDF: {str(e)}") else: st.info("👉 Enter your name to generate the detailed reports") except Exception as e: st.error(f"Error generating feedback: {str(e)}") print(f"Feedback Generation Error: {str(e)}") if __name__ == "__main__": main()