import os import tempfile import nltk import logging import sys import builtins from datetime import datetime from flask_cors import CORS from flask import Flask, request, jsonify, render_template, make_response, Response from werkzeug.utils import secure_filename from dotenv import load_dotenv import json import shutil from threading import Thread, Event import queue import time from contextlib import contextmanager import torch from utils import notification_queue, log_print # Load environment variables load_dotenv() # Create directories in /tmp which is writable BASE_DIR = '/tmp' # Use direct /tmp path for Hugging Face log_dir = os.path.join(BASE_DIR, 'app_logs') cache_dir = os.path.join(BASE_DIR, 'app_cache') nltk_data_dir = os.path.join(BASE_DIR, 'nltk_data') gensim_data_dir = os.path.join(BASE_DIR, 'gensim-data') upload_dir = os.path.join(BASE_DIR, 'uploads') ans_image_dir = os.path.join(BASE_DIR, 'ans_image') images_dir = os.path.join(BASE_DIR, 'images') log_file = os.path.join(log_dir, 'app.log') # Add log file path # Global variables for model caching and initialization status global_models = {} initialization_complete = Event() def ensure_directory(path): """Create directory and ensure full permissions with better error handling""" if os.path.exists(path): try: # Test write permissions test_file = os.path.join(path, '.test') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) return path except Exception as e: log_print(f"Warning: Directory exists but not writable: {path}", "WARNING") try: # Try to fix permissions os.chmod(path, 0o777) return path except Exception as chmod_e: log_print(f"Error fixing permissions for {path}: {chmod_e}", "ERROR") raise try: # Create directory with full permissions os.makedirs(path, mode=0o777, exist_ok=True) return path except Exception as e: try: # Try with more restricted permissions os.makedirs(path, mode=0o755, exist_ok=True) return path except Exception as nested_e: log_print(f"Error creating directory {path}: {nested_e}", "ERROR") raise # Simplified logging setup logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger(__name__) def initialize_resources(): """Initialize all required resources""" try: # Create essential directories first for directory in [nltk_data_dir, gensim_data_dir]: ensure_directory(directory) # Initialize NLTK required_nltk_data = ['stopwords', 'punkt', 'wordnet'] for data in required_nltk_data: try: nltk.data.find(os.path.join('tokenizers', data)) except LookupError: try: log_print(f"Downloading NLTK data: {data}") nltk.download(data, download_dir=nltk_data_dir, quiet=True) except Exception as e: log_print(f"Error downloading NLTK data {data}: {e}", "ERROR") # Initialize models try: get_or_load_model('fasttext') except Exception as e: log_print(f"Warning: Could not preload models: {e}", "WARNING") except Exception as e: log_print(f"Error during initialization: {e}", "ERROR") finally: # Signal that initialization is complete initialization_complete.set() # Create essential directories essential_dirs = [cache_dir, upload_dir, images_dir] for directory in essential_dirs: ensure_directory(directory) # Set environment variables os.environ['HF_HOME'] = cache_dir os.environ['GENSIM_DATA_DIR'] = gensim_data_dir # Add the custom directory to NLTK's search path nltk.data.path.insert(0, nltk_data_dir) # Start initialization in background initialization_thread = Thread(target=initialize_resources, daemon=True) initialization_thread.start() from flask import Flask, request, jsonify, render_template from HTR.app import extract_text_from_image from correct_answer_generation.answer_generation_database_creation import database_creation, answer_generation from similarity_check.tf_idf.tf_idf_score import create_tfidf_values, tfidf_answer_score from similarity_check.semantic_meaning_check.semantic import similarity_model_score, fasttext_similarity, question_vector_sentence, question_vector_word from similarity_check.llm_based_scoring.llm import llm_score app = Flask(__name__) app.config['JSON_SORT_KEYS'] = False app.config['JSONIFY_PRETTYPRINT_REGULAR'] = False app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size # Create temporary directories for Hugging Face Spaces UPLOAD_FOLDER = tempfile.mkdtemp() ANS_IMAGE_FOLDER = tempfile.mkdtemp() os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(ANS_IMAGE_FOLDER, exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['ANS_IMAGE_FOLDER'] = ANS_IMAGE_FOLDER # Configure CORS for all origins CORS(app, resources={ r"/*": { "origins": "*", "methods": ["GET", "POST", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization", "Accept"], "expose_headers": ["Content-Type"] } }) # Global error handler for all exceptions @app.errorhandler(Exception) def handle_exception(e): # Log the error for debugging app.logger.error(f"Unhandled exception: {str(e)}") error_details = { "status": "error", "error": "Internal server error", "message": str(e), "type": type(e).__name__, "timestamp": datetime.now().isoformat() } notification_queue.put({ "type": "error", "message": error_details }) return jsonify(error_details), 500 # Handle 404 errors @app.errorhandler(404) def not_found_error(error): return jsonify({ "status": "error", "error": "Not found", "message": "The requested resource was not found" }), 404 # Handle 400 Bad Request @app.errorhandler(400) def bad_request_error(error): return jsonify({ "status": "error", "error": "Bad request", "message": str(error) }), 400 @app.route('/') def index(): return render_template('2.html') def new_value(value, old_min, old_max, new_min, new_max): """Calculate new value with proper error handling""" try: if old_max == old_min: return new_min # Return minimum value if range is zero return new_min + ((value - old_min) * (new_max - new_min)) / (old_max - old_min) except Exception as e: log_print(f"Error in new_value calculation: {e}", "ERROR") return new_min # Return minimum value on error @app.route('/compute_answers', methods=['POST']) def compute_answers(): try: query_file = request.files.get('query_file') if not query_file: error_msg = "Missing query file" notification_queue.put({ "type": "error", "message": error_msg }) return jsonify({"error": error_msg}), 400 queries = query_file.read().decode('utf-8').splitlines() logger.info(f"Received queries: {queries}") file_type = request.form.get('file_type') ans_csv_file = request.files.get('ans_csv_file') if file_type == "csv": ans_csv_file = ans_csv_file.read().decode('utf-8').splitlines() c_answers = [] for i in ans_csv_file: c_answers.append(i.split('\\n')) logger.info(f"Processed CSV answers: {c_answers}") return jsonify({"answers": c_answers}), 200 try: c_answers = [] if file_type == 'pdf': # Create a temporary directory for PDF files pdf_dir = tempfile.mkdtemp() os.makedirs(pdf_dir, exist_ok=True) # Save uploaded PDF files pdf_files = [] for file in request.files.getlist('pdf_files[]'): if file.filename.endswith('.pdf'): filename = secure_filename(file.filename) filepath = os.path.join(pdf_dir, filename) file.save(filepath) pdf_files.append(filepath) if not pdf_files: return jsonify({"error": "No PDF files uploaded"}), 400 logger.info(f"Processing {len(pdf_files)} PDF files") # Process PDFs for pdf_file in pdf_files: database_creation(pdf_file) # Generate answers for query in queries: ans = [] for pdf_file in pdf_files: ans.append(answer_generation(pdf_file, query)) c_answers.append(ans) # Clean up PDF directory try: shutil.rmtree(pdf_dir) except Exception as e: logger.warning(f"Could not clean up PDF directory: {e}") else: return jsonify({"error": "Unsupported file type"}), 400 logger.info(f"Generated answers: {c_answers}") return jsonify({"answers": c_answers}), 200 except Exception as e: logger.error(f"Error processing files: {str(e)}") error_msg = str(e) notification_queue.put({ "type": "error", "message": error_msg }) return jsonify({"error": error_msg}), 500 except Exception as e: logger.error(f"Error in compute_answers: {str(e)}") error_msg = str(e) notification_queue.put({ "type": "error", "message": error_msg }) return jsonify({"error": error_msg}), 500 def validate_folder_structure(files): """Validate the folder structure of uploaded files""" try: # Get unique student folders student_folders = set() for file in files: if not file or not file.filename: continue path_parts = file.filename.split('/') if len(path_parts) >= 2: student_folders.add(path_parts[-2]) if not student_folders: return False, "No valid student folders found. Please create folders with student names." # Check if each student folder has the same number of files file_counts = {} for file in files: if not file or not file.filename: continue path_parts = file.filename.split('/') if len(path_parts) >= 2: student = path_parts[-2] file_counts[student] = file_counts.get(student, 0) + 1 if not file_counts: return False, "No valid files found in student folders. Please add image files." # Check if all students have the same number of files counts = list(file_counts.values()) if len(set(counts)) > 1: return False, "Inconsistent number of files across student folders. Each student must have the same number of images." # Validate file extensions for file in files: if not file or not file.filename: continue path_parts = file.filename.split('/') if len(path_parts) >= 2: filename = path_parts[-1] ext = os.path.splitext(filename)[1].lower() if ext not in ['.jpg', '.jpeg', '.png']: return False, f"Invalid file extension: {ext}. Only .jpg, .jpeg, and .png files are allowed." return True, f"Valid folder structure with {len(student_folders)} students and {counts[0]} files each" except Exception as e: return False, f"Error validating folder structure: {str(e)}" @app.route('/notifications') def notifications(): def generate(): error_count = 0 max_errors = 3 while True: try: # Get notification from queue (non-blocking) try: notification = notification_queue.get_nowait() if notification: yield "data: " + json.dumps(notification) + "\n\n" error_count = 0 # Reset error count on successful notification except queue.Empty: # If no notification, yield empty to keep connection alive yield "data: " + json.dumps({"type": "ping"}) + "\n\n" time.sleep(0.5) # Keep the connection alive except Exception as e: error_count += 1 error_msg = str(e).encode('ascii', 'ignore').decode('ascii') log_print(f"Error in notification stream: {error_msg}", "ERROR") yield "data: " + json.dumps({ "type": "error", "message": f"Server error: {error_msg}" }) + "\n\n" if error_count >= max_errors: break return Response(generate(), mimetype='text/event-stream') @app.route('/compute_marks', methods=['POST']) def compute_marks(): try: # Get answers from request a = request.form.get('answers') if not a: error_msg = "Missing answers in the request" notification_queue.put({ "type": "error", "message": error_msg }) return jsonify({"error": error_msg}), 400 a = json.loads(a) answers = [] for i in a: ans = i.split('\n\n') answers.append(ans) logger.info(f"Processing answers: {answers}") # Process uploaded files files = request.files.getlist('file') if not files: error_msg = "No files uploaded. Please upload student folders containing images." notification_queue.put({ "type": "error", "message": error_msg }) return jsonify({"error": error_msg}), 400 # Validate folder structure is_valid, message = validate_folder_structure(files) if not is_valid: notification_queue.put({ "type": "error", "message": message }) return jsonify({"error": message}), 400 # Create student folders structure data = {} parent_folder = app.config['ANS_IMAGE_FOLDER'] # Create student folders and save files for file in files: if file.filename.endswith(('.jpg', '.jpeg', '.png')): # Extract student folder from filename path_parts = file.filename.split('/') if len(path_parts) >= 2: student_folder = secure_filename(path_parts[-2]) student_path = os.path.join(parent_folder, student_folder) os.makedirs(student_path, exist_ok=True) # Save the file filename = secure_filename(path_parts[-1]) filepath = os.path.join(student_path, filename) file.save(filepath) if student_folder in data: data[student_folder].append((filename, filepath)) else: data[student_folder] = [(filename, filepath)] logger.info(f"Processed files structure: {data}") # Calculate marks results = [] sen_vec_answers = [] word_vec_answers = [] # Process correct answers for i in answers: temp_v = [] temp_w = [] for j in i: temp_v.append(question_vector_sentence(j)) temp_w.append(question_vector_word(j)) sen_vec_answers.append(temp_v) word_vec_answers.append(temp_w) # Calculate marks for each student for student in data: # Sort the image paths by filename sorted_images = sorted(data[student], key=lambda x: x[0]) count = 0 for filename, image_path in sorted_images: try: s_answer = extract_text_from_image(image_path) logger.info(f"Extracted text from {image_path}: {s_answer}") if not s_answer: logger.warning(f"No text extracted from {image_path}") results.append({ "subfolder": student, "image": filename, "marks": 0 }) count += 1 continue tf_idf_word_values, max_tfidf = create_tfidf_values(answers[count]) m = marks(s_answer, sen_vec_answers[count], word_vec_answers[count], tf_idf_word_values, max_tfidf, answers[count]) if isinstance(m, torch.Tensor): m = m.item() results.append({ "subfolder": student, "image": filename, "marks": round(m, 2) # Round marks to 2 decimal places }) count += 1 except Exception as e: logger.error(f"Error processing {image_path}: {str(e)}") results.append({ "subfolder": student, "image": filename, "marks": 0 }) count += 1 continue logger.info(f"Calculated results: {results}") # Clean up temporary files try: shutil.rmtree(parent_folder) except Exception as e: logger.warning(f"Could not clean up temporary files: {e}") return jsonify({"results": results}), 200 except Exception as e: error_msg = str(e) notification_queue.put({ "type": "error", "message": error_msg }) return jsonify({"error": error_msg}), 500 def marks(answer, sen_vec_answers, word_vec_answers, tf_idf_word_values, max_tfidf, correct_answers): try: marks = 0 marks1 = tfidf_answer_score(answer, tf_idf_word_values, max_tfidf, marks=10) if marks1 > 3: marks += new_value(marks1, old_min=3, old_max=10, new_min=0, new_max=5) logger.info(f"TFIDF Score: {float(marks)}") if marks1 > 2: marks2 = similarity_model_score(sen_vec_answers, answer) a = 0 if marks2 > 0.95: marks += 3 a = a + 3 elif marks2 > 0.5: marks += new_value(marks2, old_min=0.5, old_max=0.95, new_min=0, new_max=3) a = a + new_value(marks2, old_min=0.5, old_max=0.95, new_min=0, new_max=3) logger.info(f"Sentence transformers score: {a}") marks3 = fasttext_similarity(word_vec_answers, answer) b = 0 if marks2 > 0.9: marks += 2 b = b + 2 elif marks3 > 0.4: marks += new_value(marks3, old_min=0.4, old_max=0.9, new_min=0, new_max=2) b = b + new_value(marks3, old_min=0.4, old_max=0.9, new_min=0, new_max=2) logger.info(f"Fasttext score: {b}") marks4 = llm_score(correct_answers, answer) for i in range(len(marks4)): marks4[i] = float(marks4[i]) m = max(marks4) logger.info(f"LLM score: {m/2}") marks = marks/2 + m/2 return marks except Exception as e: logger.error(f"Error in marks calculation: {str(e)}") return 0 @app.route('/check_logs') def check_logs(): try: # Ensure log directory exists ensure_directory(log_dir) # If log file doesn't exist, create it if not os.path.exists(log_file): with open(log_file, 'w') as f: f.write("Log file created.\n") # Read last 1000 lines of logs with open(log_file, 'r') as f: logs = f.readlines()[-1000:] return jsonify({ "status": "success", "logs": "".join(logs) }) except Exception as e: log_print(f"Error reading logs: {str(e)}", "ERROR") return jsonify({ "status": "error", "error": str(e) }), 500 def is_valid_image_file(filename): """Validate image file extensions and basic format""" try: # Check file extension valid_extensions = {'.jpg', '.jpeg', '.png'} ext = os.path.splitext(filename)[1].lower() if ext not in valid_extensions: return False return True except Exception: return False def allowed_file(filename, allowed_extensions): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in allowed_extensions def get_or_load_model(model_name): """Get a model from cache or load it if not present""" if model_name not in global_models: try: if model_name == 'fasttext': from gensim.models import KeyedVectors log_print(f"Loading {model_name} model...") model_path = os.path.join(gensim_data_dir, 'fasttext-wiki-news-subwords-300', 'fasttext-wiki-news-subwords-300.gz') if not os.path.exists(model_path): from gensim.downloader import load log_print("Downloading fasttext model...") global_models[model_name] = load('fasttext-wiki-news-subwords-300') else: global_models[model_name] = KeyedVectors.load_word2vec_format(model_path) log_print(f"Successfully loaded {model_name} model") elif model_name == 'llm': # Implement LLM model loading here log_print("LLM model loading not implemented", "WARNING") return None except Exception as e: log_print(f"Error loading {model_name} model: {e}", "ERROR") raise return global_models.get(model_name) def wait_for_initialization(): """Wait for initialization to complete""" initialization_complete.wait() return True # Add this to the compute_marks route before processing files @app.before_request def ensure_initialization(): """Ensure all resources are initialized before processing requests""" if request.endpoint in ['compute_marks', 'compute_answers']: wait_for_initialization() def cleanup_temp_files(): """Clean up temporary files with proper error handling""" try: # Clean up the temporary processing directory temp_processing_dir = os.path.join(BASE_DIR, 'temp_processing') if os.path.exists(temp_processing_dir): shutil.rmtree(temp_processing_dir, ignore_errors=True) # Clean up the images directory if os.path.exists(images_dir): for file in os.listdir(images_dir): try: file_path = os.path.join(images_dir, file) if os.path.isfile(file_path): os.unlink(file_path) except Exception as e: log_print(f"Warning: Could not delete file {file_path}: {e}", "WARNING") # Clean up the upload folder if os.path.exists(UPLOAD_FOLDER): try: shutil.rmtree(UPLOAD_FOLDER, ignore_errors=True) except Exception as e: log_print(f"Warning: Could not clean up upload folder: {e}", "WARNING") except Exception as e: log_print(f"Error cleaning up temporary files: {e}", "ERROR") @app.before_first_request def setup_temp_directories(): """Set up temporary directories before first request""" try: # Create temporary directories with proper permissions global UPLOAD_FOLDER, ANS_IMAGE_FOLDER UPLOAD_FOLDER = tempfile.mkdtemp() ANS_IMAGE_FOLDER = tempfile.mkdtemp() # Ensure directories have proper permissions ensure_directory(UPLOAD_FOLDER) ensure_directory(ANS_IMAGE_FOLDER) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['ANS_IMAGE_FOLDER'] = ANS_IMAGE_FOLDER log_print(f"Created temporary directories: {UPLOAD_FOLDER}, {ANS_IMAGE_FOLDER}") except Exception as e: log_print(f"Error setting up temporary directories: {e}", "ERROR") raise if __name__ == '__main__': try: # Create essential directories for directory in essential_dirs: ensure_directory(directory) # Configure server app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 # Start the Flask app port = int(os.environ.get('PORT', 7860)) log_print(f"Starting server on port {port}") log_print("Server configuration:") log_print(f"- Threaded: True") log_print(f"- Debug mode: False") # Run the server with proper configuration app.run( host='0.0.0.0', port=port, debug=False, use_reloader=False, threaded=True ) except Exception as e: log_print(f"Fatal error starting server: {str(e)}", "ERROR") raise finally: log_print("Cleaning up temporary files...") cleanup_temp_files() log_print("Server shutdown complete")