import os import tempfile import nltk import logging import sys import builtins from datetime import datetime from flask_cors import CORS from flask import Flask, request, jsonify, render_template, make_response from werkzeug.utils import secure_filename from dotenv import load_dotenv import json # Load environment variables load_dotenv() # Create directories in /tmp which is writable BASE_DIR = '/tmp' # Use direct /tmp path for Hugging Face log_dir = os.path.join(BASE_DIR, 'app_logs') cache_dir = os.path.join(BASE_DIR, 'app_cache') def ensure_directory(path): """Create directory and ensure full permissions""" try: os.makedirs(path, mode=0o777, exist_ok=True) os.chmod(path, 0o777) # Full permissions except Exception as e: print(f"Warning: Could not set permissions for {path}: {e}") # Create necessary directories with full permissions ensure_directory(log_dir) ensure_directory(cache_dir) # Create a log file with timestamp and ensure it's writable log_file = os.path.join(log_dir, f'app_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log') try: # Create log file with full permissions with open(log_file, 'w') as f: pass os.chmod(log_file, 0o666) except Exception as e: print(f"Warning: Could not create log file: {e}") log_file = os.path.join(BASE_DIR, 'app.log') with open(log_file, 'w') as f: pass os.chmod(log_file, 0o666) # Set up logging to both file and console try: logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) except Exception as e: print(f"Warning: Could not set up file logging: {e}") # Fallback to console-only logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) # Create a custom logger logger = logging.getLogger(__name__) # Add a print function that also logs def log_print(message, level="INFO"): # Use the original print function to avoid recursion builtins.print(message, flush=True) if level == "INFO": logger.info(message) elif level == "ERROR": logger.error(message) elif level == "WARNING": logger.warning(message) # Set up all cache and data directories in /tmp with full permissions try: nltk_data_dir = os.path.join(BASE_DIR, 'nltk_data') gensim_data_dir = os.path.join(BASE_DIR, 'gensim-data') upload_dir = os.path.join(BASE_DIR, 'uploads') ans_image_dir = os.path.join(BASE_DIR, 'ans_image') images_dir = os.path.join(BASE_DIR, 'images') # Create all directories with full permissions for directory in [nltk_data_dir, gensim_data_dir, upload_dir, ans_image_dir, images_dir]: ensure_directory(directory) except Exception as e: print(f"Warning: Could not create cache directories: {e}") # Create directories in /tmp directly for dirname in ['nltk_data', 'gensim-data', 'uploads', 'ans_image', 'images']: path = os.path.join(BASE_DIR, dirname) ensure_directory(path) # Set environment variables os.environ['HF_HOME'] = cache_dir os.environ['GENSIM_DATA_DIR'] = gensim_data_dir # Add the custom directory to NLTK's search path nltk.data.path.insert(0, nltk_data_dir) # Download required NLTK data required_nltk_data = ['stopwords', 'punkt', 'wordnet'] for data in required_nltk_data: try: log_print(f"Downloading NLTK data: {data}") nltk.download(data, download_dir=nltk_data_dir) except Exception as e: log_print(f"Error downloading NLTK data {data}: {e}", "ERROR") raise from flask import Flask, request, jsonify, render_template import json import torch from werkzeug.utils import secure_filename from HTR.app import extract_text_from_image from correct_answer_generation.answer_generation_database_creation import database_creation, answer_generation from similarity_check.tf_idf.tf_idf_score import create_tfidf_values, tfidf_answer_score from similarity_check.semantic_meaning_check.semantic import question_vector_sentence, question_vector_word, fasttext_similarity from similarity_check.llm_based_scoring.llm import llm_score app = Flask(__name__) app.config['JSON_SORT_KEYS'] = False app.config['JSONIFY_PRETTYPRINT_REGULAR'] = False app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size # Create a temporary directory for file uploads UPLOAD_FOLDER = tempfile.mkdtemp() app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER logger.info(f"Using temporary upload folder: {UPLOAD_FOLDER}") # Configure CORS for all origins CORS(app, resources={ r"/*": { "origins": "*", "methods": ["GET", "POST", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization", "Accept"], "expose_headers": ["Content-Type"] } }) # Global error handler for all exceptions @app.errorhandler(Exception) def handle_exception(e): # Log the error for debugging app.logger.error(f"Unhandled exception: {str(e)}") return jsonify({ "error": "Internal server error", "message": str(e) }), 500 # Handle 404 errors @app.errorhandler(404) def not_found_error(error): return jsonify({ "error": "Not found", "message": "The requested resource was not found" }), 404 # Handle 400 Bad Request @app.errorhandler(400) def bad_request_error(error): return jsonify({ "error": "Bad request", "message": str(error) }), 400 @app.route('/') def index(): try: response = make_response(render_template('index.html')) response.headers['Content-Type'] = 'text/html; charset=utf-8' return response except Exception as e: return jsonify({"error": str(e)}), 500 def new_value(value, old_min, old_max, new_min, new_max): new_value = new_min + ((value - old_min) * (new_max - new_min)) / (old_max - old_min) return new_value @app.route('/compute_answers', methods=['POST']) def compute_answers(): try: log_print("\n=== Starting Answer Computation ===") query_file = request.files.get('query_file') if not query_file: log_print("Missing query file", "ERROR") return jsonify({"error": "Missing query file"}), 400 try: queries = query_file.read().decode('utf-8').splitlines() if not queries: log_print("No queries found in file", "ERROR") return jsonify({"error": "No queries found in file"}), 400 log_print(f"Received queries: {queries}") except UnicodeDecodeError: log_print("Invalid file encoding", "ERROR") return jsonify({"error": "Invalid file encoding"}), 400 file_type = request.form.get('file_type') if not file_type: log_print("Missing file type", "ERROR") return jsonify({"error": "Missing file type"}), 400 ans_csv_file = request.files.get('ans_csv_file') if file_type == "csv": if not ans_csv_file: log_print("Missing answer CSV file", "ERROR") return jsonify({"error": "Missing answer CSV file"}), 400 try: ans_csv_file = ans_csv_file.read().decode('utf-8').splitlines() c_answers = [] for i in ans_csv_file: c_answers.append(i.split('\\n')) log_print(f"Processed CSV answers: {c_answers}") return jsonify({"answers": c_answers}), 200 except UnicodeDecodeError: log_print("Invalid CSV file encoding", "ERROR") return jsonify({"error": "Invalid CSV file encoding"}), 400 c_answers = [] if file_type == 'pdf': # Create a temporary directory for PDF files pdf_dir = os.path.join(cache_dir, 'pdf_files') os.makedirs(pdf_dir, exist_ok=True) # Save uploaded PDF files pdf_files = [] for file in request.files.getlist('pdf_files[]'): if file.filename.endswith('.pdf'): filename = secure_filename(file.filename) filepath = os.path.join(pdf_dir, filename) file.save(filepath) pdf_files.append(filepath) if not pdf_files: log_print("No PDF files uploaded", "ERROR") return jsonify({"error": "No PDF files uploaded"}), 400 log_print(f"Processing {len(pdf_files)} PDF files") # Process PDFs for pdf_file in pdf_files: database_creation(pdf_file) # Generate answers for query in queries: ans = [] for pdf_file in pdf_files: ans.append(answer_generation(pdf_file, query)) c_answers.append(ans) # Clean up PDF directory try: import shutil shutil.rmtree(pdf_dir) except Exception as e: log_print(f"Warning: Could not clean up PDF directory: {e}", "WARNING") else: log_print(f"Unsupported file type: {file_type}", "ERROR") return jsonify({"error": "Unsupported file type"}), 400 log_print(f"Generated answers: {c_answers}") return jsonify({"answers": c_answers}), 200 except Exception as e: log_print(f"Error in compute_answers: {str(e)}", "ERROR") error_msg = str(e).encode('ascii', 'ignore').decode('ascii') return jsonify({"error": error_msg}), 500 @app.route('/compute_marks', methods=['POST']) def compute_marks(): try: # Get correct answers correct_answers = request.form.getlist('correct_answers[]') if not correct_answers: return jsonify({ "error": "Missing data", "message": "No correct answers provided" }), 400 # Create TFIDF values for correct answers try: max_tfidf = create_tfidf_values(correct_answers) except Exception as e: return jsonify({ "error": "TFIDF error", "message": f"Error creating TFIDF values: {str(e)}" }), 400 # Get all uploaded files files = request.files.getlist('file') if not files: return jsonify({ "error": "Missing data", "message": "No files uploaded" }), 400 # Create a temporary directory for processing base_temp_dir = tempfile.mkdtemp() # Dictionary to store results by student folder results = {} failed_files = [] try: # Process each file for file in files: try: # Validate file if not file or not file.filename: continue # Get folder structure from file path path_parts = file.filename.split('/') if len(path_parts) < 2: continue student_folder = path_parts[-2] filename = path_parts[-1] if student_folder not in results: results[student_folder] = [0] * len(correct_answers) # Save and process file student_dir = os.path.join(base_temp_dir, student_folder) os.makedirs(student_dir, exist_ok=True) filepath = os.path.join(student_dir, filename) file.save(filepath) # Extract text extracted_text = extract_text_from_image(filepath) if not extracted_text: failed_files.append({ "file": file.filename, "error": "No text could be extracted" }) continue # Clean the extracted text for JSON extracted_text = extracted_text.encode('ascii', 'ignore').decode('ascii') # Find best matching answer best_score = 0 best_answer_index = 0 for i, correct_answer in enumerate(correct_answers): try: # Clean the correct answer for comparison clean_correct_answer = correct_answer.encode('ascii', 'ignore').decode('ascii') # Calculate similarity scores semantic_score = question_vector_sentence(extracted_text, clean_correct_answer) word_score = question_vector_word(extracted_text, clean_correct_answer) tfidf_score = tfidf_answer_score(extracted_text, clean_correct_answer, max_tfidf) ft_score = fasttext_similarity(extracted_text, clean_correct_answer) llm_marks = llm_score(extracted_text, clean_correct_answer) combined_score = ( semantic_score * 0.3 + word_score * 0.2 + tfidf_score * 0.2 + ft_score * 0.2 + llm_marks * 0.1 ) if combined_score > best_score: best_score = combined_score best_answer_index = i except Exception as score_error: error_msg = str(score_error).encode('ascii', 'ignore').decode('ascii') failed_files.append({ "file": file.filename, "error": f"Error calculating scores: {error_msg}" }) continue marks = new_value(best_score, 0, 1, 0, 5) results[student_folder][best_answer_index] = round(marks, 2) except Exception as e: error_msg = str(e).encode('ascii', 'ignore').decode('ascii') failed_files.append({ "file": file.filename, "error": error_msg }) continue finally: # Clean up temp directory try: shutil.rmtree(base_temp_dir) except Exception: pass if not results: return jsonify({ "error": "Processing error", "message": "No results computed" }), 400 # Clean the results for JSON response clean_results = {} for student, scores in results.items(): clean_student = student.encode('ascii', 'ignore').decode('ascii') clean_results[clean_student] = scores response = jsonify({ "results": clean_results, "failed_files": [{ "file": f["file"].encode('ascii', 'ignore').decode('ascii'), "error": f["error"].encode('ascii', 'ignore').decode('ascii') } for f in failed_files] }) response.headers['Content-Type'] = 'application/json' return response except Exception as e: error_msg = str(e).encode('ascii', 'ignore').decode('ascii') return jsonify({ "error": "Server error", "message": f"Error computing marks: {error_msg}" }), 500 @app.route('/check_logs') def check_logs(): try: with open(log_file, 'r') as f: logs = f.read() return jsonify({"logs": logs}) except Exception as e: return jsonify({"error": str(e)}) def is_valid_image_file(filename): """Validate image file extensions and basic format""" try: # Check file extension valid_extensions = {'.jpg', '.jpeg', '.png'} ext = os.path.splitext(filename)[1].lower() if ext not in valid_extensions: return False return True except Exception: return False def allowed_file(filename, allowed_extensions): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in allowed_extensions def cleanup_temp_files(): """Clean up temporary files with proper error handling""" try: if os.path.exists(images_dir): # First ensure we have write permissions try: os.chmod(images_dir, 0o777) except Exception as e: log_print(f"Warning: Could not set directory permissions: {e}", "WARNING") # Try to remove files for file in os.listdir(images_dir): file_path = os.path.join(images_dir, file) try: if os.path.isfile(file_path): try: os.chmod(file_path, 0o666) # Ensure we can delete the file except: pass os.unlink(file_path) except Exception as e: log_print(f"Warning: Could not delete file {file_path}: {e}", "WARNING") continue # Recreate the directory with full permissions ensure_directory(images_dir) log_print("Successfully cleaned up temporary files") except Exception as e: log_print(f"Error cleaning up temporary files: {e}", "ERROR") if __name__ == '__main__': try: # Ensure all directories exist and have proper permissions before starting for directory in [log_dir, cache_dir, nltk_data_dir, gensim_data_dir, upload_dir, ans_image_dir, images_dir]: ensure_directory(directory) port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port, debug=False) finally: cleanup_temp_files()