#!/usr/bin/env python3 """ PocketFlow-based Gradio GUI for MBTI Personality Questionnaire - V2 with Auto-save and LLM """ import sys import os import json import gradio as gr from datetime import datetime sys.path.append(os.path.join(os.path.dirname(__file__), '..')) from flow import create_mbti_flow, create_shared_store from utils.questionnaire import load_questionnaire, save_questionnaire class MBTIPocketFlowApp: def __init__(self): self.questions = load_questionnaire() self.responses = {} self.shared = None self.last_report_path = None self.questionnaire_length = 20 def get_question_text(self, question_idx): """Get current question text""" if 0 <= question_idx < len(self.questions): q = self.questions[question_idx] return f"Question {question_idx + 1} of {len(self.questions)}: {q['text']}" return "All questions completed!" def get_current_response(self, question_idx): """Get current response for question""" if 0 <= question_idx < len(self.questions): q_id = self.questions[question_idx]['id'] return self.responses.get(q_id, 3) return 3 def navigate_question(self, question_idx, direction, current_response): """Navigate to previous/next question and auto-save current response""" # Auto-save current response before navigating if 0 <= question_idx < len(self.questions): q_id = self.questions[question_idx]['id'] self.responses[q_id] = current_response print(f"DEBUG: Saved Q{q_id} = {current_response}") # Navigate if direction == "prev": new_idx = max(0, question_idx - 1) else: # next new_idx = min(len(self.questions) - 1, question_idx + 1) question_text = self.get_question_text(new_idx) new_response = self.get_current_response(new_idx) # Update button states prev_disabled = new_idx == 0 next_disabled = new_idx == len(self.questions) - 1 # Check if all questions answered (after saving current response) all_answered = len(self.responses) == len(self.questions) print(f"DEBUG: {len(self.responses)}/{len(self.questions)} answered, all_answered={all_answered}") return new_idx, question_text, new_response, gr.update(interactive=not prev_disabled), gr.update( interactive=not next_disabled), gr.update(visible=all_answered) def change_questionnaire_length(self, length): """Change questionnaire length and reset""" from utils.questionnaire import get_questionnaire_by_length self.questionnaire_length = length self.questions = get_questionnaire_by_length(length) self.responses = {} # Reset responses # Return to first question question_text = self.get_question_text(0) return 0, question_text, 3, gr.update(visible=False) def save_slider_response(self, question_idx, current_response): """Save response when slider changes""" if 0 <= question_idx < len(self.questions): q_id = self.questions[question_idx]['id'] self.responses[q_id] = current_response print(f"DEBUG: Slider saved Q{q_id} = {current_response}") # Check if all questions answered all_answered = len(self.responses) == len(self.questions) return gr.update(visible=all_answered) def run_pocketflow_analysis_with_save(self, question_idx, current_response): """Save current response then run analysis""" # Save current response before analysis if 0 <= question_idx < len(self.questions): q_id = self.questions[question_idx]['id'] self.responses[q_id] = current_response # Run the analysis return self.run_pocketflow_analysis() def save_current_questionnaire(self, question_idx=None, current_response=None): """Save current questionnaire state (even if incomplete)""" # Save current response if provided if question_idx is not None and current_response is not None and 0 <= question_idx < len(self.questions): q_id = self.questions[question_idx]['id'] self.responses[q_id] = current_response if not self.responses: return None questionnaire_data = { "questionnaire": { "questions": self.questions, "responses": self.responses }, "metadata": { "version": "1.0", "created_at": datetime.now().isoformat(), "completed": len(self.responses) == len(self.questions) } } timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') answered_count = len(self.responses) json_filename = f"mbti_questionnaire_pf_partial_{answered_count}q_{timestamp}.json" saved_path = save_questionnaire(questionnaire_data, json_filename) if saved_path: return saved_path return None def run_pocketflow_analysis(self): """Run complete PocketFlow analysis with LLM""" if len(self.responses) != len(self.questions): return "Please answer all questions before analyzing.", "", gr.update(visible=False) try: # Create flow and shared store flow = create_mbti_flow() config = { "ui_mode": "gradio", "output_format": "html", "analysis_method": "both" # Use both traditional and LLM } self.shared = create_shared_store(config) # Pre-populate responses and current questions self.shared["questionnaire"]["responses"] = self.responses self.shared["questionnaire"]["questions"] = self.questions # Run partial flow (skip question loading/presentation) from pocketflow import Flow from nodes import AnalyzeResponsesBatchNode, TraditionalScoringNode, LLMAnalysisNode, DetermineMBTITypeNode, \ GenerateReportNode, ExportDataNode analyze_responses = AnalyzeResponsesBatchNode() traditional_scoring = TraditionalScoringNode() llm_analysis = LLMAnalysisNode() determine_type = DetermineMBTITypeNode() generate_report = GenerateReportNode() export_data = ExportDataNode() # Connect partial flow analyze_responses >> traditional_scoring >> llm_analysis >> determine_type >> generate_report >> export_data analysis_flow = Flow(start=analyze_responses) # Run the flow print("Running PocketFlow analysis with LLM...") analysis_flow.run(self.shared) # Extract results mbti_type = self.shared["results"]["mbti_type"] scores = self.shared["analysis"]["traditional_scores"] llm_analysis_text = self.shared["analysis"]["llm_analysis"] report_path = self.shared["exports"]["report_path"] self.last_report_path = os.path.abspath(report_path) # Read report HTML with open(report_path, 'r', encoding='utf-8') as f: report_html = f.read() # Extract type info from HTML report for summary from bs4 import BeautifulSoup soup = BeautifulSoup(report_html, 'html.parser') # Get type badge and description type_badge = soup.find('div', class_='type-badge') type_desc = soup.find('p').find('em') if soup.find('p') else None # Get strengths and weaknesses sections sections = soup.find_all('div', class_='section') strengths_html = "" weaknesses_html = "" careers_html = "" for section in sections: h2 = section.find('h2') if h2: if 'Strengths' in h2.text: strengths_html = str(section) elif 'Growth' in h2.text or 'Areas' in h2.text: weaknesses_html = str(section) elif 'Career' in h2.text: careers_html = str(section) # Get responses data for the table responses_data = self.shared["analysis"].get("responses_data", []) # Generate responses table HTML responses_table_html = """
Question | Dimension | Your Response |
---|---|---|
Q{resp['id']}: {resp['text']} | {resp['dimension']} | {resp['response']} |
{type_desc.text}
' if type_desc else ''}