Spaces:
Sleeping
Sleeping
""" | |
API client for communicating with the backend service. | |
""" | |
import requests | |
import logging | |
from typing import Dict, Any, Tuple, Optional, List | |
class APIClient: | |
def __init__(self, base_url: str, timeout: int = 60): | |
self.base_url = base_url.rstrip('/') # Remove trailing slash if present | |
self.timeout = timeout # Used only for health checks, not for query processing | |
self.logger = logging.getLogger(__name__) | |
# Define endpoints | |
self.endpoints = { | |
'process_text': '/api/process-text', | |
'health': '/api/health', | |
'models': '/api/models', | |
'change_model': '/api/change-model' | |
} | |
def send_query(self, question: str, model: str = None, agent: str = None) -> Dict[str, Any]: | |
""" | |
Send a query to the backend with optional model specification | |
Args: | |
question: The user's question | |
model: Optional AI model name to use for processing | |
agent: Agent type (legacy parameter, not used in current backend) | |
""" | |
# Prepare payload with model_name for the new API | |
payload = {"question": question} | |
if model: | |
payload["model_name"] = model | |
self.logger.info(f"Sending API request with: {payload}") | |
# Construct full URL | |
full_url = f"{self.base_url}{self.endpoints['process_text']}" | |
try: | |
# Remove timeout for query processing - wait for actual result | |
# Complex queries may take time and should not be interrupted | |
response = requests.post(full_url, json=payload) | |
return self._process_response(response) | |
except requests.exceptions.ConnectionError as e: | |
self.logger.error(f"Connection error: {e}") | |
return { | |
"message": "β Cannot connect to the server. Please check if the backend is running.", | |
"rows": [], | |
"chart": None, | |
"error": True | |
} | |
except requests.exceptions.RequestException as e: | |
self.logger.error(f"Request error: {e}") | |
return { | |
"message": f"β Request failed: {str(e)}", | |
"rows": [], | |
"chart": None, | |
"error": True | |
} | |
except Exception as e: | |
self.logger.error(f"Unexpected error: {e}") | |
return { | |
"message": f"β οΈ Exception: {str(e)}", | |
"rows": [], | |
"chart": None, | |
"error": True | |
} | |
def _process_response(self, response: requests.Response) -> Dict[str, Any]: | |
"""Process the API response and extract relevant data.""" | |
try: | |
result = response.json() | |
self.logger.info(f"API response received: {type(result)} with keys: {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}") | |
except ValueError as e: | |
self.logger.error(f"Failed to parse JSON response: {e}") | |
result = {"detail": response.text} | |
if response.status_code != 200: | |
error_msg = result.get('detail') or result.get('message') or response.text | |
self.logger.error(f"API error response (status {response.status_code}): {error_msg}") | |
return { | |
"message": f"β Error: {error_msg}", | |
"rows": [], | |
"chart": None, | |
"error": True, | |
"model_used": result.get('model_used', 'unknown') if isinstance(result, dict) else 'unknown' | |
} | |
# Handle message-only responses (chat responses, errors, etc.) | |
if "message" in result and not ("rows" in result or "chart" in result): | |
self.logger.info("Processing message-only response") | |
return { | |
"message": result["message"], | |
"rows": [], | |
"chart": None, | |
"error": False, | |
"model_used": result.get("model_used", "unknown"), | |
"status": result.get("status", "message") | |
} | |
# Handle data responses (successful SQL queries) | |
rows = result.get("rows", []) | |
chart = result.get("chart", None) | |
heading = result.get("summary", "") | |
sql = result.get("sql", "") | |
self.logger.info(f"Processing data response - Rows: {len(rows) if isinstance(rows, list) else 'Not a list'}, Heading: {heading}, SQL length: {len(sql) if sql else 0}") | |
# Use heading from backend as-is (backend already parsed JSON and model includes record count) | |
if heading and isinstance(heading, str) and heading.strip(): | |
message = heading.strip() | |
else: | |
# Fallback message if no heading provided | |
if rows and len(rows) > 0: | |
message = f"Here are the {len(rows)} results I found:" | |
else: | |
message = "I could not find matching records for your query." | |
processed_response = { | |
"message": message, | |
"rows": rows, | |
"chart": chart, | |
"error": False, | |
"model_used": result.get("model_used", "unknown"), | |
"status": result.get("status", "success"), | |
"sql": sql, | |
"heading": heading, # Clean heading from backend | |
"summary": result.get("summary", "") # Summary from backend if available | |
} | |
self.logger.info(f"Final processed response keys: {list(processed_response.keys())}") | |
return processed_response | |
def check_health(self) -> Tuple[str, str, str]: | |
try: | |
# Construct health URL | |
health_url = f"{self.base_url}{self.endpoints['health']}" | |
response = requests.get(health_url) | |
try: | |
result = response.json() | |
status = result.get("status", "") | |
except Exception: | |
result = {} | |
status = "" | |
if status == "healthy": | |
return "π’ Active", "Online", "success" | |
elif response.status_code == 503: | |
return "π‘ Degraded", "Some Issues", "warning" | |
else: | |
return "π‘ Limited", f"Status: {response.status_code}", "warning" | |
except requests.exceptions.RequestException: | |
# Fallback to socket check | |
try: | |
import socket | |
# Extract host and port from base_url | |
from urllib.parse import urlparse | |
parsed = urlparse(self.base_url) | |
host = parsed.hostname or "127.0.0.1" | |
port = parsed.port or 8000 | |
socket.create_connection((host, port), timeout=1).close() | |
return "π‘ Reachable", "Port Open", "warning" | |
except: | |
return "π΄ Offline", "Connection Failed", "error" | |
except Exception: | |
return "π‘ Unknown", "Check Required", "warning" | |
def get_detailed_health(self) -> Dict[str, Any]: | |
""" | |
Get detailed health information from the API. | |
Returns: | |
Dictionary containing detailed health status or error information | |
""" | |
try: | |
health_url = f"{self.base_url}{self.endpoints['health']}" | |
response = requests.get(health_url, timeout=5) | |
if response.status_code in [200, 503]: | |
return response.json() | |
else: | |
return { | |
"status": "error", | |
"message": f"Health endpoint returned status {response.status_code}", | |
"checks": {} | |
} | |
except requests.exceptions.RequestException as e: | |
return { | |
"status": "error", | |
"message": f"Connection failed: {str(e)}", | |
"checks": {} | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Health check failed: {str(e)}", | |
"checks": {} | |
} | |
def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict[str, Any]]: | |
""" | |
Generic GET method for API endpoints | |
Args: | |
endpoint: API endpoint path (e.g., "/models") | |
params: Optional query parameters | |
Returns: | |
API response as dictionary or None if failed | |
""" | |
try: | |
url = f"{self.base_url}/api{endpoint}" if not endpoint.startswith('/api') else f"{self.base_url}{endpoint}" | |
response = requests.get(url, params=params, timeout=10) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
self.logger.error(f"GET {endpoint} failed with status {response.status_code}") | |
return None | |
except Exception as e: | |
self.logger.error(f"GET {endpoint} error: {str(e)}") | |
return None | |
def post(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict[str, Any]]: | |
""" | |
Generic POST method for API endpoints | |
Args: | |
endpoint: API endpoint path | |
data: Optional POST data | |
Returns: | |
API response as dictionary or None if failed | |
""" | |
try: | |
url = f"{self.base_url}/api{endpoint}" if not endpoint.startswith('/api') else f"{self.base_url}{endpoint}" | |
response = requests.post(url, json=data, timeout=10) | |
if response.status_code in [200, 201]: | |
return response.json() | |
else: | |
self.logger.error(f"POST {endpoint} failed with status {response.status_code}") | |
return None | |
except Exception as e: | |
self.logger.error(f"POST {endpoint} error: {str(e)}") | |
return None | |