# models/parallel_processor.py import multiprocessing as mp import concurrent.futures import asyncio import threading from functools import partial from typing import Dict, Any, List, Tuple from .logging_config import logger class ParallelProcessor: """Handles parallel processing of property verification analyses""" def __init__(self, max_workers=None): self.max_workers = max_workers or min(mp.cpu_count(), 8) self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) self.process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=min(4, mp.cpu_count())) def __del__(self): self.thread_pool.shutdown(wait=True) self.process_pool.shutdown(wait=True) def process_images_parallel(self, image_files): """Process multiple images in parallel""" try: max_workers = min(8, mp.cpu_count(), len(image_files)) if image_files else 1 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for img_file in image_files: future = executor.submit(self._process_single_image, img_file) futures.append(future) results = [] for future in concurrent.futures.as_completed(futures): try: result = future.result(timeout=30) if isinstance(result, dict): result['parallelization_info'] = {'worker_count': max_workers} results.append(result) except Exception as e: logger.error(f"Error processing image: {str(e)}") results.append({'error': str(e), 'is_property_related': False, 'parallelization_info': {'worker_count': max_workers}}) return results except Exception as e: logger.error(f"Error in parallel image processing: {str(e)}") return [] def _process_single_image(self, img_file): """Process a single image""" try: from PIL import Image import base64 import io from .image_analysis import analyze_image img = Image.open(img_file) buffered = io.BytesIO() img.save(buffered, format="JPEG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') analysis = analyze_image(img) return { 'image_data': img_str, 'analysis': analysis } except Exception as e: logger.error(f"Error processing image {img_file.filename}: {str(e)}") return {'error': str(e), 'is_property_related': False} def process_pdfs_parallel(self, pdf_files): """Process multiple PDFs in parallel""" try: max_workers = min(8, mp.cpu_count(), len(pdf_files)) if pdf_files else 1 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for pdf_file in pdf_files: future = executor.submit(self._process_single_pdf, pdf_file) futures.append(future) results = [] for future in concurrent.futures.as_completed(futures): try: result = future.result(timeout=60) if isinstance(result, dict): result['parallelization_info'] = {'worker_count': max_workers} results.append(result) except Exception as e: logger.error(f"Error processing PDF: {str(e)}") results.append({'error': str(e), 'parallelization_info': {'worker_count': max_workers}}) return results except Exception as e: logger.error(f"Error in parallel PDF processing: {str(e)}") return [] def _process_single_pdf(self, pdf_file): """Process a single PDF""" try: from .pdf_analysis import extract_text_from_pdf, analyze_pdf_content # Ensure pdf_file is a file object, not a dict if hasattr(pdf_file, 'read'): pdf_text = extract_text_from_pdf(pdf_file) else: logger.error(f"Invalid PDF file object: {type(pdf_file)}") return { 'filename': getattr(pdf_file, 'filename', 'unknown.pdf'), 'text': '', 'analysis': { 'is_property_related': False, 'confidence': 0.0, 'summary': 'Invalid PDF file object', 'verification_score': 0.0, 'model_used': 'static_fallback', 'error': 'Invalid PDF file object' } } analysis = analyze_pdf_content(pdf_text, {}) return { 'filename': pdf_file.filename, 'text': pdf_text, 'analysis': analysis } except Exception as e: logger.error(f"Error processing PDF {getattr(pdf_file, 'filename', 'unknown.pdf')}: {str(e)}") return { 'filename': getattr(pdf_file, 'filename', 'unknown.pdf'), 'text': '', 'analysis': { 'is_property_related': False, 'confidence': 0.0, 'summary': f'Error processing PDF: {str(e)}', 'verification_score': 0.0, 'model_used': 'static_fallback', 'error': str(e) } } async def run_analyses_parallel(self, data, consolidated_text, image_analysis, pdf_analysis): """Run all analyses in parallel using asyncio and thread pools""" try: # Prepare property data for price analysis property_data = self._prepare_property_data(data) price_context = self._create_price_context(data) # Define analysis tasks with their respective functions analysis_tasks = [ ('summary', self._run_summary_analysis, data), ('fraud', self._run_fraud_analysis, consolidated_text, data), ('trust', self._run_trust_analysis, consolidated_text, image_analysis, pdf_analysis), ('suggestions', self._run_suggestions_analysis, consolidated_text, data), ('quality', self._run_quality_analysis, data.get('description_translated', '')), ('address', self._run_address_analysis, data), ('cross_validation', self._run_cross_validation_analysis, data), ('location', self._run_location_analysis, data), ('price', self._run_price_analysis, data, price_context, property_data), ('legal', self._run_legal_analysis, data.get('legal_details', '')), ('specs', self._run_specs_analysis, data), ('market', self._run_market_analysis, data) ] # Run tasks in parallel with timeout loop = asyncio.get_event_loop() tasks = [] for task_name, func, *args in analysis_tasks: task = loop.run_in_executor( self.thread_pool, func, *args ) tasks.append((task_name, task)) # Wait for all tasks to complete with timeout results = {} for task_name, task in tasks: try: result = await asyncio.wait_for(task, timeout=60) # Reduced from 120 to 60 seconds results[task_name] = result except asyncio.TimeoutError: logger.error(f"Task {task_name} timed out") results[task_name] = self._get_error_result(f"Task {task_name} timed out") except Exception as e: logger.error(f"Task {task_name} failed: {str(e)}") results[task_name] = self._get_error_result(f"Task {task_name} failed: {str(e)}") return results except Exception as e: logger.error(f"Error in parallel analyses: {str(e)}") return self._get_all_error_results(str(e)) def _prepare_property_data(self, data): """Prepare property data for price analysis""" property_data = {} try: if data.get('sq_ft'): property_data['size'] = float(data['sq_ft']) if data.get('market_value'): property_data['price'] = float(data['market_value'].replace('₹', '').replace(',', '')) if data.get('year_built'): from datetime import datetime current_year = datetime.now().year property_data['property_age'] = current_year - int(data['year_built']) except Exception as e: logger.warning(f"Error preparing property data: {str(e)}") return property_data def _create_price_context(self, data): """Create context text for price analysis""" return f""" Property: {data.get('property_name', '')} Type: {data.get('property_type', '')} Location: {data.get('address', '')}, {data.get('city', '')}, {data.get('state', '')} Size: {data.get('sq_ft', '')} sq ft Market Value: ₹{data.get('market_value', '')} Description: {data.get('description', '')} Amenities: {data.get('amenities', '')} """ def _run_summary_analysis(self, data): """Run property summary analysis""" try: from .property_summary import generate_property_summary return generate_property_summary(data) except Exception as e: logger.error(f"Error in summary analysis: {str(e)}") return "Property summary unavailable." def _run_fraud_analysis(self, consolidated_text, data): """Run fraud classification analysis""" try: from .fraud_classification import classify_fraud return classify_fraud(data, consolidated_text) except Exception as e: logger.error(f"Error in fraud analysis: {str(e)}") return self._get_error_result("Fraud analysis failed") def _run_trust_analysis(self, consolidated_text, image_analysis, pdf_analysis): """Run trust score analysis""" try: from .trust_score import generate_trust_score return generate_trust_score(consolidated_text, image_analysis, pdf_analysis) except Exception as e: logger.error(f"Error in trust analysis: {str(e)}") return (0.0, "Trust analysis failed") def _run_suggestions_analysis(self, consolidated_text, data): """Run suggestions analysis""" try: from .suggestions import generate_suggestions return generate_suggestions(consolidated_text, data) except Exception as e: logger.error(f"Error in suggestions analysis: {str(e)}") return self._get_error_result("Suggestions analysis failed") def _run_quality_analysis(self, description): """Run text quality analysis""" try: from .text_quality import assess_text_quality return assess_text_quality(description) except Exception as e: logger.error(f"Error in quality analysis: {str(e)}") return self._get_error_result("Quality analysis failed") def _run_address_analysis(self, data): """Run address verification analysis""" try: from .address_verification import verify_address return verify_address(data) except Exception as e: logger.error(f"Error in address analysis: {str(e)}") return self._get_error_result("Address analysis failed") def _run_cross_validation_analysis(self, data): """Run cross validation analysis""" try: from .cross_validation import perform_cross_validation return perform_cross_validation(data) except Exception as e: logger.error(f"Error in cross validation analysis: {str(e)}") return self._get_error_result("Cross validation analysis failed") def _run_location_analysis(self, data): """Run location analysis""" try: from .location_analysis import analyze_location return analyze_location(data) except Exception as e: logger.error(f"Error in location analysis: {str(e)}") return self._get_error_result("Location analysis failed") def _run_price_analysis(self, data, price_context, property_data): """Run price analysis""" try: from .price_analysis import analyze_price # Pass rental information to price analysis return analyze_price(data, price_context, data.get('latitude'), data.get('longitude'), property_data) except Exception as e: logger.error(f"Error in price analysis: {str(e)}") return self._get_error_result("Price analysis failed") def _run_legal_analysis(self, legal_details): """Run legal analysis""" try: from .legal_analysis import analyze_legal_details return analyze_legal_details(legal_details) except Exception as e: logger.error(f"Error in legal analysis: {str(e)}") return self._get_error_result("Legal analysis failed") def _run_specs_analysis(self, data): """Run property specs analysis""" try: from .property_specs import verify_property_specs return verify_property_specs(data) except Exception as e: logger.error(f"Error in specs analysis: {str(e)}") return self._get_error_result("Specs analysis failed") def _run_market_analysis(self, data): """Run market value analysis""" try: from .market_value import analyze_market_value return analyze_market_value(data) except Exception as e: logger.error(f"Error in market analysis: {str(e)}") return self._get_error_result("Market analysis failed") def _get_error_result(self, error_message): """Get a standardized error result""" return { 'error': error_message, 'status': 'error', 'confidence': 0.0 } def _get_all_error_results(self, error_message): """Get error results for all analyses""" return { 'summary': "Analysis failed", 'fraud': self._get_error_result(error_message), 'trust': (0.0, error_message), 'suggestions': self._get_error_result(error_message), 'quality': self._get_error_result(error_message), 'address': self._get_error_result(error_message), 'cross_validation': self._get_error_result(error_message), 'location': self._get_error_result(error_message), 'price': self._get_error_result(error_message), 'legal': self._get_error_result(error_message), 'specs': self._get_error_result(error_message), 'market': self._get_error_result(error_message) } async def _process_pdf_async(self, pdf_file, property_data): """Process a single PDF file asynchronously""" try: from .pdf_analysis import extract_text_from_pdf, analyze_pdf_content # Ensure pdf_file is a file object, not a dict if hasattr(pdf_file, 'read'): # Extract text from PDF text = extract_text_from_pdf(pdf_file) if not text: return { 'filename': pdf_file.filename, 'text': '', 'analysis': { 'is_property_related': False, 'confidence': 0.0, 'summary': 'No text extracted from PDF', 'verification_score': 0.0, 'model_used': 'static_fallback' } } # Analyze the content analysis = analyze_pdf_content(text, property_data) return { 'filename': pdf_file.filename, 'text': text, 'analysis': analysis } else: logger.error(f"Invalid PDF file object in async processing: {type(pdf_file)}") return { 'filename': getattr(pdf_file, 'filename', 'unknown.pdf'), 'text': '', 'analysis': { 'is_property_related': False, 'confidence': 0.0, 'summary': 'Invalid PDF file object', 'verification_score': 0.0, 'model_used': 'static_fallback', 'error': 'Invalid PDF file object' } } except Exception as e: logger.error(f"Error processing PDF {getattr(pdf_file, 'filename', 'unknown.pdf')}: {str(e)}") return { 'filename': getattr(pdf_file, 'filename', 'unknown.pdf'), 'text': '', 'analysis': { 'is_property_related': False, 'confidence': 0.0, 'summary': f'Error processing PDF: {str(e)}', 'verification_score': 0.0, 'model_used': 'static_fallback', 'error': str(e) } } # Global instance for easy import parallel_processor = ParallelProcessor()