""" Conversation Service for Advanced AI Chatbot Implements memory management, context awareness, and natural language processing """ import logging import json import re from typing import Dict, List, Optional, Any, Tuple from datetime import datetime, timezone from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_ from sqlalchemy.orm import selectinload from models import ConversationMemory, CrossModuleMemory, ConversationMessage, GPTModeSession from services.ai_service_manager import AIServiceManager from services.rag_service import RAGService logger = logging.getLogger(__name__) class ConversationService: """Advanced conversation service with memory management and context awareness.""" def __init__(self): self.ai_service = AIServiceManager() self.rag_service = RAGService() # Intent patterns for natural language understanding self.intent_patterns = { "greeting": [ r"\b(hi|hello|hey|good morning|good afternoon|good evening)\b", r"\b(start|begin|ready|let's go|let's begin)\b" ], "question": [ r"\b(what|how|why|when|where|who|which|can you|could you|would you)\b", r"\b(explain|tell me|describe|clarify|help)\b" ], "answer": [ r"\b(it's|it is|this is|that is|my|our|we|i)\b", r"\b(about|regarding|concerning|related to)\b" ], "clarification": [ r"\b(what do you mean|i don't understand|can you explain|clarify)\b", r"\b(rephrase|repeat|say that again)\b" ], "edit_request": [ r"\b(edit|change|modify|update|revise|correct)\b", r"\b(wrong|incorrect|not right|different)\b" ], "skip": [ r"\b(skip|pass|next|move on|continue|not applicable)\b", r"\b(don't know|not sure|no idea)\b" ] } async def create_conversation_memory( self, db: AsyncSession, project_id: str, session_id: str, module_id: str, user_id: str # Add user_id parameter ) -> ConversationMemory: """Create a new conversation memory for a session.""" try: # Check if memory already exists for this user result = await db.execute( select(ConversationMemory).where( and_( ConversationMemory.project_id == project_id, ConversationMemory.session_id == session_id, ConversationMemory.module_id == module_id, ConversationMemory.user_id == user_id # Add user filter ) ) ) existing_memory = result.scalar_one_or_none() if existing_memory: return existing_memory # Create new memory memory = ConversationMemory( project_id=project_id, session_id=session_id, module_id=module_id, user_id=user_id, # Add user_id conversation_history=[], context_summary="", user_profile={}, conversation_state={ "current_question": 0, "questions_answered": 0, "total_questions": 0, "conversation_flow": "welcome", "last_intent": None } ) db.add(memory) await db.commit() await db.refresh(memory) return memory except Exception as e: logger.error(f"Error creating conversation memory: {e}") await db.rollback() raise async def get_or_create_cross_module_memory( self, db: AsyncSession, project_id: str, user_id: str # Add user_id parameter ) -> CrossModuleMemory: """Get or create cross-module memory for a project.""" try: # Check if cross-module memory exists for this user result = await db.execute( select(CrossModuleMemory).where( and_( CrossModuleMemory.project_id == project_id, CrossModuleMemory.user_id == user_id # Add user filter ) ) ) existing_memory = result.scalar_one_or_none() if existing_memory: return existing_memory # Create new cross-module memory memory = CrossModuleMemory( project_id=project_id, user_id=user_id, # Add user_id business_context={}, user_preferences={}, project_goals={}, key_insights=[], completed_modules=[], module_outputs={}, context_embeddings=[] ) db.add(memory) await db.commit() await db.refresh(memory) return memory except Exception as e: logger.error(f"Error getting or creating cross-module memory: {e}") await db.rollback() raise async def add_message_to_memory( self, db: AsyncSession, memory_id: str, role: str, content: str, message_type: str = "text", context_data: dict = None ) -> ConversationMessage: """Add a message to conversation memory.""" try: # Detect intent intent, confidence = self._detect_intent(content) # Estimate token count (rough approximation) tokens_used = len(content.split()) * 1.3 # Rough token estimation message = ConversationMessage( conversation_memory_id=memory_id, role=role, content=content, message_type=message_type, context_data=context_data or {}, intent=intent, confidence=confidence, tokens_used=int(tokens_used) ) db.add(message) await db.commit() await db.refresh(message) return message except Exception as e: logger.error(f"Error adding message to memory: {e}") await db.rollback() raise def _detect_intent(self, text: str) -> Tuple[str, float]: """Detect the intent of a user message.""" text_lower = text.lower() max_confidence = 0.0 detected_intent = "general" for intent, patterns in self.intent_patterns.items(): for pattern in patterns: matches = re.findall(pattern, text_lower) if matches: confidence = len(matches) / len(text_lower.split()) if confidence > max_confidence: max_confidence = confidence detected_intent = intent return detected_intent, max_confidence async def get_conversation_context( self, db: AsyncSession, memory_id: str, max_messages: int = 10 ) -> Dict[str, Any]: """Get conversation context for AI processing.""" try: # Get recent messages result = await db.execute( select(ConversationMessage) .where(ConversationMessage.conversation_memory_id == memory_id) .order_by(ConversationMessage.created_at.desc()) .limit(max_messages) ) messages = result.scalars().all() # Get memory object result = await db.execute( select(ConversationMemory).where(ConversationMemory.id == memory_id) ) memory = result.scalar_one_or_none() if not memory: return {"messages": [], "context": "", "state": {}} # Format messages for AI context formatted_messages = [] for msg in reversed(messages): # Reverse to get chronological order formatted_messages.append({ "role": msg.role, "content": msg.content, "intent": msg.intent, "timestamp": msg.created_at.isoformat() }) return { "messages": formatted_messages, "context": memory.context_summary, "state": memory.conversation_state, "user_profile": memory.user_profile } except Exception as e: logger.error(f"Error getting conversation context: {e}") return {"messages": [], "context": "", "state": {}} async def update_conversation_state( self, db: AsyncSession, memory_id: str, state_updates: dict ): """Update conversation state.""" try: result = await db.execute( select(ConversationMemory).where(ConversationMemory.id == memory_id) ) memory = result.scalar_one_or_none() if memory: memory.conversation_state.update(state_updates) memory.last_updated = datetime.now(timezone.utc) await db.commit() except Exception as e: logger.error(f"Error updating conversation state: {e}") await db.rollback() async def generate_context_summary( self, db: AsyncSession, memory_id: str ) -> str: """Generate a summary of conversation context for memory management.""" try: context = await self.get_conversation_context(db, memory_id, max_messages=20) if not context["messages"]: return "" # Create summary prompt messages_text = "\n".join([ f"{msg['role']}: {msg['content']}" for msg in context["messages"][-10:] # Last 10 messages ]) summary_prompt = f""" Based on the following conversation, create a concise summary of the key points, user preferences, and important information that should be remembered for future context. Conversation: {messages_text} Summary: """ summary = await self.ai_service.generate_content( prompt=summary_prompt, temperature=0.3, max_tokens=200 ) # Update memory with new summary result = await db.execute( select(ConversationMemory).where(ConversationMemory.id == memory_id) ) memory = result.scalar_one_or_none() if memory: memory.context_summary = summary memory.last_updated = datetime.now(timezone.utc) await db.commit() return summary except Exception as e: logger.error(f"Error generating context summary: {e}") return "" async def process_natural_message( self, db: AsyncSession, project_id: str, session_id: str, module_id: str, user_id: str, # Add user_id parameter user_message: str, module_questions: List[str] ) -> Dict[str, Any]: """Process a natural language message with full context awareness.""" try: # Get or create conversation memory memory = await self.create_conversation_memory(db, project_id, session_id, module_id, user_id) # Get cross-module memory cross_memory = await self.get_or_create_cross_module_memory(db, project_id, user_id) # Add user message to memory await self.add_message_to_memory( db, memory.id, "user", user_message, "text" ) # Get conversation context context = await self.get_conversation_context(db, memory.id) # Detect intent intent, confidence = self._detect_intent(user_message) # Update conversation state current_state = memory.conversation_state current_question = current_state.get("current_question", 0) # Process based on intent and context if intent == "greeting" and current_question == 0: # Welcome message and first question response = await self._handle_greeting(db, memory, module_questions, context, cross_memory) elif intent == "question": # User is asking a question response = await self._handle_user_question(db, memory, user_message, context, cross_memory) elif intent == "edit_request": # User wants to edit something response = await self._handle_edit_request(db, memory, user_message, context) elif intent == "skip": # User wants to skip current question response = await self._handle_skip(db, memory, module_questions, context) else: # Treat as potential answer to current question response = await self._handle_potential_answer( db, memory, user_message, module_questions, context, cross_memory ) # Add assistant response to memory await self.add_message_to_memory( db, memory.id, "assistant", response["message"], "response" ) # Update conversation state await self.update_conversation_state(db, memory.id, { "current_question": response.get("current_question", current_question), "last_intent": intent, "conversation_flow": response.get("flow", "normal") }) # Generate context summary if needed (every 5 messages) message_count = len(context["messages"]) if message_count % 5 == 0: await self.generate_context_summary(db, memory.id) return response except Exception as e: logger.error(f"Error processing natural message: {e}") return { "message": "I'm having trouble processing that. Could you please rephrase?", "is_question": True, "current_question": module_questions[0] if module_questions else "", "flow": "error" } async def _handle_greeting( self, db: AsyncSession, memory: ConversationMemory, module_questions: List[str], context: dict, cross_memory: CrossModuleMemory ) -> Dict[str, Any]: """Handle greeting and start conversation.""" # Generate personalized welcome message welcome_prompt = f""" You are a helpful business consultant. Generate a warm, personalized welcome message for starting a conversation about {memory.module_id}. Context from previous modules: {cross_memory.business_context} User preferences: {cross_memory.user_preferences} Make it conversational and encouraging. Then ask the first question naturally. """ welcome_message = await self.ai_service.generate_content( prompt=welcome_prompt, temperature=0.7 ) first_question = module_questions[0] if module_questions else "" return { "message": f"{welcome_message}\n\n{first_question}", "is_question": True, "current_question": 0, "flow": "welcome" } async def _handle_user_question( self, db: AsyncSession, memory: ConversationMemory, user_message: str, context: dict, cross_memory: CrossModuleMemory ) -> Dict[str, Any]: """Handle when user asks a question.""" # Generate contextual answer answer_prompt = f""" You are a helpful business consultant. The user is asking: "{user_message}" Conversation context: {context['context']} Business context: {cross_memory.business_context} Provide a helpful, contextual answer. If the question is about the current process, guide them back to the current question naturally. """ answer = await self.ai_service.generate_content( prompt=answer_prompt, temperature=0.7 ) # Get current question current_state = memory.conversation_state current_question = current_state.get("current_question", 0) module_questions = self._get_module_questions(memory.module_id) if current_question < len(module_questions): current_q = module_questions[current_question] return { "message": f"{answer}\n\nNow, back to our current question: {current_q}", "is_question": True, "current_question": current_question, "flow": "clarification" } else: return { "message": answer, "is_question": False, "flow": "answer" } async def _handle_edit_request( self, db: AsyncSession, memory: ConversationMemory, user_message: str, context: dict ) -> Dict[str, Any]: """Handle edit requests.""" return { "message": "I'd be happy to help you edit that! What would you like to change?", "is_question": False, "flow": "edit" } async def _handle_skip( self, db: AsyncSession, memory: ConversationMemory, module_questions: List[str], context: dict ) -> Dict[str, Any]: """Handle skip requests.""" current_state = memory.conversation_state current_question = current_state.get("current_question", 0) if current_question + 1 < len(module_questions): next_question = module_questions[current_question + 1] return { "message": f"Alright, let's move on to the next question: {next_question}", "is_question": True, "current_question": current_question + 1, "flow": "skip" } else: return { "message": "Great! We've covered all the questions. Let me create a summary of what we've discussed.", "is_question": False, "module_complete": True, "flow": "complete" } async def _handle_potential_answer( self, db: AsyncSession, memory: ConversationMemory, user_message: str, module_questions: List[str], context: dict, cross_memory: CrossModuleMemory ) -> Dict[str, Any]: """Handle potential answers to current questions.""" current_state = memory.conversation_state current_question = current_state.get("current_question", 0) if current_question >= len(module_questions): # All questions answered, generate summary return await self._generate_completion_summary(db, memory, context, cross_memory) # Validate answer using AI validation_prompt = f""" Current question: "{module_questions[current_question]}" User's response: "{user_message}" Determine if this is a valid answer to the question. Consider: 1. Does it address the question? 2. Is it relevant and meaningful? 3. Does it provide useful information? Respond with "VALID" or "INVALID" and a brief explanation. """ validation_result = await self.ai_service.generate_content( prompt=validation_prompt, temperature=0.3 ) is_valid = "VALID" in validation_result.upper() if is_valid: # Valid answer, move to next question next_question_idx = current_question + 1 if next_question_idx >= len(module_questions): # Last question answered return await self._generate_completion_summary(db, memory, context, cross_memory) else: # Generate natural transition transition_prompt = f""" The user just answered: "{user_message}" Next question: "{module_questions[next_question_idx]}" Create a natural, conversational transition to the next question. Acknowledge their answer briefly and smoothly introduce the next question. """ transition = await self.ai_service.generate_content( prompt=transition_prompt, temperature=0.7 ) return { "message": transition, "is_question": True, "current_question": next_question_idx, "flow": "transition" } else: # Invalid answer, ask for clarification clarification_prompt = f""" The user's response "{user_message}" doesn't seem to fully answer the question: "{module_questions[current_question]}" Create a friendly, helpful clarification request that: 1. Acknowledges their response 2. Explains what kind of information is needed 3. Encourages them to provide more details """ clarification = await self.ai_service.generate_content( prompt=clarification_prompt, temperature=0.7 ) return { "message": clarification, "is_question": True, "current_question": current_question, "flow": "clarification" } async def _generate_completion_summary( self, db: AsyncSession, memory: ConversationMemory, context: dict, cross_memory: CrossModuleMemory ) -> Dict[str, Any]: """Generate completion summary.""" summary_prompt = f""" Generate a comprehensive summary of the conversation for {memory.module_id}. Conversation context: {context['context']} Business context: {cross_memory.business_context} Create a detailed, professional summary that captures all key points discussed. """ summary = await self.ai_service.generate_content( prompt=summary_prompt, temperature=0.5 ) return { "message": f"Perfect! Here's a summary of what we've discussed:\n\n{summary}", "is_question": False, "module_complete": True, "summary": summary, "flow": "complete" } def _get_module_questions(self, module_id: str) -> List[str]: """Get questions for a module from the chatbot service.""" # This should be implemented to get questions from your module configuration # For now, returning empty list - this will be passed from the chatbot service return []