File size: 10,130 Bytes
cce43bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
"""
API client for communicating with the backend service.
"""
import requests
import logging
from typing import Dict, Any, Tuple, Optional, List


class APIClient:
    
    def __init__(self, base_url: str, timeout: int = 60):
        self.base_url = base_url.rstrip('/')  # Remove trailing slash if present
        self.timeout = timeout  # Used only for health checks, not for query processing
        self.logger = logging.getLogger(__name__)
        
        # Define endpoints
        self.endpoints = {
            'process_text': '/api/process-text',
            'health': '/api/health',
            'models': '/api/models',
            'change_model': '/api/change-model'
        }
    
    def send_query(self, question: str, model: str = None, agent: str = None) -> Dict[str, Any]:
        """
        Send a query to the backend with optional model specification
        
        Args:
            question: The user's question
            model: Optional AI model name to use for processing
            agent: Agent type (legacy parameter, not used in current backend)
        """
        # Prepare payload with model_name for the new API
        payload = {"question": question}
        if model:
            payload["model_name"] = model
            
        self.logger.info(f"Sending API request with: {payload}")
        
        # Construct full URL
        full_url = f"{self.base_url}{self.endpoints['process_text']}"
        
        try:
            # Remove timeout for query processing - wait for actual result
            # Complex queries may take time and should not be interrupted
            response = requests.post(full_url, json=payload)
            return self._process_response(response)
        except requests.exceptions.ConnectionError as e:
            self.logger.error(f"Connection error: {e}")
            return {
                "message": "❌ Cannot connect to the server. Please check if the backend is running.",
                "rows": [],
                "chart": None,
                "error": True
            }
        except requests.exceptions.RequestException as e:
            self.logger.error(f"Request error: {e}")
            return {
                "message": f"❌ Request failed: {str(e)}",
                "rows": [],
                "chart": None,
                "error": True
            }
        except Exception as e:
            self.logger.error(f"Unexpected error: {e}")
            return {
                "message": f"⚠️ Exception: {str(e)}",
                "rows": [],
                "chart": None,
                "error": True
            }
    
    def _process_response(self, response: requests.Response) -> Dict[str, Any]:
        """Process the API response and extract relevant data."""
        try:
            result = response.json()
            self.logger.info(f"API response received: {type(result)} with keys: {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}")
        except ValueError as e:
            self.logger.error(f"Failed to parse JSON response: {e}")
            result = {"detail": response.text}
        
        if response.status_code != 200:
            error_msg = result.get('detail') or result.get('message') or response.text
            self.logger.error(f"API error response (status {response.status_code}): {error_msg}")
            return {
                "message": f"❌ Error: {error_msg}",
                "rows": [],
                "chart": None,
                "error": True,
                "model_used": result.get('model_used', 'unknown') if isinstance(result, dict) else 'unknown'
            }
        
        # Handle message-only responses (chat responses, errors, etc.)
        if "message" in result and not ("rows" in result or "chart" in result):
            self.logger.info("Processing message-only response")
            return {
                "message": result["message"],
                "rows": [],
                "chart": None,
                "error": False,
                "model_used": result.get("model_used", "unknown"),
                "status": result.get("status", "message")
            }
        
        # Handle data responses (successful SQL queries)
        rows = result.get("rows", [])
        chart = result.get("chart", None)
        heading = result.get("summary", "")
        sql = result.get("sql", "")
        
        self.logger.info(f"Processing data response - Rows: {len(rows) if isinstance(rows, list) else 'Not a list'}, Heading: {heading}, SQL length: {len(sql) if sql else 0}")
        
        # Use heading from backend as-is (backend already parsed JSON and model includes record count)
        if heading and isinstance(heading, str) and heading.strip():
            message = heading.strip()
        else:
            # Fallback message if no heading provided
            if rows and len(rows) > 0:
                message = f"Here are the {len(rows)} results I found:"
            else:
                message = "I could not find matching records for your query."
        
        processed_response = {
            "message": message,
            "rows": rows,
            "chart": chart,
            "error": False,
            "model_used": result.get("model_used", "unknown"),
            "status": result.get("status", "success"),
            "sql": sql,
            "heading": heading,  # Clean heading from backend
            "summary": result.get("summary", "")  # Summary from backend if available
        }
        
        self.logger.info(f"Final processed response keys: {list(processed_response.keys())}")
        return processed_response

    def check_health(self) -> Tuple[str, str, str]:
      
        try:
            # Construct health URL
            health_url = f"{self.base_url}{self.endpoints['health']}"
            response = requests.get(health_url)
            try:
                result = response.json()
                status = result.get("status", "")
            except Exception:
                result = {}
                status = ""
            
            if status == "healthy":
                return "🟒 Active", "Online", "success"
            elif response.status_code == 503:
                return "🟑 Degraded", "Some Issues", "warning"
            else:
                return "🟑 Limited", f"Status: {response.status_code}", "warning"
        except requests.exceptions.RequestException:
            # Fallback to socket check
            try:
                import socket
                # Extract host and port from base_url
                from urllib.parse import urlparse
                parsed = urlparse(self.base_url)
                host = parsed.hostname or "127.0.0.1"
                port = parsed.port or 8000
                socket.create_connection((host, port), timeout=1).close()
                return "🟑 Reachable", "Port Open", "warning"
            except:
                return "πŸ”΄ Offline", "Connection Failed", "error"
        except Exception:
            return "🟑 Unknown", "Check Required", "warning"

    def get_detailed_health(self) -> Dict[str, Any]:
        """
        Get detailed health information from the API.
        
        Returns:
            Dictionary containing detailed health status or error information
        """
        try:
            health_url = f"{self.base_url}{self.endpoints['health']}"
            response = requests.get(health_url, timeout=5)
            
            if response.status_code in [200, 503]:
                return response.json()
            else:
                return {
                    "status": "error",
                    "message": f"Health endpoint returned status {response.status_code}",
                    "checks": {}
                }
                
        except requests.exceptions.RequestException as e:
            return {
                "status": "error", 
                "message": f"Connection failed: {str(e)}",
                "checks": {}
            }
        except Exception as e:
            return {
                "status": "error",
                "message": f"Health check failed: {str(e)}",
                "checks": {}
            }
    
    def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
        """
        Generic GET method for API endpoints
        
        Args:
            endpoint: API endpoint path (e.g., "/models")
            params: Optional query parameters
            
        Returns:
            API response as dictionary or None if failed
        """
        try:
            url = f"{self.base_url}/api{endpoint}" if not endpoint.startswith('/api') else f"{self.base_url}{endpoint}"
            response = requests.get(url, params=params, timeout=10)
            
            if response.status_code == 200:
                return response.json()
            else:
                self.logger.error(f"GET {endpoint} failed with status {response.status_code}")
                return None
                
        except Exception as e:
            self.logger.error(f"GET {endpoint} error: {str(e)}")
            return None
    
    def post(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
        """
        Generic POST method for API endpoints
        
        Args:
            endpoint: API endpoint path
            data: Optional POST data
            
        Returns:
            API response as dictionary or None if failed
        """
        try:
            url = f"{self.base_url}/api{endpoint}" if not endpoint.startswith('/api') else f"{self.base_url}{endpoint}"
            response = requests.post(url, json=data, timeout=10)
            
            if response.status_code in [200, 201]:
                return response.json()
            else:
                self.logger.error(f"POST {endpoint} failed with status {response.status_code}")
                return None
                
        except Exception as e:
            self.logger.error(f"POST {endpoint} error: {str(e)}")
            return None