|
import threading |
|
from flask import Flask, g, render_template, request, redirect, url_for, session |
|
import os |
|
import time |
|
|
|
from huggingface_hub import login, HfApi, hf_hub_download |
|
import os |
|
import logging |
|
|
|
import csv |
|
import random |
|
import shortuuid |
|
import json |
|
import pandas as pd |
|
from filelock import FileLock |
|
|
|
|
|
|
|
app = Flask(__name__) |
|
app.config['SECRET_KEY'] = os.environ.get('FLASK_SECRET_KEY') |
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
level=logging.DEBUG, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
HF_TOKEN = os.environ.get("HF_TOKEN") |
|
if HF_TOKEN: |
|
try: |
|
login(token=HF_TOKEN) |
|
logger.info("Logged into Hugging Face successfully.") |
|
except Exception as e: |
|
logger.exception(f"Failed to log into Hugging Face: {e}") |
|
else: |
|
logger.warning("HF_TOKEN not found in environment variables. Session data will not be uploaded.") |
|
|
|
|
|
hf_api = HfApi() |
|
|
|
|
|
HF_REPO_ID = "pooyanrg/resultsBlindTest" |
|
HF_REPO_PATH = "responses" |
|
|
|
|
|
QUESTIONS_FILE = "./data/dataset.csv" |
|
AVAILABLE_FILE = "/tmp/available.json" |
|
LOCK_FILE = "/tmp/available.lock" |
|
|
|
|
|
|
|
with open(QUESTIONS_FILE, newline="", encoding="utf-8") as f: |
|
reader = csv.DictReader(f) |
|
all_questions = list(reader) |
|
|
|
|
|
|
|
with open(AVAILABLE_FILE, "w") as f: |
|
json.dump([0, 1, 3, 6, 12, 14, 16, 17, 21, 31, 36, 42, 43, 47, 54, 58, 59, 62, 65, 68, 71, 75, 83, 90, 93, 97, 100, 105, 112, 120, 132, 137, 139, 143, 146, 153, 156, 160, 172, 173, 177, 181, 187, 188, 190, 191, 202, 206, 207, 209, 210, 214, 233, 235, 237, 240, 242, 243, 248, 260, 261, 262, 266, 271, 275, 276, 286, 287, 301, 308, 311, 316, 317, 322, 326, 327, 330, 331, 339, 346, 348, 356, 357, 363, 365, 374, 377, 378, 379, 381, 382, 386, 389, 392, 404, 407, 413, 416, 419, 423, 426, 449, 452, 453, 454, 482, 484, 487, 488, 491, 500, 509, 510, 512, 513, 516, 518, 519, 521, 522, 525, 526, 530, 536, 539, 550, 555], f) |
|
|
|
def get_questions(num=10): |
|
with FileLock(LOCK_FILE): |
|
with open(AVAILABLE_FILE, "r") as f: |
|
available = json.load(f) |
|
|
|
if len(available) == 0: |
|
return [] |
|
|
|
sample_size = min(num, len(available)) |
|
selected_ids = random.sample(available, sample_size) |
|
|
|
remaining = [qid for qid in available if qid not in selected_ids] |
|
|
|
with open(AVAILABLE_FILE, "w") as f: |
|
json.dump(remaining, f) |
|
|
|
return [all_questions[qid] | {"id": qid} for qid in selected_ids] |
|
|
|
def save_session_data(session_id, data): |
|
""" |
|
Saves session data to a JSON file in the SESSION_DIR. |
|
|
|
Args: |
|
session_id (str): Unique identifier for the session. |
|
data (dict): Session data to save. |
|
""" |
|
try: |
|
file_path = os.path.join('/tmp', f'{session_id}.json') |
|
with open(file_path, 'w') as f: |
|
json.dump(data, f) |
|
logger.info(f"Session data saved for session {session_id}") |
|
except Exception as e: |
|
logger.exception(f"Failed to save session data for session {session_id}: {e}") |
|
|
|
def load_session_data(session_id): |
|
""" |
|
Loads session data from a JSON file in the SESSION_DIR. |
|
|
|
Args: |
|
session_id (str): Unique identifier for the session. |
|
|
|
Returns: |
|
dict or None: Session data if file exists, else None. |
|
""" |
|
try: |
|
file_path = os.path.join("/tmp", f'{session_id}.json') |
|
if os.path.exists(file_path): |
|
with open(file_path, 'r') as f: |
|
data = json.load(f) |
|
logger.info(f"Session data loaded for session {session_id}") |
|
return data |
|
else: |
|
logger.warning(f"Session file not found for session {session_id}") |
|
return None |
|
except Exception as e: |
|
logger.exception(f"Failed to load session data for session {session_id}: {e}") |
|
return None |
|
|
|
|
|
@app.route("/", methods=['GET', 'POST']) |
|
def splash(): |
|
if request.method == 'POST': |
|
username = request.form.get('username') |
|
|
|
if not username: |
|
logger.warning("Username not provided by the user.") |
|
return render_template('splash.html', error="Please enter a username.") |
|
|
|
session_id = shortuuid.uuid() |
|
|
|
current_index = 0 |
|
answers = [] |
|
questions = [] |
|
|
|
session_data = {'questions': questions, |
|
'answers': answers} |
|
|
|
save_session_data(session_id, session_data) |
|
|
|
return redirect(url_for('instructions', username=username, current_index=current_index, session_id=session_id)) |
|
|
|
|
|
logger.info("Splash page rendered.") |
|
return render_template('splash.html') |
|
|
|
|
|
@app.route('/instructions', methods=['GET', 'POST']) |
|
def instructions(): |
|
username = request.args.get('username') |
|
session_id = request.args.get('session_id') |
|
current_index = int(request.args.get('current_index')) |
|
|
|
if request.method == 'POST': |
|
|
|
start_time = time.time() |
|
session_data = load_session_data(session_id) |
|
session_data['time'] = start_time |
|
save_session_data(session_id, session_data) |
|
return redirect(url_for('prep', username=username, current_index=current_index, session_id=session_id)) |
|
|
|
|
|
questions = get_questions(num=10) |
|
|
|
session_data = load_session_data(session_id) |
|
session_data['questions'] = questions |
|
save_session_data(session_id, session_data) |
|
|
|
if len(questions) == 0: |
|
return render_template('thanks.html') |
|
|
|
|
|
return render_template('instructions.html', username=username) |
|
|
|
|
|
|
|
@app.route('/prep', methods=['GET', 'POST']) |
|
def prep(): |
|
username = request.args.get('username') |
|
current_index = int(request.args.get('current_index')) |
|
|
|
|
|
session_id = request.args.get('session_id') |
|
session_data = load_session_data(session_id) |
|
|
|
questions = session_data['questions'] |
|
|
|
|
|
if request.method == 'POST': |
|
|
|
|
|
return redirect(url_for('prompt', username=username, current_index=current_index, session_id=session_id)) |
|
|
|
return render_template('prep.html', |
|
question_number=current_index + 1, |
|
total=len(questions)) |
|
|
|
@app.route('/prompt', methods=['GET', 'POST']) |
|
def prompt(): |
|
username = request.args.get('username') |
|
current_index = int(request.args.get('current_index')) |
|
|
|
|
|
session_id = request.args.get('session_id') |
|
session_data = load_session_data(session_id) |
|
|
|
questions = session_data['questions'] |
|
|
|
if request.method == 'POST': |
|
|
|
|
|
return redirect(url_for('image', username=username, current_index=current_index, session_id=session_id)) |
|
|
|
question_raw = questions[current_index].get('question', '') |
|
if "count" in question_raw.lower(): |
|
idx = question_raw.index('.') |
|
else: |
|
idx = question_raw.index('?') |
|
|
|
question = question_raw[:idx+1] |
|
|
|
task = questions[current_index].get('task', '') |
|
if task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids': |
|
question = "Count the number of rows and columns." |
|
|
|
return render_template('prompt.html', |
|
question_text=question) |
|
|
|
@app.route('/image', methods=['GET', 'POST']) |
|
def image(): |
|
username = request.args.get('username') |
|
current_index = int(request.args.get('current_index')) |
|
|
|
|
|
session_id = request.args.get('session_id') |
|
session_data = load_session_data(session_id) |
|
|
|
questions = session_data['questions'] |
|
image_id = str(questions[current_index].get('image_id', 'default_image.png')) |
|
|
|
if request.method == 'POST': |
|
task = questions[current_index].get('task', '') |
|
|
|
|
|
if task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids': |
|
return redirect(url_for('respond_grid', username=username, current_index=current_index, session_id=session_id)) |
|
else: |
|
return redirect(url_for('respond', username=username, current_index=current_index, session_id=session_id)) |
|
|
|
|
|
return render_template('image.html', |
|
image_name=image_id + '.jpg') |
|
|
|
@app.route('/respond_grid', methods=['GET', 'POST']) |
|
def respond_grid(): |
|
username = request.args.get('username') |
|
current_index = int(request.args.get('current_index')) |
|
session_id = request.args.get('session_id') |
|
session_data = load_session_data(session_id) |
|
|
|
questions = session_data['questions'] |
|
answers = session_data['answers'] |
|
|
|
if request.method == 'POST': |
|
response_r = request.form.get('response_r') |
|
response_c = request.form.get('response_c') |
|
question_id = questions[current_index]['image_id'] |
|
|
|
|
|
answers.append({"image_id": question_id, |
|
"answer": (response_r, response_c)}) |
|
|
|
session['answers'] = answers |
|
save_session_data(session_id, session_data) |
|
current_index += 1 |
|
|
|
if current_index >= len(questions): |
|
|
|
end_time = time.time() |
|
elapsed_time = end_time - session_data['time'] |
|
|
|
response_all = {'username': username, |
|
'time': elapsed_time, |
|
'responses':answers} |
|
|
|
json_data = json.dumps(response_all, indent=4) |
|
|
|
file_name = f"answers_{session_id}.json" |
|
temp_file_path = os.path.join("/tmp", file_name) |
|
|
|
with open(temp_file_path, 'w') as f: |
|
f.write(json_data) |
|
|
|
for file_path in os.listdir("/tmp/"): |
|
if "answers_" in file_path: |
|
hf_api.upload_file( |
|
path_or_fileobj=f'/tmp/{file_path}', |
|
path_in_repo=f"{HF_REPO_PATH}/{file_path}", |
|
repo_id=HF_REPO_ID, |
|
repo_type="space", |
|
) |
|
|
|
return render_template('thanks.html') |
|
|
|
return redirect(url_for('prep', username=username, current_index=current_index, session_id=session_id)) |
|
|
|
question_raw = questions[current_index].get('question', '') |
|
if "count" in question_raw.lower(): |
|
idx = question_raw.index('.') |
|
else: |
|
idx = question_raw.index('?') |
|
|
|
question = question_raw[:idx+1] |
|
form = "Enter a number" |
|
|
|
return render_template('respond_grid.html', |
|
question_text=question, |
|
instruction=form) |
|
|
|
|
|
@app.route('/respond', methods=['GET', 'POST']) |
|
def respond(): |
|
username = request.args.get('username') |
|
current_index = int(request.args.get('current_index')) |
|
session_id = request.args.get('session_id') |
|
session_data = load_session_data(session_id) |
|
|
|
questions = session_data['questions'] |
|
answers = session_data['answers'] |
|
|
|
if request.method == 'POST': |
|
response = request.form.get('response') |
|
question_id = questions[current_index]['image_id'] |
|
|
|
|
|
answers.append({"image_id": question_id, |
|
"answer": response}) |
|
|
|
session['answers'] = answers |
|
save_session_data(session_id, session_data) |
|
current_index += 1 |
|
|
|
if current_index >= len(questions): |
|
|
|
end_time = time.time() |
|
elapsed_time = end_time - session_data['time'] |
|
|
|
response_all = {'username': username, |
|
'time': elapsed_time, |
|
'responses':answers} |
|
|
|
json_data = json.dumps(response_all, indent=4) |
|
|
|
file_name = f"answers_{session_id}.json" |
|
temp_file_path = os.path.join("/tmp", file_name) |
|
|
|
with open(temp_file_path, 'w') as f: |
|
f.write(json_data) |
|
|
|
for file_path in os.listdir("/tmp/"): |
|
if "answers_" in file_path: |
|
hf_api.upload_file( |
|
path_or_fileobj=f'/tmp/{file_path}', |
|
path_in_repo=f"{HF_REPO_PATH}/{file_path}", |
|
repo_id=HF_REPO_ID, |
|
repo_type="space", |
|
) |
|
|
|
return render_template('thanks.html') |
|
|
|
return redirect(url_for('prep', username=username, current_index=current_index, session_id=session_id)) |
|
|
|
question_raw = questions[current_index].get('question', '') |
|
if "count" in question_raw.lower(): |
|
idx = question_raw.index('.') |
|
else: |
|
idx = question_raw.index('?') |
|
|
|
question = question_raw[:idx+1] |
|
|
|
task = questions[current_index].get('task', '') |
|
if task == 'Circled Letter': |
|
form = "Enter a letter" |
|
elif task == 'Touching Circles': |
|
form = "Enter Y/N" |
|
else: |
|
form = "Enter a number" |
|
|
|
return render_template('respond.html', |
|
question_text=question, |
|
instruction=form) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
app.run(host="0.0.0.0", port=7860, debug=False) |