File size: 18,412 Bytes
1049797
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e3dbdb
 
1049797
 
 
 
 
 
 
 
6e3dbdb
 
1049797
 
 
6e3dbdb
1049797
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e3dbdb
 
1049797
 
 
 
 
 
 
 
6e3dbdb
 
1049797
 
 
6e3dbdb
1049797
 
 
 
 
 
 
 
6e3dbdb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1049797
 
 
 
 
 
 
 
 
6e3dbdb
 
 
 
 
 
 
 
 
 
 
 
 
1049797
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e3dbdb
1049797
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01dfef8
1049797
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e3dbdb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1049797
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# models/parallel_processor.py

import multiprocessing as mp
import concurrent.futures
import asyncio
import threading
from functools import partial
from typing import Dict, Any, List, Tuple
from .logging_config import logger

class ParallelProcessor:
    """Handles parallel processing of property verification analyses"""
    
    def __init__(self, max_workers=None):
        self.max_workers = max_workers or min(mp.cpu_count(), 8)
        self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
        self.process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=min(4, mp.cpu_count()))
        
    def __del__(self):
        self.thread_pool.shutdown(wait=True)
        self.process_pool.shutdown(wait=True)
    
    def process_images_parallel(self, image_files):
        """Process multiple images in parallel"""
        try:
            max_workers = min(8, mp.cpu_count(), len(image_files)) if image_files else 1
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = []
                for img_file in image_files:
                    future = executor.submit(self._process_single_image, img_file)
                    futures.append(future)
                results = []
                for future in concurrent.futures.as_completed(futures):
                    try:
                        result = future.result(timeout=30)
                        if isinstance(result, dict):
                            result['parallelization_info'] = {'worker_count': max_workers}
                        results.append(result)
                    except Exception as e:
                        logger.error(f"Error processing image: {str(e)}")
                        results.append({'error': str(e), 'is_property_related': False, 'parallelization_info': {'worker_count': max_workers}})
                return results
        except Exception as e:
            logger.error(f"Error in parallel image processing: {str(e)}")
            return []
    
    def _process_single_image(self, img_file):
        """Process a single image"""
        try:
            from PIL import Image
            import base64
            import io
            from .image_analysis import analyze_image
            
            img = Image.open(img_file)
            buffered = io.BytesIO()
            img.save(buffered, format="JPEG")
            img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
            
            analysis = analyze_image(img)
            return {
                'image_data': img_str,
                'analysis': analysis
            }
        except Exception as e:
            logger.error(f"Error processing image {img_file.filename}: {str(e)}")
            return {'error': str(e), 'is_property_related': False}
    
    def process_pdfs_parallel(self, pdf_files):
        """Process multiple PDFs in parallel"""
        try:
            max_workers = min(8, mp.cpu_count(), len(pdf_files)) if pdf_files else 1
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = []
                for pdf_file in pdf_files:
                    future = executor.submit(self._process_single_pdf, pdf_file)
                    futures.append(future)
                results = []
                for future in concurrent.futures.as_completed(futures):
                    try:
                        result = future.result(timeout=60)
                        if isinstance(result, dict):
                            result['parallelization_info'] = {'worker_count': max_workers}
                        results.append(result)
                    except Exception as e:
                        logger.error(f"Error processing PDF: {str(e)}")
                        results.append({'error': str(e), 'parallelization_info': {'worker_count': max_workers}})
                return results
        except Exception as e:
            logger.error(f"Error in parallel PDF processing: {str(e)}")
            return []
    
    def _process_single_pdf(self, pdf_file):
        """Process a single PDF"""
        try:
            from .pdf_analysis import extract_text_from_pdf, analyze_pdf_content
            
            # Ensure pdf_file is a file object, not a dict
            if hasattr(pdf_file, 'read'):
                pdf_text = extract_text_from_pdf(pdf_file)
            else:
                logger.error(f"Invalid PDF file object: {type(pdf_file)}")
                return {
                    'filename': getattr(pdf_file, 'filename', 'unknown.pdf'),
                    'text': '',
                    'analysis': {
                        'is_property_related': False,
                        'confidence': 0.0,
                        'summary': 'Invalid PDF file object',
                        'verification_score': 0.0,
                        'model_used': 'static_fallback',
                        'error': 'Invalid PDF file object'
                    }
                }
            
            analysis = analyze_pdf_content(pdf_text, {})
            
            return {
                'filename': pdf_file.filename,
                'text': pdf_text,
                'analysis': analysis
            }
        except Exception as e:
            logger.error(f"Error processing PDF {getattr(pdf_file, 'filename', 'unknown.pdf')}: {str(e)}")
            return {
                'filename': getattr(pdf_file, 'filename', 'unknown.pdf'),
                'text': '',
                'analysis': {
                    'is_property_related': False,
                    'confidence': 0.0,
                    'summary': f'Error processing PDF: {str(e)}',
                    'verification_score': 0.0,
                    'model_used': 'static_fallback',
                    'error': str(e)
                }
            }
    
    async def run_analyses_parallel(self, data, consolidated_text, image_analysis, pdf_analysis):
        """Run all analyses in parallel using asyncio and thread pools"""
        try:
            # Prepare property data for price analysis
            property_data = self._prepare_property_data(data)
            price_context = self._create_price_context(data)
            
            # Define analysis tasks with their respective functions
            analysis_tasks = [
                ('summary', self._run_summary_analysis, data),
                ('fraud', self._run_fraud_analysis, consolidated_text, data),
                ('trust', self._run_trust_analysis, consolidated_text, image_analysis, pdf_analysis),
                ('suggestions', self._run_suggestions_analysis, consolidated_text, data),
                ('quality', self._run_quality_analysis, data.get('description_translated', '')),
                ('address', self._run_address_analysis, data),
                ('cross_validation', self._run_cross_validation_analysis, data),
                ('location', self._run_location_analysis, data),
                ('price', self._run_price_analysis, data, price_context, property_data),
                ('legal', self._run_legal_analysis, data.get('legal_details', '')),
                ('specs', self._run_specs_analysis, data),
                ('market', self._run_market_analysis, data)
            ]
            
            # Run tasks in parallel with timeout
            loop = asyncio.get_event_loop()
            tasks = []
            
            for task_name, func, *args in analysis_tasks:
                task = loop.run_in_executor(
                    self.thread_pool, 
                    func, 
                    *args
                )
                tasks.append((task_name, task))
            
            # Wait for all tasks to complete with timeout
            results = {}
            for task_name, task in tasks:
                try:
                    result = await asyncio.wait_for(task, timeout=60)  # Reduced from 120 to 60 seconds
                    results[task_name] = result
                except asyncio.TimeoutError:
                    logger.error(f"Task {task_name} timed out")
                    results[task_name] = self._get_error_result(f"Task {task_name} timed out")
                except Exception as e:
                    logger.error(f"Task {task_name} failed: {str(e)}")
                    results[task_name] = self._get_error_result(f"Task {task_name} failed: {str(e)}")
            
            return results
            
        except Exception as e:
            logger.error(f"Error in parallel analyses: {str(e)}")
            return self._get_all_error_results(str(e))
    
    def _prepare_property_data(self, data):
        """Prepare property data for price analysis"""
        property_data = {}
        try:
            if data.get('sq_ft'):
                property_data['size'] = float(data['sq_ft'])
            if data.get('market_value'):
                property_data['price'] = float(data['market_value'].replace('₹', '').replace(',', ''))
            if data.get('year_built'):
                from datetime import datetime
                current_year = datetime.now().year
                property_data['property_age'] = current_year - int(data['year_built'])
        except Exception as e:
            logger.warning(f"Error preparing property data: {str(e)}")
        return property_data
    
    def _create_price_context(self, data):
        """Create context text for price analysis"""
        return f"""
        Property: {data.get('property_name', '')}
        Type: {data.get('property_type', '')}
        Location: {data.get('address', '')}, {data.get('city', '')}, {data.get('state', '')}
        Size: {data.get('sq_ft', '')} sq ft
        Market Value: ₹{data.get('market_value', '')}
        Description: {data.get('description', '')}
        Amenities: {data.get('amenities', '')}
        """
    
    def _run_summary_analysis(self, data):
        """Run property summary analysis"""
        try:
            from .property_summary import generate_property_summary
            return generate_property_summary(data)
        except Exception as e:
            logger.error(f"Error in summary analysis: {str(e)}")
            return "Property summary unavailable."
    
    def _run_fraud_analysis(self, consolidated_text, data):
        """Run fraud classification analysis"""
        try:
            from .fraud_classification import classify_fraud
            return classify_fraud(data, consolidated_text)
        except Exception as e:
            logger.error(f"Error in fraud analysis: {str(e)}")
            return self._get_error_result("Fraud analysis failed")
    
    def _run_trust_analysis(self, consolidated_text, image_analysis, pdf_analysis):
        """Run trust score analysis"""
        try:
            from .trust_score import generate_trust_score
            return generate_trust_score(consolidated_text, image_analysis, pdf_analysis)
        except Exception as e:
            logger.error(f"Error in trust analysis: {str(e)}")
            return (0.0, "Trust analysis failed")
    
    def _run_suggestions_analysis(self, consolidated_text, data):
        """Run suggestions analysis"""
        try:
            from .suggestions import generate_suggestions
            return generate_suggestions(consolidated_text, data)
        except Exception as e:
            logger.error(f"Error in suggestions analysis: {str(e)}")
            return self._get_error_result("Suggestions analysis failed")
    
    def _run_quality_analysis(self, description):
        """Run text quality analysis"""
        try:
            from .text_quality import assess_text_quality
            return assess_text_quality(description)
        except Exception as e:
            logger.error(f"Error in quality analysis: {str(e)}")
            return self._get_error_result("Quality analysis failed")
    
    def _run_address_analysis(self, data):
        """Run address verification analysis"""
        try:
            from .address_verification import verify_address
            return verify_address(data)
        except Exception as e:
            logger.error(f"Error in address analysis: {str(e)}")
            return self._get_error_result("Address analysis failed")
    
    def _run_cross_validation_analysis(self, data):
        """Run cross validation analysis"""
        try:
            from .cross_validation import perform_cross_validation
            return perform_cross_validation(data)
        except Exception as e:
            logger.error(f"Error in cross validation analysis: {str(e)}")
            return self._get_error_result("Cross validation analysis failed")
    
    def _run_location_analysis(self, data):
        """Run location analysis"""
        try:
            from .location_analysis import analyze_location
            return analyze_location(data)
        except Exception as e:
            logger.error(f"Error in location analysis: {str(e)}")
            return self._get_error_result("Location analysis failed")
    
    def _run_price_analysis(self, data, price_context, property_data):
        """Run price analysis"""
        try:
            from .price_analysis import analyze_price
            # Pass rental information to price analysis
            return analyze_price(data, price_context, data.get('latitude'), data.get('longitude'), property_data)
        except Exception as e:
            logger.error(f"Error in price analysis: {str(e)}")
            return self._get_error_result("Price analysis failed")
    
    def _run_legal_analysis(self, legal_details):
        """Run legal analysis"""
        try:
            from .legal_analysis import analyze_legal_details
            return analyze_legal_details(legal_details)
        except Exception as e:
            logger.error(f"Error in legal analysis: {str(e)}")
            return self._get_error_result("Legal analysis failed")
    
    def _run_specs_analysis(self, data):
        """Run property specs analysis"""
        try:
            from .property_specs import verify_property_specs
            return verify_property_specs(data)
        except Exception as e:
            logger.error(f"Error in specs analysis: {str(e)}")
            return self._get_error_result("Specs analysis failed")
    
    def _run_market_analysis(self, data):
        """Run market value analysis"""
        try:
            from .market_value import analyze_market_value
            return analyze_market_value(data)
        except Exception as e:
            logger.error(f"Error in market analysis: {str(e)}")
            return self._get_error_result("Market analysis failed")
    
    def _get_error_result(self, error_message):
        """Get a standardized error result"""
        return {
            'error': error_message,
            'status': 'error',
            'confidence': 0.0
        }
    
    def _get_all_error_results(self, error_message):
        """Get error results for all analyses"""
        return {
            'summary': "Analysis failed",
            'fraud': self._get_error_result(error_message),
            'trust': (0.0, error_message),
            'suggestions': self._get_error_result(error_message),
            'quality': self._get_error_result(error_message),
            'address': self._get_error_result(error_message),
            'cross_validation': self._get_error_result(error_message),
            'location': self._get_error_result(error_message),
            'price': self._get_error_result(error_message),
            'legal': self._get_error_result(error_message),
            'specs': self._get_error_result(error_message),
            'market': self._get_error_result(error_message)
        }

    async def _process_pdf_async(self, pdf_file, property_data):
        """Process a single PDF file asynchronously"""
        try:
            from .pdf_analysis import extract_text_from_pdf, analyze_pdf_content
            
            # Ensure pdf_file is a file object, not a dict
            if hasattr(pdf_file, 'read'):
                # Extract text from PDF
                text = extract_text_from_pdf(pdf_file)
                if not text:
                    return {
                        'filename': pdf_file.filename,
                        'text': '',
                        'analysis': {
                            'is_property_related': False,
                            'confidence': 0.0,
                            'summary': 'No text extracted from PDF',
                            'verification_score': 0.0,
                            'model_used': 'static_fallback'
                        }
                    }
                
                # Analyze the content
                analysis = analyze_pdf_content(text, property_data)
                
                return {
                    'filename': pdf_file.filename,
                    'text': text,
                    'analysis': analysis
                }
            else:
                logger.error(f"Invalid PDF file object in async processing: {type(pdf_file)}")
                return {
                    'filename': getattr(pdf_file, 'filename', 'unknown.pdf'),
                    'text': '',
                    'analysis': {
                        'is_property_related': False,
                        'confidence': 0.0,
                        'summary': 'Invalid PDF file object',
                        'verification_score': 0.0,
                        'model_used': 'static_fallback',
                        'error': 'Invalid PDF file object'
                    }
                }
            
        except Exception as e:
            logger.error(f"Error processing PDF {getattr(pdf_file, 'filename', 'unknown.pdf')}: {str(e)}")
            return {
                'filename': getattr(pdf_file, 'filename', 'unknown.pdf'),
                'text': '',
                'analysis': {
                    'is_property_related': False,
                    'confidence': 0.0,
                    'summary': f'Error processing PDF: {str(e)}',
                    'verification_score': 0.0,
                    'model_used': 'static_fallback',
                    'error': str(e)
                }
            }

# Global instance for easy import
parallel_processor = ParallelProcessor()