|
import os |
|
import tempfile |
|
import gc |
|
import psutil |
|
import time |
|
import logging |
|
import queue |
|
import torch |
|
from all_models import ModelSingleton |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[logging.StreamHandler()] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
notification_queue = queue.Queue() |
|
|
|
def log_print(message, level="INFO"): |
|
"""Unified logging function""" |
|
if level == "ERROR": |
|
logger.error(message) |
|
elif level == "WARNING": |
|
logger.warning(message) |
|
else: |
|
logger.info(message) |
|
|
|
notification_queue.put({ |
|
"type": level.lower(), |
|
"message": message |
|
}) |
|
|
|
def get_user_cache_dir(): |
|
"""Get a user-accessible cache directory""" |
|
try: |
|
|
|
user_cache = os.path.join(os.path.expanduser('~'), '.cache', 'answer_grading_app') |
|
if not os.path.exists(user_cache): |
|
os.makedirs(user_cache, mode=0o755, exist_ok=True) |
|
return user_cache |
|
except Exception as e: |
|
log_print(f"Error creating user cache directory: {e}", "WARNING") |
|
|
|
temp_dir = os.path.join(tempfile.gettempdir(), 'answer_grading_app') |
|
os.makedirs(temp_dir, mode=0o755, exist_ok=True) |
|
return temp_dir |
|
|
|
|
|
BASE_DIR = get_user_cache_dir() |
|
log_print(f"Using base directory: {BASE_DIR}") |
|
|
|
|
|
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) |
|
os.environ['MODEL_ROOT'] = PROJECT_ROOT |
|
log_print(f"Set MODEL_ROOT to: {PROJECT_ROOT}") |
|
|
|
|
|
cache_dirs = { |
|
'root': BASE_DIR, |
|
'transformers': os.path.join(BASE_DIR, 'transformers'), |
|
'hf': os.path.join(BASE_DIR, 'huggingface'), |
|
'torch': os.path.join(BASE_DIR, 'torch'), |
|
'cache': os.path.join(BASE_DIR, 'cache'), |
|
'sentence_transformers': os.path.join(BASE_DIR, 'sentence_transformers'), |
|
'gensim': os.path.join(BASE_DIR, 'gensim'), |
|
'nltk': os.path.join(BASE_DIR, 'nltk_data'), |
|
'logs': os.path.join(BASE_DIR, 'logs'), |
|
'uploads': os.path.join(BASE_DIR, 'uploads'), |
|
'images': os.path.join(BASE_DIR, 'images'), |
|
'ans_image': os.path.join(BASE_DIR, 'ans_image'), |
|
'models': os.path.join(PROJECT_ROOT, 'models') |
|
} |
|
|
|
|
|
for name, path in cache_dirs.items(): |
|
try: |
|
os.makedirs(path, mode=0o755, exist_ok=True) |
|
log_print(f"Created directory: {path}") |
|
except Exception as e: |
|
log_print(f"Error creating directory {name}: {e}", "ERROR") |
|
|
|
|
|
os.environ['TRANSFORMERS_CACHE'] = cache_dirs['transformers'] |
|
os.environ['HF_HOME'] = cache_dirs['hf'] |
|
os.environ['TORCH_HOME'] = cache_dirs['torch'] |
|
os.environ['XDG_CACHE_HOME'] = cache_dirs['cache'] |
|
os.environ['SENTENCE_TRANSFORMERS_HOME'] = cache_dirs['sentence_transformers'] |
|
os.environ['GENSIM_DATA_DIR'] = cache_dirs['gensim'] |
|
os.environ['NLTK_DATA'] = cache_dirs['nltk'] |
|
|
|
|
|
import sys |
|
from pathlib import Path |
|
from flask import Flask, request, jsonify, render_template, send_file, Response |
|
from werkzeug.utils import secure_filename |
|
import cv2 |
|
import numpy as np |
|
from PIL import Image |
|
import io |
|
import base64 |
|
from datetime import datetime |
|
import json |
|
import threading |
|
from threading import Thread, Event |
|
import warnings |
|
from flask_cors import CORS |
|
from dotenv import load_dotenv |
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
import nltk |
|
import gensim |
|
from gensim.models import FastText |
|
from sentence_transformers import SentenceTransformer |
|
from transformers import pipeline |
|
|
|
|
|
def import_with_timeout(import_statement, timeout=30): |
|
"""Import a module with a timeout to prevent hanging""" |
|
result = {'success': False, 'module': None, 'error': None} |
|
|
|
def _import(): |
|
try: |
|
if isinstance(import_statement, str): |
|
result['module'] = __import__(import_statement) |
|
else: |
|
exec(import_statement) |
|
result['success'] = True |
|
except Exception as e: |
|
result['error'] = str(e) |
|
|
|
thread = Thread(target=_import) |
|
thread.daemon = True |
|
thread.start() |
|
thread.join(timeout=timeout) |
|
|
|
if thread.is_alive(): |
|
return None, f"Import timed out after {timeout} seconds" |
|
|
|
return result['module'], result['error'] |
|
|
|
|
|
nltk, nltk_error = import_with_timeout('nltk') |
|
if nltk_error: |
|
log_print(f"Warning: NLTK import failed: {nltk_error}", "WARNING") |
|
|
|
gensim, gensim_error = import_with_timeout('gensim') |
|
if gensim_error: |
|
log_print(f"Warning: Gensim import failed: {gensim_error}", "WARNING") |
|
|
|
torch, torch_error = import_with_timeout('torch') |
|
if torch_error: |
|
log_print(f"Warning: PyTorch import failed: {torch_error}", "WARNING") |
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__))) |
|
|
|
|
|
global_models = {} |
|
initialization_complete = Event() |
|
|
|
|
|
models = ModelSingleton() |
|
|
|
def ensure_full_permissions(path): |
|
"""Grant full permissions to a file or directory""" |
|
try: |
|
if os.path.isdir(path): |
|
|
|
os.chmod(path, 0o777) |
|
|
|
for root, dirs, files in os.walk(path): |
|
for d in dirs: |
|
os.chmod(os.path.join(root, d), 0o777) |
|
for f in files: |
|
os.chmod(os.path.join(root, f), 0o666) |
|
else: |
|
|
|
os.chmod(path, 0o666) |
|
return True |
|
except Exception as e: |
|
log_print(f"Error setting permissions for {path}: {e}", "ERROR") |
|
return False |
|
|
|
def ensure_directory(path): |
|
"""Create directory and ensure full permissions""" |
|
try: |
|
if os.path.exists(path): |
|
ensure_full_permissions(path) |
|
return path |
|
|
|
|
|
os.makedirs(path, mode=0o777, exist_ok=True) |
|
ensure_full_permissions(path) |
|
return path |
|
except Exception as e: |
|
log_print(f"Error creating directory {path}: {e}", "ERROR") |
|
raise |
|
|
|
def get_or_load_model(model_name): |
|
"""Get a model from cache or load it if not present""" |
|
if model_name not in global_models: |
|
try: |
|
if model_name == 'fasttext': |
|
from gensim.models import KeyedVectors |
|
log_print(f"Loading {model_name} model...") |
|
model_path = os.path.join(cache_dirs['gensim'], 'fasttext-wiki-news-subwords-300', 'fasttext-wiki-news-subwords-300.gz') |
|
model_dir = os.path.dirname(model_path) |
|
|
|
try: |
|
|
|
os.makedirs(model_dir, exist_ok=True) |
|
|
|
if os.path.exists(model_path): |
|
log_print("Loading fasttext model from cache...") |
|
model = KeyedVectors.load_word2vec_format(model_path) |
|
else: |
|
|
|
from gensim.downloader import load |
|
log_print("Downloading fasttext model...") |
|
model = load('fasttext-wiki-news-subwords-300') |
|
|
|
|
|
global_models[model_name] = model |
|
log_print(f"Successfully loaded {model_name} model") |
|
except Exception as e: |
|
log_print(f"Error loading fasttext model: {str(e)}", "ERROR") |
|
return None |
|
elif model_name == 'vit': |
|
try: |
|
from transformers import ViTImageProcessor, ViTModel, ViTConfig |
|
log_print("Loading local ViT model...") |
|
|
|
|
|
model_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models', 'vit-base-beans') |
|
|
|
if not os.path.exists(model_path): |
|
log_print(f"Error: Local ViT model not found at {model_path}", "ERROR") |
|
return None |
|
|
|
try: |
|
|
|
log_print("Creating default image processor...") |
|
processor = ViTImageProcessor( |
|
do_resize=True, |
|
size=224, |
|
do_normalize=True, |
|
image_mean=[0.5, 0.5, 0.5], |
|
image_std=[0.5, 0.5, 0.5] |
|
) |
|
|
|
|
|
model_file = os.path.join(model_path, 'model.safetensors') |
|
config_file = os.path.join(model_path, 'config.json') |
|
|
|
if not os.path.exists(model_file): |
|
raise FileNotFoundError(f"Model file not found: {model_file}") |
|
if not os.path.exists(config_file): |
|
raise FileNotFoundError(f"Config file not found: {config_file}") |
|
|
|
log_print(f"Found model files:") |
|
log_print(f"- Model weights: {model_file}") |
|
log_print(f"- Config file: {config_file}") |
|
|
|
|
|
log_print("Loading ViT model from safetensors file...") |
|
from transformers import ViTForImageClassification |
|
model = ViTForImageClassification.from_pretrained( |
|
model_path, |
|
local_files_only=True, |
|
use_safetensors=True, |
|
trust_remote_code=False, |
|
ignore_mismatched_sizes=True |
|
) |
|
|
|
|
|
model = model.to('cpu') |
|
|
|
global_models['vit_processor'] = processor |
|
global_models['vit_model'] = model |
|
log_print("Successfully loaded local ViT model and created image processor") |
|
|
|
except Exception as e: |
|
log_print(f"Error loading local ViT model: {str(e)}", "ERROR") |
|
return None |
|
|
|
except Exception as e: |
|
log_print(f"Error initializing ViT components: {str(e)}", "ERROR") |
|
return None |
|
elif model_name == 'llm': |
|
log_print("LLM model loading not implemented", "WARNING") |
|
return None |
|
except Exception as e: |
|
log_print(f"Error loading {model_name} model: {str(e)}", "ERROR") |
|
return None |
|
return global_models.get(model_name) |
|
|
|
def initialize_resources(): |
|
"""Initialize all required resources""" |
|
try: |
|
|
|
for directory in [cache_dirs['nltk']]: |
|
ensure_directory(directory) |
|
|
|
|
|
required_nltk_data = ['stopwords', 'punkt', 'wordnet'] |
|
for data in required_nltk_data: |
|
try: |
|
nltk.data.find(os.path.join('tokenizers', data)) |
|
except LookupError: |
|
try: |
|
log_print(f"Downloading NLTK data: {data}") |
|
nltk.download(data, download_dir=cache_dirs['nltk'], quiet=True) |
|
except Exception as e: |
|
log_print(f"Error downloading NLTK data {data}: {e}", "WARNING") |
|
continue |
|
|
|
|
|
try: |
|
|
|
get_or_load_model('fasttext') |
|
|
|
|
|
get_or_load_model('vit') |
|
except Exception as e: |
|
log_print(f"Warning: Could not preload models: {e}", "WARNING") |
|
|
|
except Exception as e: |
|
log_print(f"Error during initialization: {e}", "ERROR") |
|
finally: |
|
|
|
initialization_complete.set() |
|
|
|
|
|
essential_dirs = [cache_dirs['root'], cache_dirs['uploads'], cache_dirs['images']] |
|
for directory in essential_dirs: |
|
ensure_directory(directory) |
|
|
|
|
|
os.environ['HF_HOME'] = cache_dirs['hf'] |
|
os.environ['GENSIM_DATA_DIR'] = cache_dirs['gensim'] |
|
|
|
|
|
nltk.data.path.insert(0, cache_dirs['nltk']) |
|
|
|
|
|
for cache_dir in cache_dirs.values(): |
|
ensure_full_permissions(cache_dir) |
|
|
|
|
|
initialization_thread = Thread(target=initialize_resources, daemon=True) |
|
initialization_thread.start() |
|
|
|
from flask import Flask, request, jsonify, render_template |
|
|
|
from HTR.app import extract_text_from_image |
|
from correct_answer_generation.answer_generation_database_creation import database_creation, answer_generation |
|
from similarity_check.tf_idf.tf_idf_score import create_tfidf_values, tfidf_answer_score |
|
from similarity_check.semantic_meaning_check.semantic import similarity_model_score, fasttext_similarity, question_vector_sentence, question_vector_word |
|
from similarity_check.llm_based_scoring.llm import llm_score |
|
|
|
app = Flask(__name__) |
|
app.config['JSON_SORT_KEYS'] = False |
|
app.config['JSONIFY_PRETTYPRINT_REGULAR'] = False |
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 |
|
|
|
|
|
UPLOAD_FOLDER = tempfile.mkdtemp() |
|
ANS_IMAGE_FOLDER = tempfile.mkdtemp() |
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
os.makedirs(ANS_IMAGE_FOLDER, exist_ok=True) |
|
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
app.config['ANS_IMAGE_FOLDER'] = ANS_IMAGE_FOLDER |
|
|
|
|
|
CORS(app, resources={ |
|
r"/*": { |
|
"origins": "*", |
|
"methods": ["GET", "POST", "OPTIONS"], |
|
"allow_headers": ["Content-Type", "Authorization", "Accept"], |
|
"expose_headers": ["Content-Type"] |
|
} |
|
}) |
|
|
|
|
|
@app.errorhandler(Exception) |
|
def handle_exception(e): |
|
|
|
app.logger.error(f"Unhandled exception: {str(e)}") |
|
error_details = { |
|
"status": "error", |
|
"error": "Internal server error", |
|
"message": str(e), |
|
"type": type(e).__name__, |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
notification_queue.put({ |
|
"type": "error", |
|
"message": error_details |
|
}) |
|
return jsonify(error_details), 500 |
|
|
|
|
|
@app.errorhandler(404) |
|
def not_found_error(error): |
|
return jsonify({ |
|
"status": "error", |
|
"error": "Not found", |
|
"message": "The requested resource was not found" |
|
}), 404 |
|
|
|
|
|
@app.errorhandler(400) |
|
def bad_request_error(error): |
|
return jsonify({ |
|
"status": "error", |
|
"error": "Bad request", |
|
"message": str(error) |
|
}), 400 |
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('2.html') |
|
|
|
def new_value(value, old_min, old_max, new_min, new_max): |
|
"""Calculate new value with proper error handling""" |
|
try: |
|
if old_max == old_min: |
|
return new_min |
|
return new_min + ((value - old_min) * (new_max - new_min)) / (old_max - old_min) |
|
except Exception as e: |
|
log_print(f"Error in new_value calculation: {e}", "ERROR") |
|
return new_min |
|
|
|
@app.route('/compute_answers', methods=['POST']) |
|
def compute_answers(): |
|
try: |
|
file_type = request.form.get('file_type') |
|
log_print(f"Processing file type: {file_type}") |
|
|
|
if file_type != "csv": |
|
return jsonify({"error": "Only CSV file processing is supported"}), 400 |
|
|
|
ans_csv_file = request.files.get('ans_csv_file') |
|
if not ans_csv_file: |
|
return jsonify({"error": "Missing answer CSV file"}), 400 |
|
|
|
try: |
|
|
|
content = ans_csv_file.read().decode('utf-8') |
|
if not content.strip(): |
|
return jsonify({"error": "CSV file is empty"}), 400 |
|
|
|
|
|
c_answers = [] |
|
for line in content.splitlines(): |
|
if line.strip(): |
|
answers = [ans.strip() for ans in line.split('\\n') if ans.strip()] |
|
if answers: |
|
c_answers.append(answers) |
|
|
|
if not c_answers: |
|
return jsonify({"error": "No valid answers found in CSV file"}), 400 |
|
|
|
log_print(f"Successfully processed {len(c_answers)} answers from CSV") |
|
return jsonify({"answers": c_answers}), 200 |
|
|
|
except Exception as e: |
|
log_print(f"Error processing CSV file: {str(e)}", "ERROR") |
|
return jsonify({"error": f"Error processing CSV file: {str(e)}"}), 400 |
|
|
|
except Exception as e: |
|
error_msg = f"Error in compute_answers: {str(e)}" |
|
log_print(error_msg, "ERROR") |
|
return jsonify({"error": error_msg}), 500 |
|
|
|
def validate_folder_structure(files): |
|
"""Validate the folder structure of uploaded files""" |
|
try: |
|
|
|
student_folders = set() |
|
for file in files: |
|
if not file or not file.filename: |
|
continue |
|
path_parts = file.filename.split('/') |
|
if len(path_parts) >= 2: |
|
student_folders.add(path_parts[-2]) |
|
|
|
if not student_folders: |
|
return False, "No valid student folders found. Please create folders with student names." |
|
|
|
|
|
file_counts = {} |
|
for file in files: |
|
if not file or not file.filename: |
|
continue |
|
path_parts = file.filename.split('/') |
|
if len(path_parts) >= 2: |
|
student = path_parts[-2] |
|
file_counts[student] = file_counts.get(student, 0) + 1 |
|
|
|
if not file_counts: |
|
return False, "No valid files found in student folders. Please add image files." |
|
|
|
|
|
counts = list(file_counts.values()) |
|
if len(set(counts)) > 1: |
|
return False, "Inconsistent number of files across student folders. Each student must have the same number of images." |
|
|
|
|
|
for file in files: |
|
if not file or not file.filename: |
|
continue |
|
path_parts = file.filename.split('/') |
|
if len(path_parts) >= 2: |
|
filename = path_parts[-1] |
|
ext = os.path.splitext(filename)[1].lower() |
|
if ext not in ['.jpg', '.jpeg', '.png']: |
|
return False, f"Invalid file extension: {ext}. Only .jpg, .jpeg, and .png files are allowed." |
|
|
|
return True, f"Valid folder structure with {len(student_folders)} students and {counts[0]} files each" |
|
|
|
except Exception as e: |
|
return False, f"Error validating folder structure: {str(e)}" |
|
|
|
@app.route('/notifications') |
|
def notifications(): |
|
def generate(): |
|
error_count = 0 |
|
max_errors = 3 |
|
|
|
while True: |
|
try: |
|
|
|
try: |
|
notification = notification_queue.get_nowait() |
|
if notification: |
|
yield "data: " + json.dumps(notification) + "\n\n" |
|
error_count = 0 |
|
except queue.Empty: |
|
|
|
yield "data: " + json.dumps({"type": "ping"}) + "\n\n" |
|
time.sleep(0.5) |
|
|
|
except Exception as e: |
|
error_count += 1 |
|
error_msg = str(e).encode('ascii', 'ignore').decode('ascii') |
|
log_print(f"Error in notification stream: {error_msg}", "ERROR") |
|
|
|
yield "data: " + json.dumps({ |
|
"type": "error", |
|
"message": f"Server error: {error_msg}" |
|
}) + "\n\n" |
|
|
|
if error_count >= max_errors: |
|
break |
|
|
|
return Response(generate(), mimetype='text/event-stream') |
|
|
|
def get_memory_usage(): |
|
"""Get current memory usage""" |
|
process = psutil.Process(os.getpid()) |
|
return process.memory_info().rss / 1024 / 1024 |
|
|
|
def cleanup_memory(): |
|
"""Clean up memory by clearing caches and garbage collection""" |
|
try: |
|
|
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
|
|
gc.collect() |
|
|
|
|
|
if models: |
|
models.cleanup() |
|
|
|
|
|
memory_usage = get_memory_usage() |
|
log_print(f"Memory usage after cleanup: {memory_usage:.2f} MB") |
|
|
|
except Exception as e: |
|
log_print(f"Error during memory cleanup: {e}", "ERROR") |
|
|
|
@app.route('/compute_marks', methods=['POST']) |
|
def compute_marks(): |
|
"""Compute marks for submitted answers""" |
|
try: |
|
|
|
a = request.form.get('answers') |
|
if not a: |
|
error_msg = "Missing answers in the request" |
|
log_print(error_msg, "ERROR") |
|
return jsonify({"error": error_msg}), 400 |
|
|
|
try: |
|
answers = json.loads(a) |
|
|
|
if not isinstance(answers, list): |
|
raise ValueError("Answers must be a list") |
|
if not all(isinstance(ans, list) for ans in answers): |
|
raise ValueError("Each answer must be a list of strings") |
|
if not all(isinstance(text, str) for ans in answers for text in ans): |
|
raise ValueError("All answer texts must be strings") |
|
|
|
log_print(f"Received {len(answers)} sets of answers") |
|
log_print(f"Answer format: {[len(ans) for ans in answers]} answers per set") |
|
|
|
except json.JSONDecodeError as e: |
|
error_msg = f"Invalid JSON format for answers: {str(e)}" |
|
log_print(error_msg, "ERROR") |
|
return jsonify({"error": error_msg}), 400 |
|
except ValueError as e: |
|
error_msg = f"Invalid answer format: {str(e)}" |
|
log_print(error_msg, "ERROR") |
|
return jsonify({"error": error_msg}), 400 |
|
|
|
|
|
files = request.files.getlist('file') |
|
if not files: |
|
error_msg = "No files uploaded. Please upload student folders containing images." |
|
log_print(error_msg, "ERROR") |
|
return jsonify({"error": error_msg}), 400 |
|
|
|
|
|
is_valid, message = validate_folder_structure(files) |
|
if not is_valid: |
|
log_print(message, "ERROR") |
|
return jsonify({"error": message}), 400 |
|
|
|
|
|
data = {} |
|
parent_folder = app.config['ANS_IMAGE_FOLDER'] |
|
|
|
|
|
for file in files: |
|
if file.filename.endswith(('.jpg', '.jpeg', '.png')): |
|
|
|
path_parts = file.filename.split('/') |
|
if len(path_parts) >= 2: |
|
student_folder = secure_filename(path_parts[-2]) |
|
student_path = os.path.join(parent_folder, student_folder) |
|
os.makedirs(student_path, exist_ok=True) |
|
|
|
|
|
filename = secure_filename(path_parts[-1]) |
|
filepath = os.path.join(student_path, filename) |
|
file.save(filepath) |
|
|
|
if student_folder in data: |
|
data[student_folder].append((filename, filepath)) |
|
else: |
|
data[student_folder] = [(filename, filepath)] |
|
|
|
log_print(f"Processed files structure: {data}") |
|
|
|
|
|
expected_files = len(answers) |
|
for student, files in data.items(): |
|
if len(files) != expected_files: |
|
error_msg = f"Student {student} has {len(files)} files but {expected_files} answers were provided" |
|
log_print(error_msg, "ERROR") |
|
return jsonify({"error": error_msg}), 400 |
|
|
|
|
|
results = [] |
|
sen_vec_answers = [] |
|
word_vec_answers = [] |
|
|
|
|
|
for i in answers: |
|
temp_v = [] |
|
temp_w = [] |
|
for j in i: |
|
temp_v.append(question_vector_sentence(j)) |
|
temp_w.append(question_vector_word(j)) |
|
sen_vec_answers.append(temp_v) |
|
word_vec_answers.append(temp_w) |
|
|
|
|
|
for student in data: |
|
|
|
sorted_images = sorted(data[student], key=lambda x: x[0]) |
|
count = 0 |
|
for filename, image_path in sorted_images: |
|
try: |
|
|
|
s_answer = extract_text_from_image(image_path) |
|
log_print(f"Processing student: {student}, image: {filename}") |
|
log_print(f"Extracted text: {s_answer}") |
|
|
|
|
|
if s_answer is None or s_answer.strip() == '': |
|
log_print(f"No text extracted from {image_path}", "WARNING") |
|
results.append({ |
|
"subfolder": student, |
|
"image": filename, |
|
"marks": 0, |
|
"extracted_text": "", |
|
"correct_answer": answers[count], |
|
"error": "No text could be extracted from image. Please check image quality." |
|
}) |
|
count += 1 |
|
continue |
|
|
|
|
|
tf_idf_word_values, max_tfidf = create_tfidf_values(answers[count]) |
|
log_print(f"TF-IDF max value: {max_tfidf}") |
|
|
|
|
|
m = marks(s_answer, sen_vec_answers[count], word_vec_answers[count], |
|
tf_idf_word_values, max_tfidf, answers[count]) |
|
|
|
if isinstance(m, torch.Tensor): |
|
m = m.item() |
|
|
|
|
|
results.append({ |
|
"subfolder": student, |
|
"image": filename, |
|
"marks": round(m, 2), |
|
"extracted_text": s_answer, |
|
"correct_answer": answers[count] |
|
}) |
|
count += 1 |
|
|
|
|
|
cleanup_memory() |
|
|
|
except Exception as e: |
|
log_print(f"Error processing {image_path}: {str(e)}", "ERROR") |
|
results.append({ |
|
"subfolder": student, |
|
"image": filename, |
|
"marks": 0, |
|
"extracted_text": "", |
|
"correct_answer": answers[count] if count < len(answers) else [], |
|
"error": f"Error processing image: {str(e)}" |
|
}) |
|
count += 1 |
|
continue |
|
|
|
log_print(f"Calculated results: {results}") |
|
|
|
|
|
try: |
|
shutil.rmtree(parent_folder) |
|
except Exception as e: |
|
log_print(f"Could not clean up temporary files: {e}", "WARNING") |
|
|
|
|
|
cleanup_memory() |
|
|
|
return jsonify({ |
|
"results": results, |
|
"debug_info": { |
|
"total_students": len(data), |
|
"total_answers": len(answers), |
|
"answers_processed": count, |
|
"successful_extractions": len([r for r in results if r.get('extracted_text')]) |
|
} |
|
}), 200 |
|
|
|
except Exception as e: |
|
error_msg = str(e) |
|
log_print(f"Error in compute_marks: {error_msg}", "ERROR") |
|
return jsonify({"error": error_msg}), 500 |
|
finally: |
|
|
|
cleanup_memory() |
|
|
|
def marks(answer, sen_vec_answers, word_vec_answers, tf_idf_word_values, max_tfidf, correct_answers): |
|
try: |
|
marks = 0 |
|
log_print(f"Starting marks calculation for answer: {answer}") |
|
log_print(f"Correct answers: {correct_answers}") |
|
|
|
|
|
marks1 = tfidf_answer_score(answer, tf_idf_word_values, max_tfidf, marks=10) |
|
log_print(f"Initial TF-IDF score: {marks1}") |
|
|
|
if marks1 > 3: |
|
tfidf_contribution = new_value(marks1, old_min=3, old_max=10, new_min=0, new_max=5) |
|
marks += tfidf_contribution |
|
log_print(f"TF-IDF contribution (>3): {tfidf_contribution}") |
|
|
|
if marks1 > 2: |
|
|
|
marks2 = similarity_model_score(sen_vec_answers, answer) |
|
log_print(f"Sentence transformer raw score: {marks2}") |
|
|
|
a = 0 |
|
if marks2 > 0.95: |
|
marks += 3 |
|
a = 3 |
|
log_print("High sentence similarity (>0.95): +3 marks") |
|
elif marks2 > 0.5: |
|
sentence_contribution = new_value(marks2, old_min=0.5, old_max=0.95, new_min=0, new_max=3) |
|
marks += sentence_contribution |
|
a = sentence_contribution |
|
log_print(f"Medium sentence similarity (>0.5): +{sentence_contribution} marks") |
|
|
|
|
|
marks3 = fasttext_similarity(word_vec_answers, answer) |
|
log_print(f"FastText similarity raw score: {marks3}") |
|
|
|
b = 0 |
|
if marks2 > 0.9: |
|
marks += 2 |
|
b = 2 |
|
log_print("High word similarity (>0.9): +2 marks") |
|
elif marks3 > 0.4: |
|
word_contribution = new_value(marks3, old_min=0.4, old_max=0.9, new_min=0, new_max=2) |
|
marks += word_contribution |
|
b = word_contribution |
|
log_print(f"Medium word similarity (>0.4): +{word_contribution} marks") |
|
|
|
|
|
marks4 = llm_score(correct_answers, answer) |
|
log_print(f"Raw LLM scores: {marks4}") |
|
|
|
for i in range(len(marks4)): |
|
marks4[i] = float(marks4[i]) |
|
|
|
m = max(marks4) |
|
log_print(f"Max LLM score: {m}") |
|
|
|
|
|
final_score = marks/2 + m/2 |
|
log_print(f"Final score calculation: (marks={marks}/2 + llm={m}/2) = {final_score}") |
|
marks = final_score |
|
|
|
log_print(f"Final marks awarded: {marks}") |
|
return marks |
|
|
|
except Exception as e: |
|
log_print(f"Error in marks calculation: {str(e)}", "ERROR") |
|
return 0 |
|
|
|
@app.route('/check_logs') |
|
def check_logs(): |
|
try: |
|
|
|
ensure_directory(cache_dirs['logs']) |
|
|
|
|
|
log_file = os.path.join(cache_dirs['logs'], 'app.log') |
|
if not os.path.exists(log_file): |
|
with open(log_file, 'w') as f: |
|
f.write("Log file created.\n") |
|
|
|
|
|
with open(log_file, 'r') as f: |
|
logs = f.readlines()[-1000:] |
|
return jsonify({ |
|
"status": "success", |
|
"logs": "".join(logs) |
|
}) |
|
except Exception as e: |
|
log_print(f"Error reading logs: {str(e)}", "ERROR") |
|
return jsonify({ |
|
"status": "error", |
|
"error": str(e) |
|
}), 500 |
|
|
|
def is_valid_image_file(filename): |
|
"""Validate image file extensions and basic format""" |
|
try: |
|
|
|
valid_extensions = {'.jpg', '.jpeg', '.png'} |
|
ext = os.path.splitext(filename)[1].lower() |
|
if ext not in valid_extensions: |
|
return False |
|
|
|
return True |
|
except Exception: |
|
return False |
|
|
|
def allowed_file(filename, allowed_extensions): |
|
return '.' in filename and \ |
|
filename.rsplit('.', 1)[1].lower() in allowed_extensions |
|
|
|
def wait_for_initialization(): |
|
"""Wait for initialization to complete""" |
|
initialization_complete.wait() |
|
return True |
|
|
|
@app.before_request |
|
def ensure_initialization(): |
|
"""Ensure all resources are initialized before processing requests""" |
|
if request.endpoint == 'compute_marks': |
|
wait_for_initialization() |
|
elif request.endpoint == 'compute_answers': |
|
|
|
if request.method == 'POST' and request.form.get('file_type') == 'pdf': |
|
wait_for_initialization() |
|
|
|
def cleanup_temp_files(): |
|
"""Clean up temporary files with proper error handling""" |
|
try: |
|
|
|
temp_processing_dir = os.path.join(BASE_DIR, 'temp_processing') |
|
if os.path.exists(temp_processing_dir): |
|
shutil.rmtree(temp_processing_dir, ignore_errors=True) |
|
|
|
|
|
if os.path.exists(cache_dirs['images']): |
|
for file in os.listdir(cache_dirs['images']): |
|
try: |
|
file_path = os.path.join(cache_dirs['images'], file) |
|
if os.path.isfile(file_path): |
|
os.unlink(file_path) |
|
except Exception as e: |
|
log_print(f"Warning: Could not delete file {file_path}: {e}", "WARNING") |
|
|
|
|
|
if os.path.exists(UPLOAD_FOLDER): |
|
try: |
|
shutil.rmtree(UPLOAD_FOLDER, ignore_errors=True) |
|
except Exception as e: |
|
log_print(f"Warning: Could not clean up upload folder: {e}", "WARNING") |
|
except Exception as e: |
|
log_print(f"Error cleaning up temporary files: {e}", "ERROR") |
|
|
|
@app.before_first_request |
|
def setup_temp_directories(): |
|
"""Set up temporary directories before first request""" |
|
try: |
|
|
|
global UPLOAD_FOLDER, ANS_IMAGE_FOLDER |
|
|
|
UPLOAD_FOLDER = tempfile.mkdtemp() |
|
ANS_IMAGE_FOLDER = tempfile.mkdtemp() |
|
|
|
|
|
ensure_directory(UPLOAD_FOLDER) |
|
ensure_directory(ANS_IMAGE_FOLDER) |
|
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
app.config['ANS_IMAGE_FOLDER'] = ANS_IMAGE_FOLDER |
|
|
|
log_print(f"Created temporary directories: {UPLOAD_FOLDER}, {ANS_IMAGE_FOLDER}") |
|
except Exception as e: |
|
log_print(f"Error setting up temporary directories: {e}", "ERROR") |
|
raise |
|
|
|
if __name__ == '__main__': |
|
try: |
|
|
|
for directory in essential_dirs: |
|
ensure_directory(directory) |
|
|
|
|
|
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 |
|
|
|
|
|
port = int(os.environ.get('PORT', 7860)) |
|
|
|
log_print(f"Starting server on port {port}") |
|
log_print("Server configuration:") |
|
log_print(f"- Threaded: True") |
|
log_print(f"- Debug mode: False") |
|
|
|
|
|
app.run( |
|
host='0.0.0.0', |
|
port=port, |
|
debug=False, |
|
use_reloader=False, |
|
threaded=True |
|
) |
|
except Exception as e: |
|
log_print(f"Fatal error starting server: {str(e)}", "ERROR") |
|
raise |
|
finally: |
|
log_print("Cleaning up temporary files...") |
|
cleanup_temp_files() |
|
log_print("Server shutdown complete") |