yamanavijayavardhan's picture
update_
8efbae1
raw
history blame
17.1 kB
import os
import tempfile
import nltk
import logging
import sys
import builtins
from datetime import datetime
from flask_cors import CORS
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Create directories in /tmp which is writable
BASE_DIR = tempfile.gettempdir()
log_dir = os.path.join(BASE_DIR, 'app_logs')
cache_dir = os.path.join(BASE_DIR, 'app_cache')
# Create necessary directories
try:
os.makedirs(log_dir, exist_ok=True)
except Exception as e:
print(f"Warning: Could not create log directory: {e}")
# Fallback to temp directory
log_dir = tempfile.mkdtemp(prefix='app_logs_')
# Create a log file with timestamp
log_file = os.path.join(log_dir, f'app_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
# Set up logging to both file and console
try:
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
except Exception as e:
print(f"Warning: Could not set up file logging: {e}")
# Fallback to console-only logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
# Create a custom logger
logger = logging.getLogger(__name__)
# Add a print function that also logs
def log_print(message, level="INFO"):
# Use the original print function to avoid recursion
builtins.print(message, flush=True)
if level == "INFO":
logger.info(message)
elif level == "ERROR":
logger.error(message)
elif level == "WARNING":
logger.warning(message)
# Set up all cache and data directories in /tmp
try:
nltk_data_dir = os.path.join(cache_dir, 'nltk_data')
gensim_data_dir = os.path.join(cache_dir, 'gensim-data')
upload_dir = os.path.join(cache_dir, 'uploads')
ans_image_dir = os.path.join(cache_dir, 'ans_image')
# Create directories with correct permissions
for directory in [cache_dir, nltk_data_dir, gensim_data_dir, upload_dir, ans_image_dir]:
os.makedirs(directory, exist_ok=True)
except Exception as e:
print(f"Warning: Could not create cache directories: {e}")
# Fallback to temporary directories
cache_dir = tempfile.mkdtemp(prefix='app_cache_')
nltk_data_dir = tempfile.mkdtemp(prefix='nltk_data_')
gensim_data_dir = tempfile.mkdtemp(prefix='gensim_data_')
upload_dir = tempfile.mkdtemp(prefix='uploads_')
ans_image_dir = tempfile.mkdtemp(prefix='ans_image_')
# Set environment variables
os.environ['HF_HOME'] = cache_dir
os.environ['GENSIM_DATA_DIR'] = gensim_data_dir
# Add the custom directory to NLTK's search path
nltk.data.path.insert(0, nltk_data_dir)
# Download required NLTK data
required_nltk_data = ['stopwords', 'punkt', 'wordnet']
for data in required_nltk_data:
try:
log_print(f"Downloading NLTK data: {data}")
nltk.download(data, download_dir=nltk_data_dir)
except Exception as e:
log_print(f"Error downloading NLTK data {data}: {e}", "ERROR")
raise
from flask import Flask, request, jsonify, render_template
import json
import torch
from werkzeug.utils import secure_filename
from HTR.app import extract_text_from_image
from correct_answer_generation.answer_generation_database_creation import database_creation, answer_generation
from similarity_check.tf_idf.tf_idf_score import create_tfidf_values, tfidf_answer_score
from similarity_check.semantic_meaning_check.semantic import similarity_model_score, fasttext_similarity,question_vector_sentence,question_vector_word
from similarity_check.llm_based_scoring.llm import llm_score
app = Flask(__name__)
# Use the new upload directory
UPLOAD_FOLDER = upload_dir
# Configure CORS for Hugging Face
CORS(app, resources={
r"/*": {
"origins": ["*"],
"methods": ["GET", "POST", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization"]
}
})
@app.route('/')
def index():
return render_template('index.html')
def new_value(value, old_min, old_max, new_min, new_max):
new_value = new_min + ((value - old_min) * (new_max - new_min)) / (old_max - old_min)
return new_value
@app.route('/compute_answers', methods=['POST'])
def compute_answers():
try:
log_print("\n=== Starting Answer Computation ===")
query_file = request.files.get('query_file')
if not query_file:
log_print("Missing query file", "ERROR")
return jsonify({"error": "Missing query file"}), 400
try:
queries = query_file.read().decode('utf-8').splitlines()
if not queries:
log_print("No queries found in file", "ERROR")
return jsonify({"error": "No queries found in file"}), 400
log_print(f"Received queries: {queries}")
except UnicodeDecodeError:
log_print("Invalid file encoding", "ERROR")
return jsonify({"error": "Invalid file encoding"}), 400
file_type = request.form.get('file_type')
if not file_type:
log_print("Missing file type", "ERROR")
return jsonify({"error": "Missing file type"}), 400
ans_csv_file = request.files.get('ans_csv_file')
if file_type == "csv":
if not ans_csv_file:
log_print("Missing answer CSV file", "ERROR")
return jsonify({"error": "Missing answer CSV file"}), 400
try:
ans_csv_file = ans_csv_file.read().decode('utf-8').splitlines()
c_answers = []
for i in ans_csv_file:
c_answers.append(i.split('\\n'))
log_print(f"Processed CSV answers: {c_answers}")
return jsonify({"answers": c_answers}), 200
except UnicodeDecodeError:
log_print("Invalid CSV file encoding", "ERROR")
return jsonify({"error": "Invalid CSV file encoding"}), 400
c_answers = []
if file_type == 'pdf':
# Create a temporary directory for PDF files
pdf_dir = os.path.join(cache_dir, 'pdf_files')
os.makedirs(pdf_dir, exist_ok=True)
# Save uploaded PDF files
pdf_files = []
for file in request.files.getlist('pdf_files[]'):
if file.filename.endswith('.pdf'):
filename = secure_filename(file.filename)
filepath = os.path.join(pdf_dir, filename)
file.save(filepath)
pdf_files.append(filepath)
if not pdf_files:
log_print("No PDF files uploaded", "ERROR")
return jsonify({"error": "No PDF files uploaded"}), 400
log_print(f"Processing {len(pdf_files)} PDF files")
# Process PDFs
for pdf_file in pdf_files:
database_creation(pdf_file)
# Generate answers
for query in queries:
ans = []
for pdf_file in pdf_files:
ans.append(answer_generation(pdf_file, query))
c_answers.append(ans)
# Clean up PDF directory
try:
import shutil
shutil.rmtree(pdf_dir)
except Exception as e:
log_print(f"Warning: Could not clean up PDF directory: {e}", "WARNING")
else:
log_print(f"Unsupported file type: {file_type}", "ERROR")
return jsonify({"error": "Unsupported file type"}), 400
log_print(f"Generated answers: {c_answers}")
return jsonify({"answers": c_answers}), 200
except Exception as e:
log_print(f"Error in compute_answers: {str(e)}", "ERROR")
return jsonify({"error": str(e)}), 500
@app.route('/compute_marks', methods=['POST'])
def compute_marks():
try:
log_print("\n=== Starting Marks Computation ===")
# Get and process answers
a = request.form.get('answers')
if not a:
log_print("No answers provided", "ERROR")
return jsonify({"error": "No answers provided"}), 400
try:
log_print("=== Processing Answers ===")
log_print(f"Received answers: {a}")
a = json.loads(a)
answers = []
for i in a:
ans = i.split('\n\n')
answers.append(ans)
log_print(f"Processed answers structure: {answers}")
except json.JSONDecodeError:
log_print("Invalid JSON format in answers", "ERROR")
return jsonify({"error": "Invalid JSON format in answers"}), 400
# Add validation for answers
def validate_answers(answers):
try:
if not isinstance(answers, list):
return False
# Check if each answer is a string or a list of strings
for ans in answers:
if isinstance(ans, str):
continue
elif isinstance(ans, list):
if not all(isinstance(a, str) for a in ans):
return False
else:
return False
return True
except Exception as e:
log_print(f"Validation error: {str(e)}", "ERROR")
return False
if not validate_answers(answers):
log_print("Invalid answer format", "ERROR")
log_print(f"Received answer structure: {answers}", "ERROR")
return jsonify({"error": "Invalid answer format"}), 400
# Process answers to ensure consistent format
processed_answers = []
for ans in answers:
if isinstance(ans, str):
processed_answers.append([ans])
else:
processed_answers.append(ans)
answers = processed_answers
# Get files from the request
files = request.files.getlist('files[]')
if not files:
log_print("No files were uploaded", "ERROR")
return jsonify({"error": "No files were uploaded"}), 400
# Create student folders and save files
data = {}
for file in files:
if file and is_valid_image_file(file.filename):
# Extract student folder from the path
path_parts = file.filename.split('/')
if len(path_parts) >= 2:
student_folder = path_parts[-2] # Get the parent folder name
filename = path_parts[-1] # Get the actual filename
# Create student directory if it doesn't exist
student_dir = os.path.join(ans_image_dir, student_folder)
os.makedirs(student_dir, exist_ok=True)
# Save the file
filepath = os.path.join(student_dir, filename)
file.save(filepath)
# Add to data structure
if student_folder not in data:
data[student_folder] = []
data[student_folder].append(filepath.replace("\\", "/"))
log_print(f"Saved file: {filepath}")
if not data:
log_print("No valid image files were found in the upload", "ERROR")
return jsonify({"error": "No valid image files were found"}), 400
log_print(f"Processed files structure: {data}")
# Initialize vectors for answers
sen_vec_answers = []
word_vec_answers = []
for i in answers:
temp_v = []
temp_w = []
for j in i:
temp_v.append(question_vector_sentence(j))
temp_w.append(question_vector_word(j))
sen_vec_answers.append(temp_v)
word_vec_answers.append(temp_w)
# Calculate marks
s_marks = {}
for student_folder, file_paths in data.items():
s_marks[student_folder] = []
count = 0
for image_path in file_paths:
try:
s_answer = extract_text_from_image(image_path)
log_print(f"\nProcessing {student_folder}/{os.path.basename(image_path)}:")
log_print(f"Extracted answer: {s_answer}")
if s_answer and count < len(answers):
log_print(f"Reference answer: {answers[count]}")
tf_idf_word_values, max_tfidf = create_tfidf_values(answers[count])
m = marks(s_answer, sen_vec_answers[count], word_vec_answers[count],
tf_idf_word_values, max_tfidf, answers[count])
if isinstance(m, torch.Tensor):
m = m.item()
s_marks[student_folder].append(round(float(m), 2))
log_print(f"Marks awarded: {m}")
else:
s_marks[student_folder].append(0)
log_print(f"No text extracted or no reference answer for index {count}", "WARNING")
count += 1
except Exception as e:
log_print(f"Error processing {image_path}: {str(e)}", "ERROR")
s_marks[student_folder].append(0)
if not s_marks:
log_print("No marks were computed", "ERROR")
return jsonify({"error": "No marks were computed. Please check your input files and answers."}), 400
log_print("\nFinal Results:")
for student, marks_list in s_marks.items():
log_print(f"{student}: {marks_list}")
return jsonify({"message": s_marks}), 200
except Exception as e:
log_print(f"Error in compute_marks: {str(e)}", "ERROR")
return jsonify({"error": str(e)}), 500
def marks(answer, sen_vec_answers, word_vec_answers, tf_idf_word_values, max_tfidf, correct_answers):
marks = 0
log_print("\n=== Marks Calculation ===")
log_print(f"Processing answer: {answer[:100]}...")
marks1 = tfidf_answer_score(answer, tf_idf_word_values, max_tfidf, marks=10)
log_print(f"TFIDF Score: {marks1}")
if marks1 > 3:
marks += new_value(marks1, old_min=3, old_max=10, new_min=0, new_max=5)
log_print(f"After TFIDF adjustment: {marks}")
if marks1 > 2:
marks2 = similarity_model_score(sen_vec_answers, answer)
log_print(f"Sentence Similarity Score: {marks2}")
if marks2 > 0.95:
marks += 3
elif marks2 > 0.5:
marks += new_value(marks2, old_min=0.5, old_max=0.95, new_min=0, new_max=3)
marks3 = fasttext_similarity(word_vec_answers, answer)
log_print(f"Word Similarity Score: {marks3}")
if marks3 > 0.9:
marks += 2
elif marks3 > 0.4:
marks += new_value(marks3, old_min=0.4, old_max=0.9, new_min=0, new_max=2)
marks4 = llm_score(correct_answers, answer)
log_print(f"LLM Scores: {marks4}")
for i in range(len(marks4)):
marks4[i] = float(marks4[i])
m = max(marks4)
log_print(f"Max LLM Score: {m}")
marks = marks/2 + m/2
log_print(f"Final marks: {marks}")
else:
log_print("WARNING: TFIDF score too low, returning 0")
return marks
@app.route('/check_logs')
def check_logs():
try:
with open(log_file, 'r') as f:
logs = f.read()
return jsonify({"logs": logs})
except Exception as e:
return jsonify({"error": str(e)})
# Add file type validation
def is_valid_image_file(filename):
valid_extensions = {'.jpg', '.jpeg', '.png'}
return os.path.splitext(filename)[1].lower() in valid_extensions
def allowed_file(filename, allowed_extensions):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in allowed_extensions
def cleanup_temp_files():
try:
import shutil
temp_dirs = [ans_image_dir, upload_dir]
for directory in temp_dirs:
if os.path.exists(directory):
shutil.rmtree(directory)
os.makedirs(directory, exist_ok=True)
log_print("Successfully cleaned up temporary files")
except Exception as e:
log_print(f"Error cleaning up temporary files: {e}", "ERROR")
if __name__ == '__main__':
try:
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port, debug=False)
finally:
cleanup_temp_files()