# Core evaluation engine with heuristic algorithms """ This module implements the core evaluation logic including DAS calculation, volatility-aware thresholds, WAT scoring, and macro-adjusted evaluation metrics. """ import numpy as np from typing import Dict, List, Optional import logging from datetime import datetime # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class EvaluationEngine: """ Core engine for evaluating FinBERT predictions against market movements. """ def __init__(self, volatility_multiplier: float = 1.0, confidence_threshold: float = 0.7): """ Initialize the evaluation engine. Args: volatility_multiplier: k factor for volatility thresholds (default: 1.0) threshold = k * 14-day_volatility confidence_threshold: Minimum confidence for high-confidence predictions (default: 0.7) """ self.volatility_multiplier = volatility_multiplier # k = 1.0 per framework self.confidence_threshold = confidence_threshold def calculate_das(self, sentiment_direction: int, price_return: float, volatility: float) -> float: """ Calculate Directional Alignment Score (DAS). Args: sentiment_direction: 1 for positive, -1 for negative, 0 for neutral price_return: Stock return percentage volatility: Stock volatility percentage Returns: DAS score between 0 and 1 """ try: # Handle neutral sentiment if sentiment_direction == 0: # For neutral sentiment, score based on how close to zero the return is threshold = volatility * self.volatility_multiplier if abs(price_return) <= threshold: return 1.0 # Perfect neutral prediction else: # Decay score based on how far from neutral excess = abs(price_return) - threshold return max(0.0, 1.0 - (excess / (threshold * 2))) # For positive/negative sentiment expected_direction = sentiment_direction actual_direction = 1 if price_return > 0 else -1 if price_return < 0 else 0 # Base alignment check if expected_direction == actual_direction: # Correct direction - score based on magnitude magnitude_factor = min(abs(price_return) / (volatility * self.volatility_multiplier), 2.0) return min(1.0, 0.7 + 0.3 * magnitude_factor) else: # Wrong direction - score based on how wrong threshold = volatility * self.volatility_multiplier if abs(price_return) <= threshold: # Small move in wrong direction - partial credit return 0.3 else: # Large move in wrong direction - low score return max(0.0, 0.3 - (abs(price_return) - threshold) / (threshold * 3)) except Exception as e: logger.error(f"Error calculating DAS: {str(e)}") return 0.0 def calculate_wat_weight(self, confidence: float, impact: float, days_ago: int = 0, decay_factor: float = 0.95) -> float: """ Calculate Weighted Accuracy over Time (WAT) weight. Args: confidence: Model confidence score impact: Impact magnitude (absolute return) days_ago: Days since prediction (for decay) decay_factor: Decay factor for time-based weighting Returns: WAT weight for the prediction """ try: # Base weight from confidence and impact confidence_weight = confidence impact_weight = min(impact / 5.0, 2.0) # Cap at 2x for very large moves # Time decay (optional) time_weight = decay_factor ** days_ago if days_ago > 0 else 1.0 # Combined weight wat_weight = confidence_weight * impact_weight * time_weight return float(wat_weight) except Exception as e: logger.error(f"Error calculating WAT weight: {str(e)}") return 1.0 def evaluate_prediction(self, sentiment_data: Dict, market_data: Dict, news_date: datetime) -> Dict: """ Comprehensive evaluation of a single prediction. Args: sentiment_data: Output from FinBERT analyzer market_data: Output from market data service news_date: Date when news was published Returns: Complete evaluation results """ try: # Extract key values sentiment = sentiment_data.get("sentiment", "neutral") confidence = sentiment_data.get("confidence", 0.0) return_24h = market_data.get("return_24h") volatility_14d = market_data.get("volatility_14d") alpha_adjusted = market_data.get("alpha_adjusted") # Check for missing data if return_24h is None or volatility_14d is None: return { "error": "Insufficient market data for evaluation", "sentiment": sentiment, "confidence": confidence } # Convert sentiment to direction sentiment_direction = self._get_sentiment_direction(sentiment) # Calculate volatility threshold threshold = volatility_14d * self.volatility_multiplier # Calculate DAS das_score = self.calculate_das(sentiment_direction, return_24h, volatility_14d) # Determine correctness is_correct = self._is_prediction_correct(sentiment_direction, return_24h, threshold) # Calculate WAT weight impact = abs(return_24h) wat_weight = self.calculate_wat_weight(confidence, impact) # Prepare results results = { "ticker": market_data.get("ticker", "Unknown"), "news_date": news_date.strftime("%Y-%m-%d"), "sentiment": sentiment, "confidence": confidence, "return_24h": return_24h, "volatility_14d": volatility_14d, "threshold": threshold, "das_score": das_score, "is_correct": is_correct, "wat_weight": wat_weight, "impact": impact, "alpha_adjusted": alpha_adjusted, "sentiment_direction": sentiment_direction, "evaluation_summary": self._generate_summary( sentiment, confidence, return_24h, das_score, is_correct ) } logger.info(f"Evaluation completed - DAS: {das_score:.3f}, Correct: {is_correct}") return results except Exception as e: logger.error(f"Error in prediction evaluation: {str(e)}") return {"error": str(e)} def _get_sentiment_direction(self, sentiment: str) -> int: """Convert sentiment to numerical direction.""" sentiment_map = { "positive": 1, "negative": -1, "neutral": 0 } return sentiment_map.get(sentiment.lower(), 0) def _is_prediction_correct(self, sentiment_direction: int, price_return: float, threshold: float) -> bool: """ Determine if prediction is correct based on volatility-aware thresholds. """ if sentiment_direction == 0: # Neutral return abs(price_return) <= threshold elif sentiment_direction == 1: # Positive return price_return > threshold elif sentiment_direction == -1: # Negative return price_return < -threshold else: return False def _generate_summary(self, sentiment: str, confidence: float, return_24h: float, das_score: float, is_correct: bool) -> str: """Generate human-readable evaluation summary.""" direction = "📈" if return_24h > 0 else "📉" if return_24h < 0 else "➡️" # More nuanced verdict based on DAS score if is_correct: verdict = "✅ Aligned" else: if das_score > 0.7: verdict = "⚠️ Directionally Right, Magnitude Wrong" # Right direction, wrong magnitude elif das_score > 0.3: verdict = "🔄 Partially Aligned" # Some alignment else: verdict = "❌ Misaligned" # Completely wrong confidence_level = "High" if confidence > 0.8 else "Medium" if confidence > 0.6 else "Low" return (f"{verdict} | {sentiment.title()} sentiment ({confidence_level} conf: {confidence:.2f}) " f"vs {direction} {return_24h:+.2f}% return | DAS: {das_score:.3f}") def calculate_batch_metrics(self, evaluations: List[Dict]) -> Dict: """ Calculate aggregate metrics for a batch of evaluations. Args: evaluations: List of evaluation results Returns: Dictionary with aggregate metrics """ try: if not evaluations: return {"error": "No evaluations provided"} # Filter out error results valid_evals = [e for e in evaluations if "error" not in e] if not valid_evals: return {"error": "No valid evaluations found"} # Calculate metrics das_scores = [e["das_score"] for e in valid_evals] correctness = [e["is_correct"] for e in valid_evals] confidences = [e["confidence"] for e in valid_evals] wat_weights = [e["wat_weight"] for e in valid_evals] # Aggregate metrics avg_das = float(np.mean(das_scores)) accuracy = float(np.mean(correctness)) avg_confidence = float(np.mean(confidences)) # Weighted accuracy weighted_correctness = [float(c) * float(w) for c, w in zip(correctness, wat_weights)] total_weight = sum(wat_weights) weighted_accuracy = float(sum(weighted_correctness) / total_weight) if total_weight > 0 else 0.0 # Confidence-accuracy correlation (handle single evaluation case) if len(confidences) > 1: try: corr_matrix = np.corrcoef(confidences, correctness) confidence_correlation = float(corr_matrix[0, 1]) # Handle NaN case (when all values are the same) if np.isnan(confidence_correlation): confidence_correlation = 0.0 except: confidence_correlation = 0.0 else: confidence_correlation = 0.0 # Cannot calculate correlation with single point # Count high/low confidence predictions high_confidence_count = sum(1 for c in confidences if c > self.confidence_threshold) low_confidence_count = sum(1 for c in confidences if c < 0.6) return { "total_evaluations": len(valid_evals), "average_das": avg_das, "accuracy": accuracy, "weighted_accuracy": weighted_accuracy, "average_confidence": avg_confidence, "confidence_accuracy_correlation": confidence_correlation, "high_confidence_predictions": high_confidence_count, "low_confidence_predictions": low_confidence_count } except Exception as e: logger.error(f"Error calculating batch metrics: {str(e)}") return {"error": str(e)}