AISqlGeneratorApp / src /api_client.py
Vivek0912's picture
added new code
cce43bc
"""
API client for communicating with the backend service.
"""
import requests
import logging
from typing import Dict, Any, Tuple, Optional, List
class APIClient:
def __init__(self, base_url: str, timeout: int = 60):
self.base_url = base_url.rstrip('/') # Remove trailing slash if present
self.timeout = timeout # Used only for health checks, not for query processing
self.logger = logging.getLogger(__name__)
# Define endpoints
self.endpoints = {
'process_text': '/api/process-text',
'health': '/api/health',
'models': '/api/models',
'change_model': '/api/change-model'
}
def send_query(self, question: str, model: str = None, agent: str = None) -> Dict[str, Any]:
"""
Send a query to the backend with optional model specification
Args:
question: The user's question
model: Optional AI model name to use for processing
agent: Agent type (legacy parameter, not used in current backend)
"""
# Prepare payload with model_name for the new API
payload = {"question": question}
if model:
payload["model_name"] = model
self.logger.info(f"Sending API request with: {payload}")
# Construct full URL
full_url = f"{self.base_url}{self.endpoints['process_text']}"
try:
# Remove timeout for query processing - wait for actual result
# Complex queries may take time and should not be interrupted
response = requests.post(full_url, json=payload)
return self._process_response(response)
except requests.exceptions.ConnectionError as e:
self.logger.error(f"Connection error: {e}")
return {
"message": "❌ Cannot connect to the server. Please check if the backend is running.",
"rows": [],
"chart": None,
"error": True
}
except requests.exceptions.RequestException as e:
self.logger.error(f"Request error: {e}")
return {
"message": f"❌ Request failed: {str(e)}",
"rows": [],
"chart": None,
"error": True
}
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
return {
"message": f"⚠️ Exception: {str(e)}",
"rows": [],
"chart": None,
"error": True
}
def _process_response(self, response: requests.Response) -> Dict[str, Any]:
"""Process the API response and extract relevant data."""
try:
result = response.json()
self.logger.info(f"API response received: {type(result)} with keys: {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}")
except ValueError as e:
self.logger.error(f"Failed to parse JSON response: {e}")
result = {"detail": response.text}
if response.status_code != 200:
error_msg = result.get('detail') or result.get('message') or response.text
self.logger.error(f"API error response (status {response.status_code}): {error_msg}")
return {
"message": f"❌ Error: {error_msg}",
"rows": [],
"chart": None,
"error": True,
"model_used": result.get('model_used', 'unknown') if isinstance(result, dict) else 'unknown'
}
# Handle message-only responses (chat responses, errors, etc.)
if "message" in result and not ("rows" in result or "chart" in result):
self.logger.info("Processing message-only response")
return {
"message": result["message"],
"rows": [],
"chart": None,
"error": False,
"model_used": result.get("model_used", "unknown"),
"status": result.get("status", "message")
}
# Handle data responses (successful SQL queries)
rows = result.get("rows", [])
chart = result.get("chart", None)
heading = result.get("summary", "")
sql = result.get("sql", "")
self.logger.info(f"Processing data response - Rows: {len(rows) if isinstance(rows, list) else 'Not a list'}, Heading: {heading}, SQL length: {len(sql) if sql else 0}")
# Use heading from backend as-is (backend already parsed JSON and model includes record count)
if heading and isinstance(heading, str) and heading.strip():
message = heading.strip()
else:
# Fallback message if no heading provided
if rows and len(rows) > 0:
message = f"Here are the {len(rows)} results I found:"
else:
message = "I could not find matching records for your query."
processed_response = {
"message": message,
"rows": rows,
"chart": chart,
"error": False,
"model_used": result.get("model_used", "unknown"),
"status": result.get("status", "success"),
"sql": sql,
"heading": heading, # Clean heading from backend
"summary": result.get("summary", "") # Summary from backend if available
}
self.logger.info(f"Final processed response keys: {list(processed_response.keys())}")
return processed_response
def check_health(self) -> Tuple[str, str, str]:
try:
# Construct health URL
health_url = f"{self.base_url}{self.endpoints['health']}"
response = requests.get(health_url)
try:
result = response.json()
status = result.get("status", "")
except Exception:
result = {}
status = ""
if status == "healthy":
return "🟒 Active", "Online", "success"
elif response.status_code == 503:
return "🟑 Degraded", "Some Issues", "warning"
else:
return "🟑 Limited", f"Status: {response.status_code}", "warning"
except requests.exceptions.RequestException:
# Fallback to socket check
try:
import socket
# Extract host and port from base_url
from urllib.parse import urlparse
parsed = urlparse(self.base_url)
host = parsed.hostname or "127.0.0.1"
port = parsed.port or 8000
socket.create_connection((host, port), timeout=1).close()
return "🟑 Reachable", "Port Open", "warning"
except:
return "πŸ”΄ Offline", "Connection Failed", "error"
except Exception:
return "🟑 Unknown", "Check Required", "warning"
def get_detailed_health(self) -> Dict[str, Any]:
"""
Get detailed health information from the API.
Returns:
Dictionary containing detailed health status or error information
"""
try:
health_url = f"{self.base_url}{self.endpoints['health']}"
response = requests.get(health_url, timeout=5)
if response.status_code in [200, 503]:
return response.json()
else:
return {
"status": "error",
"message": f"Health endpoint returned status {response.status_code}",
"checks": {}
}
except requests.exceptions.RequestException as e:
return {
"status": "error",
"message": f"Connection failed: {str(e)}",
"checks": {}
}
except Exception as e:
return {
"status": "error",
"message": f"Health check failed: {str(e)}",
"checks": {}
}
def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
"""
Generic GET method for API endpoints
Args:
endpoint: API endpoint path (e.g., "/models")
params: Optional query parameters
Returns:
API response as dictionary or None if failed
"""
try:
url = f"{self.base_url}/api{endpoint}" if not endpoint.startswith('/api') else f"{self.base_url}{endpoint}"
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
return response.json()
else:
self.logger.error(f"GET {endpoint} failed with status {response.status_code}")
return None
except Exception as e:
self.logger.error(f"GET {endpoint} error: {str(e)}")
return None
def post(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
"""
Generic POST method for API endpoints
Args:
endpoint: API endpoint path
data: Optional POST data
Returns:
API response as dictionary or None if failed
"""
try:
url = f"{self.base_url}/api{endpoint}" if not endpoint.startswith('/api') else f"{self.base_url}{endpoint}"
response = requests.post(url, json=data, timeout=10)
if response.status_code in [200, 201]:
return response.json()
else:
self.logger.error(f"POST {endpoint} failed with status {response.status_code}")
return None
except Exception as e:
self.logger.error(f"POST {endpoint} error: {str(e)}")
return None