Spaces:
Sleeping
Sleeping
""" | |
NLP-based Query Analyzer Implementation. | |
This module provides query analysis using spaCy NLP capabilities for | |
entity extraction, linguistic analysis, and advanced query understanding. | |
Features: | |
- Named entity recognition | |
- Technical term extraction | |
- Linguistic complexity analysis | |
- Intent classification | |
- Query optimization suggestions | |
""" | |
import logging | |
from typing import Dict, Any, List, Optional, Set | |
from pathlib import Path | |
import sys | |
# Add project paths for imports | |
project_root = Path(__file__).parent.parent.parent.parent.parent | |
sys.path.append(str(project_root)) | |
from ..base import QueryAnalysis | |
from .base_analyzer import BaseQueryAnalyzer | |
logger = logging.getLogger(__name__) | |
class NLPAnalyzer(BaseQueryAnalyzer): | |
""" | |
NLP-based query analyzer using spaCy for linguistic analysis. | |
This analyzer provides advanced query understanding by leveraging | |
spaCy's NLP capabilities including entity recognition, POS tagging, | |
dependency parsing, and technical term identification. | |
Configuration Options: | |
- model: spaCy model name (default: "en_core_web_sm") | |
- extract_entities: Enable named entity recognition (default: True) | |
- extract_technical_terms: Enable technical term detection (default: True) | |
- complexity_scoring: Enable complexity scoring (default: True) | |
- min_confidence: Minimum confidence for entity extraction (default: 0.7) | |
""" | |
def __init__(self, config: Optional[Dict[str, Any]] = None): | |
""" | |
Initialize NLP analyzer with spaCy model. | |
Args: | |
config: Configuration dictionary | |
""" | |
# Initialize attributes first before calling super().__init__ | |
self._nlp = None | |
self._model_name = (config or {}).get('model', 'en_core_web_sm') | |
self._extract_entities = (config or {}).get('extract_entities', True) | |
self._extract_technical_terms = (config or {}).get('extract_technical_terms', True) | |
self._complexity_scoring = (config or {}).get('complexity_scoring', True) | |
self._min_confidence = (config or {}).get('min_confidence', 0.7) | |
# Now call super().__init__ which may call configure() | |
super().__init__(config) | |
# Technical term patterns (can be extended via configuration) | |
self._technical_patterns = set(self._config.get('technical_patterns', [ | |
'api', 'sdk', 'framework', 'library', 'protocol', 'algorithm', | |
'implementation', 'architecture', 'design pattern', 'interface', | |
'configuration', 'deployment', 'optimization', 'performance', | |
'scalability', 'security', 'authentication', 'authorization', | |
'database', 'query', 'index', 'cache', 'memory', 'cpu', 'processor', | |
'network', 'http', 'tcp', 'udp', 'ssl', 'tls', 'json', 'xml', | |
'yaml', 'markdown', 'regex', 'parse', 'serialize', 'encode', | |
'decode', 'encrypt', 'decrypt', 'hash', 'token', 'session' | |
])) | |
# Load spaCy model | |
self._load_nlp_model() | |
def _extract_basic_technical_terms(self, query: str) -> List[str]: | |
""" | |
Extract technical terms using simple pattern matching when spaCy is not available. | |
Args: | |
query: Query string to analyze | |
Returns: | |
List of technical terms found | |
""" | |
technical_terms = [] | |
query_lower = query.lower() | |
# Check for individual technical patterns | |
for pattern in self._technical_patterns: | |
if pattern in query_lower: | |
# Find the actual case-preserved term | |
words = query.split() | |
for word in words: | |
if word.lower() == pattern: | |
technical_terms.append(word) | |
elif pattern in word.lower(): | |
technical_terms.append(word) | |
# Remove duplicates while preserving order | |
seen = set() | |
unique_terms = [] | |
for term in technical_terms: | |
if term.lower() not in seen: | |
seen.add(term.lower()) | |
unique_terms.append(term) | |
return unique_terms | |
def _load_nlp_model(self) -> None: | |
"""Load spaCy NLP model with error handling.""" | |
try: | |
import spacy | |
# Try to load the model | |
try: | |
self._nlp = spacy.load(self._model_name) | |
logger.info(f"Loaded spaCy model: {self._model_name}") | |
except OSError: | |
# Fallback to basic English model | |
logger.warning(f"Model {self._model_name} not found, trying en_core_web_sm") | |
self._nlp = spacy.load("en_core_web_sm") | |
self._model_name = "en_core_web_sm" | |
except ImportError: | |
logger.error("spaCy not available, NLP analysis will be limited") | |
self._nlp = None | |
except Exception as e: | |
logger.error(f"Failed to load spaCy model: {e}") | |
self._nlp = None | |
def _analyze_query(self, query: str) -> QueryAnalysis: | |
""" | |
Perform NLP-based query analysis. | |
Args: | |
query: Clean, validated query string | |
Returns: | |
QueryAnalysis with NLP-extracted characteristics | |
""" | |
# Start with basic features | |
basic_features = self._extract_basic_features(query) | |
# Perform NLP analysis if available | |
if self._nlp is not None: | |
nlp_features = self._extract_nlp_features(query) | |
basic_features.update(nlp_features) | |
else: | |
logger.warning("NLP model not available, using basic analysis only") | |
# Add basic technical term extraction when spaCy is not available | |
basic_features['technical_terms'] = self._extract_basic_technical_terms(query) | |
# Extract query characteristics | |
entities = basic_features.get('entities', []) | |
technical_terms = basic_features.get('technical_terms', []) | |
complexity_score = self._calculate_complexity_score(query, basic_features) | |
intent_category = self._determine_intent_category(query, basic_features) | |
suggested_k = self._suggest_retrieval_k(query, basic_features) | |
confidence = self._calculate_confidence(basic_features) | |
# Epic 2 feature analysis | |
epic2_features = self._analyze_epic2_features(query, basic_features) | |
return QueryAnalysis( | |
query=query, | |
complexity_score=complexity_score, | |
technical_terms=technical_terms, | |
entities=entities, | |
intent_category=intent_category, | |
suggested_k=suggested_k, | |
confidence=confidence, | |
metadata={ | |
'analyzer_type': 'nlp', | |
'model_used': self._model_name, | |
'nlp_available': self._nlp is not None, | |
'features': basic_features, | |
'epic2_features': epic2_features, | |
'analysis_version': '2.0' | |
} | |
) | |
def _extract_nlp_features(self, query: str) -> Dict[str, Any]: | |
""" | |
Extract features using spaCy NLP analysis. | |
Args: | |
query: Query string to analyze | |
Returns: | |
Dictionary with NLP-extracted features | |
""" | |
features = {} | |
try: | |
# Process query with spaCy | |
doc = self._nlp(query) | |
# Extract named entities | |
if self._extract_entities: | |
entities = [] | |
for ent in doc.ents: | |
if ent.label_ in ['PERSON', 'ORG', 'PRODUCT', 'TECHNOLOGY']: | |
entities.append({ | |
'text': ent.text, | |
'label': ent.label_, | |
'confidence': getattr(ent, 'confidence', 1.0) | |
}) | |
features['entities'] = [e['text'] for e in entities if e['confidence'] >= self._min_confidence] | |
features['entity_details'] = entities | |
# Extract technical terms | |
if self._extract_technical_terms: | |
technical_terms = self._extract_technical_terms_from_doc(doc) | |
features['technical_terms'] = technical_terms | |
# Linguistic analysis | |
features['pos_tags'] = [token.pos_ for token in doc] | |
features['dependencies'] = [(token.text, token.dep_, token.head.text) for token in doc] | |
# Complexity indicators | |
features['avg_word_length'] = sum(len(token.text) for token in doc if token.is_alpha) / max(1, sum(1 for token in doc if token.is_alpha)) | |
features['noun_count'] = sum(1 for token in doc if token.pos_ == 'NOUN') | |
features['verb_count'] = sum(1 for token in doc if token.pos_ == 'VERB') | |
features['adj_count'] = sum(1 for token in doc if token.pos_ == 'ADJ') | |
# Sentence structure | |
features['sentence_structures'] = [] | |
for sent in doc.sents: | |
features['sentence_structures'].append({ | |
'length': len([token for token in sent if token.is_alpha]), | |
'complexity': self._analyze_sentence_complexity(sent) | |
}) | |
except Exception as e: | |
logger.warning(f"NLP feature extraction failed: {e}") | |
features['nlp_error'] = str(e) | |
return features | |
def _extract_technical_terms_from_doc(self, doc) -> List[str]: | |
""" | |
Extract technical terms from spaCy document. | |
Args: | |
doc: spaCy document object | |
Returns: | |
List of technical terms found | |
""" | |
technical_terms = [] | |
# Check individual tokens | |
for token in doc: | |
if token.text.lower() in self._technical_patterns: | |
technical_terms.append(token.text) | |
# Check noun phrases for multi-word technical terms | |
for chunk in doc.noun_chunks: | |
chunk_text = chunk.text.lower() | |
if any(pattern in chunk_text for pattern in self._technical_patterns): | |
technical_terms.append(chunk.text) | |
# Remove duplicates while preserving order | |
seen = set() | |
unique_terms = [] | |
for term in technical_terms: | |
if term.lower() not in seen: | |
seen.add(term.lower()) | |
unique_terms.append(term) | |
return unique_terms | |
def _analyze_sentence_complexity(self, sent) -> str: | |
""" | |
Analyze complexity of a single sentence. | |
Args: | |
sent: spaCy sentence object | |
Returns: | |
Complexity level: 'simple', 'medium', 'complex' | |
""" | |
# Count syntactic features | |
word_count = len([token for token in sent if token.is_alpha]) | |
clause_count = sum(1 for token in sent if token.dep_ in ['ccomp', 'xcomp', 'advcl']) | |
subordinate_count = sum(1 for token in sent if token.dep_ in ['mark', 'prep']) | |
# Determine complexity | |
if word_count < 8 and clause_count == 0: | |
return 'simple' | |
elif word_count < 15 and clause_count <= 1: | |
return 'medium' | |
else: | |
return 'complex' | |
def _calculate_complexity_score(self, query: str, features: Dict[str, Any]) -> float: | |
""" | |
Calculate numerical complexity score for the query. | |
Args: | |
query: Original query string | |
features: Extracted features | |
Returns: | |
Complexity score between 0.0 and 1.0 | |
""" | |
if not self._complexity_scoring: | |
return 0.5 # Default medium complexity | |
score = 0.0 | |
# Word count factor (0.0 - 0.3) | |
word_count = features.get('word_count', 0) | |
word_factor = min(0.3, word_count / 20.0) | |
score += word_factor | |
# Technical terms factor (0.0 - 0.2) | |
tech_terms = len(features.get('technical_terms', [])) | |
tech_factor = min(0.2, tech_terms / 5.0) | |
score += tech_factor | |
# Entity count factor (0.0 - 0.2) | |
entities = len(features.get('entities', [])) | |
entity_factor = min(0.2, entities / 3.0) | |
score += entity_factor | |
# Linguistic complexity factor (0.0 - 0.3) | |
if 'avg_word_length' in features: | |
avg_word_len = features['avg_word_length'] | |
length_factor = min(0.15, (avg_word_len - 4.0) / 10.0) if avg_word_len > 4.0 else 0.0 | |
score += length_factor | |
if 'sentence_structures' in features: | |
complex_sentences = sum(1 for s in features['sentence_structures'] if s['complexity'] == 'complex') | |
structure_factor = min(0.15, complex_sentences / 2.0) | |
score += structure_factor | |
return min(1.0, max(0.0, score)) | |
def _calculate_confidence(self, features: Dict[str, Any]) -> float: | |
""" | |
Calculate confidence in the analysis results. | |
Args: | |
features: Extracted features | |
Returns: | |
Confidence score between 0.0 and 1.0 | |
""" | |
confidence = 0.5 # Base confidence | |
# Higher confidence if NLP model worked | |
if self._nlp is not None and 'nlp_error' not in features: | |
confidence += 0.3 | |
# Higher confidence for queries with clear characteristics | |
if features.get('technical_terms'): | |
confidence += 0.1 | |
if features.get('entities'): | |
confidence += 0.1 | |
if features.get('has_question_words'): | |
confidence += 0.1 | |
return min(1.0, max(0.0, confidence)) | |
def get_supported_features(self) -> List[str]: | |
""" | |
Return list of features this NLP analyzer supports. | |
Returns: | |
List of feature names | |
""" | |
base_features = super().get_supported_features() | |
nlp_features = [ | |
'entities', | |
'technical_terms', | |
'complexity_scoring', | |
'intent_classification', | |
'linguistic_analysis', | |
'pos_tagging', | |
'dependency_parsing' | |
] | |
if self._nlp is None: | |
nlp_features = ['basic_' + feature for feature in nlp_features] | |
return base_features + nlp_features | |
def configure(self, config: Dict[str, Any]) -> None: | |
""" | |
Configure the NLP analyzer with provided settings. | |
Args: | |
config: Configuration dictionary | |
""" | |
super().configure(config) | |
# Update NLP-specific configuration | |
old_model = self._model_name | |
self._model_name = config.get('model', self._model_name) | |
self._extract_entities = config.get('extract_entities', self._extract_entities) | |
self._extract_technical_terms = config.get('extract_technical_terms', self._extract_technical_terms) | |
self._complexity_scoring = config.get('complexity_scoring', self._complexity_scoring) | |
self._min_confidence = config.get('min_confidence', self._min_confidence) | |
# Update technical patterns if provided | |
if 'technical_patterns' in config: | |
additional_patterns = config['technical_patterns'] | |
if isinstance(additional_patterns, list): | |
self._technical_patterns.update(additional_patterns) | |
# Reload model if changed | |
if old_model != self._model_name: | |
logger.info(f"Model changed from {old_model} to {self._model_name}, reloading...") | |
self._load_nlp_model() |