import threading from flask import Flask, g, render_template, request, redirect, url_for, session import os import time from huggingface_hub import login, HfApi, hf_hub_download # For Hugging Face integration import os import logging import csv import random import shortuuid import json import pandas as pd from filelock import FileLock ## new version # Flask application setup app = Flask(__name__) app.config['SECRET_KEY'] = os.environ.get('FLASK_SECRET_KEY') # required for sessions # app.config.update( # SESSION_COOKIE_SAMESITE="None", # allow cross-site # SESSION_COOKIE_SECURE=True # required for "None" # ) logging.basicConfig( level=logging.DEBUG, # Set to DEBUG for more granular logs format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) HF_TOKEN = os.environ.get("HF_TOKEN") if HF_TOKEN: try: login(token=HF_TOKEN) logger.info("Logged into Hugging Face successfully.") except Exception as e: logger.exception(f"Failed to log into Hugging Face: {e}") else: logger.warning("HF_TOKEN not found in environment variables. Session data will not be uploaded.") # Initialize Hugging Face API hf_api = HfApi() HF_REPO_ID = "pooyanrg/resultsBlindTest" # Update as needed HF_REPO_PATH = "responses" QUESTIONS_FILE = "./data/dataset.csv" AVAILABLE_FILE = "/tmp/available.json" LOCK_FILE = "/tmp/available.lock" # Load questions into memory with open(QUESTIONS_FILE, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) all_questions = list(reader) # Track which questions are available with open(AVAILABLE_FILE, "w") as f: json.dump([0, 1, 3, 6, 12, 14, 16, 17, 21, 31, 36, 42, 43, 47, 54, 58, 59, 62, 65, 68, 71, 75, 83, 90, 93, 97, 100, 105, 112, 120, 132, 137, 139, 143, 146, 153, 156, 160, 172, 173, 177, 181, 187, 188, 190, 191, 202, 206, 207, 209, 210, 214, 233, 235, 237, 240, 242, 243, 248, 260, 261, 262, 266, 271, 275, 276, 286, 287, 301, 308, 311, 316, 317, 322, 326, 327, 330, 331, 339, 346, 348, 356, 357, 363, 365, 374, 377, 378, 379, 381, 382, 386, 389, 392, 404, 407, 413, 416, 419, 423, 426, 449, 452, 453, 454, 482, 484, 487, 488, 491, 500, 509, 510, 512, 513, 516, 518, 519, 521, 522, 525, 526, 530, 536, 539, 550, 555], f) def get_questions(num=10): with FileLock(LOCK_FILE): with open(AVAILABLE_FILE, "r") as f: available = json.load(f) if len(available) == 0: return [] # Not enough questions left sample_size = min(num, len(available)) selected_ids = random.sample(available, sample_size) remaining = [qid for qid in available if qid not in selected_ids] with open(AVAILABLE_FILE, "w") as f: json.dump(remaining, f) return [all_questions[qid] | {"id": qid} for qid in selected_ids] def save_session_data(session_id, data): """ Saves session data to a JSON file in the SESSION_DIR. Args: session_id (str): Unique identifier for the session. data (dict): Session data to save. """ try: file_path = os.path.join('/tmp', f'{session_id}.json') with open(file_path, 'w') as f: json.dump(data, f) logger.info(f"Session data saved for session {session_id}") except Exception as e: logger.exception(f"Failed to save session data for session {session_id}: {e}") def load_session_data(session_id): """ Loads session data from a JSON file in the SESSION_DIR. Args: session_id (str): Unique identifier for the session. Returns: dict or None: Session data if file exists, else None. """ try: file_path = os.path.join("/tmp", f'{session_id}.json') if os.path.exists(file_path): with open(file_path, 'r') as f: data = json.load(f) logger.info(f"Session data loaded for session {session_id}") return data else: logger.warning(f"Session file not found for session {session_id}") return None except Exception as e: logger.exception(f"Failed to load session data for session {session_id}: {e}") return None @app.route("/", methods=['GET', 'POST']) def splash(): if request.method == 'POST': username = request.form.get('username') if not username: logger.warning("Username not provided by the user.") return render_template('splash.html', error="Please enter a username.") session_id = shortuuid.uuid() current_index = 0 answers = [] questions = [] session_data = {'questions': questions, 'answers': answers} save_session_data(session_id, session_data) return redirect(url_for('instructions', username=username, current_index=current_index, session_id=session_id)) # GET request - show intro page logger.info("Splash page rendered.") return render_template('splash.html') @app.route('/instructions', methods=['GET', 'POST']) def instructions(): username = request.args.get('username') session_id = request.args.get('session_id') current_index = int(request.args.get('current_index')) if request.method == 'POST': # User clicked the "Begin Quiz" button start_time = time.time() session_data = load_session_data(session_id) session_data['time'] = start_time save_session_data(session_id, session_data) return redirect(url_for('prep', username=username, current_index=current_index, session_id=session_id)) questions = get_questions(num=10) session_data = load_session_data(session_id) session_data['questions'] = questions save_session_data(session_id, session_data) if len(questions) == 0: return render_template('thanks.html') # If GET, render the final instructions page return render_template('instructions.html', username=username) @app.route('/prep', methods=['GET', 'POST']) def prep(): username = request.args.get('username') current_index = int(request.args.get('current_index')) # questions = session.get('questions', []) session_id = request.args.get('session_id') session_data = load_session_data(session_id) questions = session_data['questions'] if request.method == 'POST': # User clicked "Start" button # Redirect to the actual question display return redirect(url_for('prompt', username=username, current_index=current_index, session_id=session_id)) return render_template('prep.html', question_number=current_index + 1, total=len(questions)) @app.route('/prompt', methods=['GET', 'POST']) def prompt(): username = request.args.get('username') current_index = int(request.args.get('current_index')) # questions = session.get('questions', []) session_id = request.args.get('session_id') session_data = load_session_data(session_id) questions = session_data['questions'] if request.method == 'POST': # User clicked "Start" button # Redirect to the image display return redirect(url_for('image', username=username, current_index=current_index, session_id=session_id)) question_raw = questions[current_index].get('question', '') if "count" in question_raw.lower(): idx = question_raw.index('.') else: idx = question_raw.index('?') question = question_raw[:idx+1] task = questions[current_index].get('task', '') if task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids': question = "Count the number of rows and columns." return render_template('prompt.html', question_text=question) @app.route('/image', methods=['GET', 'POST']) def image(): username = request.args.get('username') current_index = int(request.args.get('current_index')) # questions = session.get('questions', []) session_id = request.args.get('session_id') session_data = load_session_data(session_id) questions = session_data['questions'] image_id = str(questions[current_index].get('image_id', 'default_image.png')) if request.method == 'POST': task = questions[current_index].get('task', '') # Move on to the "quiz" route to see if we still have more questions if task == 'Counting Grid - Blank Grids' or task == 'Counting Grid - Word Grids': return redirect(url_for('respond_grid', username=username, current_index=current_index, session_id=session_id)) else: return redirect(url_for('respond', username=username, current_index=current_index, session_id=session_id)) # If GET, display the current image with a 10-seconds countdown return render_template('image.html', image_name=image_id + '.jpg') @app.route('/respond_grid', methods=['GET', 'POST']) def respond_grid(): username = request.args.get('username') current_index = int(request.args.get('current_index')) session_id = request.args.get('session_id') session_data = load_session_data(session_id) questions = session_data['questions'] answers = session_data['answers'] if request.method == 'POST': response_r = request.form.get('response_r') response_c = request.form.get('response_c') question_id = questions[current_index]['image_id'] # Submit the answer answers.append({"image_id": question_id, "answer": (response_r, response_c)}) session['answers'] = answers save_session_data(session_id, session_data) current_index += 1 if current_index >= len(questions): # Store the elapsed time end_time = time.time() elapsed_time = end_time - session_data['time'] response_all = {'username': username, 'time': elapsed_time, 'responses':answers} json_data = json.dumps(response_all, indent=4) file_name = f"answers_{session_id}.json" temp_file_path = os.path.join("/tmp", file_name) with open(temp_file_path, 'w') as f: f.write(json_data) for file_path in os.listdir("/tmp/"): if "answers_" in file_path: hf_api.upload_file( path_or_fileobj=f'/tmp/{file_path}', path_in_repo=f"{HF_REPO_PATH}/{file_path}", repo_id=HF_REPO_ID, repo_type="space", # Use "dataset" or "space" based on your repo ) return render_template('thanks.html') return redirect(url_for('prep', username=username, current_index=current_index, session_id=session_id)) question_raw = questions[current_index].get('question', '') if "count" in question_raw.lower(): idx = question_raw.index('.') else: idx = question_raw.index('?') question = question_raw[:idx+1] form = "Enter a number" return render_template('respond_grid.html', question_text=question, instruction=form) @app.route('/respond', methods=['GET', 'POST']) def respond(): username = request.args.get('username') current_index = int(request.args.get('current_index')) session_id = request.args.get('session_id') session_data = load_session_data(session_id) questions = session_data['questions'] answers = session_data['answers'] if request.method == 'POST': response = request.form.get('response') question_id = questions[current_index]['image_id'] # Submit the answer answers.append({"image_id": question_id, "answer": response}) session['answers'] = answers save_session_data(session_id, session_data) current_index += 1 if current_index >= len(questions): # Store the elapsed time end_time = time.time() elapsed_time = end_time - session_data['time'] response_all = {'username': username, 'time': elapsed_time, 'responses':answers} json_data = json.dumps(response_all, indent=4) file_name = f"answers_{session_id}.json" temp_file_path = os.path.join("/tmp", file_name) with open(temp_file_path, 'w') as f: f.write(json_data) for file_path in os.listdir("/tmp/"): if "answers_" in file_path: hf_api.upload_file( path_or_fileobj=f'/tmp/{file_path}', path_in_repo=f"{HF_REPO_PATH}/{file_path}", repo_id=HF_REPO_ID, repo_type="space", # Use "dataset" or "space" based on your repo ) return render_template('thanks.html') return redirect(url_for('prep', username=username, current_index=current_index, session_id=session_id)) question_raw = questions[current_index].get('question', '') if "count" in question_raw.lower(): idx = question_raw.index('.') else: idx = question_raw.index('?') question = question_raw[:idx+1] task = questions[current_index].get('task', '') if task == 'Circled Letter': form = "Enter a letter" elif task == 'Touching Circles': form = "Enter Y/N" else: form = "Enter a number" return render_template('respond.html', question_text=question, instruction=form) if __name__ == "__main__": # Initialize database when running the script # example_usage() # Run Flask app # app.run(debug=False, threaded=True) app.run(host="0.0.0.0", port=7860, debug=False)