# import os | |
# from flask import Flask, render_template, request, jsonify, send_from_directory | |
# from dotenv import load_dotenv | |
# from supabase import create_client, Client | |
# import openai | |
# app = Flask(__name__) | |
# # Load environment variables | |
# load_dotenv() | |
# SUPABASE_URL = os.getenv("SUPABASE_URL") | |
# SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
# # Initialize Supabase client | |
# supabase = create_client(SUPABASE_URL, SUPABASE_KEY) | |
# @app.route('/') | |
# def index(): | |
# return send_from_directory(os.path.join(app.root_path, 'static'), 'index.html') | |
# @app.route('/login', methods=['POST']) | |
# def login(): | |
# data = request.get_json() | |
# email = data['email'] | |
# password = data['password'] | |
# # Replace with actual login logic using Supabase or another method | |
# try: | |
# user = supabase.auth.sign_in(email=email, password=password) | |
# return jsonify({ | |
# "message": "Login successful", | |
# "user_id": user.id | |
# }) | |
# except Exception as e: | |
# return jsonify({"message": f"Login failed: {e}"}), 400 | |
# @app.route('/signup', methods=['POST']) | |
# def signup(): | |
# data = request.get_json() | |
# email = data['email'] | |
# password = data['password'] | |
# # Replace with actual signup logic using Supabase or another method | |
# try: | |
# user = supabase.auth.sign_up(email=email, password=password) | |
# return jsonify({"message": "Signup successful"}) | |
# except Exception as e: | |
# return jsonify({"message": f"Signup failed: {e}"}), 400 | |
# @app.route('/chat', methods=['POST']) | |
# def chat(): | |
# data = request.get_json() | |
# message = data['message'] | |
# chat_history = data.get('history', []) | |
# # Add your chatbot logic here (using OpenAI or another API) | |
# response = "This is a placeholder response" # Replace with chatbot response logic | |
# chat_history.append(f"User: {message}") | |
# chat_history.append(f"Bot: {response}") | |
# return jsonify({ | |
# "bot_response": response, | |
# "history": chat_history | |
# }) | |
# if __name__ == '__main__': | |
# app.run(debug=True, host='0.0.0.0', port=7860) | |
# import os | |
# from flask import Flask, send_from_directory, request, redirect, url_for, session | |
# from supabase import create_client | |
# import openai | |
# from dotenv import load_dotenv | |
# import json | |
# from json import jsonify | |
# # Load environment variables | |
# load_dotenv() | |
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
# SUPABASE_URL = os.getenv("SUPABASE_URL") | |
# SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
# # Initialize OpenAI and Supabase clients | |
# openai.api_key = OPENAI_API_KEY | |
# supabase = create_client(SUPABASE_URL, SUPABASE_KEY) | |
# # Flask app setup | |
# app = Flask(__name__) | |
# app.secret_key = os.getenv('FLASK_SECRET_KEY') | |
# # Global variables | |
# required_slots = { | |
# "patient_name": "May I know your full name?", | |
# "date_of_birth": "What is your date of birth?", | |
# "country_of_birth": "Where were you born?", | |
# "sex": "What is your biological sex?", | |
# "gender": "What gender do you identify as?", | |
# "race": "What is your racial background?", | |
# "school_behavior": "How was your behavior at school?", | |
# "school_grades": "How were your grades at school?", | |
# "school_failures": "Did you fail any class or year in school?", | |
# "school_behavior_issues": "Did you have problems at school due to your behavior?", | |
# "education_level": "What is your highest level of education?", | |
# "citizenship_status": "What is your citizenship or immigration status?", | |
# "marital_status": "What is your current relationship status?", | |
# "offsprings": "Do you have children? If yes, how many?", | |
# "living_situation": "What is your current living situation?", | |
# "employment_status": "What is your employment status?", | |
# "income_source": "What is your main source of income?", | |
# "legal_issues": "Do you have any history of legal issues?" | |
# } | |
# # Routes | |
# @app.route('/') | |
# def index(): | |
# return send_from_directory('static', 'index.html') | |
# # Signup route | |
# @app.route('/signup', methods=['POST']) | |
# def signup(): | |
# """Handle user signup.""" | |
# email = request.form.get('email') | |
# password = request.form.get('password') | |
# try: | |
# # Correctly pass email and password as part of the dictionary | |
# user = supabase.auth.sign_up({ | |
# "email": email, | |
# "password": password | |
# }) | |
# return redirect(url_for('login')) | |
# except Exception as e: | |
# return f"Signup failed: {e}" | |
# @app.route('/login', methods=['GET', 'POST']) | |
# def login(): | |
# if request.method == 'POST': | |
# email = request.form.get('email') | |
# password = request.form.get('password') | |
# try: | |
# # Attempt to log in with Supabase | |
# result = supabase.auth.sign_in_with_password({"email": email, "password": password}) | |
# # Store user_id in session after login | |
# session['user_id'] = result.user.id | |
# session['user_email'] = result.user.email | |
# # Redirect to the chat page after login | |
# return redirect(url_for('chat')) | |
# except Exception as e: | |
# # If login fails, show the error message | |
# return f"Login failed: {e}" | |
# # For GET request, show the login page | |
# return send_from_directory('static', 'login.html') | |
# @app.route('/chat', methods=['GET', 'POST']) | |
# def chat(): | |
# if 'user_id' not in session: # Check if the user is logged in | |
# return redirect(url_for('login')) # If not, redirect to login page | |
# # If user is logged in, show the chat page | |
# user_id = session.get('user_id') | |
# history = [] | |
# if request.method == 'POST': | |
# user_message = request.form.get('message') | |
# history.append({"role": "user", "content": user_message}) | |
# # OpenAI API call to get the response from the model | |
# response = openai.ChatCompletion.create( | |
# model="gpt-4o", | |
# messages=history, | |
# temperature=0.7 | |
# ) | |
# bot_response = response.choices[0].message['content'] | |
# history.append({"role": "assistant", "content": bot_response}) | |
# return send_from_directory('static', 'chat.html', | |
# mimetype='text/html', | |
# headers={'history': history, 'user_message': user_message, 'bot_response': bot_response}) | |
# # Render the chat page | |
# return send_from_directory('static', 'chat.html') | |
# @app.route('/test_session') | |
# def test_session(): | |
# return jsonify({"session": dict(session)}) | |
# # Visit `/test_session` in your browser to check session data | |
# @app.route('/logout') | |
# def logout(): | |
# session.clear() # Clear the session | |
# return redirect(url_for('login')) # Redirect to login after logout | |
# # Run the app | |
# if __name__ == "__main__": | |
# app.run(debug=True, host='0.0.0.0', port=7860) | |
# import os | |
# from flask import Flask, send_from_directory, request, redirect, url_for, session | |
# from flask_session import Session | |
# from supabase import create_client | |
# import openai | |
# from dotenv import load_dotenv | |
# # Load environment variables | |
# load_dotenv() | |
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
# SUPABASE_URL = os.getenv("SUPABASE_URL") | |
# SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
# # Initialize OpenAI and Supabase clients | |
# openai.api_key = OPENAI_API_KEY | |
# supabase = create_client(SUPABASE_URL, SUPABASE_KEY) | |
# # Flask app setup | |
# app = Flask(__name__) | |
# app.secret_key = os.getenv('FLASK_SECRET_KEY') # Set secret key | |
# app.config['SESSION_TYPE'] = 'filesystem' # Use filesystem to store session data | |
# app.config['SESSION_PERMANENT'] = False # Make the session non-permanent | |
# Session(app) | |
# # Global variables (slot filling) | |
# required_slots = { | |
# "patient_name": "May I know your full name?", | |
# "date_of_birth": "What is your date of birth?", | |
# "country_of_birth": "Where were you born?", | |
# "sex": "What is your biological sex?", | |
# "gender": "What gender do you identify as?", | |
# "race": "What is your racial background?", | |
# "school_behavior": "How was your behavior at school?", | |
# "school_grades": "How were your grades at school?", | |
# "school_failures": "Did you fail any class or year in school?", | |
# "school_behavior_issues": "Did you have problems at school due to your behavior?", | |
# "education_level": "What is your highest level of education?", | |
# "citizenship_status": "What is your citizenship or immigration status?", | |
# "marital_status": "What is your current relationship status?", | |
# "offsprings": "Do you have children? If yes, how many?", | |
# "living_situation": "What is your current living situation?", | |
# "employment_status": "What is your employment status?", | |
# "income_source": "What is your main source of income?", | |
# "legal_issues": "Do you have any history of legal issues?" | |
# } | |
# # Routes | |
# @app.route('/') | |
# def index(): | |
# # Welcome page with buttons to Log In and Sign Up | |
# return send_from_directory('static', 'index.html') | |
# @app.route('/signup', methods=['GET', 'POST']) | |
# def signup(): | |
# """Handle user signup.""" | |
# if request.method == 'POST': | |
# email = request.form.get('email') | |
# password = request.form.get('password') | |
# try: | |
# # Sign up logic with Supabase | |
# user = supabase.auth.sign_up({ | |
# "email": email, | |
# "password": password | |
# }) | |
# # After successful signup, log in the user and redirect to the chat page | |
# session['user_id'] = user.id | |
# session['user_email'] = user.email | |
# return redirect(url_for('chat')) | |
# except Exception as e: | |
# return f"Signup failed: {e}" | |
# # For GET request, show the signup page | |
# return send_from_directory('static', 'signup.html') | |
# @app.route('/login', methods=['GET', 'POST']) | |
# def login(): | |
# # """Handle user login.""" | |
# # if request.method == 'POST': | |
# # email = request.form.get('email') | |
# # password = request.form.get('password') | |
# # try: | |
# # # Attempt to log in with Supabase | |
# # result = supabase.auth.sign_in_with_password({"email": email, "password": password}) | |
# # # Store user_id in session after login | |
# # session['user_id'] = result.user.id | |
# # session['user_email'] = result.user.email | |
# # print(f"This is Login page User logged in: {session['user_email']}") | |
# # # Redirect to the chat page after successful login | |
# # return redirect(url_for('chat')) | |
# # except Exception as e: | |
# # # If login fails, show the error message | |
# # return f"Login failed: {e}" | |
# # # For GET request, show the login page | |
# # return send_from_directory('static', 'login.html') | |
# """Handle user login.""" | |
# if request.method == 'POST': | |
# email = request.form.get('email') | |
# password = request.form.get('password') | |
# try: | |
# # Attempt to log in with Supabase (using the correct method) | |
# result = supabase.auth.sign_in_with_password({"email": email, "password": password}) | |
# # Check if the login was successful | |
# if result.user: | |
# # Store user_id in session after login | |
# session['user_id'] = result.user.id | |
# session['user_email'] = result.user.email | |
# # Debugging: Print session data to check if it's set | |
# print(f"User logged in: {session['user_email']}") | |
# print(f"Session data in login page: {session}") | |
# # Redirect to the chat page after successful login | |
# return redirect(url_for('chat')) | |
# else: | |
# return "Invalid credentials, please try again." | |
# except Exception as e: | |
# return f"Login failed: {e}" | |
# # For GET request, show the login page | |
# return send_from_directory('static', 'login.html') | |
# # Chat route | |
# @app.route('/chat', methods=['GET', 'POST']) | |
# def chat(): | |
# """Handle chat interactions.""" | |
# # Check if user is logged in | |
# print(f"Checking session for user_id in chat page : {session.get('user_id')}") | |
# print(f"Session data in chat page: {session}") | |
# # if 'user_id' not in session: | |
# # return redirect(url_for('login')) # Redirect to login page if not logged in | |
# # If user is logged in, show the chat page | |
# print(f"User accessing chat: {session.get('user_email')}") | |
# user_id = session.get('user_id') | |
# history = [] | |
# if request.method == 'POST': | |
# user_message = request.form.get('message') | |
# history.append({"role": "user", "content": user_message}) | |
# # OpenAI API call to get the response from the model | |
# response = openai.ChatCompletion.create( | |
# model="gpt-4o", | |
# messages=history, | |
# temperature=0.7 | |
# ) | |
# bot_response = response.choices[0].message['content'] | |
# history.append({"role": "assistant", "content": bot_response}) | |
# return send_from_directory('static', 'chat.html', | |
# mimetype='text/html', | |
# headers={'history': history, 'user_message': user_message, 'bot_response': bot_response}) | |
# # Render the chat page | |
# return send_from_directory('static', 'chat.html') | |
# # Logout route | |
# @app.route('/logout') | |
# def logout(): | |
# session.clear() # Clear the session | |
# return redirect(url_for('login')) # Redirect to login after logout | |
# # Run the app | |
# if __name__ == "__main__": | |
# app.run(debug=True, host='0.0.0.0', port=7860) | |
# import os | |
# from flask import Flask, render_template, request, redirect, url_for, session | |
# from flask_session import Session | |
# from supabase import create_client | |
# import openai | |
# from dotenv import load_dotenv | |
# # Load environment variables | |
# load_dotenv() | |
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
# SUPABASE_URL = os.getenv("SUPABASE_URL") | |
# SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
# # Initialize OpenAI and Supabase clients | |
# openai.api_key = OPENAI_API_KEY | |
# supabase = create_client(SUPABASE_URL, SUPABASE_KEY) | |
# # Flask app setup | |
# app = Flask(__name__, static_folder="static", template_folder="templates") | |
# app.secret_key = os.getenv('FLASK_SECRET_KEY') # Set secret key | |
# app.config['SESSION_TYPE'] = 'filesystem' # Use filesystem to store session data | |
# app.config['SESSION_PERMANENT'] = False # Make the session non-permanent | |
# Session(app) # Initialize Flask-Session | |
# # Global variables (slot filling) | |
# required_slots = { | |
# "patient_name": "May I know your full name?", | |
# "date_of_birth": "What is your date of birth?", | |
# "country_of_birth": "Where were you born?", | |
# "sex": "What is your biological sex?", | |
# "gender": "What gender do you identify as?", | |
# "race": "What is your racial background?", | |
# "school_behavior": "How was your behavior at school?", | |
# "school_grades": "How were your grades at school?", | |
# "school_failures": "Did you fail any class or year in school?", | |
# "school_behavior_issues": "Did you have problems at school due to your behavior?", | |
# "education_level": "What is your highest level of education?", | |
# "citizenship_status": "What is your citizenship or immigration status?", | |
# "marital_status": "What is your current relationship status?", | |
# "offsprings": "Do you have children? If yes, how many?", | |
# "living_situation": "What is your current living situation?", | |
# "employment_status": "What is your employment status?", | |
# "income_source": "What is your main source of income?", | |
# "legal_issues": "Do you have any history of legal issues?" | |
# } | |
# # Routes | |
# @app.route("/") | |
# def index(): | |
# return render_template("index.html", title="Welcome") | |
# # Signup route | |
# @app.route('/signup', methods=['GET', 'POST']) | |
# def signup(): | |
# """Handle user signup.""" | |
# if request.method == 'POST': | |
# email = request.form.get('email') | |
# password = request.form.get('password') | |
# try: | |
# # Sign up logic with Supabase | |
# user = supabase.auth.sign_up({ | |
# "email": email, | |
# "password": password | |
# }) | |
# # After successful signup, log in the user and redirect to the chat page | |
# session['user_id'] = user.id | |
# session['user_email'] = user.email | |
# return redirect(url_for('chat')) | |
# except Exception as e: | |
# return f"Signup failed: {e}" | |
# # For GET request, show the signup page | |
# return render_template('signup.html') | |
# # Login route | |
# @app.route('/login', methods=['GET', 'POST']) | |
# def login(): | |
# """Handle user login.""" | |
# if request.method == 'POST': | |
# email = request.form.get('email') | |
# password = request.form.get('password') | |
# try: | |
# # Attempt to log in with Supabase | |
# result = supabase.auth.sign_in_with_password({"email": email, "password": password}) | |
# # Store user_id in session after login | |
# session['user_id'] = result.user.id | |
# session['user_email'] = result.user.email | |
# # Debugging: Print session data to check if it's set | |
# print(f"User logged in: {session['user_email']}") | |
# # Redirect to the chat page after successful login | |
# return redirect(url_for('chat')) | |
# except Exception as e: | |
# return f"Login failed: {e}" | |
# # For GET request, show the login page | |
# return render_template('login.html') | |
# # Chat route | |
# @app.route('/chat', methods=['GET', 'POST']) | |
# def chat(): | |
# """Handle chat interactions.""" | |
# # if 'user_id' not in session: | |
# # return redirect(url_for('login')) # Redirect to login page if not logged in | |
# user_id = session.get('user_id') | |
# history = session.get('chat_history', []) | |
# # Initial system prompt (context for the chatbot) | |
# system_prompt = { | |
# "role": "system", | |
# "content": """The following is a conversation with an AI assistant that can have meaningful conversations with users. | |
# You are a clinical assistant who assesses patients' mental health by asking relevant questions. | |
# You are helpful, empathetic, and friendly. Your objective is to make the user feel better by feeling heard. | |
# Focus solely on evaluating and supporting the user’s mental health: ask about their feelings, symptoms, coping strategies, and well-being.""" | |
# } | |
# if request.method == 'POST': | |
# user_message = request.form.get('message') | |
# history.append({"role": "user", "content": user_message}) | |
# # Combine history with system prompt | |
# messages = [system_prompt] + [{"role": msg['role'], "content": msg['content']} for msg in history] | |
# # OpenAI API call to get the response from the model | |
# response = openai.ChatCompletion.create( | |
# model="gpt-4o", | |
# messages=messages, | |
# temperature=0.7 | |
# ) | |
# bot_response = response.choices[0].message['content'] | |
# history.append({"role": "assistant", "content": bot_response}) | |
# # Store the updated history in the session | |
# session['chat_history'] = history | |
# return render_template('chat.html', history=history) | |
# # Render the chat page | |
# return render_template('chat.html', history=history) | |
# # Logout route | |
# @app.route('/logout') | |
# def logout(): | |
# session.clear() # Clear the session | |
# return redirect(url_for('login')) # Redirect to login after logout | |
# # Run the app | |
# if __name__ == "__main__": | |
# app.run(debug=True, host='0.0.0.0', port=7860) | |
# import os | |
# from flask import Flask, render_template, request, redirect, url_for, session, jsonify | |
# from flask_session import Session | |
# from dotenv import load_dotenv | |
# from supabase import create_client, Client | |
# from openai import OpenAI | |
# # =============== Env & Clients =============== | |
# load_dotenv() | |
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
# SUPABASE_URL = os.getenv("SUPABASE_URL") | |
# SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
# FLASK_SECRET_KEY = os.getenv("FLASK_SECRET_KEY", "dev-secret") | |
# if not OPENAI_API_KEY: | |
# raise RuntimeError("OPENAI_API_KEY is not set") | |
# if not SUPABASE_URL or not SUPABASE_KEY: | |
# raise RuntimeError("SUPABASE_URL or SUPABASE_KEY is not set") | |
# client = OpenAI(api_key=OPENAI_API_KEY) | |
# supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
# # =============== Flask App =============== | |
# app = Flask(__name__, static_folder="static", template_folder="templates") | |
# app.secret_key = FLASK_SECRET_KEY | |
# app.config["SESSION_TYPE"] = "filesystem" | |
# app.config["SESSION_PERMANENT"] = False | |
# app.config["SESSION_FILE_DIR"] = os.path.join("/tmp", "flask_session") | |
# os.makedirs(app.config["SESSION_FILE_DIR"], exist_ok=True) | |
# Session(app) | |
# # =============== Chat Config =============== | |
# SYSTEM_PROMPT = { | |
# "role": "system", | |
# "content": ( | |
# "You are a clinical assistant who assesses patients' mental health by asking relevant questions. " | |
# "You are helpful, empathetic, and friendly. Focus solely on evaluating and supporting the user's mental health: " | |
# "ask about feelings, symptoms, coping strategies, and well-being. Keep responses under 120 words." | |
# ), | |
# } | |
# # =============== Helpers =============== | |
# def get_history() -> list[dict]: | |
# """Return chat history stored in session (list of {role, content}).""" | |
# if "chat_history" not in session: | |
# session["chat_history"] = [] | |
# return session["chat_history"] | |
# def set_history(history: list[dict]) -> None: | |
# session["chat_history"] = history | |
# # =============== Routes =============== | |
# @app.route("/") | |
# def index(): | |
# return render_template("index.html", title="Welcome") | |
# @app.route("/login", methods=["GET", "POST"]) | |
# def login(): | |
# if request.method == "POST": | |
# email = request.form.get("email", "").strip() | |
# password = request.form.get("password", "") | |
# try: | |
# resp = supabase.auth.sign_in_with_password({"email": email, "password": password}) | |
# # supabase-py v2 returns an object with .user | |
# if not resp.user: | |
# return render_template("login.html", title="Login", error="Invalid credentials.") | |
# session["user_id"] = resp.user.id | |
# session["user_email"] = resp.user.email | |
# return redirect(url_for("chat")) | |
# except Exception as e: | |
# return render_template("login.html", title="Login", error=str(e)) | |
# return render_template("login.html", title="Login") | |
# @app.route("/signup", methods=["GET", "POST"]) | |
# def signup(): | |
# if request.method == "POST": | |
# email = request.form.get("email", "").strip() | |
# password = request.form.get("password", "") | |
# try: | |
# resp = supabase.auth.sign_up({"email": email, "password": password}) | |
# if not resp.user: | |
# return render_template("signup.html", title="Sign Up", error="Signup failed.") | |
# session["user_id"] = resp.user.id | |
# session["user_email"] = resp.user.email | |
# return redirect(url_for("chat")) | |
# except Exception as e: | |
# return render_template("signup.html", title="Sign Up", error=str(e)) | |
# return render_template("signup.html", title="Sign Up") | |
# @app.route("/chat", methods=["GET"]) | |
# def chat(): | |
# # Optional gate: | |
# # if "user_id" not in session: return redirect(url_for("login")) | |
# history = get_history() | |
# return render_template("chat.html", history=history, title="Chat") | |
# @app.post("/message") | |
# def message(): | |
# """JSON endpoint consumed by static/script.js""" | |
# data = request.get_json(force=True) or {} | |
# user_text = (data.get("message") or "").strip() | |
# if not user_text: | |
# return jsonify({"error": "Empty message"}), 400 | |
# history = get_history() | |
# # Append user turn | |
# history.append({"role": "user", "content": user_text}) | |
# # Build messages window to keep token usage reasonable | |
# recent = history[-10:] | |
# messages = [SYSTEM_PROMPT] + recent | |
# try: | |
# resp = client.chat.completions.create( | |
# model="gpt-4o-mini", | |
# messages=messages, | |
# temperature=0.6, | |
# ) | |
# bot_reply = resp.choices[0].message.content | |
# except Exception as e: | |
# bot_reply = f"Oops, I couldn't reach the AI service: {e}" | |
# # Append assistant turn & persist in session | |
# history.append({"role": "assistant", "content": bot_reply}) | |
# set_history(history) | |
# return jsonify({"reply": bot_reply}) | |
# @app.route("/logout") | |
# def logout(): | |
# session.clear() | |
# return redirect(url_for("login")) | |
# # =============== Main =============== | |
# if __name__ == "__main__": | |
# # Hugging Face Spaces expects the app to listen on 7860 | |
# app.run(host="0.0.0.0", port=7860, debug=True) | |
import os | |
import re | |
import json | |
import logging | |
from datetime import datetime | |
import base64 | |
import uuid | |
from datetime import date | |
from flask import Flask, render_template, request, redirect, url_for, session, jsonify | |
from flask_session import Session | |
from dotenv import load_dotenv | |
from supabase import create_client, Client | |
from openai import OpenAI | |
from google.cloud import dialogflowcx_v3 as dfx | |
from google.oauth2 import service_account | |
from google.protobuf import struct_pb2 | |
from google.api_core.client_options import ClientOptions | |
# =============== Env & Clients =============== | |
load_dotenv() | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
SUPABASE_URL = os.getenv("SUPABASE_URL") | |
SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
FLASK_SECRET_KEY = os.getenv("FLASK_SECRET_KEY", "dev-secret") | |
DF_PROJECT = os.getenv("DIALOGFLOW_PROJECT_ID") | |
DF_LOCATION = os.getenv("DIALOGFLOW_LOCATION", "us-central1") | |
DF_AGENT_ID = os.getenv("DIALOGFLOW_AGENT_ID", "") | |
DF_BEARER = os.getenv("DF_WEBHOOK_BEARER", "") | |
YES = re.compile(r'^(y|ya|yes|yep|yeah|sure|ok|okay|affirmative|i agree|consent)\.?$', re.I) | |
NO = re.compile(r'^(n|no|nope|nah|not now|decline|don\'t|do not)\.?$', re.I) | |
if not OPENAI_API_KEY: | |
raise RuntimeError("OPENAI_API_KEY is not set") | |
if not SUPABASE_URL or not SUPABASE_KEY: | |
raise RuntimeError("SUPABASE_URL or SUPABASE_KEY is not set") | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
# =============== Logging =============== | |
# Logs go to STDOUT (visible in Spaces logs) | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
) | |
log = logging.getLogger("app") | |
# =============== Flask App =============== | |
app = Flask(__name__, static_folder="static", template_folder="templates") | |
app.secret_key = FLASK_SECRET_KEY | |
app.config["SESSION_TYPE"] = "filesystem" | |
app.config["SESSION_PERMANENT"] = False | |
app.config["SESSION_FILE_DIR"] = os.path.join("/tmp", "flask_session") | |
os.makedirs(app.config["SESSION_FILE_DIR"], exist_ok=True) | |
Session(app) | |
app.config.update( | |
SESSION_COOKIE_NAME="mh_session", | |
SESSION_COOKIE_HTTPONLY=True, | |
SESSION_COOKIE_SAMESITE="None", | |
SESSION_COOKIE_SECURE=True, # HTTPS on Spaces | |
) | |
def _log_request(): | |
log.info( | |
"REQ %s %s ip=%s ua=%s", | |
request.method, | |
request.path, | |
request.headers.get("X-Forwarded-For", request.remote_addr), | |
request.headers.get("User-Agent", "-"), | |
) | |
if request.is_json: | |
try: | |
log.info("REQ JSON: %s", json.dumps(request.get_json(force=False), ensure_ascii=False)) | |
except Exception: | |
pass | |
def _log_response(resp): | |
log.info("RESP %s %s status=%s", request.method, request.path, resp.status_code) | |
return resp | |
# =============== Chat Config =============== | |
SYSTEM_PROMPT = { | |
"role": "system", | |
"content": ( | |
"You are a clinical assistant who assesses patients' mental health by asking relevant questions. " | |
"You are helpful, empathetic, and friendly. Focus solely on evaluating and supporting the user's mental health: " | |
"ask about feelings, symptoms, coping strategies, and well-being. Keep responses under 120 words." | |
), | |
} | |
# =============== Helpers =============== | |
def _auth_ok(req): | |
# Accept either Authorization: Bearer <token> or X-DF-Token: <token> | |
auth = req.headers.get("Authorization", "") | |
if auth == f"Bearer {DF_BEARER}": | |
return True | |
return False | |
def get_history() -> list[dict]: | |
"""Return chat history stored in session (list of {role, content}).""" | |
hist = session.get("chat_history") | |
if not isinstance(hist, list): | |
hist = [] | |
session["chat_history"] = hist | |
return hist | |
def set_history(history: list[dict]) -> None: | |
session["chat_history"] = history | |
def safe_email() -> str: | |
return session.get("user_email") or "anonymous@local" | |
def needs_socio_row(user_id: str) -> bool: | |
try: | |
res = (supabase.table("Patient Table") | |
.select("patient_id", count="exact") # ask PostgREST to count | |
.eq("patient_id", user_id) | |
.limit(1) | |
.execute()) | |
log.info("Supabase sociodemographic check for %s: %s", user_id, res) | |
# Normal case: | |
if res is not None and getattr(res, "count", None) is not None: | |
return res.count == 0 | |
# Fallbacks: | |
rows = (getattr(res, "data", None) or []) | |
return len(rows) == 0 | |
except Exception as e: | |
app.logger.exception("Supabase sociodemographic check failed: %s", e) | |
return True # be conservative: collect intake | |
def _cx_endpoint(location: str) -> str: | |
# CX requires regional endpoints (except 'global') | |
return "dialogflow.googleapis.com" if location == "global" else f"{location}-dialogflow.googleapis.com" | |
def _load_df_credentials(): | |
raw = os.getenv("DIALOGFLOW_KEY", "") | |
if not raw: | |
raise RuntimeError("DIALOGFLOW_KEY not set") | |
# Try JSON object first | |
try: | |
info = json.loads(raw) | |
return service_account.Credentials.from_service_account_info( | |
info, | |
scopes=["https://www.googleapis.com/auth/dialogflow"] | |
) | |
except Exception: | |
pass | |
# Try base64 JSON | |
try: | |
decoded = base64.b64decode(raw).decode("utf-8") | |
info = json.loads(decoded) | |
return service_account.Credentials.from_service_account_info( | |
info, | |
scopes=["https://www.googleapis.com/auth/dialogflow"] | |
) | |
except Exception: | |
pass | |
# Fall back: treat as file path | |
if os.path.exists(raw): | |
return service_account.Credentials.from_service_account_file( | |
raw, | |
scopes=["https://www.googleapis.com/auth/dialogflow"] | |
) | |
raise RuntimeError("DIALOGFLOW_KEY must be service account JSON (plain or base64) or a valid file path.") | |
_creds = _load_df_credentials() | |
# ---- Build CX Sessions client and agent path | |
_df_sessions = dfx.SessionsClient(credentials=_creds, client_options=ClientOptions(api_endpoint=_cx_endpoint(DF_LOCATION))) | |
DF_AGENT_PATH = f"projects/{DF_PROJECT}/locations/{DF_LOCATION}/agents/{DF_AGENT_ID}" | |
def cx_texts(resp) -> str: | |
"""Flatten Dialogflow CX response_messages into a single string.""" | |
if not resp or not getattr(resp, "query_result", None): | |
return "…" | |
texts = [] | |
for m in resp.query_result.response_messages: | |
if getattr(m, "text", None) and m.text.text: | |
# m.text.text is a list of strings: take them all and join | |
texts.extend([t for t in m.text.text if t]) | |
return "\n\n".join(texts).strip() or "…" | |
def _mk_query_params(params: dict | None): | |
if not params: | |
return None | |
s = struct_pb2.Struct() | |
# Only put JSON-serializable values (str/int/bool/float/list/dict); skip None | |
for k, v in params.items(): | |
if v is not None: | |
s.update({k: v}) | |
return dfx.QueryParameters(parameters=s) | |
def cx_detect_text(text: str, session_id: str, params: dict | None = None): | |
session_path = f"{DF_AGENT_PATH}/sessions/{session_id or 'anon'}" | |
qp = _mk_query_params(params) | |
return _df_sessions.detect_intent( | |
request=dfx.DetectIntentRequest( | |
session=session_path, | |
query_input=dfx.QueryInput( | |
text=dfx.TextInput(text=text), | |
language_code="en", | |
), | |
query_params=qp, | |
) | |
) | |
def cx_trigger_event(event_name: str, session_id: str, params: dict | None = None): | |
session_path = f"{DF_AGENT_PATH}/sessions/{session_id or 'anon'}" | |
qp = _mk_query_params(params) | |
return _df_sessions.detect_intent( | |
request=dfx.DetectIntentRequest( | |
session=session_path, | |
query_input=dfx.QueryInput( | |
event=dfx.EventInput(event=event_name), | |
language_code="en", | |
), | |
query_params=qp, | |
) | |
) | |
def is_yes_no(s: str) -> bool: | |
t = (s or "").strip() | |
return bool(YES.match(t) or NO.match(t)) | |
def ensure_cx_session(): | |
if not session.get("cx_session_id"): | |
session["cx_session_id"] = f'{session.get("user_id","anon")}-{uuid.uuid4().hex}' | |
session["kicked_socio"] = False | |
def classify_with_openai(text:str)->dict: | |
try: | |
r = client.chat.completions.create( | |
model="gpt-4o-mini", | |
temperature=0, | |
response_format={"type":"json_object"}, | |
messages=[{ | |
"role":"system", | |
"content":( | |
"Return JSON with fields: kind ∈ {on_topic, smalltalk}, " | |
"and note (≤30 words, empathetic, no clinical advice). " | |
"If user asks casual or unrelated questions → smalltalk." | |
) | |
},{"role":"user","content":text}] | |
) | |
return json.loads(r.choices[0].message.content) | |
except Exception: | |
return {"kind":"on_topic","note":""} | |
# =============== Routes =============== | |
def health(): | |
return jsonify({"ok": True, "time": datetime.utcnow().isoformat() + "Z"}) | |
def index(): | |
return render_template("index.html", title="Welcome") | |
def dialogflow_webhook(): | |
if not _auth_ok(request): | |
return ("Unauthorized", 401) | |
log.info("Dialogflow webhook called with auth OK") | |
body = request.get_json(force=True) or {} | |
tag = ((body.get("fulfillmentInfo") or {}).get("tag")) or "" | |
params = (body.get("sessionInfo") or {}).get("parameters") or {} | |
user_id = params.get("user_id") | |
log.info("Dialogflow webhook called with tag=%s, user_id=%s", tag, user_id) | |
log.debug("Dialogflow webhook params: %s", json.dumps(params, ensure_ascii=False)) | |
# --- helpers --- | |
def join_text(x): | |
if x is None: return None | |
if isinstance(x, list): return ", ".join([str(v) for v in x if v is not None]) | |
return str(x) | |
def yes_no(v): | |
if isinstance(v, bool): return "yes" if v else "no" | |
s = str(v or "").strip().lower() | |
return "yes" if s in {"y","yes","true","1","sure","ok","okay","affirmative"} else ("no" if s in {"n","no","false","0","nah","nope","negative"} else None) | |
def only_digits(x): | |
if x is None: return None | |
s = re.sub(r"\D+", "", str(x)) | |
return int(s) if s else None | |
def iso_date(s): | |
# keep as given; DB column is varchar. If you want strict ISO, coerce here. | |
return str(s) if s else None | |
def calc_age(dob_str): | |
try: | |
y, m, d = [int(t) for t in str(dob_str)[:10].split("-")] | |
today = date.today() | |
a = today.year - y - ((today.month, today.day) < (m, d)) | |
return a | |
except Exception: | |
return None | |
# --- consent logging (optional example) --- | |
if tag == "record_consent": | |
try: | |
supabase.table("consent_log").insert({ | |
"user_id": user_id, | |
"consent": (params.get("consent") == "yes"), | |
}).execute() | |
except Exception as e: | |
log.exception("Failed to record consent: %s", e) | |
return {"sessionInfo": {"parameters": params}} | |
# --- main save --- | |
if tag == "save_socio": | |
if not user_id: | |
# Without a patient_id we can't satisfy DB constraints -> ask CX to reprompt via INVALID (or handle upstream) | |
return { | |
"sessionInfo": {"parameters": params}, | |
"fulfillment_response": {"messages": [{"text": {"text": [ | |
"I’m missing your account id. Please log in again." | |
]}}]} | |
} | |
# Collect inputs with tolerant keys (use whichever your CX params are using) | |
full_name = params.get("patient_name") or params.get("full_name") or params.get("name") | |
dob = params.get("date_of_birth") | |
phone_in = params.get("patient_phone") or params.get("phone_number") | |
country = params.get("country_of_birth") | |
sex_val = params.get("sex") or params.get("sex_assigned_at_birth") | |
gender_val = params.get("gender") or params.get("gender_identity") | |
race_val = params.get("race") or params.get("race_ethnicity") | |
sch_beh = params.get("school_performance_behavior") or params.get("school_behavior") | |
grades = params.get("grades_overall") or params.get("school_grades") | |
failed = params.get("failed_class_or_year") or params.get("school_failures") | |
# beh_probs = params.get("school_behavior_problems") or params.get("school_behavior_issues") | |
edu = params.get("highest_education") or params.get("education_level") | |
citizen = params.get("citizenship_status") | |
marital = params.get("marital_status") | |
kids = params.get("offsprings") or params.get("offspring_count") | |
living = params.get("living_situation") | |
employment = params.get("employment_status") | |
income = params.get("income_source") | |
legal_hist = params.get("legal_issues_history") or params.get("legal_issues") | |
# legal_det = params.get("legal_issues_details") | |
# Normalize | |
dob_txt = iso_date(dob) | |
age_val = params.get("age") or (calc_age(dob_txt) if dob_txt else None) | |
phone_num = only_digits(phone_in) | |
race_txt = join_text(race_val) | |
income_txt= join_text(income) | |
fail_txt = yes_no(failed) | |
# beh_txt = yes_no(beh_probs) | |
legal_txt = yes_no(legal_hist) | |
# If you want details captured, append when legal is yes | |
# if legal_txt == "yes" and legal_det: | |
# legal_txt = f"yes - {str(legal_det)}" | |
row = { | |
"patient_id": user_id, # NOT NULL, PK | |
"patient_name": full_name, | |
"age": age_val, | |
"date_of_birth": dob_txt, # varchar in your schema | |
"phone": phone_num, | |
"country_of_birth": country, | |
"sex": sex_val, | |
"gender": gender_val, | |
"race": race_txt, | |
"school_behavior": sch_beh, | |
"school_grades": grades, | |
"school_failures": fail_txt, # "yes"/"no" | |
# "school_behavior_issues": beh_txt, # "yes"/"no" | |
"education_level": edu, | |
"citizenship_status": citizen, | |
"marital_status": marital, | |
"offsprings": str(kids) if kids is not None else None, | |
"living_situation": living, | |
"employment_status": employment, | |
"income_source": income_txt, | |
"legal_issues": legal_txt, | |
"patient_phone": phone_num, # also fill the duplicate column | |
} | |
# Upsert by patient_id | |
try: | |
supabase.table("Patient Table").upsert(row, on_conflict="patient_id").execute() | |
msg = "Thanks — I’ve saved your information." | |
except Exception as e: | |
log.exception('Supabase upsert failed: %s', e) | |
msg = "I had trouble saving your answers, but we can continue." | |
# Optionally reflect normalized values back into the session | |
params.update({ | |
"age": age_val, | |
"date_of_birth": dob_txt, | |
"phone": phone_num, | |
"patient_phone": phone_num, | |
"race": race_txt, | |
"income_source": income_txt, | |
"school_failures": fail_txt, | |
"school_behavior_issues": beh_txt, | |
"legal_issues": legal_txt, | |
}) | |
return { | |
"sessionInfo": {"parameters": params}, | |
"fulfillment_response": {"messages": [{"text": {"text": [msg]}}]}, | |
} | |
# default no-op | |
log.info("DF Webhook called with tag=%s params=%s", tag, params) | |
return {"sessionInfo": {"parameters": params}} | |
def login(): | |
if request.method == "POST": | |
email = request.form.get("email", "").strip() | |
password = request.form.get("password", "") | |
try: | |
log.info("Attempting login for %s", email) | |
resp = supabase.auth.sign_in_with_password({"email": email, "password": password}) | |
if not getattr(resp, "user", None): | |
log.warning("Login failed for %s (no user in response)", email) | |
return render_template("login.html", title="Login", error="Invalid credentials.") | |
session["user_id"] = resp.user.id | |
session["user_email"] = resp.user.email | |
log.info("Login success user_id=%s email=%s", resp.user.id, resp.user.email) | |
# try: | |
# # s = supabase.table("Patient Table") \ | |
# # .select("patient_id") \ | |
# # .eq("patient_id", session["user_id"]) \ | |
# # .maybe_single().execute() | |
# # need_socio = (len(getattr(s, "data", []) or s.get("data", [])) == 0) | |
# resp = ( | |
# supabase.table("Patient Table") | |
# .select("patient_id") | |
# .eq("patient_id", session["user_id"]) | |
# .maybe_single() # ← key line | |
# .execute() | |
# ) | |
# need_socio = (resp.data is None) | |
# except Exception as e: | |
# app.logger.exception("Supabase sociodemographic check failed") | |
# need_socio = True # collect if unsure | |
session["cx_session_id"] = f'{session["user_id"]}-{uuid.uuid4().hex}' | |
session["kicked_socio"] = False | |
session["chat_history"] = [] | |
need_socio = needs_socio_row(resp.user.id) | |
session["phase"] = "intake_socio" if need_socio else "place_holder" | |
log.info("The Phase of the session %s", session["phase"]) | |
return redirect(url_for("chat")) | |
except Exception as e: | |
log.exception("Login error for %s", email) | |
return render_template("login.html", title="Login", error=str(e)) | |
return render_template("login.html", title="Login") | |
def signup(): | |
if request.method == "POST": | |
email = request.form.get("email", "").strip() | |
password = request.form.get("password", "") | |
try: | |
log.info("Attempting signup for %s", email) | |
resp = supabase.auth.sign_up({"email": email, "password": password}) | |
if not getattr(resp, "user", None): | |
log.warning("Signup failed for %s (no user in response)", email) | |
return render_template("signup.html", title="Sign Up", error="Signup failed.") | |
session["user_id"] = resp.user.id | |
session["user_email"] = resp.user.email | |
log.info("Signup success user_id=%s email=%s", resp.user.id, resp.user.email) | |
# try: | |
# s = supabase.table("Patient Table") \ | |
# .select("patient_id") \ | |
# .eq("patient_id", session["user_id"]) \ | |
# .limit(1).execute() | |
# need_socio = (len(getattr(s, "data", []) or s.get("data", [])) == 0) | |
# except Exception as e: | |
# app.logger.exception("Supabase sociodemographic check failed") | |
# need_socio = True # collect if unsure | |
# need_socio = needs_socio_row(resp.user.id) | |
session["cx_session_id"] = f'{session["user_id"]}-{uuid.uuid4().hex}' | |
session["kicked_socio"] = False | |
session["chat_history"] = [] | |
session["phase"] = "intake_socio" | |
log.info("The Phase of the session %s", session["phase"]) | |
return redirect(url_for("chat")) | |
except Exception as e: | |
log.exception("Signup error for %s", email) | |
return render_template("signup.html", title="Sign Up", error=str(e)) | |
return render_template("signup.html", title="Sign Up") | |
def chat(): | |
# Optional gate: | |
# if "user_id" not in session: return redirect(url_for("login")) | |
ensure_cx_session() | |
history = get_history() | |
log.info("Render chat for %s with %d history msgs", safe_email(), len(history)) | |
if session.get("phase") == "intake_socio" and not session.get("kicked_socio"): | |
session["kicked_socio"] = True | |
#change | |
session_id = session.get("cx_session_id") or session.get("user_id") or "anon" | |
params={"user_id": session.get("user_id"), "user_email": session.get("user_email")} | |
try: | |
df = cx_trigger_event( | |
"start_socio", | |
session_id, | |
params | |
) | |
except Exception as e: | |
app.logger.warning("start_socio not handled; falling back: %s", e) | |
df = cx_detect_text("", session_id, params) | |
log.info("CX triggered start_socio event, got %d response messages", len(df.query_result.response_messages)) | |
# extract first text reply from CX | |
# texts = [m.text.text for m in df.query_result.response_messages if m.text and m.text.text] | |
# if texts: | |
# history.append({"role": "assistant", "content": texts[0]}) | |
# set_history(history) | |
msg = cx_texts(df) | |
history.append({"role": "assistant", "content": msg}) | |
set_history(history) | |
cr = df.query_result | |
log.info("CX kickoff -> page=%s", getattr(cr.current_page, "display_name", None)) | |
return render_template("chat.html", history=history, title="Chat") | |
def message(): | |
"""JSON endpoint consumed by static/script.js""" | |
try: | |
data = request.get_json(force=True) or {} | |
user_text = (data.get("message") or "").strip() | |
if not user_text: | |
log.warning("Empty message received") | |
return jsonify({"error": "Empty message"}), 400 | |
history = get_history() | |
history.append({"role": "user", "content": user_text}) | |
set_history(history) | |
log.info("USER: %s", user_text) | |
phase = session.get("phase") or "assessment" | |
session_params = {"user_id": session.get("user_id"), "user_email": session.get("user_email")} | |
session_id = session.get("cx_session_id") or session.get("user_id") or "anon" | |
# def first_cx_text(df_resp): | |
# msgs = [m.text.text for m in df_resp.query_result.response_messages if m.text and m.text.text] | |
# return (msgs[0] if msgs and msgs[0] else "Okay.") | |
# --- Intake phases use CX --- | |
if phase == "intake_socio": | |
# 1) quick classify smalltalk/off-topic; also extract any obvious values | |
df = cx_detect_text(user_text, session_id, session_params) | |
bot_reply = cx_texts(df) | |
log.info("CX response: %s", bot_reply) | |
if bot_reply.strip().lower() in {"i missed that, say that again?", "sorry, could you repeat that?"}: | |
#Try out code block | |
# if is_yes_no(user_text): | |
# df = cx_detect_text(user_text, session.get("user_id","anon"), session_params) | |
# bot_reply = cx_texts(df) | |
# else: | |
# 2) classify with OpenAI | |
decision = classify_with_openai(user_text) # returns {"kind":"on_topic"/"smalltalk","note": "..."} | |
if decision["kind"] == "smalltalk": | |
# brief supportive reply, then nudge back with the current CX prompt (ask CX what’s next without consuming user input) | |
empathetic = decision["note"] | |
# Query CX with an empty text to get the next prompt (or you can store last CX prompt in session) | |
df_peek = cx_detect_text("", session_id, session_params) | |
# nudge = first_cx_text(df_peek) | |
nudge = cx_texts(df_peek) | |
log.info("CX smalltalk nudge: %s", nudge) | |
bot_reply = f"{empathetic}\n\n{nudge}" | |
#tryout block | |
# else: | |
# # 2) normal CX turn with the user’s text | |
# df = cx_detect_text(user_text, session.get("user_id","anon"), session_params) | |
# # bot_reply = first_cx_text(df) | |
# bot_reply = cx_texts(df) | |
# 3) if the form just finished, CX will satisfy $page.params.status == "FINAL". | |
# Easiest: add a Page Route in CX that calls your webhook (tag=save_socio) and transitions. | |
# Here, we can also flip the local phase if CX already moved you. | |
if df.query_result.current_page and df.query_result.current_page.display_name != "Socio form": | |
# assume we left SocioForm -> go next phase | |
log.info("Current page is %s, display name is %s", df.query_result.current_page,df.query_result.current_page.display_name) | |
session["phase"] = "place_holder" # or "intake_clinical" | |
log.info("CX moved to page %s, switching phase to %s", | |
df.query_result.current_page.display_name, session["phase"]) | |
else: | |
try: | |
recent = history[-10:] | |
messages = [SYSTEM_PROMPT] + recent | |
resp = client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=messages, | |
temperature=0.6, | |
) | |
bot_reply = resp.choices[0].message.content | |
log.info("ASSISTANT: %s", bot_reply) | |
except Exception as e: | |
log.exception("OpenAI call failed") | |
bot_reply = f"Oops, I couldn't reach the AI service: {e}" | |
history.append({"role": "assistant", "content": bot_reply}) | |
set_history(history) | |
# NOTE: If you want to persist to Supabase, you can insert here safely | |
# supabase.table("conversations").insert({...}) | |
return jsonify({"reply": bot_reply}) | |
except Exception as e: | |
log.exception("Unhandled error in /message") | |
return jsonify({"error": f"Server error: {e}"}), 500 | |
def logout(): | |
log.info("Logout %s", safe_email()) | |
session.clear() | |
return redirect(url_for("login")) | |
# =============== Main =============== | |
if __name__ == "__main__": | |
# Hugging Face Spaces expects the app to listen on 7860 | |
port = int(os.getenv("PORT", "7860")) | |
app.run(host="0.0.0.0", port=port, debug=True) | |