yamanavijayavardhan's picture
fix req 9
2fa128a
raw
history blame
28.1 kB
import os
import tempfile
# Set up Hugging Face cache directory
os.environ['TRANSFORMERS_CACHE'] = os.path.join(tempfile.gettempdir(), 'huggingface_cache')
os.makedirs(os.environ['TRANSFORMERS_CACHE'], exist_ok=True)
# Now import the rest of the dependencies
import sys
import logging
from pathlib import Path
from flask import Flask, request, jsonify, render_template, send_file, Response
from werkzeug.utils import secure_filename
import cv2
import numpy as np
from PIL import Image
import io
import base64
from datetime import datetime
import json
import queue
import threading
from threading import Thread, Event
import time
import nltk
import gensim
from gensim.models import FastText
from sentence_transformers import SentenceTransformer
from transformers import pipeline
import torch
import torch.nn.functional as F
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import re
import string
import unicodedata
import warnings
from flask_cors import CORS
from dotenv import load_dotenv
warnings.filterwarnings('ignore')
# Add the project root directory to Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Set up Hugging Face cache directory
os.environ['TRANSFORMERS_CACHE'] = os.path.join(tempfile.gettempdir(), 'huggingface_cache')
os.environ['HF_HOME'] = os.path.join(tempfile.gettempdir(), 'huggingface')
# Create cache directory if it doesn't exist
os.makedirs(os.environ['TRANSFORMERS_CACHE'], exist_ok=True)
os.makedirs(os.environ['HF_HOME'], exist_ok=True)
# Import HTR modules
from HTR.app import extract_text_from_image
from HTR.word import convert_image
from HTR.strike import struck_images
from HTR.hcr import text
from HTR.spell_and_gramer_check import spell_grammer
# Import utils
from utils import notification_queue, log_print
# Load environment variables
load_dotenv()
# Create directories in /tmp which is writable
BASE_DIR = '/tmp' # Use direct /tmp path for Hugging Face
log_dir = os.path.join(BASE_DIR, 'app_logs')
cache_dir = os.path.join(BASE_DIR, 'app_cache')
nltk_data_dir = os.path.join(BASE_DIR, 'nltk_data')
gensim_data_dir = os.path.join(BASE_DIR, 'gensim-data')
upload_dir = os.path.join(BASE_DIR, 'uploads')
ans_image_dir = os.path.join(BASE_DIR, 'ans_image')
images_dir = os.path.join(BASE_DIR, 'images')
log_file = os.path.join(log_dir, 'app.log') # Add log file path
# Global variables for model caching and initialization status
global_models = {}
initialization_complete = Event()
def ensure_directory(path):
"""Create directory and ensure full permissions with better error handling"""
if os.path.exists(path):
try:
# Test write permissions
test_file = os.path.join(path, '.test')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
return path
except Exception as e:
log_print(f"Warning: Directory exists but not writable: {path}", "WARNING")
try:
# Try to fix permissions
os.chmod(path, 0o777)
return path
except Exception as chmod_e:
log_print(f"Error fixing permissions for {path}: {chmod_e}", "ERROR")
raise
try:
# Create directory with full permissions
os.makedirs(path, mode=0o777, exist_ok=True)
return path
except Exception as e:
try:
# Try with more restricted permissions
os.makedirs(path, mode=0o755, exist_ok=True)
return path
except Exception as nested_e:
log_print(f"Error creating directory {path}: {nested_e}", "ERROR")
raise
# Simplified logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
def initialize_resources():
"""Initialize all required resources"""
try:
# Create essential directories first
for directory in [nltk_data_dir, gensim_data_dir]:
ensure_directory(directory)
# Initialize NLTK
required_nltk_data = ['stopwords', 'punkt', 'wordnet']
for data in required_nltk_data:
try:
nltk.data.find(os.path.join('tokenizers', data))
except LookupError:
try:
log_print(f"Downloading NLTK data: {data}")
nltk.download(data, download_dir=nltk_data_dir, quiet=True)
except Exception as e:
log_print(f"Error downloading NLTK data {data}: {e}", "ERROR")
# Initialize models
try:
get_or_load_model('fasttext')
except Exception as e:
log_print(f"Warning: Could not preload models: {e}", "WARNING")
except Exception as e:
log_print(f"Error during initialization: {e}", "ERROR")
finally:
# Signal that initialization is complete
initialization_complete.set()
# Create essential directories
essential_dirs = [cache_dir, upload_dir, images_dir]
for directory in essential_dirs:
ensure_directory(directory)
# Set environment variables
os.environ['HF_HOME'] = cache_dir
os.environ['GENSIM_DATA_DIR'] = gensim_data_dir
# Add the custom directory to NLTK's search path
nltk.data.path.insert(0, nltk_data_dir)
# Start initialization in background
initialization_thread = Thread(target=initialize_resources, daemon=True)
initialization_thread.start()
from flask import Flask, request, jsonify, render_template
from HTR.app import extract_text_from_image
from correct_answer_generation.answer_generation_database_creation import database_creation, answer_generation
from similarity_check.tf_idf.tf_idf_score import create_tfidf_values, tfidf_answer_score
from similarity_check.semantic_meaning_check.semantic import similarity_model_score, fasttext_similarity, question_vector_sentence, question_vector_word
from similarity_check.llm_based_scoring.llm import llm_score
app = Flask(__name__)
app.config['JSON_SORT_KEYS'] = False
app.config['JSONIFY_PRETTYPRINT_REGULAR'] = False
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
# Create temporary directories for Hugging Face Spaces
UPLOAD_FOLDER = tempfile.mkdtemp()
ANS_IMAGE_FOLDER = tempfile.mkdtemp()
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(ANS_IMAGE_FOLDER, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['ANS_IMAGE_FOLDER'] = ANS_IMAGE_FOLDER
# Configure CORS for all origins
CORS(app, resources={
r"/*": {
"origins": "*",
"methods": ["GET", "POST", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization", "Accept"],
"expose_headers": ["Content-Type"]
}
})
# Global error handler for all exceptions
@app.errorhandler(Exception)
def handle_exception(e):
# Log the error for debugging
app.logger.error(f"Unhandled exception: {str(e)}")
error_details = {
"status": "error",
"error": "Internal server error",
"message": str(e),
"type": type(e).__name__,
"timestamp": datetime.now().isoformat()
}
notification_queue.put({
"type": "error",
"message": error_details
})
return jsonify(error_details), 500
# Handle 404 errors
@app.errorhandler(404)
def not_found_error(error):
return jsonify({
"status": "error",
"error": "Not found",
"message": "The requested resource was not found"
}), 404
# Handle 400 Bad Request
@app.errorhandler(400)
def bad_request_error(error):
return jsonify({
"status": "error",
"error": "Bad request",
"message": str(error)
}), 400
@app.route('/')
def index():
return render_template('2.html')
def new_value(value, old_min, old_max, new_min, new_max):
"""Calculate new value with proper error handling"""
try:
if old_max == old_min:
return new_min # Return minimum value if range is zero
return new_min + ((value - old_min) * (new_max - new_min)) / (old_max - old_min)
except Exception as e:
log_print(f"Error in new_value calculation: {e}", "ERROR")
return new_min # Return minimum value on error
@app.route('/compute_answers', methods=['POST'])
def compute_answers():
try:
query_file = request.files.get('query_file')
if not query_file:
error_msg = "Missing query file"
notification_queue.put({
"type": "error",
"message": error_msg
})
return jsonify({"error": error_msg}), 400
queries = query_file.read().decode('utf-8').splitlines()
logger.info(f"Received queries: {queries}")
file_type = request.form.get('file_type')
ans_csv_file = request.files.get('ans_csv_file')
if file_type == "csv":
ans_csv_file = ans_csv_file.read().decode('utf-8').splitlines()
c_answers = []
for i in ans_csv_file:
c_answers.append(i.split('\\n'))
logger.info(f"Processed CSV answers: {c_answers}")
return jsonify({"answers": c_answers}), 200
try:
c_answers = []
if file_type == 'pdf':
# Create a temporary directory for PDF files
pdf_dir = tempfile.mkdtemp()
os.makedirs(pdf_dir, exist_ok=True)
# Save uploaded PDF files
pdf_files = []
for file in request.files.getlist('pdf_files[]'):
if file.filename.endswith('.pdf'):
filename = secure_filename(file.filename)
filepath = os.path.join(pdf_dir, filename)
file.save(filepath)
pdf_files.append(filepath)
if not pdf_files:
return jsonify({"error": "No PDF files uploaded"}), 400
logger.info(f"Processing {len(pdf_files)} PDF files")
# Process PDFs
for pdf_file in pdf_files:
database_creation(pdf_file)
# Generate answers
for query in queries:
ans = []
for pdf_file in pdf_files:
ans.append(answer_generation(pdf_file, query))
c_answers.append(ans)
# Clean up PDF directory
try:
shutil.rmtree(pdf_dir)
except Exception as e:
logger.warning(f"Could not clean up PDF directory: {e}")
else:
return jsonify({"error": "Unsupported file type"}), 400
logger.info(f"Generated answers: {c_answers}")
return jsonify({"answers": c_answers}), 200
except Exception as e:
logger.error(f"Error processing files: {str(e)}")
error_msg = str(e)
notification_queue.put({
"type": "error",
"message": error_msg
})
return jsonify({"error": error_msg}), 500
except Exception as e:
logger.error(f"Error in compute_answers: {str(e)}")
error_msg = str(e)
notification_queue.put({
"type": "error",
"message": error_msg
})
return jsonify({"error": error_msg}), 500
def validate_folder_structure(files):
"""Validate the folder structure of uploaded files"""
try:
# Get unique student folders
student_folders = set()
for file in files:
if not file or not file.filename:
continue
path_parts = file.filename.split('/')
if len(path_parts) >= 2:
student_folders.add(path_parts[-2])
if not student_folders:
return False, "No valid student folders found. Please create folders with student names."
# Check if each student folder has the same number of files
file_counts = {}
for file in files:
if not file or not file.filename:
continue
path_parts = file.filename.split('/')
if len(path_parts) >= 2:
student = path_parts[-2]
file_counts[student] = file_counts.get(student, 0) + 1
if not file_counts:
return False, "No valid files found in student folders. Please add image files."
# Check if all students have the same number of files
counts = list(file_counts.values())
if len(set(counts)) > 1:
return False, "Inconsistent number of files across student folders. Each student must have the same number of images."
# Validate file extensions
for file in files:
if not file or not file.filename:
continue
path_parts = file.filename.split('/')
if len(path_parts) >= 2:
filename = path_parts[-1]
ext = os.path.splitext(filename)[1].lower()
if ext not in ['.jpg', '.jpeg', '.png']:
return False, f"Invalid file extension: {ext}. Only .jpg, .jpeg, and .png files are allowed."
return True, f"Valid folder structure with {len(student_folders)} students and {counts[0]} files each"
except Exception as e:
return False, f"Error validating folder structure: {str(e)}"
@app.route('/notifications')
def notifications():
def generate():
error_count = 0
max_errors = 3
while True:
try:
# Get notification from queue (non-blocking)
try:
notification = notification_queue.get_nowait()
if notification:
yield "data: " + json.dumps(notification) + "\n\n"
error_count = 0 # Reset error count on successful notification
except queue.Empty:
# If no notification, yield empty to keep connection alive
yield "data: " + json.dumps({"type": "ping"}) + "\n\n"
time.sleep(0.5) # Keep the connection alive
except Exception as e:
error_count += 1
error_msg = str(e).encode('ascii', 'ignore').decode('ascii')
log_print(f"Error in notification stream: {error_msg}", "ERROR")
yield "data: " + json.dumps({
"type": "error",
"message": f"Server error: {error_msg}"
}) + "\n\n"
if error_count >= max_errors:
break
return Response(generate(), mimetype='text/event-stream')
@app.route('/compute_marks', methods=['POST'])
def compute_marks():
try:
# Get answers from request
a = request.form.get('answers')
if not a:
error_msg = "Missing answers in the request"
notification_queue.put({
"type": "error",
"message": error_msg
})
return jsonify({"error": error_msg}), 400
a = json.loads(a)
answers = []
for i in a:
ans = i.split('\n\n')
answers.append(ans)
logger.info(f"Processing answers: {answers}")
# Process uploaded files
files = request.files.getlist('file')
if not files:
error_msg = "No files uploaded. Please upload student folders containing images."
notification_queue.put({
"type": "error",
"message": error_msg
})
return jsonify({"error": error_msg}), 400
# Validate folder structure
is_valid, message = validate_folder_structure(files)
if not is_valid:
notification_queue.put({
"type": "error",
"message": message
})
return jsonify({"error": message}), 400
# Create student folders structure
data = {}
parent_folder = app.config['ANS_IMAGE_FOLDER']
# Create student folders and save files
for file in files:
if file.filename.endswith(('.jpg', '.jpeg', '.png')):
# Extract student folder from filename
path_parts = file.filename.split('/')
if len(path_parts) >= 2:
student_folder = secure_filename(path_parts[-2])
student_path = os.path.join(parent_folder, student_folder)
os.makedirs(student_path, exist_ok=True)
# Save the file
filename = secure_filename(path_parts[-1])
filepath = os.path.join(student_path, filename)
file.save(filepath)
if student_folder in data:
data[student_folder].append((filename, filepath))
else:
data[student_folder] = [(filename, filepath)]
logger.info(f"Processed files structure: {data}")
# Calculate marks
results = []
sen_vec_answers = []
word_vec_answers = []
# Process correct answers
for i in answers:
temp_v = []
temp_w = []
for j in i:
temp_v.append(question_vector_sentence(j))
temp_w.append(question_vector_word(j))
sen_vec_answers.append(temp_v)
word_vec_answers.append(temp_w)
# Calculate marks for each student
for student in data:
# Sort the image paths by filename
sorted_images = sorted(data[student], key=lambda x: x[0])
count = 0
for filename, image_path in sorted_images:
try:
s_answer = extract_text_from_image(image_path)
logger.info(f"Extracted text from {image_path}: {s_answer}")
if not s_answer:
logger.warning(f"No text extracted from {image_path}")
results.append({
"subfolder": student,
"image": filename,
"marks": 0
})
count += 1
continue
tf_idf_word_values, max_tfidf = create_tfidf_values(answers[count])
m = marks(s_answer, sen_vec_answers[count], word_vec_answers[count],
tf_idf_word_values, max_tfidf, answers[count])
if isinstance(m, torch.Tensor):
m = m.item()
results.append({
"subfolder": student,
"image": filename,
"marks": round(m, 2) # Round marks to 2 decimal places
})
count += 1
except Exception as e:
logger.error(f"Error processing {image_path}: {str(e)}")
results.append({
"subfolder": student,
"image": filename,
"marks": 0
})
count += 1
continue
logger.info(f"Calculated results: {results}")
# Clean up temporary files
try:
shutil.rmtree(parent_folder)
except Exception as e:
logger.warning(f"Could not clean up temporary files: {e}")
return jsonify({"results": results}), 200
except Exception as e:
error_msg = str(e)
notification_queue.put({
"type": "error",
"message": error_msg
})
return jsonify({"error": error_msg}), 500
def marks(answer, sen_vec_answers, word_vec_answers, tf_idf_word_values, max_tfidf, correct_answers):
try:
marks = 0
marks1 = tfidf_answer_score(answer, tf_idf_word_values, max_tfidf, marks=10)
if marks1 > 3:
marks += new_value(marks1, old_min=3, old_max=10, new_min=0, new_max=5)
logger.info(f"TFIDF Score: {float(marks)}")
if marks1 > 2:
marks2 = similarity_model_score(sen_vec_answers, answer)
a = 0
if marks2 > 0.95:
marks += 3
a = a + 3
elif marks2 > 0.5:
marks += new_value(marks2, old_min=0.5, old_max=0.95, new_min=0, new_max=3)
a = a + new_value(marks2, old_min=0.5, old_max=0.95, new_min=0, new_max=3)
logger.info(f"Sentence transformers score: {a}")
marks3 = fasttext_similarity(word_vec_answers, answer)
b = 0
if marks2 > 0.9:
marks += 2
b = b + 2
elif marks3 > 0.4:
marks += new_value(marks3, old_min=0.4, old_max=0.9, new_min=0, new_max=2)
b = b + new_value(marks3, old_min=0.4, old_max=0.9, new_min=0, new_max=2)
logger.info(f"Fasttext score: {b}")
marks4 = llm_score(correct_answers, answer)
for i in range(len(marks4)):
marks4[i] = float(marks4[i])
m = max(marks4)
logger.info(f"LLM score: {m/2}")
marks = marks/2 + m/2
return marks
except Exception as e:
logger.error(f"Error in marks calculation: {str(e)}")
return 0
@app.route('/check_logs')
def check_logs():
try:
# Ensure log directory exists
ensure_directory(log_dir)
# If log file doesn't exist, create it
if not os.path.exists(log_file):
with open(log_file, 'w') as f:
f.write("Log file created.\n")
# Read last 1000 lines of logs
with open(log_file, 'r') as f:
logs = f.readlines()[-1000:]
return jsonify({
"status": "success",
"logs": "".join(logs)
})
except Exception as e:
log_print(f"Error reading logs: {str(e)}", "ERROR")
return jsonify({
"status": "error",
"error": str(e)
}), 500
def is_valid_image_file(filename):
"""Validate image file extensions and basic format"""
try:
# Check file extension
valid_extensions = {'.jpg', '.jpeg', '.png'}
ext = os.path.splitext(filename)[1].lower()
if ext not in valid_extensions:
return False
return True
except Exception:
return False
def allowed_file(filename, allowed_extensions):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in allowed_extensions
def get_or_load_model(model_name):
"""Get a model from cache or load it if not present"""
if model_name not in global_models:
try:
if model_name == 'fasttext':
from gensim.models import KeyedVectors
log_print(f"Loading {model_name} model...")
model_path = os.path.join(gensim_data_dir, 'fasttext-wiki-news-subwords-300', 'fasttext-wiki-news-subwords-300.gz')
if not os.path.exists(model_path):
from gensim.downloader import load
log_print("Downloading fasttext model...")
global_models[model_name] = load('fasttext-wiki-news-subwords-300')
else:
global_models[model_name] = KeyedVectors.load_word2vec_format(model_path)
log_print(f"Successfully loaded {model_name} model")
elif model_name == 'llm':
# Implement LLM model loading here
log_print("LLM model loading not implemented", "WARNING")
return None
except Exception as e:
log_print(f"Error loading {model_name} model: {e}", "ERROR")
raise
return global_models.get(model_name)
def wait_for_initialization():
"""Wait for initialization to complete"""
initialization_complete.wait()
return True
# Add this to the compute_marks route before processing files
@app.before_request
def ensure_initialization():
"""Ensure all resources are initialized before processing requests"""
if request.endpoint in ['compute_marks', 'compute_answers']:
wait_for_initialization()
def cleanup_temp_files():
"""Clean up temporary files with proper error handling"""
try:
# Clean up the temporary processing directory
temp_processing_dir = os.path.join(BASE_DIR, 'temp_processing')
if os.path.exists(temp_processing_dir):
shutil.rmtree(temp_processing_dir, ignore_errors=True)
# Clean up the images directory
if os.path.exists(images_dir):
for file in os.listdir(images_dir):
try:
file_path = os.path.join(images_dir, file)
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
log_print(f"Warning: Could not delete file {file_path}: {e}", "WARNING")
# Clean up the upload folder
if os.path.exists(UPLOAD_FOLDER):
try:
shutil.rmtree(UPLOAD_FOLDER, ignore_errors=True)
except Exception as e:
log_print(f"Warning: Could not clean up upload folder: {e}", "WARNING")
except Exception as e:
log_print(f"Error cleaning up temporary files: {e}", "ERROR")
@app.before_first_request
def setup_temp_directories():
"""Set up temporary directories before first request"""
try:
# Create temporary directories with proper permissions
global UPLOAD_FOLDER, ANS_IMAGE_FOLDER
UPLOAD_FOLDER = tempfile.mkdtemp()
ANS_IMAGE_FOLDER = tempfile.mkdtemp()
# Ensure directories have proper permissions
ensure_directory(UPLOAD_FOLDER)
ensure_directory(ANS_IMAGE_FOLDER)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['ANS_IMAGE_FOLDER'] = ANS_IMAGE_FOLDER
log_print(f"Created temporary directories: {UPLOAD_FOLDER}, {ANS_IMAGE_FOLDER}")
except Exception as e:
log_print(f"Error setting up temporary directories: {e}", "ERROR")
raise
if __name__ == '__main__':
try:
# Create essential directories
for directory in essential_dirs:
ensure_directory(directory)
# Configure server
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
# Start the Flask app
port = int(os.environ.get('PORT', 7860))
log_print(f"Starting server on port {port}")
log_print("Server configuration:")
log_print(f"- Threaded: True")
log_print(f"- Debug mode: False")
# Run the server with proper configuration
app.run(
host='0.0.0.0',
port=port,
debug=False,
use_reloader=False,
threaded=True
)
except Exception as e:
log_print(f"Fatal error starting server: {str(e)}", "ERROR")
raise
finally:
log_print("Cleaning up temporary files...")
cleanup_temp_files()
log_print("Server shutdown complete")