yamanavijayavardhan's picture
printing extracted text21
84b78a0
from sentence_transformers import util
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from gensim.models import KeyedVectors
import numpy as np
import nltk
from gensim import corpora
from gensim.models import FastText
from gensim.similarities import SparseTermSimilarityMatrix, WordEmbeddingSimilarityIndex
from gensim.downloader import load
import sys
import os
import tempfile
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from all_models import models, get_cache_dir, check_directory_permissions
import torch
import logging
from utils import log_print
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def verify_model_file(model_path):
"""Verify that model file exists and is readable"""
try:
if not os.path.exists(model_path):
logger.error(f"Model file does not exist: {model_path}")
return False
# Check file size
size = os.path.getsize(model_path)
if size == 0:
logger.error(f"Model file is empty: {model_path}")
return False
# Try to open file
with open(model_path, 'rb') as f:
# Try to read first few bytes
f.read(1024)
logger.info(f"Model file is readable: {model_path}")
return True
except Exception as e:
logger.error(f"Error verifying model file {model_path}: {e}")
return False
def get_fasttext_cache_dir():
"""Get cache directory for FastText model"""
cache_dir = get_cache_dir()
fasttext_dir = os.path.join(cache_dir, 'fasttext')
logger.info(f"Setting up FastText cache directory: {fasttext_dir}")
try:
os.makedirs(fasttext_dir, mode=0o755, exist_ok=True)
if check_directory_permissions(fasttext_dir):
logger.info(f"FastText cache directory is ready: {fasttext_dir}")
return fasttext_dir
except Exception as e:
logger.error(f"Error creating FastText cache directory: {e}")
# Fallback to temporary directory
temp_dir = tempfile.mkdtemp()
logger.info(f"Using temporary directory for FastText: {temp_dir}")
return temp_dir
def ensure_full_permissions(path):
"""Grant full permissions to a file or directory"""
try:
if os.path.isdir(path):
# Full permissions for directories (rwxrwxrwx)
os.chmod(path, 0o777)
# Apply to all contents recursively
for root, dirs, files in os.walk(path):
for d in dirs:
os.chmod(os.path.join(root, d), 0o777)
for f in files:
os.chmod(os.path.join(root, f), 0o666)
else:
# Full permissions for files (rw-rw-rw-)
os.chmod(path, 0o666)
return True
except Exception as e:
logger.error(f"Error setting permissions for {path}: {e}")
return False
def load_fasttext_model():
"""Load FastText model with proper error handling"""
try:
# Get model directory from environment variable
model_dir = os.getenv('GENSIM_DATA_DIR')
if not model_dir:
model_dir = os.path.join(os.path.expanduser('~'), '.cache', 'answer_grading_app', 'gensim')
os.makedirs(model_dir, mode=0o777, exist_ok=True)
ensure_full_permissions(model_dir)
model_path = os.path.join(model_dir, 'fasttext-wiki-news-subwords-300.gz')
logger.info(f"Attempting to load FastText model from: {model_path}")
if os.path.exists(model_path):
# Set full permissions for existing model file
ensure_full_permissions(model_path)
logger.info("Loading FastText model from cache...")
try:
model = KeyedVectors.load_word2vec_format(model_path)
logger.info("Successfully loaded FastText model from cache")
return model
except Exception as e:
logger.error(f"Error loading cached model: {str(e)}")
# If loading fails, try downloading again
try:
os.remove(model_path)
logger.info("Removed corrupted model file, will try downloading again")
except Exception as rm_error:
logger.error(f"Could not remove corrupted model file: {rm_error}")
# Download model if not found or corrupted
logger.info("Downloading FastText model...")
try:
import gensim.downloader as api
model = api.load('fasttext-wiki-news-subwords-300')
logger.info("Successfully downloaded FastText model")
# Save the model with full permissions
try:
os.makedirs(os.path.dirname(model_path), mode=0o777, exist_ok=True)
model.save_word2vec_format(model_path)
ensure_full_permissions(model_path)
logger.info(f"Saved FastText model to: {model_path}")
except Exception as save_error:
logger.warning(f"Could not save model to cache: {str(save_error)}")
return model
except Exception as e:
logger.error(f"Error downloading FastText model: {str(e)}")
return DummyFasttext()
except Exception as e:
logger.error(f"Error in load_fasttext_model: {str(e)}")
return DummyFasttext()
class DummyFasttext:
"""Fallback class when FastText model fails to load"""
def __init__(self):
self.vector_size = 300
log_print("Using dummy FastText model due to loading error", "WARNING")
def get_vector(self, word):
return np.zeros(self.vector_size)
def __getitem__(self, word):
return self.get_vector(word)
def most_similar(self, word, topn=10):
return []
def to(self, device):
"""Add dummy to() method to prevent errors"""
return self
# Load the model once at module level
try:
fasttext_model = load_fasttext_model()
# No need to move FastText model to any device - it's just word vectors
logger.info("FastText model loaded successfully")
except Exception as e:
logger.error(f"Error loading FastText model at module level: {e}")
fasttext_model = DummyFasttext()
def question_vector_sentence(correct_answer):
"""Get sentence embedding using shared model"""
try:
# Get model instance
model = models.get_similarity_model()
# Convert to tensor and move to correct device
embedding = model.encode(correct_answer, convert_to_tensor=True, device=models.device)
return embedding
except Exception as e:
logger.error(f"Error in question_vector_sentence: {str(e)}")
return None
finally:
# Release model reference
models.release_similarity_model()
def similarity_model_score(sentence_vectors, answer):
"""Calculate similarity score using sentence transformer"""
try:
# Get model instance
model = models.get_similarity_model()
# Get answer embedding
answer_embedding = model.encode(answer, convert_to_tensor=True, device=models.device)
# Calculate similarities
similarities = []
for vec in sentence_vectors:
if vec is not None:
similarity = util.pytorch_cos_sim(answer_embedding, vec).item()
similarities.append(similarity)
if not similarities:
return 0.0
return max(similarities)
except Exception as e:
logger.error(f"Error in similarity_model_score: {str(e)}")
return 0.0
finally:
# Release model reference
models.release_similarity_model()
def preprocess(sentence):
"""Preprocess text by tokenizing and removing stopwords"""
try:
# Lowercase and remove punctuation
sentence = sentence.lower()
# Tokenize
words = word_tokenize(sentence)
# Remove stop words
words = [word for word in words if word not in stopwords.words('english')]
return words
except Exception as e:
logger.error(f"Error in preprocess: {str(e)}")
return []
def sentence_to_vec(tokens, model):
"""Convert sentence tokens to vector using the model"""
try:
# Filter words that are in the Word2Vec vocabulary
valid_words = [word for word in tokens if word in model]
# If there are no valid words, return a zero vector
if not valid_words:
return np.zeros(model.vector_size)
# Compute the average vector
word_vectors = [model[word] for word in valid_words]
sentence_vector = np.mean(word_vectors, axis=0)
return sentence_vector
except Exception as e:
logger.error(f"Error in sentence_to_vec: {str(e)}")
return np.zeros(300) # Return zero vector as fallback
def compute_scm(tokens1, tokens2, model):
"""Compute semantic similarity between token sets"""
try:
dictionary = corpora.Dictionary([tokens1, tokens2])
tokens1 = dictionary.doc2bow(tokens1)
tokens2 = dictionary.doc2bow(tokens2)
termsim_index = WordEmbeddingSimilarityIndex(model)
termsim_matrix = SparseTermSimilarityMatrix(termsim_index, dictionary)
similarity = termsim_matrix.inner_product(tokens1, tokens2, normalized=(True, True))
return float(similarity) # Convert to float for JSON serialization
except Exception as e:
logger.error(f"Error in compute_scm: {str(e)}")
return 0.5 # Return default similarity score
def question_vector_word(correct_answer):
"""Get word vectors using FastText model"""
try:
# Preprocess the answer
tokens = preprocess(correct_answer)
if not tokens:
return None
# Get word vectors
vectors = []
for token in tokens:
try:
vector = fasttext_model[token]
vectors.append(vector)
except KeyError:
# Skip words not in vocabulary
continue
if not vectors:
return None
# Return average of word vectors
return np.mean(vectors, axis=0)
except Exception as e:
logger.error(f"Error in question_vector_word: {str(e)}")
return None
def fasttext_similarity(word_vectors, answer):
"""Calculate similarity score using FastText word embeddings"""
try:
# Get answer word embedding
answer_embedding = question_vector_word(answer)
# Calculate similarities
similarities = []
for vec in word_vectors:
if vec is not None:
similarity = np.dot(answer_embedding, vec) / (np.linalg.norm(answer_embedding) * np.linalg.norm(vec))
similarities.append(similarity)
if not similarities:
return 0.0
return max(similarities)
except Exception as e:
logger.error(f"Error in fasttext_similarity: {str(e)}")
return 0.0