|
import os |
|
import tempfile |
|
import nltk |
|
import logging |
|
import sys |
|
import builtins |
|
from datetime import datetime |
|
from flask_cors import CORS |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
BASE_DIR = tempfile.gettempdir() |
|
log_dir = os.path.join(BASE_DIR, 'app_logs') |
|
cache_dir = os.path.join(BASE_DIR, 'app_cache') |
|
|
|
|
|
try: |
|
os.makedirs(log_dir, exist_ok=True) |
|
except Exception as e: |
|
print(f"Warning: Could not create log directory: {e}") |
|
|
|
log_dir = tempfile.mkdtemp(prefix='app_logs_') |
|
|
|
|
|
log_file = os.path.join(log_dir, f'app_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log') |
|
|
|
|
|
try: |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler(log_file), |
|
logging.StreamHandler(sys.stdout) |
|
] |
|
) |
|
except Exception as e: |
|
print(f"Warning: Could not set up file logging: {e}") |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[logging.StreamHandler(sys.stdout)] |
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def log_print(message, level="INFO"): |
|
|
|
builtins.print(message, flush=True) |
|
if level == "INFO": |
|
logger.info(message) |
|
elif level == "ERROR": |
|
logger.error(message) |
|
elif level == "WARNING": |
|
logger.warning(message) |
|
|
|
|
|
try: |
|
nltk_data_dir = os.path.join(cache_dir, 'nltk_data') |
|
gensim_data_dir = os.path.join(cache_dir, 'gensim-data') |
|
upload_dir = os.path.join(cache_dir, 'uploads') |
|
ans_image_dir = os.path.join(cache_dir, 'ans_image') |
|
|
|
|
|
for directory in [cache_dir, nltk_data_dir, gensim_data_dir, upload_dir, ans_image_dir]: |
|
os.makedirs(directory, exist_ok=True) |
|
except Exception as e: |
|
print(f"Warning: Could not create cache directories: {e}") |
|
|
|
cache_dir = tempfile.mkdtemp(prefix='app_cache_') |
|
nltk_data_dir = tempfile.mkdtemp(prefix='nltk_data_') |
|
gensim_data_dir = tempfile.mkdtemp(prefix='gensim_data_') |
|
upload_dir = tempfile.mkdtemp(prefix='uploads_') |
|
ans_image_dir = tempfile.mkdtemp(prefix='ans_image_') |
|
|
|
|
|
os.environ['HF_HOME'] = cache_dir |
|
os.environ['GENSIM_DATA_DIR'] = gensim_data_dir |
|
|
|
|
|
nltk.data.path.insert(0, nltk_data_dir) |
|
|
|
|
|
required_nltk_data = ['stopwords', 'punkt', 'wordnet'] |
|
for data in required_nltk_data: |
|
try: |
|
log_print(f"Downloading NLTK data: {data}") |
|
nltk.download(data, download_dir=nltk_data_dir) |
|
except Exception as e: |
|
log_print(f"Error downloading NLTK data {data}: {e}", "ERROR") |
|
raise |
|
|
|
from flask import Flask, request, jsonify, render_template |
|
import json |
|
import torch |
|
from werkzeug.utils import secure_filename |
|
|
|
from HTR.app import extract_text_from_image |
|
from correct_answer_generation.answer_generation_database_creation import database_creation, answer_generation |
|
from similarity_check.tf_idf.tf_idf_score import create_tfidf_values, tfidf_answer_score |
|
from similarity_check.semantic_meaning_check.semantic import similarity_model_score, fasttext_similarity,question_vector_sentence,question_vector_word |
|
from similarity_check.llm_based_scoring.llm import llm_score |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
UPLOAD_FOLDER = upload_dir |
|
|
|
|
|
CORS(app, resources={ |
|
r"/*": { |
|
"origins": ["*"], |
|
"methods": ["GET", "POST", "OPTIONS"], |
|
"allow_headers": ["Content-Type", "Authorization"] |
|
} |
|
}) |
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('index.html') |
|
|
|
def new_value(value, old_min, old_max, new_min, new_max): |
|
new_value = new_min + ((value - old_min) * (new_max - new_min)) / (old_max - old_min) |
|
return new_value |
|
|
|
@app.route('/compute_answers', methods=['POST']) |
|
def compute_answers(): |
|
try: |
|
log_print("\n=== Starting Answer Computation ===") |
|
|
|
query_file = request.files.get('query_file') |
|
if not query_file: |
|
log_print("Missing query file", "ERROR") |
|
return jsonify({"error": "Missing query file"}), 400 |
|
|
|
try: |
|
queries = query_file.read().decode('utf-8').splitlines() |
|
if not queries: |
|
log_print("No queries found in file", "ERROR") |
|
return jsonify({"error": "No queries found in file"}), 400 |
|
log_print(f"Received queries: {queries}") |
|
except UnicodeDecodeError: |
|
log_print("Invalid file encoding", "ERROR") |
|
return jsonify({"error": "Invalid file encoding"}), 400 |
|
|
|
file_type = request.form.get('file_type') |
|
if not file_type: |
|
log_print("Missing file type", "ERROR") |
|
return jsonify({"error": "Missing file type"}), 400 |
|
|
|
ans_csv_file = request.files.get('ans_csv_file') |
|
|
|
if file_type == "csv": |
|
if not ans_csv_file: |
|
log_print("Missing answer CSV file", "ERROR") |
|
return jsonify({"error": "Missing answer CSV file"}), 400 |
|
|
|
try: |
|
ans_csv_file = ans_csv_file.read().decode('utf-8').splitlines() |
|
c_answers = [] |
|
for i in ans_csv_file: |
|
c_answers.append(i.split('\\n')) |
|
log_print(f"Processed CSV answers: {c_answers}") |
|
return jsonify({"answers": c_answers}), 200 |
|
except UnicodeDecodeError: |
|
log_print("Invalid CSV file encoding", "ERROR") |
|
return jsonify({"error": "Invalid CSV file encoding"}), 400 |
|
|
|
c_answers = [] |
|
|
|
if file_type == 'pdf': |
|
|
|
pdf_dir = os.path.join(cache_dir, 'pdf_files') |
|
os.makedirs(pdf_dir, exist_ok=True) |
|
|
|
|
|
pdf_files = [] |
|
for file in request.files.getlist('pdf_files[]'): |
|
if file.filename.endswith('.pdf'): |
|
filename = secure_filename(file.filename) |
|
filepath = os.path.join(pdf_dir, filename) |
|
file.save(filepath) |
|
pdf_files.append(filepath) |
|
|
|
if not pdf_files: |
|
log_print("No PDF files uploaded", "ERROR") |
|
return jsonify({"error": "No PDF files uploaded"}), 400 |
|
|
|
log_print(f"Processing {len(pdf_files)} PDF files") |
|
|
|
|
|
for pdf_file in pdf_files: |
|
database_creation(pdf_file) |
|
|
|
|
|
for query in queries: |
|
ans = [] |
|
for pdf_file in pdf_files: |
|
ans.append(answer_generation(pdf_file, query)) |
|
c_answers.append(ans) |
|
|
|
|
|
try: |
|
import shutil |
|
shutil.rmtree(pdf_dir) |
|
except Exception as e: |
|
log_print(f"Warning: Could not clean up PDF directory: {e}", "WARNING") |
|
|
|
else: |
|
log_print(f"Unsupported file type: {file_type}", "ERROR") |
|
return jsonify({"error": "Unsupported file type"}), 400 |
|
|
|
log_print(f"Generated answers: {c_answers}") |
|
return jsonify({"answers": c_answers}), 200 |
|
|
|
except Exception as e: |
|
log_print(f"Error in compute_answers: {str(e)}", "ERROR") |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route('/compute_marks', methods=['POST']) |
|
def compute_marks(): |
|
try: |
|
log_print("\n=== Starting Marks Computation ===") |
|
|
|
|
|
a = request.form.get('answers') |
|
if not a: |
|
log_print("No answers provided", "ERROR") |
|
return jsonify({"error": "No answers provided"}), 400 |
|
|
|
try: |
|
log_print("=== Processing Answers ===") |
|
log_print(f"Received answers: {a}") |
|
a = json.loads(a) |
|
answers = [] |
|
for i in a: |
|
ans = i.split('\n\n') |
|
answers.append(ans) |
|
log_print(f"Processed answers structure: {answers}") |
|
except json.JSONDecodeError: |
|
log_print("Invalid JSON format in answers", "ERROR") |
|
return jsonify({"error": "Invalid JSON format in answers"}), 400 |
|
|
|
|
|
def validate_answers(answers): |
|
try: |
|
if not isinstance(answers, list): |
|
return False |
|
|
|
for ans in answers: |
|
if isinstance(ans, str): |
|
continue |
|
elif isinstance(ans, list): |
|
if not all(isinstance(a, str) for a in ans): |
|
return False |
|
else: |
|
return False |
|
return True |
|
except Exception as e: |
|
log_print(f"Validation error: {str(e)}", "ERROR") |
|
return False |
|
|
|
if not validate_answers(answers): |
|
log_print("Invalid answer format", "ERROR") |
|
log_print(f"Received answer structure: {answers}", "ERROR") |
|
return jsonify({"error": "Invalid answer format"}), 400 |
|
|
|
|
|
processed_answers = [] |
|
for ans in answers: |
|
if isinstance(ans, str): |
|
processed_answers.append([ans]) |
|
else: |
|
processed_answers.append(ans) |
|
answers = processed_answers |
|
|
|
|
|
files = request.files.getlist('files[]') |
|
if not files: |
|
log_print("No files were uploaded", "ERROR") |
|
return jsonify({"error": "No files were uploaded"}), 400 |
|
|
|
|
|
data = {} |
|
for file in files: |
|
if file and is_valid_image_file(file.filename): |
|
|
|
path_parts = file.filename.split('/') |
|
if len(path_parts) >= 2: |
|
student_folder = path_parts[-2] |
|
filename = path_parts[-1] |
|
|
|
|
|
student_dir = os.path.join(ans_image_dir, student_folder) |
|
os.makedirs(student_dir, exist_ok=True) |
|
|
|
|
|
filepath = os.path.join(student_dir, filename) |
|
file.save(filepath) |
|
|
|
|
|
if student_folder not in data: |
|
data[student_folder] = [] |
|
data[student_folder].append(filepath.replace("\\", "/")) |
|
log_print(f"Saved file: {filepath}") |
|
|
|
if not data: |
|
log_print("No valid image files were found in the upload", "ERROR") |
|
return jsonify({"error": "No valid image files were found"}), 400 |
|
|
|
log_print(f"Processed files structure: {data}") |
|
|
|
|
|
sen_vec_answers = [] |
|
word_vec_answers = [] |
|
for i in answers: |
|
temp_v = [] |
|
temp_w = [] |
|
for j in i: |
|
temp_v.append(question_vector_sentence(j)) |
|
temp_w.append(question_vector_word(j)) |
|
sen_vec_answers.append(temp_v) |
|
word_vec_answers.append(temp_w) |
|
|
|
|
|
s_marks = {} |
|
for student_folder, file_paths in data.items(): |
|
s_marks[student_folder] = [] |
|
count = 0 |
|
for image_path in file_paths: |
|
try: |
|
s_answer = extract_text_from_image(image_path) |
|
log_print(f"\nProcessing {student_folder}/{os.path.basename(image_path)}:") |
|
log_print(f"Extracted answer: {s_answer}") |
|
|
|
if s_answer and count < len(answers): |
|
log_print(f"Reference answer: {answers[count]}") |
|
tf_idf_word_values, max_tfidf = create_tfidf_values(answers[count]) |
|
m = marks(s_answer, sen_vec_answers[count], word_vec_answers[count], |
|
tf_idf_word_values, max_tfidf, answers[count]) |
|
|
|
if isinstance(m, torch.Tensor): |
|
m = m.item() |
|
s_marks[student_folder].append(round(float(m), 2)) |
|
log_print(f"Marks awarded: {m}") |
|
else: |
|
s_marks[student_folder].append(0) |
|
log_print(f"No text extracted or no reference answer for index {count}", "WARNING") |
|
|
|
count += 1 |
|
|
|
except Exception as e: |
|
log_print(f"Error processing {image_path}: {str(e)}", "ERROR") |
|
s_marks[student_folder].append(0) |
|
|
|
if not s_marks: |
|
log_print("No marks were computed", "ERROR") |
|
return jsonify({"error": "No marks were computed. Please check your input files and answers."}), 400 |
|
|
|
log_print("\nFinal Results:") |
|
for student, marks_list in s_marks.items(): |
|
log_print(f"{student}: {marks_list}") |
|
|
|
return jsonify({"message": s_marks}), 200 |
|
|
|
except Exception as e: |
|
log_print(f"Error in compute_marks: {str(e)}", "ERROR") |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
def marks(answer, sen_vec_answers, word_vec_answers, tf_idf_word_values, max_tfidf, correct_answers): |
|
marks = 0 |
|
log_print("\n=== Marks Calculation ===") |
|
log_print(f"Processing answer: {answer[:100]}...") |
|
|
|
marks1 = tfidf_answer_score(answer, tf_idf_word_values, max_tfidf, marks=10) |
|
log_print(f"TFIDF Score: {marks1}") |
|
|
|
if marks1 > 3: |
|
marks += new_value(marks1, old_min=3, old_max=10, new_min=0, new_max=5) |
|
log_print(f"After TFIDF adjustment: {marks}") |
|
|
|
if marks1 > 2: |
|
marks2 = similarity_model_score(sen_vec_answers, answer) |
|
log_print(f"Sentence Similarity Score: {marks2}") |
|
|
|
if marks2 > 0.95: |
|
marks += 3 |
|
elif marks2 > 0.5: |
|
marks += new_value(marks2, old_min=0.5, old_max=0.95, new_min=0, new_max=3) |
|
|
|
marks3 = fasttext_similarity(word_vec_answers, answer) |
|
log_print(f"Word Similarity Score: {marks3}") |
|
|
|
if marks3 > 0.9: |
|
marks += 2 |
|
elif marks3 > 0.4: |
|
marks += new_value(marks3, old_min=0.4, old_max=0.9, new_min=0, new_max=2) |
|
|
|
marks4 = llm_score(correct_answers, answer) |
|
log_print(f"LLM Scores: {marks4}") |
|
|
|
for i in range(len(marks4)): |
|
marks4[i] = float(marks4[i]) |
|
|
|
m = max(marks4) |
|
log_print(f"Max LLM Score: {m}") |
|
|
|
marks = marks/2 + m/2 |
|
log_print(f"Final marks: {marks}") |
|
else: |
|
log_print("WARNING: TFIDF score too low, returning 0") |
|
|
|
return marks |
|
|
|
@app.route('/check_logs') |
|
def check_logs(): |
|
try: |
|
with open(log_file, 'r') as f: |
|
logs = f.read() |
|
return jsonify({"logs": logs}) |
|
except Exception as e: |
|
return jsonify({"error": str(e)}) |
|
|
|
|
|
def is_valid_image_file(filename): |
|
valid_extensions = {'.jpg', '.jpeg', '.png'} |
|
return os.path.splitext(filename)[1].lower() in valid_extensions |
|
|
|
def allowed_file(filename, allowed_extensions): |
|
return '.' in filename and \ |
|
filename.rsplit('.', 1)[1].lower() in allowed_extensions |
|
|
|
def cleanup_temp_files(): |
|
try: |
|
import shutil |
|
temp_dirs = [ans_image_dir, upload_dir] |
|
for directory in temp_dirs: |
|
if os.path.exists(directory): |
|
shutil.rmtree(directory) |
|
os.makedirs(directory, exist_ok=True) |
|
log_print("Successfully cleaned up temporary files") |
|
except Exception as e: |
|
log_print(f"Error cleaning up temporary files: {e}", "ERROR") |
|
|
|
if __name__ == '__main__': |
|
try: |
|
port = int(os.environ.get('PORT', 7860)) |
|
app.run(host='0.0.0.0', port=port, debug=False) |
|
finally: |
|
cleanup_temp_files() |