import os import tempfile import nltk import logging import sys import builtins from datetime import datetime from flask_cors import CORS from dotenv import load_dotenv # Load environment variables load_dotenv() # Create directories in /tmp which is writable BASE_DIR = '/tmp' # Use direct /tmp path for Hugging Face log_dir = os.path.join(BASE_DIR, 'app_logs') cache_dir = os.path.join(BASE_DIR, 'app_cache') # Create necessary directories try: os.makedirs(log_dir, exist_ok=True) except Exception as e: print(f"Warning: Could not create log directory: {e}") # Fallback to temp directory log_dir = os.path.join(BASE_DIR, 'app_logs') os.makedirs(log_dir, exist_ok=True) # Create a log file with timestamp log_file = os.path.join(log_dir, f'app_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log') # Set up logging to both file and console try: logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) except Exception as e: print(f"Warning: Could not set up file logging: {e}") # Fallback to console-only logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) # Create a custom logger logger = logging.getLogger(__name__) # Add a print function that also logs def log_print(message, level="INFO"): # Use the original print function to avoid recursion builtins.print(message, flush=True) if level == "INFO": logger.info(message) elif level == "ERROR": logger.error(message) elif level == "WARNING": logger.warning(message) # Set up all cache and data directories in /tmp try: nltk_data_dir = os.path.join(BASE_DIR, 'nltk_data') gensim_data_dir = os.path.join(BASE_DIR, 'gensim-data') upload_dir = os.path.join(BASE_DIR, 'uploads') ans_image_dir = os.path.join(BASE_DIR, 'ans_image') images_dir = os.path.join(BASE_DIR, 'images') # Direct in /tmp for HTR # Create directories with correct permissions for directory in [cache_dir, nltk_data_dir, gensim_data_dir, upload_dir, ans_image_dir, images_dir]: os.makedirs(directory, exist_ok=True) except Exception as e: print(f"Warning: Could not create cache directories: {e}") # Don't use tempfile in Hugging Face environment cache_dir = os.path.join(BASE_DIR, 'cache') nltk_data_dir = os.path.join(BASE_DIR, 'nltk_data') gensim_data_dir = os.path.join(BASE_DIR, 'gensim_data') upload_dir = os.path.join(BASE_DIR, 'uploads') ans_image_dir = os.path.join(BASE_DIR, 'ans_image') images_dir = os.path.join(BASE_DIR, 'images') # Create directories for directory in [cache_dir, nltk_data_dir, gensim_data_dir, upload_dir, ans_image_dir, images_dir]: os.makedirs(directory, exist_ok=True) # Set environment variables os.environ['HF_HOME'] = cache_dir os.environ['GENSIM_DATA_DIR'] = gensim_data_dir # Add the custom directory to NLTK's search path nltk.data.path.insert(0, nltk_data_dir) # Download required NLTK data required_nltk_data = ['stopwords', 'punkt', 'wordnet'] for data in required_nltk_data: try: log_print(f"Downloading NLTK data: {data}") nltk.download(data, download_dir=nltk_data_dir) except Exception as e: log_print(f"Error downloading NLTK data {data}: {e}", "ERROR") raise from flask import Flask, request, jsonify, render_template import json import torch from werkzeug.utils import secure_filename from HTR.app import extract_text_from_image from correct_answer_generation.answer_generation_database_creation import database_creation, answer_generation from similarity_check.tf_idf.tf_idf_score import create_tfidf_values, tfidf_answer_score from similarity_check.semantic_meaning_check.semantic import similarity_model_score, fasttext_similarity,question_vector_sentence,question_vector_word from similarity_check.llm_based_scoring.llm import llm_score app = Flask(__name__) # Use the new upload directory UPLOAD_FOLDER = upload_dir # Configure CORS for Hugging Face CORS(app, resources={ r"/*": { "origins": ["*"], "methods": ["GET", "POST", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization"] } }) @app.route('/') def index(): return render_template('index.html') def new_value(value, old_min, old_max, new_min, new_max): new_value = new_min + ((value - old_min) * (new_max - new_min)) / (old_max - old_min) return new_value @app.route('/compute_answers', methods=['POST']) def compute_answers(): try: log_print("\n=== Starting Answer Computation ===") query_file = request.files.get('query_file') if not query_file: log_print("Missing query file", "ERROR") return jsonify({"error": "Missing query file"}), 400 try: queries = query_file.read().decode('utf-8').splitlines() if not queries: log_print("No queries found in file", "ERROR") return jsonify({"error": "No queries found in file"}), 400 log_print(f"Received queries: {queries}") except UnicodeDecodeError: log_print("Invalid file encoding", "ERROR") return jsonify({"error": "Invalid file encoding"}), 400 file_type = request.form.get('file_type') if not file_type: log_print("Missing file type", "ERROR") return jsonify({"error": "Missing file type"}), 400 ans_csv_file = request.files.get('ans_csv_file') if file_type == "csv": if not ans_csv_file: log_print("Missing answer CSV file", "ERROR") return jsonify({"error": "Missing answer CSV file"}), 400 try: ans_csv_file = ans_csv_file.read().decode('utf-8').splitlines() c_answers = [] for i in ans_csv_file: c_answers.append(i.split('\\n')) log_print(f"Processed CSV answers: {c_answers}") return jsonify({"answers": c_answers}), 200 except UnicodeDecodeError: log_print("Invalid CSV file encoding", "ERROR") return jsonify({"error": "Invalid CSV file encoding"}), 400 c_answers = [] if file_type == 'pdf': # Create a temporary directory for PDF files pdf_dir = os.path.join(cache_dir, 'pdf_files') os.makedirs(pdf_dir, exist_ok=True) # Save uploaded PDF files pdf_files = [] for file in request.files.getlist('pdf_files[]'): if file.filename.endswith('.pdf'): filename = secure_filename(file.filename) filepath = os.path.join(pdf_dir, filename) file.save(filepath) pdf_files.append(filepath) if not pdf_files: log_print("No PDF files uploaded", "ERROR") return jsonify({"error": "No PDF files uploaded"}), 400 log_print(f"Processing {len(pdf_files)} PDF files") # Process PDFs for pdf_file in pdf_files: database_creation(pdf_file) # Generate answers for query in queries: ans = [] for pdf_file in pdf_files: ans.append(answer_generation(pdf_file, query)) c_answers.append(ans) # Clean up PDF directory try: import shutil shutil.rmtree(pdf_dir) except Exception as e: log_print(f"Warning: Could not clean up PDF directory: {e}", "WARNING") else: log_print(f"Unsupported file type: {file_type}", "ERROR") return jsonify({"error": "Unsupported file type"}), 400 log_print(f"Generated answers: {c_answers}") return jsonify({"answers": c_answers}), 200 except Exception as e: log_print(f"Error in compute_answers: {str(e)}", "ERROR") return jsonify({"error": str(e)}), 500 @app.route('/compute_marks', methods=['POST']) def compute_marks(): try: log_print("\n=== Starting Marks Computation ===") # Get and process answers a = request.form.get('answers') if not a: log_print("No answers provided", "ERROR") return jsonify({"error": "No answers provided"}), 400 try: log_print("=== Processing Answers ===") log_print(f"Received answers: {a}") a = json.loads(a) answers = [] for i in a: ans = i.split('\n\n') answers.append(ans) log_print(f"Processed answers structure: {answers}") except json.JSONDecodeError: log_print("Invalid JSON format in answers", "ERROR") return jsonify({"error": "Invalid JSON format in answers"}), 400 # Process answers to ensure consistent format processed_answers = [] for ans in answers: if isinstance(ans, str): processed_answers.append([ans]) else: processed_answers.append(ans) answers = processed_answers # Create necessary directories try: # Ensure the images directory exists in the HTR expected location os.makedirs(images_dir, exist_ok=True) # Get files from the request files = request.files.getlist('files[]') if not files: log_print("No files were uploaded", "ERROR") return jsonify({"error": "No files were uploaded"}), 400 # Create student folders and save files data = {} for file in files: if file and is_valid_image_file(file.filename): # Extract student folder from the path path_parts = file.filename.split('/') if len(path_parts) >= 2: student_folder = path_parts[-2] # Get the parent folder name filename = path_parts[-1] # Get the actual filename # Save directly to the images directory with a unique name htr_filename = f"{student_folder}_{filename}" htr_filepath = os.path.join(images_dir, htr_filename) # Save the file file.save(htr_filepath) log_print(f"Saved file: {htr_filepath}") # Add to data structure if student_folder not in data: data[student_folder] = [] data[student_folder].append(htr_filepath) if not data: log_print("No valid image files were found in the upload", "ERROR") return jsonify({"error": "No valid image files were found"}), 400 log_print(f"Processed files structure: {data}") # Initialize vectors for answers sen_vec_answers = [] word_vec_answers = [] for i in answers: temp_v = [] temp_w = [] for j in i: temp_v.append(question_vector_sentence(j)) temp_w.append(question_vector_word(j)) sen_vec_answers.append(temp_v) word_vec_answers.append(temp_w) # Calculate marks s_marks = {} for student_folder, file_paths in data.items(): s_marks[student_folder] = [] count = 0 for image_path in file_paths: try: s_answer = extract_text_from_image(image_path) log_print(f"\nProcessing {student_folder}/{os.path.basename(image_path)}:") log_print(f"Extracted answer: {s_answer}") if s_answer and count < len(answers): log_print(f"Reference answer: {answers[count]}") tf_idf_word_values, max_tfidf = create_tfidf_values(answers[count]) m = marks(s_answer, sen_vec_answers[count], word_vec_answers[count], tf_idf_word_values, max_tfidf, answers[count]) if isinstance(m, torch.Tensor): m = m.item() s_marks[student_folder].append(round(float(m), 2)) log_print(f"Marks awarded: {m}") else: s_marks[student_folder].append(0) log_print(f"No text extracted or no reference answer for index {count}", "WARNING") count += 1 except Exception as e: log_print(f"Error processing {image_path}: {str(e)}", "ERROR") s_marks[student_folder].append(0) if not s_marks: log_print("No marks were computed", "ERROR") return jsonify({"error": "No marks were computed. Please check your input files and answers."}), 400 log_print("\nFinal Results:") for student, marks_list in s_marks.items(): log_print(f"{student}: {marks_list}") return jsonify({"message": s_marks}), 200 except Exception as e: log_print(f"Error in compute_marks: {str(e)}", "ERROR") return jsonify({"error": str(e)}), 500 def marks(answer, sen_vec_answers, word_vec_answers, tf_idf_word_values, max_tfidf, correct_answers): marks = 0 log_print("\n=== Marks Calculation ===") log_print(f"Processing answer: {answer[:100]}...") marks1 = tfidf_answer_score(answer, tf_idf_word_values, max_tfidf, marks=10) log_print(f"TFIDF Score: {marks1}") if marks1 > 3: marks += new_value(marks1, old_min=3, old_max=10, new_min=0, new_max=5) log_print(f"After TFIDF adjustment: {marks}") if marks1 > 2: marks2 = similarity_model_score(sen_vec_answers, answer) log_print(f"Sentence Similarity Score: {marks2}") if marks2 > 0.95: marks += 3 elif marks2 > 0.5: marks += new_value(marks2, old_min=0.5, old_max=0.95, new_min=0, new_max=3) marks3 = fasttext_similarity(word_vec_answers, answer) log_print(f"Word Similarity Score: {marks3}") if marks3 > 0.9: marks += 2 elif marks3 > 0.4: marks += new_value(marks3, old_min=0.4, old_max=0.9, new_min=0, new_max=2) marks4 = llm_score(correct_answers, answer) log_print(f"LLM Scores: {marks4}") for i in range(len(marks4)): marks4[i] = float(marks4[i]) m = max(marks4) log_print(f"Max LLM Score: {m}") marks = marks/2 + m/2 log_print(f"Final marks: {marks}") else: log_print("WARNING: TFIDF score too low, returning 0") return marks @app.route('/check_logs') def check_logs(): try: with open(log_file, 'r') as f: logs = f.read() return jsonify({"logs": logs}) except Exception as e: return jsonify({"error": str(e)}) # Add file type validation def is_valid_image_file(filename): valid_extensions = {'.jpg', '.jpeg', '.png'} return os.path.splitext(filename)[1].lower() in valid_extensions def allowed_file(filename, allowed_extensions): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in allowed_extensions def cleanup_temp_files(): try: # Clean up only the images directory if os.path.exists(images_dir): for file in os.listdir(images_dir): file_path = os.path.join(images_dir, file) try: if os.path.isfile(file_path): os.unlink(file_path) except Exception as e: log_print(f"Error deleting file {file_path}: {e}", "ERROR") log_print("Successfully cleaned up temporary files") except Exception as e: log_print(f"Error cleaning up temporary files: {e}", "ERROR") if __name__ == '__main__': try: port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port, debug=False) finally: cleanup_temp_files()