from sentence_transformers import util from nltk.corpus import stopwords from nltk.tokenize import word_tokenize from gensim.models import KeyedVectors import numpy as np import nltk from gensim import corpora from gensim.models import FastText from gensim.similarities import SparseTermSimilarityMatrix, WordEmbeddingSimilarityIndex from gensim.downloader import load import sys import os import tempfile sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from all_models import models, get_cache_dir, check_directory_permissions import torch import logging from utils import log_print # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def verify_model_file(model_path): """Verify that model file exists and is readable""" try: if not os.path.exists(model_path): logger.error(f"Model file does not exist: {model_path}") return False # Check file size size = os.path.getsize(model_path) if size == 0: logger.error(f"Model file is empty: {model_path}") return False # Try to open file with open(model_path, 'rb') as f: # Try to read first few bytes f.read(1024) logger.info(f"Model file is readable: {model_path}") return True except Exception as e: logger.error(f"Error verifying model file {model_path}: {e}") return False def get_fasttext_cache_dir(): """Get cache directory for FastText model""" cache_dir = get_cache_dir() fasttext_dir = os.path.join(cache_dir, 'fasttext') logger.info(f"Setting up FastText cache directory: {fasttext_dir}") try: os.makedirs(fasttext_dir, mode=0o755, exist_ok=True) if check_directory_permissions(fasttext_dir): logger.info(f"FastText cache directory is ready: {fasttext_dir}") return fasttext_dir except Exception as e: logger.error(f"Error creating FastText cache directory: {e}") # Fallback to temporary directory temp_dir = tempfile.mkdtemp() logger.info(f"Using temporary directory for FastText: {temp_dir}") return temp_dir def ensure_full_permissions(path): """Grant full permissions to a file or directory""" try: if os.path.isdir(path): # Full permissions for directories (rwxrwxrwx) os.chmod(path, 0o777) # Apply to all contents recursively for root, dirs, files in os.walk(path): for d in dirs: os.chmod(os.path.join(root, d), 0o777) for f in files: os.chmod(os.path.join(root, f), 0o666) else: # Full permissions for files (rw-rw-rw-) os.chmod(path, 0o666) return True except Exception as e: logger.error(f"Error setting permissions for {path}: {e}") return False def load_fasttext_model(): """Load FastText model with proper error handling""" try: # Get model directory from environment variable model_dir = os.getenv('GENSIM_DATA_DIR') if not model_dir: model_dir = os.path.join(os.path.expanduser('~'), '.cache', 'answer_grading_app', 'gensim') os.makedirs(model_dir, mode=0o777, exist_ok=True) ensure_full_permissions(model_dir) model_path = os.path.join(model_dir, 'fasttext-wiki-news-subwords-300.gz') logger.info(f"Attempting to load FastText model from: {model_path}") if os.path.exists(model_path): # Set full permissions for existing model file ensure_full_permissions(model_path) logger.info("Loading FastText model from cache...") try: model = KeyedVectors.load_word2vec_format(model_path) logger.info("Successfully loaded FastText model from cache") return model except Exception as e: logger.error(f"Error loading cached model: {str(e)}") # If loading fails, try downloading again try: os.remove(model_path) logger.info("Removed corrupted model file, will try downloading again") except Exception as rm_error: logger.error(f"Could not remove corrupted model file: {rm_error}") # Download model if not found or corrupted logger.info("Downloading FastText model...") try: import gensim.downloader as api model = api.load('fasttext-wiki-news-subwords-300') logger.info("Successfully downloaded FastText model") # Save the model with full permissions try: os.makedirs(os.path.dirname(model_path), mode=0o777, exist_ok=True) model.save_word2vec_format(model_path) ensure_full_permissions(model_path) logger.info(f"Saved FastText model to: {model_path}") except Exception as save_error: logger.warning(f"Could not save model to cache: {str(save_error)}") return model except Exception as e: logger.error(f"Error downloading FastText model: {str(e)}") return DummyFasttext() except Exception as e: logger.error(f"Error in load_fasttext_model: {str(e)}") return DummyFasttext() class DummyFasttext: """Fallback class when FastText model fails to load""" def __init__(self): self.vector_size = 300 log_print("Using dummy FastText model due to loading error", "WARNING") def get_vector(self, word): return np.zeros(self.vector_size) def __getitem__(self, word): return self.get_vector(word) def most_similar(self, word, topn=10): return [] def to(self, device): """Add dummy to() method to prevent errors""" return self # Load the model once at module level try: fasttext_model = load_fasttext_model() # No need to move FastText model to any device - it's just word vectors logger.info("FastText model loaded successfully") except Exception as e: logger.error(f"Error loading FastText model at module level: {e}") fasttext_model = DummyFasttext() def question_vector_sentence(correct_answer): """Get sentence embedding using shared model""" try: # Get model instance model = models.get_similarity_model() # Convert to tensor and move to correct device embedding = model.encode(correct_answer, convert_to_tensor=True, device=models.device) return embedding except Exception as e: logger.error(f"Error in question_vector_sentence: {str(e)}") return None finally: # Release model reference models.release_similarity_model() def similarity_model_score(sentence_vectors, answer): """Calculate similarity score using sentence transformer""" try: # Get model instance model = models.get_similarity_model() # Get answer embedding answer_embedding = model.encode(answer, convert_to_tensor=True, device=models.device) # Calculate similarities similarities = [] for vec in sentence_vectors: if vec is not None: similarity = util.pytorch_cos_sim(answer_embedding, vec).item() similarities.append(similarity) if not similarities: return 0.0 return max(similarities) except Exception as e: logger.error(f"Error in similarity_model_score: {str(e)}") return 0.0 finally: # Release model reference models.release_similarity_model() def preprocess(sentence): """Preprocess text by tokenizing and removing stopwords""" try: # Lowercase and remove punctuation sentence = sentence.lower() # Tokenize words = word_tokenize(sentence) # Remove stop words words = [word for word in words if word not in stopwords.words('english')] return words except Exception as e: logger.error(f"Error in preprocess: {str(e)}") return [] def sentence_to_vec(tokens, model): """Convert sentence tokens to vector using the model""" try: # Filter words that are in the Word2Vec vocabulary valid_words = [word for word in tokens if word in model] # If there are no valid words, return a zero vector if not valid_words: return np.zeros(model.vector_size) # Compute the average vector word_vectors = [model[word] for word in valid_words] sentence_vector = np.mean(word_vectors, axis=0) return sentence_vector except Exception as e: logger.error(f"Error in sentence_to_vec: {str(e)}") return np.zeros(300) # Return zero vector as fallback def compute_scm(tokens1, tokens2, model): """Compute semantic similarity between token sets""" try: dictionary = corpora.Dictionary([tokens1, tokens2]) tokens1 = dictionary.doc2bow(tokens1) tokens2 = dictionary.doc2bow(tokens2) termsim_index = WordEmbeddingSimilarityIndex(model) termsim_matrix = SparseTermSimilarityMatrix(termsim_index, dictionary) similarity = termsim_matrix.inner_product(tokens1, tokens2, normalized=(True, True)) return float(similarity) # Convert to float for JSON serialization except Exception as e: logger.error(f"Error in compute_scm: {str(e)}") return 0.5 # Return default similarity score def question_vector_word(correct_answer): """Get word vectors using FastText model""" try: # Preprocess the answer tokens = preprocess(correct_answer) if not tokens: return None # Get word vectors vectors = [] for token in tokens: try: vector = fasttext_model[token] vectors.append(vector) except KeyError: # Skip words not in vocabulary continue if not vectors: return None # Return average of word vectors return np.mean(vectors, axis=0) except Exception as e: logger.error(f"Error in question_vector_word: {str(e)}") return None def fasttext_similarity(word_vectors, answer): """Calculate similarity score using FastText word embeddings""" try: # Get answer word embedding answer_embedding = question_vector_word(answer) # Calculate similarities similarities = [] for vec in word_vectors: if vec is not None: similarity = np.dot(answer_embedding, vec) / (np.linalg.norm(answer_embedding) * np.linalg.norm(vec)) similarities.append(similarity) if not similarities: return 0.0 return max(similarities) except Exception as e: logger.error(f"Error in fasttext_similarity: {str(e)}") return 0.0