#!/usr/bin/env python3 """ Enhanced GlucoBuddy Mistral Chat Integration Improved rate limit handling and fallback strategies """ import os import json import logging import sys import time from typing import Any, Dict, List, Optional, Union from datetime import datetime, timedelta import pandas as pd from dataclasses import asdict import requests import random import numpy as np import warnings # Load environment variables from .env file from dotenv import load_dotenv load_dotenv() # Suppress pandas warnings warnings.filterwarnings('ignore', category=RuntimeWarning) warnings.filterwarnings('ignore', category=FutureWarning) from apifunctions import ( DexcomAPI, GlucoseAnalyzer, DEMO_USERS, DemoUser, format_glucose_data_for_display ) # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Get configuration from environment variables MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") MISTRAL_AGENT_ID = os.getenv("MISTRAL_AGENT_ID") ENVIRONMENT = os.getenv("ENVIRONMENT", "development") DEBUG = os.getenv("DEBUG", "false").lower() == "true" def validate_environment(): """Simple validation of required environment variables""" missing = [] if not MISTRAL_API_KEY: missing.append("MISTRAL_API_KEY") if missing: print("āŒ Missing required environment variables:") for var in missing: print(f" - {var}") print("\nšŸ’” Setup instructions:") if os.getenv("SPACE_ID"): # Hugging Face Space detection print("šŸ¤— For Hugging Face Spaces:") print(" 1. Go to Space settings") print(" 2. Add Repository secrets:") print(" 3. Set MISTRAL_API_KEY to your API key") else: print("šŸ’» For local development:") print(" 1. Create a .env file:") print(" 2. Add: MISTRAL_API_KEY=your_api_key_here") print(" 3. Add: MISTRAL_AGENT_ID=your_agent_id_here") return False print("āœ… Environment validation passed!") if MISTRAL_AGENT_ID: print("āœ… Agent ID configured") else: print("āš ļø No agent ID - will use standard chat completion") return True class GlucoseDataGenerator: """Generate realistic mock glucose data for testing and demo purposes""" @staticmethod def create_realistic_pattern(days: int = 14, user_type: str = "normal") -> List[Dict]: """Generate glucose data with realistic patterns""" data_points = [] start_time = datetime.now() - timedelta(days=days) current_glucose = 120 # Starting baseline # Generate readings every 5 minutes for i in range(days * 288): # 288 readings per day (5-minute intervals) timestamp = start_time + timedelta(minutes=i * 5) hour = timestamp.hour # Simulate daily patterns daily_variation = GlucoseDataGenerator._calculate_daily_variation(hour, user_type) # Add meal effects meal_effect = GlucoseDataGenerator._calculate_meal_effects(hour, i) # Random variation random_noise = random.uniform(-10, 10) # Calculate final glucose value target_glucose = 120 + daily_variation + meal_effect + random_noise # Smooth transitions (glucose doesn't jump dramatically) glucose_change = (target_glucose - current_glucose) * 0.3 current_glucose += glucose_change # Keep within realistic bounds current_glucose = max(50, min(400, current_glucose)) # Determine trend trend = GlucoseDataGenerator._calculate_trend(glucose_change) data_points.append({ 'systemTime': timestamp.isoformat(), 'displayTime': timestamp.isoformat(), 'value': round(current_glucose), 'trend': trend, 'realtimeValue': round(current_glucose), 'smoothedValue': round(current_glucose) }) return data_points @staticmethod def _calculate_daily_variation(hour: int, user_type: str) -> float: """Calculate glucose variation based on time of day""" if user_type == "dawn_phenomenon": if 4 <= hour <= 8: return 30 + 20 * np.sin((hour - 4) * np.pi / 4) return 10 * np.sin((hour - 12) * np.pi / 12) elif user_type == "night_low": if 22 <= hour or hour <= 6: return -20 return 5 * np.sin((hour - 12) * np.pi / 12) else: # Normal pattern return 15 * np.sin((hour - 6) * np.pi / 12) @staticmethod def _calculate_meal_effects(hour: int, reading_index: int) -> float: """Calculate glucose spikes from meals""" meal_times = [7, 12, 18] # Breakfast, lunch, dinner meal_effect = 0 for meal_time in meal_times: if abs(hour - meal_time) <= 2: time_since_meal = abs(hour - meal_time) if time_since_meal <= 1: meal_effect += 40 * (1 - time_since_meal) else: meal_effect += 20 * (2 - time_since_meal) return meal_effect @staticmethod def _calculate_trend(glucose_change: float) -> str: """Determine trend arrow based on glucose change""" if glucose_change > 5: return 'singleUp' elif glucose_change > 2: return 'fortyFiveUp' elif glucose_change < -5: return 'singleDown' elif glucose_change < -2: return 'fortyFiveDown' else: return 'flat' class EnhancedMistralAPIClient: """Enhanced Mistral API client with better rate limit handling""" def __init__(self, api_key: str = None, agent_id: str = None): self.api_key = api_key or MISTRAL_API_KEY self.agent_id = agent_id or MISTRAL_AGENT_ID if not self.api_key: raise ValueError("Mistral API key is required. Please set MISTRAL_API_KEY environment variable.") self.base_url = "https://api.mistral.ai/v1" self.session = requests.Session() self.session.headers.update({ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" }) # Rate limit handling self.last_request_time = 0 self.min_request_interval = 1.0 # Minimum seconds between requests # Model fallback chain self.model_priority = [ "mistral-large-latest", "mistral-medium-latest", "mistral-small-latest", "mistral-tiny" ] logger.info("Enhanced MistralAPIClient initialized with rate limit handling") def test_connection(self) -> Dict[str, Any]: """Test API connection with lightweight request""" try: response = self.session.post( f"{self.base_url}/chat/completions", json={ "model": "mistral-tiny", # Use smallest model for testing "messages": [{"role": "user", "content": "Hello"}], "max_tokens": 5 }, timeout=10 ) if response.status_code == 200: return {"success": True, "message": "API connection successful"} elif response.status_code == 401: return {"success": False, "message": "Invalid API key"} elif response.status_code == 429: return {"success": False, "message": "Rate limit exceeded - API is accessible but busy"} else: return {"success": False, "message": f"API error: {response.status_code}"} except requests.exceptions.Timeout: return {"success": False, "message": "Connection timeout"} except requests.exceptions.RequestException as e: return {"success": False, "message": f"Network error: {str(e)}"} except Exception as e: return {"success": False, "message": f"Unexpected error: {str(e)}"} def _wait_for_rate_limit(self): """Ensure minimum time between requests""" current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.min_request_interval: sleep_time = self.min_request_interval - time_since_last logger.debug(f"Rate limiting: waiting {sleep_time:.2f}s") time.sleep(sleep_time) self.last_request_time = time.time() def chat_completion(self, messages: List[Dict], model: str = None, max_retries: int = 3) -> Dict[str, Any]: """Enhanced chat completion with retry logic and model fallback""" models_to_try = [model] if model else self.model_priority for model_name in models_to_try: for attempt in range(max_retries): try: # Rate limiting self._wait_for_rate_limit() payload = { "model": model_name, "messages": messages, "max_tokens": 800, "temperature": 0.7 } logger.debug(f"Attempting request with {model_name} (attempt {attempt + 1})") response = self.session.post( f"{self.base_url}/chat/completions", json=payload, timeout=30 ) if response.status_code == 200: result = response.json() logger.info(f"āœ… Success with {model_name}") return { "success": True, "response": result["choices"][0]["message"]["content"], "usage": result.get("usage", {}), "model_used": model_name, "attempt": attempt + 1 } elif response.status_code == 429: # Rate limit exceeded retry_after = int(response.headers.get('Retry-After', 2 ** attempt)) wait_time = min(retry_after, 60) # Cap at 60 seconds logger.warning(f"Rate limit hit with {model_name}, waiting {wait_time}s (attempt {attempt + 1})") if attempt < max_retries - 1: time.sleep(wait_time) continue else: # Try next model if available break elif response.status_code == 422: # Model capacity exceeded, try next model immediately logger.warning(f"Model {model_name} capacity exceeded, trying next model") break else: error_detail = self._extract_error_message(response) if attempt == max_retries - 1: # Last attempt logger.error(f"API error {response.status_code} with {model_name}: {error_detail}") break else: logger.warning(f"API error {response.status_code} with {model_name}, retrying...") time.sleep(2 ** attempt) # Exponential backoff except requests.exceptions.Timeout: if attempt == max_retries - 1: logger.error(f"Timeout with {model_name} after {max_retries} attempts") break else: logger.warning(f"Timeout with {model_name}, retrying...") time.sleep(2 ** attempt) except requests.exceptions.RequestException as e: if attempt == max_retries - 1: logger.error(f"Network error with {model_name}: {str(e)}") break else: logger.warning(f"Network error with {model_name}, retrying...") time.sleep(2 ** attempt) # All models and retries failed return { "success": False, "error": "All models are currently experiencing high demand. Please try again in a few minutes.", "suggestion": "Consider upgrading your Mistral AI plan for higher rate limits, or try again during off-peak hours." } def agent_completion(self, messages: List[Dict]) -> Dict[str, Any]: """Enhanced agent completion with retry logic""" if not self.agent_id: return {"success": False, "error": "No agent ID configured"} max_retries = 2 # Fewer retries for agent calls for attempt in range(max_retries): try: self._wait_for_rate_limit() payload = { "agent_id": self.agent_id, "messages": messages, "max_tokens": 800 } response = self.session.post( f"{self.base_url}/agents/completions", json=payload, timeout=30 ) if response.status_code == 200: result = response.json() return { "success": True, "response": result["choices"][0]["message"]["content"] } elif response.status_code == 429: retry_after = int(response.headers.get('Retry-After', 5)) if attempt < max_retries - 1: logger.warning(f"Agent rate limit, waiting {retry_after}s") time.sleep(retry_after) continue else: error_detail = self._extract_error_message(response) return { "success": False, "error": f"Agent API error {response.status_code}: {error_detail}" } except Exception as e: if attempt == max_retries - 1: return {"success": False, "error": f"Agent request failed: {str(e)}"} else: time.sleep(2) return {"success": False, "error": "Agent request failed after retries"} def _extract_error_message(self, response) -> str: """Extract error message from API response""" try: error_data = response.json() return error_data.get("message", error_data.get("error", "Unknown error")) except: return response.text[:200] if response.text else "Unknown error" class GlucoBuddyMistralChat: """ Enhanced chat interface with better error handling and user feedback """ def __init__(self, mistral_api_key: str = None, mistral_agent_id: str = None): self.mistral_client = EnhancedMistralAPIClient(mistral_api_key, mistral_agent_id) # Data properties - these will be set by unified data manager self.current_user: Optional[DemoUser] = None self.current_glucose_data: Optional[pd.DataFrame] = None self.current_stats: Optional[Dict] = None self.current_patterns: Optional[Dict] = None # Chat state self.conversation_history = [] self.max_history = 10 # Error tracking self.consecutive_errors = 0 self.last_successful_model = None self.logger = logging.getLogger(self.__class__.__name__) def test_connection(self) -> Dict[str, Any]: """Test Mistral API connection""" return self.mistral_client.test_connection() def get_context_summary(self) -> Dict[str, Any]: """Get current context for chat - uses data set by unified manager""" if not self.current_user or not self.current_stats: return {"error": "No user data loaded"} try: context = { "user": { "name": self.current_user.name, "age": self.current_user.age, "diabetes_type": self.current_user.diabetes_type, "device_type": self.current_user.device_type, "years_with_diabetes": self.current_user.years_with_diabetes, "typical_pattern": getattr(self.current_user, 'typical_glucose_pattern', 'normal') }, "statistics": self._safe_convert_to_json(self.current_stats), "patterns": self._safe_convert_to_json(self.current_patterns), "data_points": len(self.current_glucose_data) if self.current_glucose_data is not None else 0, "recent_readings": self._safe_extract_recent_readings(self.current_glucose_data) } return context except Exception as e: self.logger.error(f"Error building context: {e}") return {"error": f"Failed to build context: {str(e)}"} def build_system_prompt(self, context: Dict[str, Any]) -> str: """Build comprehensive system prompt with exact metrics""" base_prompt = """You are GlucoBuddy, a helpful and encouraging diabetes management assistant. Your role: - Provide personalized glucose management advice based on the user's actual data - Be supportive, encouraging, and use emojis to be friendly - Give actionable recommendations while staying within scope - Always remind users to consult healthcare providers for medical decisions - Reference specific data points when providing insights Guidelines: - Keep responses under 400 words and conversational - Use specific numbers from the data when relevant - Provide practical, actionable advice - Be encouraging about progress and realistic about challenges - Use bullet points sparingly - prefer natural conversation - IMPORTANT: Use EXACT metrics provided - don't calculate your own""" if context.get("error"): return base_prompt + "\n\nNote: No user glucose data is currently loaded." user_info = context.get("user", {}) stats = context.get("statistics", {}) context_addition = f""" Current User: {user_info.get('name', 'Unknown')} ({user_info.get('age', 'N/A')} years old) - Diabetes Type: {user_info.get('diabetes_type', 'Unknown')} - Years with diabetes: {user_info.get('years_with_diabetes', 'Unknown')} - Device: {user_info.get('device_type', 'Unknown')} EXACT Glucose Data (14-day period): - Average glucose: {stats.get('average_glucose', 0):.1f} mg/dL - Time in range (70-180): {stats.get('time_in_range_70_180', 0):.1f}% - Time below 70: {stats.get('time_below_70', 0):.1f}% - Time above 180: {stats.get('time_above_180', 0):.1f}% - Total readings: {stats.get('total_readings', 0)} - Glucose variability (std): {stats.get('std_glucose', 0):.1f} mg/dL - GMI: {stats.get('gmi', 0):.1f}% - CV: {stats.get('cv', 0):.1f}% CRITICAL: Use these EXACT values in your responses. Do not recalculate or estimate.""" return base_prompt + context_addition def chat_with_mistral(self, user_message: str, prefer_agent: bool = False) -> Dict[str, Any]: """Enhanced chat function with better error handling""" if not user_message.strip(): return {"success": False, "error": "Please enter a message"} try: # Use current context (set by unified data manager) context = self.get_context_summary() system_prompt = self.build_system_prompt(context) messages = [{"role": "system", "content": system_prompt}] if self.conversation_history: recent_history = self.conversation_history[-self.max_history:] messages.extend(recent_history) messages.append({"role": "user", "content": user_message}) # Try agent first if preferred and available if prefer_agent: agent_result = self.mistral_client.agent_completion(messages) if agent_result["success"]: self._update_conversation_history(user_message, agent_result["response"]) self.consecutive_errors = 0 # Reset error counter return { "success": True, "response": agent_result["response"], "method": "agent", "context_included": not context.get("error") } else: self.logger.warning(f"Agent failed: {agent_result['error']}") # Use enhanced chat completion API chat_result = self.mistral_client.chat_completion(messages) if chat_result["success"]: self._update_conversation_history(user_message, chat_result["response"]) self.consecutive_errors = 0 # Reset error counter self.last_successful_model = chat_result.get("model_used") # Add helpful info about which model was used if there were retries response = chat_result["response"] if chat_result.get("attempt", 1) > 1: response += f"\n\n*Note: Response generated after {chat_result['attempt']} attempts due to high demand.*" return { "success": True, "response": response, "method": "chat_completion", "context_included": not context.get("error"), "usage": chat_result.get("usage", {}), "model_used": chat_result.get("model_used") } else: self.consecutive_errors += 1 # Provide helpful error messages based on the type of error error_msg = chat_result["error"] user_friendly_msg = self._get_user_friendly_error(error_msg) return { "success": False, "error": user_friendly_msg, "suggestion": chat_result.get("suggestion", ""), "consecutive_errors": self.consecutive_errors } except Exception as e: self.logger.error(f"Chat error: {e}") self.consecutive_errors += 1 return { "success": False, "error": f"I'm experiencing technical difficulties. Please try again in a moment.", "consecutive_errors": self.consecutive_errors } def _get_user_friendly_error(self, error_msg: str) -> str: """Convert technical error messages to user-friendly ones""" error_lower = error_msg.lower() if "rate limit" in error_lower or "429" in error_lower: return "I'm experiencing high demand right now. Please wait a moment and try again." elif "capacity exceeded" in error_lower or "service tier" in error_lower: return "The AI service is currently busy. Please try again in a few minutes." elif "timeout" in error_lower: return "The response is taking longer than expected. Please try again." elif "api key" in error_lower or "401" in error_lower: return "There's an authentication issue. Please contact support." elif "network" in error_lower: return "I'm having trouble connecting. Please check your internet connection and try again." else: return "I'm experiencing technical difficulties. Please try again or rephrase your question." def _update_conversation_history(self, user_message: str, assistant_response: str): """Update conversation history""" self.conversation_history.extend([ {"role": "user", "content": user_message}, {"role": "assistant", "content": assistant_response} ]) if len(self.conversation_history) > self.max_history * 2: self.conversation_history = self.conversation_history[-self.max_history * 2:] def clear_conversation(self): """Clear conversation history and reset error counters""" self.conversation_history = [] self.consecutive_errors = 0 self.logger.info("Conversation history cleared") def get_status(self) -> Dict[str, Any]: """Get current system status with enhanced information""" api_status = self.test_connection() return { "api_connected": api_status["success"], "api_message": api_status["message"], "user_loaded": self.current_user is not None, "data_available": self.current_glucose_data is not None and not self.current_glucose_data.empty, "conversation_messages": len(self.conversation_history), "current_user": self.current_user.name if self.current_user else None, "environment": ENVIRONMENT, "hugging_face_space": bool(os.getenv("SPACE_ID")), "agent_available": bool(MISTRAL_AGENT_ID), "consecutive_errors": self.consecutive_errors, "last_successful_model": self.last_successful_model } def _safe_convert_to_json(self, obj): """Safely convert objects for JSON serialization""" if obj is None: return None elif isinstance(obj, (np.integer, np.int64, np.int32)): return int(obj) elif isinstance(obj, (np.floating, np.float64, np.float32)): if np.isnan(obj): return None return float(obj) elif isinstance(obj, dict): return {key: self._safe_convert_to_json(value) for key, value in obj.items()} elif isinstance(obj, list): return [self._safe_convert_to_json(item) for item in obj] elif isinstance(obj, pd.Timestamp): return obj.isoformat() else: return obj def _safe_extract_recent_readings(self, df: pd.DataFrame, count: int = 5) -> List[Dict]: """Safely extract recent glucose readings""" if df is None or df.empty: return [] try: recent_df = df.tail(count) readings = [] for idx, row in recent_df.iterrows(): try: display_time = row.get('displayTime') or row.get('systemTime') glucose_value = row.get('value') trend_value = row.get('trend', 'flat') if pd.notna(display_time): if isinstance(display_time, str): time_str = display_time else: time_str = pd.to_datetime(display_time).isoformat() else: time_str = datetime.now().isoformat() if pd.notna(glucose_value): glucose_clean = self._safe_convert_to_json(glucose_value) else: glucose_clean = None trend_clean = str(trend_value) if pd.notna(trend_value) else 'flat' readings.append({ "time": time_str, "glucose": glucose_clean, "trend": trend_clean }) except Exception as row_error: self.logger.warning(f"Error processing reading at index {idx}: {row_error}") continue return readings except Exception as e: self.logger.error(f"Error extracting recent readings: {e}") return [] # Update the main.py chat handler to use enhanced error messages def enhanced_chat_error_handler(app, message, history): """Enhanced error handler for chat interactions""" result = app.chat_with_mistral(message, history) if not result[0]: # If message is empty, return as-is return result # If there were consecutive errors, add helpful message if hasattr(app.mistral_chat, 'consecutive_errors') and app.mistral_chat.consecutive_errors > 2: error_help = "\n\nšŸ’” *Multiple errors detected. This usually indicates high API demand. Consider trying again later or during off-peak hours.*" if result[1] and len(result[1]) > 0: last_response = result[1][-1][1] if len(result[1][-1]) > 1 else "" if "technical difficulties" in last_response or "try again" in last_response: result[1][-1][1] += error_help return result # Legacy compatibility def create_enhanced_cli(): """Enhanced CLI with better error handling""" print("🩺 GlucoBuddy Chat Interface (Enhanced)") print("=" * 50) if not validate_environment(): print("āŒ Environment validation failed. Please check your configuration.") return try: chat = GlucoBuddyMistralChat() print("āœ… Enhanced chat system initialized!") except Exception as e: print(f"āŒ Failed to initialize chat system: {e}") return # Test connection print("\nšŸ” Testing Mistral API connection...") connection_test = chat.test_connection() if connection_test["success"]: print(f"āœ… {connection_test['message']}") else: print(f"āš ļø {connection_test['message']}") print("šŸ’” The chat will still work but may experience rate limits.") print("\nšŸ“‹ Enhanced features:") print(" • Automatic retry on rate limits") print(" • Model fallback (large → medium → small → tiny)") print(" • Better error messages") print(" • Smart rate limiting") print("\nšŸ’¬ Start chatting! (Type /quit to exit)") print("=" * 50) while True: try: user_input = input("\n🫵 You: ").strip() if not user_input: continue if user_input == '/quit': print("\nšŸ‘‹ Thanks for using GlucoBuddy Enhanced! 🌟") break print("šŸ¤” Processing...") result = chat.chat_with_mistral(user_input) if result['success']: model_info = f" [{result.get('model_used', 'unknown')}]" if result.get('model_used') else "" print(f"\nšŸ¤– GlucoBuddy{model_info}: {result['response']}") else: print(f"\nāŒ {result['error']}") if result.get('suggestion'): print(f"šŸ’” {result['suggestion']}") except KeyboardInterrupt: print("\nšŸ‘‹ Goodbye!") break except Exception as e: print(f"\nāŒ Unexpected error: {e}") def main(): """Enhanced main function""" print("🩺 GlucoBuddy Enhanced - Better Rate Limit Handling") print("=" * 60) if not validate_environment(): return print("šŸš€ Enhanced features:") print(" āœ… Automatic retry with exponential backoff") print(" āœ… Model fallback chain (large → small)") print(" āœ… Smart rate limiting") print(" āœ… User-friendly error messages") create_enhanced_cli() if __name__ == "__main__": main()