from sentence_transformers import SentenceTransformer from transformers import AutoTokenizer, AutoModelForSeq2SeqLM import torch import os import tempfile import logging import shutil # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def ensure_full_permissions(path): """Grant full permissions to a file or directory""" try: if os.path.isdir(path): # Full permissions for directories (rwxrwxrwx) os.chmod(path, 0o777) # Apply to all contents recursively for root, dirs, files in os.walk(path): for d in dirs: os.chmod(os.path.join(root, d), 0o777) for f in files: os.chmod(os.path.join(root, f), 0o666) else: # Full permissions for files (rw-rw-rw-) os.chmod(path, 0o666) return True except Exception as e: logger.error(f"Error setting permissions for {path}: {e}") return False def check_directory_permissions(path): """Check if directory exists and has correct permissions""" try: if not os.path.exists(path): logger.warning(f"Directory does not exist: {path}") return False # Set full permissions ensure_full_permissions(path) return True except Exception as e: logger.error(f"Error checking permissions for {path}: {e}") return False def get_cache_dir(): """Get a user-accessible cache directory""" try: # Try user's home directory first home_dir = os.path.expanduser('~') if not os.path.exists(home_dir): raise Exception(f"Home directory does not exist: {home_dir}") cache_dir = os.path.join(home_dir, '.cache', 'answer_grading_app') logger.info(f"Attempting to use cache directory: {cache_dir}") # Create directory with full permissions os.makedirs(cache_dir, mode=0o777, exist_ok=True) ensure_full_permissions(cache_dir) logger.info(f"Successfully created and verified cache directory: {cache_dir}") return cache_dir except Exception as e: logger.warning(f"Could not use home directory cache: {e}") # Try temp directory try: temp_dir = os.path.join(tempfile.gettempdir(), 'answer_grading_app') logger.info(f"Attempting to use temporary directory: {temp_dir}") os.makedirs(temp_dir, mode=0o777, exist_ok=True) ensure_full_permissions(temp_dir) logger.info(f"Using temporary directory: {temp_dir}") return temp_dir except Exception as e: logger.warning(f"Could not use temp directory: {e}") # Last resort: use current directory try: current_dir = os.path.join(os.getcwd(), '.cache') logger.info(f"Attempting to use current directory: {current_dir}") os.makedirs(current_dir, mode=0o777, exist_ok=True) ensure_full_permissions(current_dir) logger.info(f"Using current directory: {current_dir}") return current_dir except Exception as e: logger.error(f"Could not create any cache directory: {e}") # If all else fails, use a new temporary directory temp_dir = tempfile.mkdtemp() ensure_full_permissions(temp_dir) logger.info(f"Created temporary directory as last resort: {temp_dir}") return temp_dir class ModelSingleton: _instance = None _initialized = False _models = {} _reference_counts = {} def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not self._initialized: try: logger.info("Initializing ModelSingleton...") # Set up cache directories self.cache_dir = get_cache_dir() logger.info(f"Using main cache directory: {self.cache_dir}") # Define and create all cache directories self.cache_dirs = { 'transformers': os.path.join(self.cache_dir, 'transformers'), 'huggingface': os.path.join(self.cache_dir, 'huggingface'), 'torch': os.path.join(self.cache_dir, 'torch'), 'cache': os.path.join(self.cache_dir, 'cache'), 'sentence_transformers': os.path.join(self.cache_dir, 'sentence_transformers'), 'fasttext': os.path.join(self.cache_dir, 'fasttext') } # Create and verify each cache directory with full permissions for name, path in self.cache_dirs.items(): try: # Create directory with full permissions os.makedirs(path, mode=0o777, exist_ok=True) ensure_full_permissions(path) logger.info(f"Successfully created {name} cache directory: {path}") # Create a test file to verify write permissions test_file = os.path.join(path, '.write_test') try: with open(test_file, 'w') as f: f.write('test') os.chmod(test_file, 0o666) # Full read/write for test file os.remove(test_file) # Clean up logger.info(f"Verified write permissions for {name} cache directory") except Exception as e: logger.error(f"Failed to verify write permissions for {name} cache directory: {e}") # Try to fix permissions ensure_full_permissions(path) except Exception as e: logger.error(f"Error creating {name} cache directory: {e}") # Try to create in temp directory as fallback temp_path = os.path.join(tempfile.gettempdir(), 'answer_grading_app', name) os.makedirs(temp_path, mode=0o777, exist_ok=True) ensure_full_permissions(temp_path) self.cache_dirs[name] = temp_path logger.info(f"Using fallback directory for {name}: {temp_path}") # Set environment variables with verified directories os.environ['TRANSFORMERS_CACHE'] = self.cache_dirs['transformers'] os.environ['HF_HOME'] = self.cache_dirs['huggingface'] os.environ['TORCH_HOME'] = self.cache_dirs['torch'] os.environ['XDG_CACHE_HOME'] = self.cache_dirs['cache'] os.environ['SENTENCE_TRANSFORMERS_HOME'] = self.cache_dirs['sentence_transformers'] # Verify environment variables are set correctly for env_var, path in [ ('TRANSFORMERS_CACHE', 'transformers'), ('HF_HOME', 'huggingface'), ('TORCH_HOME', 'torch'), ('XDG_CACHE_HOME', 'cache'), ('SENTENCE_TRANSFORMERS_HOME', 'sentence_transformers') ]: if os.environ.get(env_var) != self.cache_dirs[path]: logger.warning(f"Environment variable {env_var} does not match expected path") os.environ[env_var] = self.cache_dirs[path] # Get device self.device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Using device: {self.device}") # Initialize with None values self.similarity_tokenizer = None self.similarity_model = None self.flan_tokenizer = None self.flan_model = None self.trocr_processor = None self.trocr_model = None self.vit_model = None self.vit_processor = None # Initialize reference counts self._reference_counts = { 'similarity': 0, 'flan': 0, 'trocr': 0, 'vit': 0 } self._initialized = True logger.info("ModelSingleton initialization completed successfully") except Exception as e: logger.error(f"Error during ModelSingleton initialization: {e}") raise def get_similarity_model(self): """Get sentence transformer model with reference counting""" try: if self.similarity_model is None: logger.info("Loading sentence transformer model...") SENTENCE_MODEL = "sentence-transformers/all-MiniLM-L6-v2" self.similarity_tokenizer = AutoTokenizer.from_pretrained( SENTENCE_MODEL, cache_dir=os.getenv('TRANSFORMERS_CACHE') ) self.similarity_model = SentenceTransformer( SENTENCE_MODEL, cache_folder=os.getenv('TRANSFORMERS_CACHE') ) self.similarity_model.to(self.device) logger.info("Sentence transformer model loaded successfully") self._reference_counts['similarity'] += 1 return self.similarity_model except Exception as e: logger.error(f"Error loading sentence transformer model: {e}") raise def get_flan_model(self): """Get Flan-T5 model with reference counting""" try: if self.flan_model is None: logger.info("Loading Flan-T5 model...") FLAN_MODEL = "google/flan-t5-xl" self.flan_tokenizer = AutoTokenizer.from_pretrained( FLAN_MODEL, cache_dir=os.getenv('TRANSFORMERS_CACHE') ) self.flan_model = AutoModelForSeq2SeqLM.from_pretrained( FLAN_MODEL, cache_dir=os.getenv('TRANSFORMERS_CACHE'), torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, low_cpu_mem_usage=True ) self.flan_model.to(self.device) logger.info("Flan-T5 model loaded successfully") self._reference_counts['flan'] += 1 return self.flan_model except Exception as e: logger.error(f"Error loading Flan-T5 model: {e}") raise def get_trocr_model(self): """Get TrOCR model with reference counting""" try: if self.trocr_model is None: from transformers import TrOCRProcessor, VisionEncoderDecoderModel logger.info("Loading TrOCR model...") MODEL_NAME = "microsoft/trocr-large-handwritten" self.trocr_processor = TrOCRProcessor.from_pretrained(MODEL_NAME) self.trocr_model = VisionEncoderDecoderModel.from_pretrained(MODEL_NAME) self.trocr_model.to(self.device) logger.info("TrOCR model loaded successfully") self._reference_counts['trocr'] += 1 return self.trocr_model, self.trocr_processor except Exception as e: logger.error(f"Error loading TrOCR model: {e}") raise def get_vit_model(self): """Get ViT model using only local files - no downloads""" try: if self.vit_model is None: from transformers import ViTConfig, ViTImageProcessor, ViTForImageClassification logger.info("Loading local ViT model from files...") # Get absolute path to model directory model_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) model_path = os.path.join(model_root, 'models', 'vit-base-beans') logger.info(f"Using local model directory: {model_path}") # Check model directory exists if not os.path.exists(model_path): raise FileNotFoundError(f"Local model directory not found at: {model_path}") # Get paths to required files model_file = os.path.join(model_path, 'model.safetensors') config_file = os.path.join(model_path, 'config.json') # Verify files exist if not os.path.exists(model_file): raise FileNotFoundError(f"Local model weights file not found at: {model_file}") if not os.path.exists(config_file): raise FileNotFoundError(f"Local model config file not found at: {config_file}") logger.info("Found all required local model files:") logger.info(f"- Using model weights: {model_file}") logger.info(f"- Using config file: {config_file}") # Load config directly from file logger.info("Loading model configuration from local file...") config = ViTConfig.from_json_file(config_file) # Create processor from local config logger.info("Creating image processor from local config...") self.vit_processor = ViTImageProcessor( do_resize=True, size=config.image_size, do_normalize=True ) # Load model directly from local files logger.info("Loading model weights from local file...") self.vit_model = ViTForImageClassification.from_pretrained( model_path, config=config, local_files_only=True, use_safetensors=True, trust_remote_code=False, from_tf=False, _fast_init=True ) logger.info(f"Moving model to {self.device}...") self.vit_model.to(self.device) self.vit_model.eval() logger.info("Local model loaded successfully!") self._reference_counts['vit'] += 1 return self.vit_model, self.vit_processor except Exception as e: logger.error(f"Error loading local ViT model: {str(e)}") raise def release_similarity_model(self): """Release reference to similarity model""" self._reference_counts['similarity'] -= 1 if self._reference_counts['similarity'] <= 0: self._cleanup_similarity_model() def release_flan_model(self): """Release reference to Flan-T5 model""" self._reference_counts['flan'] -= 1 if self._reference_counts['flan'] <= 0: self._cleanup_flan_model() def release_trocr_model(self): """Release reference to TrOCR model""" self._reference_counts['trocr'] -= 1 if self._reference_counts['trocr'] <= 0: self._cleanup_trocr_model() def release_vit_model(self): """Release reference to ViT model""" self._reference_counts['vit'] -= 1 if self._reference_counts['vit'] <= 0: self._cleanup_vit_model() def _cleanup_similarity_model(self): """Clean up similarity model resources""" if self.similarity_model is not None: del self.similarity_model self.similarity_model = None self.similarity_tokenizer = None torch.cuda.empty_cache() logger.info("Similarity model resources cleaned up") def _cleanup_flan_model(self): """Clean up Flan-T5 model resources""" if self.flan_model is not None: del self.flan_model self.flan_model = None self.flan_tokenizer = None torch.cuda.empty_cache() logger.info("Flan-T5 model resources cleaned up") def _cleanup_trocr_model(self): """Clean up TrOCR model resources""" if self.trocr_model is not None: del self.trocr_model del self.trocr_processor self.trocr_model = None self.trocr_processor = None torch.cuda.empty_cache() logger.info("TrOCR model resources cleaned up") def _cleanup_vit_model(self): """Clean up ViT model resources""" if self.vit_model is not None: del self.vit_model del self.vit_processor self.vit_model = None self.vit_processor = None torch.cuda.empty_cache() logger.info("ViT model resources cleaned up") def cleanup(self): """Clean up all model resources""" try: logger.info("Starting model cleanup...") # Clean up each model type if self._reference_counts.get('similarity', 0) > 0: self._cleanup_similarity_model() if self._reference_counts.get('flan', 0) > 0: self._cleanup_flan_model() if self._reference_counts.get('trocr', 0) > 0: self._cleanup_trocr_model() if self._reference_counts.get('vit', 0) > 0: self._cleanup_vit_model() # Reset reference counts for model_type in self._reference_counts: self._reference_counts[model_type] = 0 # Force CUDA cache cleanup if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info("Model cleanup completed successfully") except Exception as e: logger.error(f"Error during model cleanup: {e}") # Continue cleanup even if there's an error # Create global instance models = ModelSingleton() # Add cleanup function to the global instance def cleanup_models(): models.cleanup()