Arthur Passuello
initial commit
5e1a30c
"""
NLP-based Query Analyzer Implementation.
This module provides query analysis using spaCy NLP capabilities for
entity extraction, linguistic analysis, and advanced query understanding.
Features:
- Named entity recognition
- Technical term extraction
- Linguistic complexity analysis
- Intent classification
- Query optimization suggestions
"""
import logging
from typing import Dict, Any, List, Optional, Set
from pathlib import Path
import sys
# Add project paths for imports
project_root = Path(__file__).parent.parent.parent.parent.parent
sys.path.append(str(project_root))
from ..base import QueryAnalysis
from .base_analyzer import BaseQueryAnalyzer
logger = logging.getLogger(__name__)
class NLPAnalyzer(BaseQueryAnalyzer):
"""
NLP-based query analyzer using spaCy for linguistic analysis.
This analyzer provides advanced query understanding by leveraging
spaCy's NLP capabilities including entity recognition, POS tagging,
dependency parsing, and technical term identification.
Configuration Options:
- model: spaCy model name (default: "en_core_web_sm")
- extract_entities: Enable named entity recognition (default: True)
- extract_technical_terms: Enable technical term detection (default: True)
- complexity_scoring: Enable complexity scoring (default: True)
- min_confidence: Minimum confidence for entity extraction (default: 0.7)
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize NLP analyzer with spaCy model.
Args:
config: Configuration dictionary
"""
# Initialize attributes first before calling super().__init__
self._nlp = None
self._model_name = (config or {}).get('model', 'en_core_web_sm')
self._extract_entities = (config or {}).get('extract_entities', True)
self._extract_technical_terms = (config or {}).get('extract_technical_terms', True)
self._complexity_scoring = (config or {}).get('complexity_scoring', True)
self._min_confidence = (config or {}).get('min_confidence', 0.7)
# Now call super().__init__ which may call configure()
super().__init__(config)
# Technical term patterns (can be extended via configuration)
self._technical_patterns = set(self._config.get('technical_patterns', [
'api', 'sdk', 'framework', 'library', 'protocol', 'algorithm',
'implementation', 'architecture', 'design pattern', 'interface',
'configuration', 'deployment', 'optimization', 'performance',
'scalability', 'security', 'authentication', 'authorization',
'database', 'query', 'index', 'cache', 'memory', 'cpu', 'processor',
'network', 'http', 'tcp', 'udp', 'ssl', 'tls', 'json', 'xml',
'yaml', 'markdown', 'regex', 'parse', 'serialize', 'encode',
'decode', 'encrypt', 'decrypt', 'hash', 'token', 'session'
]))
# Load spaCy model
self._load_nlp_model()
def _extract_basic_technical_terms(self, query: str) -> List[str]:
"""
Extract technical terms using simple pattern matching when spaCy is not available.
Args:
query: Query string to analyze
Returns:
List of technical terms found
"""
technical_terms = []
query_lower = query.lower()
# Check for individual technical patterns
for pattern in self._technical_patterns:
if pattern in query_lower:
# Find the actual case-preserved term
words = query.split()
for word in words:
if word.lower() == pattern:
technical_terms.append(word)
elif pattern in word.lower():
technical_terms.append(word)
# Remove duplicates while preserving order
seen = set()
unique_terms = []
for term in technical_terms:
if term.lower() not in seen:
seen.add(term.lower())
unique_terms.append(term)
return unique_terms
def _load_nlp_model(self) -> None:
"""Load spaCy NLP model with error handling."""
try:
import spacy
# Try to load the model
try:
self._nlp = spacy.load(self._model_name)
logger.info(f"Loaded spaCy model: {self._model_name}")
except OSError:
# Fallback to basic English model
logger.warning(f"Model {self._model_name} not found, trying en_core_web_sm")
self._nlp = spacy.load("en_core_web_sm")
self._model_name = "en_core_web_sm"
except ImportError:
logger.error("spaCy not available, NLP analysis will be limited")
self._nlp = None
except Exception as e:
logger.error(f"Failed to load spaCy model: {e}")
self._nlp = None
def _analyze_query(self, query: str) -> QueryAnalysis:
"""
Perform NLP-based query analysis.
Args:
query: Clean, validated query string
Returns:
QueryAnalysis with NLP-extracted characteristics
"""
# Start with basic features
basic_features = self._extract_basic_features(query)
# Perform NLP analysis if available
if self._nlp is not None:
nlp_features = self._extract_nlp_features(query)
basic_features.update(nlp_features)
else:
logger.warning("NLP model not available, using basic analysis only")
# Add basic technical term extraction when spaCy is not available
basic_features['technical_terms'] = self._extract_basic_technical_terms(query)
# Extract query characteristics
entities = basic_features.get('entities', [])
technical_terms = basic_features.get('technical_terms', [])
complexity_score = self._calculate_complexity_score(query, basic_features)
intent_category = self._determine_intent_category(query, basic_features)
suggested_k = self._suggest_retrieval_k(query, basic_features)
confidence = self._calculate_confidence(basic_features)
# Epic 2 feature analysis
epic2_features = self._analyze_epic2_features(query, basic_features)
return QueryAnalysis(
query=query,
complexity_score=complexity_score,
technical_terms=technical_terms,
entities=entities,
intent_category=intent_category,
suggested_k=suggested_k,
confidence=confidence,
metadata={
'analyzer_type': 'nlp',
'model_used': self._model_name,
'nlp_available': self._nlp is not None,
'features': basic_features,
'epic2_features': epic2_features,
'analysis_version': '2.0'
}
)
def _extract_nlp_features(self, query: str) -> Dict[str, Any]:
"""
Extract features using spaCy NLP analysis.
Args:
query: Query string to analyze
Returns:
Dictionary with NLP-extracted features
"""
features = {}
try:
# Process query with spaCy
doc = self._nlp(query)
# Extract named entities
if self._extract_entities:
entities = []
for ent in doc.ents:
if ent.label_ in ['PERSON', 'ORG', 'PRODUCT', 'TECHNOLOGY']:
entities.append({
'text': ent.text,
'label': ent.label_,
'confidence': getattr(ent, 'confidence', 1.0)
})
features['entities'] = [e['text'] for e in entities if e['confidence'] >= self._min_confidence]
features['entity_details'] = entities
# Extract technical terms
if self._extract_technical_terms:
technical_terms = self._extract_technical_terms_from_doc(doc)
features['technical_terms'] = technical_terms
# Linguistic analysis
features['pos_tags'] = [token.pos_ for token in doc]
features['dependencies'] = [(token.text, token.dep_, token.head.text) for token in doc]
# Complexity indicators
features['avg_word_length'] = sum(len(token.text) for token in doc if token.is_alpha) / max(1, sum(1 for token in doc if token.is_alpha))
features['noun_count'] = sum(1 for token in doc if token.pos_ == 'NOUN')
features['verb_count'] = sum(1 for token in doc if token.pos_ == 'VERB')
features['adj_count'] = sum(1 for token in doc if token.pos_ == 'ADJ')
# Sentence structure
features['sentence_structures'] = []
for sent in doc.sents:
features['sentence_structures'].append({
'length': len([token for token in sent if token.is_alpha]),
'complexity': self._analyze_sentence_complexity(sent)
})
except Exception as e:
logger.warning(f"NLP feature extraction failed: {e}")
features['nlp_error'] = str(e)
return features
def _extract_technical_terms_from_doc(self, doc) -> List[str]:
"""
Extract technical terms from spaCy document.
Args:
doc: spaCy document object
Returns:
List of technical terms found
"""
technical_terms = []
# Check individual tokens
for token in doc:
if token.text.lower() in self._technical_patterns:
technical_terms.append(token.text)
# Check noun phrases for multi-word technical terms
for chunk in doc.noun_chunks:
chunk_text = chunk.text.lower()
if any(pattern in chunk_text for pattern in self._technical_patterns):
technical_terms.append(chunk.text)
# Remove duplicates while preserving order
seen = set()
unique_terms = []
for term in technical_terms:
if term.lower() not in seen:
seen.add(term.lower())
unique_terms.append(term)
return unique_terms
def _analyze_sentence_complexity(self, sent) -> str:
"""
Analyze complexity of a single sentence.
Args:
sent: spaCy sentence object
Returns:
Complexity level: 'simple', 'medium', 'complex'
"""
# Count syntactic features
word_count = len([token for token in sent if token.is_alpha])
clause_count = sum(1 for token in sent if token.dep_ in ['ccomp', 'xcomp', 'advcl'])
subordinate_count = sum(1 for token in sent if token.dep_ in ['mark', 'prep'])
# Determine complexity
if word_count < 8 and clause_count == 0:
return 'simple'
elif word_count < 15 and clause_count <= 1:
return 'medium'
else:
return 'complex'
def _calculate_complexity_score(self, query: str, features: Dict[str, Any]) -> float:
"""
Calculate numerical complexity score for the query.
Args:
query: Original query string
features: Extracted features
Returns:
Complexity score between 0.0 and 1.0
"""
if not self._complexity_scoring:
return 0.5 # Default medium complexity
score = 0.0
# Word count factor (0.0 - 0.3)
word_count = features.get('word_count', 0)
word_factor = min(0.3, word_count / 20.0)
score += word_factor
# Technical terms factor (0.0 - 0.2)
tech_terms = len(features.get('technical_terms', []))
tech_factor = min(0.2, tech_terms / 5.0)
score += tech_factor
# Entity count factor (0.0 - 0.2)
entities = len(features.get('entities', []))
entity_factor = min(0.2, entities / 3.0)
score += entity_factor
# Linguistic complexity factor (0.0 - 0.3)
if 'avg_word_length' in features:
avg_word_len = features['avg_word_length']
length_factor = min(0.15, (avg_word_len - 4.0) / 10.0) if avg_word_len > 4.0 else 0.0
score += length_factor
if 'sentence_structures' in features:
complex_sentences = sum(1 for s in features['sentence_structures'] if s['complexity'] == 'complex')
structure_factor = min(0.15, complex_sentences / 2.0)
score += structure_factor
return min(1.0, max(0.0, score))
def _calculate_confidence(self, features: Dict[str, Any]) -> float:
"""
Calculate confidence in the analysis results.
Args:
features: Extracted features
Returns:
Confidence score between 0.0 and 1.0
"""
confidence = 0.5 # Base confidence
# Higher confidence if NLP model worked
if self._nlp is not None and 'nlp_error' not in features:
confidence += 0.3
# Higher confidence for queries with clear characteristics
if features.get('technical_terms'):
confidence += 0.1
if features.get('entities'):
confidence += 0.1
if features.get('has_question_words'):
confidence += 0.1
return min(1.0, max(0.0, confidence))
def get_supported_features(self) -> List[str]:
"""
Return list of features this NLP analyzer supports.
Returns:
List of feature names
"""
base_features = super().get_supported_features()
nlp_features = [
'entities',
'technical_terms',
'complexity_scoring',
'intent_classification',
'linguistic_analysis',
'pos_tagging',
'dependency_parsing'
]
if self._nlp is None:
nlp_features = ['basic_' + feature for feature in nlp_features]
return base_features + nlp_features
def configure(self, config: Dict[str, Any]) -> None:
"""
Configure the NLP analyzer with provided settings.
Args:
config: Configuration dictionary
"""
super().configure(config)
# Update NLP-specific configuration
old_model = self._model_name
self._model_name = config.get('model', self._model_name)
self._extract_entities = config.get('extract_entities', self._extract_entities)
self._extract_technical_terms = config.get('extract_technical_terms', self._extract_technical_terms)
self._complexity_scoring = config.get('complexity_scoring', self._complexity_scoring)
self._min_confidence = config.get('min_confidence', self._min_confidence)
# Update technical patterns if provided
if 'technical_patterns' in config:
additional_patterns = config['technical_patterns']
if isinstance(additional_patterns, list):
self._technical_patterns.update(additional_patterns)
# Reload model if changed
if old_model != self._model_name:
logger.info(f"Model changed from {old_model} to {self._model_name}, reloading...")
self._load_nlp_model()