|
from flask import Flask, request, jsonify, render_template |
|
import os |
|
import json |
|
import torch |
|
from werkzeug.utils import secure_filename |
|
|
|
from HTR.app import extract_text_from_image |
|
|
|
from correct_answer_generation.answer_generation_database_creation import database_creation, answer_generation |
|
|
|
|
|
from similarity_check.tf_idf.tf_idf_score import create_tfidf_values, tfidf_answer_score |
|
from similarity_check.semantic_meaning_check.semantic import similarity_model_score, fasttext_similarity,question_vector_sentence,question_vector_word |
|
from similarity_check.llm_based_scoring.llm import llm_score |
|
|
|
app = Flask(__name__) |
|
|
|
UPLOAD_FOLDER = 'uploads' |
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('index.html') |
|
|
|
|
|
|
|
def new_value(value, old_min, old_max, new_min, new_max): |
|
new_value = new_min + ((value - old_min) * (new_max - new_min)) / (old_max - old_min) |
|
return new_value |
|
|
|
@app.route('/compute_answers', methods=['POST']) |
|
def compute_answers(): |
|
query_file = request.files.get('query_file') |
|
if not query_file: |
|
return jsonify({"error": "Missing query file"}), 400 |
|
|
|
queries = query_file.read().decode('utf-8').splitlines() |
|
|
|
file_type = request.form.get('file_type') |
|
ans_csv_file = request.files.get('ans_csv_file') |
|
|
|
if file_type == "csv": |
|
ans_csv_file = ans_csv_file.read().decode('utf-8').splitlines() |
|
c_answers = [] |
|
|
|
for i in ans_csv_file: |
|
c_answers.append(i.split('\\n')) |
|
|
|
return jsonify({"answers": c_answers}), 200 |
|
|
|
|
|
try: |
|
c_answers = [] |
|
|
|
if file_type == "csv": |
|
|
|
answers = ans_csv_file.read().decode('utf-8').splitlines() |
|
|
|
|
|
c_answers = [f"Processed query: {query}" for query in queries] |
|
|
|
elif file_type == 'pdf': |
|
|
|
for query in queries: |
|
folder_path = 'Knowledge_Retriever_pdf' |
|
|
|
pdf_files = [f"{folder_path}/{file}" for file in os.listdir(folder_path) if file.endswith('.pdf')] |
|
|
|
for i in pdf_files: |
|
database_creation(i) |
|
|
|
for i in queries: |
|
ans = [] |
|
for j in pdf_files: |
|
ans.append(answer_generation(j,i)) |
|
c_answers.append(ans) |
|
|
|
else: |
|
return jsonify({"error": "Unsupported file type"}), 400 |
|
|
|
return jsonify({"answers": c_answers}), 200 |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@app.route('/compute_marks', methods=['POST']) |
|
def compute_marks(): |
|
try: |
|
a = request.form.get('answers') |
|
a = json.loads(a) |
|
answers = [] |
|
for i in a: |
|
ans = i.split('\n\n') |
|
answers.append(ans) |
|
|
|
|
|
temp_folder = os.path.join('uploads', 'temp_answers') |
|
os.makedirs(temp_folder, exist_ok=True) |
|
|
|
|
|
files = request.files.getlist('files[]') |
|
data = {} |
|
|
|
for file in files: |
|
if file.filename.endswith(('.jpg', '.jpeg', '.png')): |
|
|
|
relative_path = file.filename |
|
|
|
|
|
path_parts = relative_path.split('/') |
|
if len(path_parts) >= 2: |
|
student_folder = path_parts[0] |
|
|
|
|
|
student_path = os.path.join(temp_folder, student_folder) |
|
os.makedirs(student_path, exist_ok=True) |
|
|
|
|
|
save_path = os.path.join(temp_folder, relative_path) |
|
os.makedirs(os.path.dirname(save_path), exist_ok=True) |
|
file.save(save_path) |
|
|
|
|
|
if student_folder in data: |
|
data[student_folder].append(save_path) |
|
else: |
|
data[student_folder] = [save_path] |
|
|
|
|
|
for student in data: |
|
data[student].sort() |
|
|
|
|
|
s_marks = {} |
|
sen_vec_answers = [] |
|
word_vec_answers = [] |
|
|
|
for i in answers: |
|
temp_v = [] |
|
temp_w = [] |
|
for j in i: |
|
temp_v.append(question_vector_sentence(j)) |
|
temp_w.append(question_vector_word(j)) |
|
sen_vec_answers.append(temp_v) |
|
word_vec_answers.append(temp_w) |
|
|
|
for i in data: |
|
s_marks[i] = [] |
|
count = 0 |
|
for j in data[i]: |
|
image_path = j |
|
s_answer = extract_text_from_image(image_path) |
|
tf_idf_word_values, max_tfidf = create_tfidf_values(answers[count]) |
|
m = marks(s_answer, sen_vec_answers[count], word_vec_answers[count], |
|
tf_idf_word_values, max_tfidf, answers[count]) |
|
if isinstance(m, torch.Tensor): |
|
m = m.item() |
|
s_marks[i].append(m) |
|
count += 1 |
|
|
|
|
|
import shutil |
|
shutil.rmtree(temp_folder) |
|
|
|
return jsonify({"message": s_marks}), 200 |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
|
|
def marks(answer,sen_vec_answers,word_vec_answers,tf_idf_word_values, max_tfidf,correct_answers): |
|
marks = 0 |
|
marks1 = tfidf_answer_score(answer,tf_idf_word_values,max_tfidf,marks =10) |
|
|
|
if marks1>3: |
|
marks += new_value(marks1, old_min = 3, old_max=10, new_min=0, new_max=5) |
|
|
|
|
|
if marks1>2: |
|
marks2 = similarity_model_score(sen_vec_answers,answer) |
|
a = 0 |
|
if marks2>0.95: |
|
marks += 3 |
|
a = a+3 |
|
elif marks2>0.5: |
|
marks += new_value(marks2, old_min = 0.5, old_max=0.95, new_min=0, new_max=3) |
|
a = a+new_value(marks2, old_min = 0.5, old_max=0.95, new_min=0, new_max=3) |
|
|
|
|
|
marks3 = fasttext_similarity(word_vec_answers,answer) |
|
b = 0 |
|
if marks2>0.9: |
|
marks += 2 |
|
b= b+2 |
|
elif marks3>0.4: |
|
marks += new_value(marks3, old_min = 0.4, old_max=0.9, new_min=0, new_max=2) |
|
b=b+new_value(marks3, old_min = 0.4, old_max=0.9, new_min=0, new_max=2) |
|
|
|
|
|
marks4 = llm_score(correct_answers,answer) |
|
for i in range(len(marks4)): |
|
marks4[i] = float(marks4[i]) |
|
|
|
m = max(marks4) |
|
|
|
marks = marks/2 + m/2 |
|
|
|
return marks |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
app.run(host='0.0.0.0', port=7860) |
|
|