BlindTest / app.py
pooyanrg's picture
new ver
13acbbd
import threading
from flask import Flask, g, render_template, request, redirect, url_for, session
import os
import time
from huggingface_hub import login, HfApi, hf_hub_download # For Hugging Face integration
import os
import logging
import csv
import random
import shortuuid
import json
import pandas as pd
from filelock import FileLock
## new version
# Flask application setup
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('FLASK_SECRET_KEY') # required for sessions
# app.config.update(
# SESSION_COOKIE_SAMESITE="None", # allow cross-site
# SESSION_COOKIE_SECURE=True # required for "None"
# )
logging.basicConfig(
level=logging.DEBUG, # Set to DEBUG for more granular logs
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
HF_TOKEN = os.environ.get("HF_TOKEN")
if HF_TOKEN:
try:
login(token=HF_TOKEN)
logger.info("Logged into Hugging Face successfully.")
except Exception as e:
logger.exception(f"Failed to log into Hugging Face: {e}")
else:
logger.warning("HF_TOKEN not found in environment variables. Session data will not be uploaded.")
# Initialize Hugging Face API
hf_api = HfApi()
HF_REPO_ID = "pooyanrg/resultsBlindTest" # Update as needed
HF_REPO_PATH = "responses"
QUESTIONS_FILE = "./data/dataset.csv"
AVAILABLE_FILE = "/tmp/available.json"
LOCK_FILE = "/tmp/available.lock"
# Load questions into memory
with open(QUESTIONS_FILE, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
all_questions = list(reader)
# Track which questions are available
with open(AVAILABLE_FILE, "w") as f:
json.dump([0, 1, 3, 6, 12, 14, 16, 17, 21, 31, 36, 42, 43, 47, 54, 58, 59, 62, 65, 68, 71, 75, 83, 90, 93, 97, 100, 105, 112, 120, 132, 137, 139, 143, 146, 153, 156, 160, 172, 173, 177, 181, 187, 188, 190, 191, 202, 206, 207, 209, 210, 214, 233, 235, 237, 240, 242, 243, 248, 260, 261, 262, 266, 271, 275, 276, 286, 287, 301, 308, 311, 316, 317, 322, 326, 327, 330, 331, 339, 346, 348, 356, 357, 363, 365, 374, 377, 378, 379, 381, 382, 386, 389, 392, 404, 407, 413, 416, 419, 423, 426, 449, 452, 453, 454, 482, 484, 487, 488, 491, 500, 509, 510, 512, 513, 516, 518, 519, 521, 522, 525, 526, 530, 536, 539, 550, 555], f)
def get_questions(num=10):
with FileLock(LOCK_FILE):
with open(AVAILABLE_FILE, "r") as f:
available = json.load(f)
if len(available) == 0:
return [] # Not enough questions left
sample_size = min(num, len(available))
selected_ids = random.sample(available, sample_size)
remaining = [qid for qid in available if qid not in selected_ids]
with open(AVAILABLE_FILE, "w") as f:
json.dump(remaining, f)
return [all_questions[qid] | {"id": qid} for qid in selected_ids]
def save_session_data(session_id, data):
"""
Saves session data to a JSON file in the SESSION_DIR.
Args:
session_id (str): Unique identifier for the session.
data (dict): Session data to save.
"""
try:
file_path = os.path.join('/tmp', f'{session_id}.json')
with open(file_path, 'w') as f:
json.dump(data, f)
logger.info(f"Session data saved for session {session_id}")
except Exception as e:
logger.exception(f"Failed to save session data for session {session_id}: {e}")
def load_session_data(session_id):
"""
Loads session data from a JSON file in the SESSION_DIR.
Args:
session_id (str): Unique identifier for the session.
Returns:
dict or None: Session data if file exists, else None.
"""
try:
file_path = os.path.join("/tmp", f'{session_id}.json')
if os.path.exists(file_path):
with open(file_path, 'r') as f:
data = json.load(f)
logger.info(f"Session data loaded for session {session_id}")
return data
else:
logger.warning(f"Session file not found for session {session_id}")
return None
except Exception as e:
logger.exception(f"Failed to load session data for session {session_id}: {e}")
return None
@app.route("/", methods=['GET', 'POST'])
def splash():
if request.method == 'POST':
username = request.form.get('username')
if not username:
logger.warning("Username not provided by the user.")
return render_template('splash.html', error="Please enter a username.")
session_id = shortuuid.uuid()
current_index = 0
answers = []
questions = []
session_data = {'questions': questions,
'answers': answers}
save_session_data(session_id, session_data)
return redirect(url_for('instructions', username=username, current_index=current_index, session_id=session_id))
# GET request - show intro page
logger.info("Splash page rendered.")
return render_template('splash.html')
@app.route('/instructions', methods=['GET', 'POST'])
def instructions():
username = request.args.get('username')
session_id = request.args.get('session_id')
current_index = int(request.args.get('current_index'))
if request.method == 'POST':
# User clicked the "Begin Quiz" button
start_time = time.time()
session_data = load_session_data(session_id)
session_data['time'] = start_time
save_session_data(session_id, session_data)
return redirect(url_for('prep', username=username, current_index=current_index, session_id=session_id))
questions = get_questions(num=10)
session_data = load_session_data(session_id)
session_data['questions'] = questions
save_session_data(session_id, session_data)
if len(questions) == 0:
return render_template('thanks.html')
# If GET, render the final instructions page
return render_template('instructions.html', username=username)
@app.route('/prep', methods=['GET', 'POST'])
def prep():
username = request.args.get('username')
current_index = int(request.args.get('current_index'))
# questions = session.get('questions', [])
session_id = request.args.get('session_id')
session_data = load_session_data(session_id)
questions = session_data['questions']
if request.method == 'POST':
# User clicked "Start" button
# Redirect to the actual question display
return redirect(url_for('prompt', username=username, current_index=current_index, session_id=session_id))
return render_template('prep.html',
question_number=current_index + 1,
total=len(questions))
@app.route('/prompt', methods=['GET', 'POST'])
def prompt():
username = request.args.get('username')
current_index = int(request.args.get('current_index'))
# questions = session.get('questions', [])
session_id = request.args.get('session_id')
session_data = load_session_data(session_id)
questions = session_data['questions']
if request.method == 'POST':
# User clicked "Start" button
# Redirect to the image display
return redirect(url_for('image', username=username, current_index=current_index, session_id=session_id))
question_raw = questions[current_index].get('question', '')
if "count" in question_raw.lower():
idx = question_raw.index('.')
else:
idx = question_raw.index('?')
question = question_raw[:idx+1]
task = questions[current_index].get('task', '')
if task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids':
question = "Count the number of rows and columns."
return render_template('prompt.html',
question_text=question)
@app.route('/image', methods=['GET', 'POST'])
def image():
username = request.args.get('username')
current_index = int(request.args.get('current_index'))
# questions = session.get('questions', [])
session_id = request.args.get('session_id')
session_data = load_session_data(session_id)
questions = session_data['questions']
image_id = str(questions[current_index].get('image_id', 'default_image.png'))
if request.method == 'POST':
task = questions[current_index].get('task', '')
# Move on to the "quiz" route to see if we still have more questions
if task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids':
return redirect(url_for('respond_grid', username=username, current_index=current_index, session_id=session_id))
else:
return redirect(url_for('respond', username=username, current_index=current_index, session_id=session_id))
# If GET, display the current image with a 10-seconds countdown
return render_template('image.html',
image_name=image_id + '.jpg')
@app.route('/respond_grid', methods=['GET', 'POST'])
def respond_grid():
username = request.args.get('username')
current_index = int(request.args.get('current_index'))
session_id = request.args.get('session_id')
session_data = load_session_data(session_id)
questions = session_data['questions']
answers = session_data['answers']
if request.method == 'POST':
response_r = request.form.get('response_r')
response_c = request.form.get('response_c')
question_id = questions[current_index]['image_id']
# Submit the answer
answers.append({"image_id": question_id,
"answer": (response_r, response_c)})
session['answers'] = answers
save_session_data(session_id, session_data)
current_index += 1
if current_index >= len(questions):
# Store the elapsed time
end_time = time.time()
elapsed_time = end_time - session_data['time']
response_all = {'username': username,
'time': elapsed_time,
'responses':answers}
json_data = json.dumps(response_all, indent=4)
file_name = f"answers_{session_id}.json"
temp_file_path = os.path.join("/tmp", file_name)
with open(temp_file_path, 'w') as f:
f.write(json_data)
for file_path in os.listdir("/tmp/"):
if "answers_" in file_path:
hf_api.upload_file(
path_or_fileobj=f'/tmp/{file_path}',
path_in_repo=f"{HF_REPO_PATH}/{file_path}",
repo_id=HF_REPO_ID,
repo_type="space", # Use "dataset" or "space" based on your repo
)
return render_template('thanks.html')
return redirect(url_for('prep', username=username, current_index=current_index, session_id=session_id))
question_raw = questions[current_index].get('question', '')
if "count" in question_raw.lower():
idx = question_raw.index('.')
else:
idx = question_raw.index('?')
question = question_raw[:idx+1]
form = "Enter a number"
return render_template('respond_grid.html',
question_text=question,
instruction=form)
@app.route('/respond', methods=['GET', 'POST'])
def respond():
username = request.args.get('username')
current_index = int(request.args.get('current_index'))
session_id = request.args.get('session_id')
session_data = load_session_data(session_id)
questions = session_data['questions']
answers = session_data['answers']
if request.method == 'POST':
response = request.form.get('response')
question_id = questions[current_index]['image_id']
# Submit the answer
answers.append({"image_id": question_id,
"answer": response})
session['answers'] = answers
save_session_data(session_id, session_data)
current_index += 1
if current_index >= len(questions):
# Store the elapsed time
end_time = time.time()
elapsed_time = end_time - session_data['time']
response_all = {'username': username,
'time': elapsed_time,
'responses':answers}
json_data = json.dumps(response_all, indent=4)
file_name = f"answers_{session_id}.json"
temp_file_path = os.path.join("/tmp", file_name)
with open(temp_file_path, 'w') as f:
f.write(json_data)
for file_path in os.listdir("/tmp/"):
if "answers_" in file_path:
hf_api.upload_file(
path_or_fileobj=f'/tmp/{file_path}',
path_in_repo=f"{HF_REPO_PATH}/{file_path}",
repo_id=HF_REPO_ID,
repo_type="space", # Use "dataset" or "space" based on your repo
)
return render_template('thanks.html')
return redirect(url_for('prep', username=username, current_index=current_index, session_id=session_id))
question_raw = questions[current_index].get('question', '')
if "count" in question_raw.lower():
idx = question_raw.index('.')
else:
idx = question_raw.index('?')
question = question_raw[:idx+1]
task = questions[current_index].get('task', '')
if task == 'Circled Letter':
form = "Enter a letter"
elif task == 'Touching Circles':
form = "Enter Y/N"
else:
form = "Enter a number"
return render_template('respond.html',
question_text=question,
instruction=form)
if __name__ == "__main__":
# Initialize database when running the script
# example_usage()
# Run Flask app
# app.run(debug=False, threaded=True)
app.run(host="0.0.0.0", port=7860, debug=False)