""" API client for communicating with the backend service. """ import requests import logging from typing import Dict, Any, Tuple, Optional, List class APIClient: def __init__(self, base_url: str, timeout: int = 60): self.base_url = base_url.rstrip('/') # Remove trailing slash if present self.timeout = timeout # Used only for health checks, not for query processing self.logger = logging.getLogger(__name__) # Define endpoints self.endpoints = { 'process_text': '/api/process-text', 'health': '/api/health', 'models': '/api/models', 'change_model': '/api/change-model' } def send_query(self, question: str, model: str = None, agent: str = None) -> Dict[str, Any]: """ Send a query to the backend with optional model specification Args: question: The user's question model: Optional AI model name to use for processing agent: Agent type (legacy parameter, not used in current backend) """ # Prepare payload with model_name for the new API payload = {"question": question} if model: payload["model_name"] = model self.logger.info(f"Sending API request with: {payload}") # Construct full URL full_url = f"{self.base_url}{self.endpoints['process_text']}" try: # Remove timeout for query processing - wait for actual result # Complex queries may take time and should not be interrupted response = requests.post(full_url, json=payload) return self._process_response(response) except requests.exceptions.ConnectionError as e: self.logger.error(f"Connection error: {e}") return { "message": "❌ Cannot connect to the server. Please check if the backend is running.", "rows": [], "chart": None, "error": True } except requests.exceptions.RequestException as e: self.logger.error(f"Request error: {e}") return { "message": f"❌ Request failed: {str(e)}", "rows": [], "chart": None, "error": True } except Exception as e: self.logger.error(f"Unexpected error: {e}") return { "message": f"⚠️ Exception: {str(e)}", "rows": [], "chart": None, "error": True } def _process_response(self, response: requests.Response) -> Dict[str, Any]: """Process the API response and extract relevant data.""" try: result = response.json() self.logger.info(f"API response received: {type(result)} with keys: {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}") except ValueError as e: self.logger.error(f"Failed to parse JSON response: {e}") result = {"detail": response.text} if response.status_code != 200: error_msg = result.get('detail') or result.get('message') or response.text self.logger.error(f"API error response (status {response.status_code}): {error_msg}") return { "message": f"❌ Error: {error_msg}", "rows": [], "chart": None, "error": True, "model_used": result.get('model_used', 'unknown') if isinstance(result, dict) else 'unknown' } # Handle message-only responses (chat responses, errors, etc.) if "message" in result and not ("rows" in result or "chart" in result): self.logger.info("Processing message-only response") return { "message": result["message"], "rows": [], "chart": None, "error": False, "model_used": result.get("model_used", "unknown"), "status": result.get("status", "message") } # Handle data responses (successful SQL queries) rows = result.get("rows", []) chart = result.get("chart", None) heading = result.get("summary", "") sql = result.get("sql", "") self.logger.info(f"Processing data response - Rows: {len(rows) if isinstance(rows, list) else 'Not a list'}, Heading: {heading}, SQL length: {len(sql) if sql else 0}") # Use heading from backend as-is (backend already parsed JSON and model includes record count) if heading and isinstance(heading, str) and heading.strip(): message = heading.strip() else: # Fallback message if no heading provided if rows and len(rows) > 0: message = f"Here are the {len(rows)} results I found:" else: message = "I could not find matching records for your query." processed_response = { "message": message, "rows": rows, "chart": chart, "error": False, "model_used": result.get("model_used", "unknown"), "status": result.get("status", "success"), "sql": sql, "heading": heading, # Clean heading from backend "summary": result.get("summary", "") # Summary from backend if available } self.logger.info(f"Final processed response keys: {list(processed_response.keys())}") return processed_response def check_health(self) -> Tuple[str, str, str]: try: # Construct health URL health_url = f"{self.base_url}{self.endpoints['health']}" response = requests.get(health_url) try: result = response.json() status = result.get("status", "") except Exception: result = {} status = "" if status == "healthy": return "🟢 Active", "Online", "success" elif response.status_code == 503: return "🟡 Degraded", "Some Issues", "warning" else: return "🟡 Limited", f"Status: {response.status_code}", "warning" except requests.exceptions.RequestException: # Fallback to socket check try: import socket # Extract host and port from base_url from urllib.parse import urlparse parsed = urlparse(self.base_url) host = parsed.hostname or "127.0.0.1" port = parsed.port or 8000 socket.create_connection((host, port), timeout=1).close() return "🟡 Reachable", "Port Open", "warning" except: return "🔴 Offline", "Connection Failed", "error" except Exception: return "🟡 Unknown", "Check Required", "warning" def get_detailed_health(self) -> Dict[str, Any]: """ Get detailed health information from the API. Returns: Dictionary containing detailed health status or error information """ try: health_url = f"{self.base_url}{self.endpoints['health']}" response = requests.get(health_url, timeout=5) if response.status_code in [200, 503]: return response.json() else: return { "status": "error", "message": f"Health endpoint returned status {response.status_code}", "checks": {} } except requests.exceptions.RequestException as e: return { "status": "error", "message": f"Connection failed: {str(e)}", "checks": {} } except Exception as e: return { "status": "error", "message": f"Health check failed: {str(e)}", "checks": {} } def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict[str, Any]]: """ Generic GET method for API endpoints Args: endpoint: API endpoint path (e.g., "/models") params: Optional query parameters Returns: API response as dictionary or None if failed """ try: url = f"{self.base_url}/api{endpoint}" if not endpoint.startswith('/api') else f"{self.base_url}{endpoint}" response = requests.get(url, params=params, timeout=10) if response.status_code == 200: return response.json() else: self.logger.error(f"GET {endpoint} failed with status {response.status_code}") return None except Exception as e: self.logger.error(f"GET {endpoint} error: {str(e)}") return None def post(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict[str, Any]]: """ Generic POST method for API endpoints Args: endpoint: API endpoint path data: Optional POST data Returns: API response as dictionary or None if failed """ try: url = f"{self.base_url}/api{endpoint}" if not endpoint.startswith('/api') else f"{self.base_url}{endpoint}" response = requests.post(url, json=data, timeout=10) if response.status_code in [200, 201]: return response.json() else: self.logger.error(f"POST {endpoint} failed with status {response.status_code}") return None except Exception as e: self.logger.error(f"POST {endpoint} error: {str(e)}") return None