from typing import Dict, List, Any, Tuple import numpy as np from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import plotly.graph_objects as go import plotly.express as px import re from collections import defaultdict def strip_formatting(text: str) -> str: """Remove markdown, HTML tags, and normalize whitespace/case.""" # Remove HTML tags text = re.sub(r'<[^>]+>', '', text) # Remove markdown bold/italic/code/headers/links/images text = re.sub(r'\*\*|__|\*|`|#+|!\[[^\]]*\]\([^\)]*\)|\[[^\]]*\]\([^\)]*\)', '', text) # Remove extra whitespace and lowercase text = re.sub(r'\s+', ' ', text).strip().lower() return text class ConsensusAnalyzer: def __init__(self, llm_client=None): self.model = SentenceTransformer('all-MiniLM-L6-v2') self.llm_client = llm_client # Store the LLM client def calculate_similarity_matrix(self, responses: List[str]) -> np.ndarray: """Calculate semantic similarity between all responses (formatting stripped).""" cleaned = [strip_formatting(r) for r in responses] embeddings = self.model.encode(cleaned) return cosine_similarity(embeddings) def calculate_consensus(self, responses: Dict[str, Any]) -> Dict[str, Any]: """Calculate consensus metrics from model responses (formatting stripped).""" # Extract successful responses valid_responses = { model: data["response"] for model, data in responses.items() if data["status"] == "success" } if not valid_responses: return { "consensus_score": 0, "error": "No valid responses received" } # Use cleaned responses for all downstream analysis cleaned_responses = {m: strip_formatting(r) for m, r in valid_responses.items()} response_texts = list(cleaned_responses.values()) similarity_matrix = self.calculate_similarity_matrix(list(valid_responses.values())) # Calculate consensus score (average similarity) consensus_score = float(np.mean(similarity_matrix)) # Identify clusters of similar responses clusters = self._identify_clusters(similarity_matrix, threshold=0.5) # Analyze disagreements with enhanced analysis disagreements = self._analyze_disagreements_enhanced(cleaned_responses, clusters) # Extract key topics and themes topics = self._extract_topics(cleaned_responses) # Analyze confidence patterns confidence_analysis = self._analyze_confidence_patterns(cleaned_responses, similarity_matrix) return { "consensus_score": consensus_score, "clusters": clusters, "disagreements": disagreements, "similarity_matrix": similarity_matrix.tolist(), "topics": topics, "confidence_analysis": confidence_analysis } def _identify_clusters(self, similarity_matrix: np.ndarray, threshold: float) -> List[List[int]]: """Identify clusters of similar responses.""" n = len(similarity_matrix) clusters = [] used = set() for i in range(n): if i in used: continue cluster = [i] used.add(i) for j in range(i + 1, n): if j not in used and similarity_matrix[i, j] >= threshold: cluster.append(j) used.add(j) clusters.append(cluster) return clusters def _analyze_disagreements_enhanced(self, responses: Dict[str, Any], clusters: List[List[int]]) -> List[Dict[str, Any]]: """Enhanced disagreement analysis with topic extraction and reasoning patterns.""" disagreements = [] model_names = list(responses.keys()) response_texts = list(responses.values()) # Extract reasoning patterns and key arguments reasoning_patterns = self._extract_reasoning_patterns(response_texts) # Analyze disagreements by topic areas topic_disagreements = self._analyze_topic_disagreements(response_texts, model_names) # Iterate through all unique pairs of models for i in range(len(model_names)): for j in range(i + 1, len(model_names)): model1_name = model_names[i] model2_name = model_names[j] response1 = response_texts[i] response2 = response_texts[j] # Calculate similarity between the two responses similarity = self.calculate_similarity_matrix([response1, response2])[0, 1] # Enhanced disagreement categorization disagreement_type = self._categorize_disagreement_enhanced([response1], [response2]) # Extract specific points of disagreement disagreement_points = self._extract_disagreement_points(response1, response2) # Generate detailed explanation explanation = self._generate_disagreement_explanation_enhanced( [response1], [response2], disagreement_points, self.llm_client ) disagreement = { "type": disagreement_type, "cluster1": [model1_name], "cluster2": [model2_name], "explanation": explanation, "disagreement_points": disagreement_points, "similarity_score": float(similarity), "reasoning_patterns": { "model1": reasoning_patterns.get(model1_name, {}), "model2": reasoning_patterns.get(model2_name, {}) } } disagreements.append(disagreement) return disagreements def _extract_reasoning_patterns(self, responses: List[str]) -> Dict[str, Dict[str, Any]]: """Extract reasoning patterns from responses.""" patterns = {} for i, response in enumerate(responses): pattern = { "uses_examples": bool(re.search(r'\b(example|instance|case|such as|like)\b', response, re.IGNORECASE)), "uses_evidence": bool(re.search(r'\b(evidence|data|research|study|fact|statistic)\b', response, re.IGNORECASE)), "uses_conditional": bool(re.search(r'\b(if|when|unless|provided that|assuming)\b', response, re.IGNORECASE)), "uses_comparison": bool(re.search(r'\b(however|but|although|while|whereas|on the other hand)\b', response, re.IGNORECASE)), "uses_authority": bool(re.search(r'\b(according to|research shows|experts say|studies indicate)\b', response, re.IGNORECASE)), "sentence_count": len(re.split(r'[.!?]+', response)), "word_count": len(response.split()), "has_conclusion": bool(re.search(r'\b(therefore|thus|consequently|in conclusion|overall)\b', response, re.IGNORECASE)) } patterns[f"model_{i}"] = pattern return patterns def _analyze_topic_disagreements(self, responses: List[str], model_names: List[str]) -> Dict[str, List[str]]: """Analyze disagreements by topic areas.""" # Simple keyword-based topic extraction topics = { "technical": ["algorithm", "implementation", "code", "system", "technical", "architecture"], "ethical": ["ethical", "moral", "right", "wrong", "fair", "bias", "justice"], "practical": ["practical", "feasible", "realistic", "implementable", "practical"], "economic": ["cost", "economic", "financial", "budget", "expensive", "affordable"], "social": ["social", "community", "people", "society", "impact", "benefit"], "legal": ["legal", "law", "regulation", "compliance", "policy"], "safety": ["safety", "security", "risk", "danger", "protect"] } topic_disagreements = defaultdict(list) for topic, keywords in topics.items(): topic_responses = [] for response in responses: if any(keyword in response.lower() for keyword in keywords): topic_responses.append(response) if len(topic_responses) > 1: # Check for disagreements within this topic topic_similarity = self.calculate_similarity_matrix(topic_responses) avg_similarity = np.mean(topic_similarity) if avg_similarity < 0.8: # Threshold for disagreement topic_disagreements[topic].extend(topic_responses) return dict(topic_disagreements) def _extract_disagreement_points(self, response1: str, response2: str) -> List[str]: """Extract specific points where responses disagree.""" # Split responses into sentences sentences1 = [s.strip() for s in re.split(r'[.!?]+', response1) if s.strip()] sentences2 = [s.strip() for s in re.split(r'[.!?]+', response2) if s.strip()] disagreement_points = [] # Look for contrasting statements contrast_keywords = ["however", "but", "although", "while", "whereas", "on the other hand", "in contrast"] for sent1 in sentences1: for sent2 in sentences2: # Check if sentences contain contrasting keywords has_contrast = any(keyword in sent1.lower() or keyword in sent2.lower() for keyword in contrast_keywords) if has_contrast: # Calculate similarity between these sentences similarity = self.calculate_similarity_matrix([sent1, sent2])[0, 1] if similarity < 0.7: # Low similarity indicates disagreement disagreement_points.append(f"'{sent1}' vs '{sent2}'") return disagreement_points[:3] # Limit to top 3 disagreements def _categorize_disagreement_enhanced(self, responses1: List[str], responses2: List[str]) -> str: """Enhanced categorization of disagreements.""" # Get embeddings for all responses all_responses = responses1 + responses2 embeddings = self.model.encode(all_responses) # Calculate average embeddings for each cluster cluster1_avg = np.mean(embeddings[:len(responses1)], axis=0) cluster2_avg = np.mean(embeddings[len(responses1):], axis=0) # Calculate cosine similarity between cluster averages similarity = np.dot(cluster1_avg, cluster2_avg) / (np.linalg.norm(cluster1_avg) * np.linalg.norm(cluster2_avg)) # Enhanced categorization with more granular types if similarity < 0.7: return "Fundamental Disagreement - Models have completely different perspectives" elif similarity < 0.8: return "Major Disagreement - Models agree on some aspects but differ significantly" elif similarity < 0.9: return "Moderate Disagreement - Models mostly agree but have important differences" elif similarity < 0.95: return "Minor Disagreement - Models agree with slight variations" else: return "Strong Agreement - Models are essentially in consensus" def _generate_disagreement_explanation_enhanced(self, responses1: List[str], responses2: List[str], disagreement_points: List[str], llm_client: Any = None) -> str: """Generate enhanced explanation for disagreements using LLM.""" if llm_client and responses1 and responses2: try: prompt = f"""Analyze the disagreement between these two AI model responses. Focus on the key differences in reasoning, assumptions, or conclusions. Response 1: {responses1[0]} Response 2: {responses2[0]} Specific disagreement points: {disagreement_points} Provide a concise analysis (2-3 sentences) explaining: 1. What the main disagreement is about 2. Why the models might have different perspectives 3. Which aspects they agree on (if any) Format as a clear, objective analysis.""" explanation = llm_client._sync_query("meta-llama/Meta-Llama-3.1-70B-Instruct", prompt) return explanation except Exception as e: print(f"Error generating LLM-based explanation: {e}") # Fallback to similarity-based explanation # Fallback explanation based on similarity all_responses = responses1 + responses2 embeddings = self.model.encode(all_responses) cluster1_avg = np.mean(embeddings[:len(responses1)], axis=0) cluster2_avg = np.mean(embeddings[len(responses1):], axis=0) similarity = np.dot(cluster1_avg, cluster2_avg) / (np.linalg.norm(cluster1_avg) * np.linalg.norm(cluster2_avg)) if similarity < 0.7: return "Models have fundamentally different perspectives, possibly due to different training data, reasoning approaches, or interpretation of the question." elif similarity < 0.9: return "Models agree on core concepts but differ in their reasoning, emphasis, or specific conclusions." else: return "Models are in strong agreement with only minor differences in expression or emphasis." def _extract_topics(self, responses: Dict[str, str]) -> Dict[str, List[str]]: """Extract key topics from responses.""" # Simple keyword-based topic extraction topic_keywords = { "technical": ["algorithm", "implementation", "code", "system", "technical", "architecture"], "ethical": ["ethical", "moral", "right", "wrong", "fair", "bias", "justice"], "practical": ["practical", "feasible", "realistic", "implementable", "practical"], "economic": ["cost", "economic", "financial", "budget", "expensive", "affordable"], "social": ["social", "community", "people", "society", "impact", "benefit"], "legal": ["legal", "law", "regulation", "compliance", "policy"], "safety": ["safety", "security", "risk", "danger", "protect"] } topics = defaultdict(list) for model, response in responses.items(): response_lower = response.lower() for topic, keywords in topic_keywords.items(): if any(keyword in response_lower for keyword in keywords): topics[topic].append(model) return dict(topics) def _analyze_confidence_patterns(self, responses: Dict[str, str], similarity_matrix: np.ndarray) -> Dict[str, Any]: """Analyze confidence patterns across models.""" model_names = list(responses.keys()) # Calculate average similarity for each model avg_similarities = [] for i in range(len(similarity_matrix)): similarities = [similarity_matrix[i][j] for j in range(len(similarity_matrix)) if i != j] avg_similarities.append(np.mean(similarities)) # Find most and least confident models most_confident_idx = np.argmax(avg_similarities) least_confident_idx = np.argmin(avg_similarities) return { "most_confident_model": model_names[most_confident_idx], "least_confident_model": model_names[least_confident_idx], "confidence_scores": dict(zip(model_names, avg_similarities)), "confidence_variance": float(np.var(avg_similarities)) } def create_visualization(self, consensus_data: Dict[str, Any]) -> Dict[str, Any]: """Create visualizations for the consensus analysis.""" # Create similarity heatmap similarity_matrix = np.array(consensus_data["similarity_matrix"]) heatmap = go.Figure(data=go.Heatmap( z=similarity_matrix, colorscale='RdYlGn', zmin=0, zmax=1 )) # Create consensus score gauge gauge = go.Figure(go.Indicator( mode="gauge+number", value=consensus_data["consensus_score"] * 100, title={'text': "Consensus Score"}, gauge={'axis': {'range': [0, 100]}, 'bar': {'color': "darkblue"}, 'steps': [ {'range': [0, 33], 'color': "red"}, {'range': [33, 66], 'color': "yellow"}, {'range': [66, 100], 'color': "green"} ]} )) return { "heatmap": heatmap.to_json(), "gauge": gauge.to_json() } def synthesize_consensus_response(self, responses: Dict[str, Any], disagreements: List[Dict[str, Any]]) -> str: """Enhanced synthesis using LLM to intelligently combine responses.""" # Collect all successful responses successful_responses = [data["response"] for data in responses.values() if data["status"] == "success"] if not successful_responses: return "No successful model responses to synthesize." # Use LLM for intelligent synthesis if available if self.llm_client and len(successful_responses) > 1: try: # Prepare the synthesis prompt responses_text = "\n\n".join([f"Model {i+1}: {response}" for i, response in enumerate(successful_responses)]) # Include disagreement analysis if available disagreement_summary = "" if disagreements: disagreement_summary = "\n\nKey Disagreements:\n" for i, d in enumerate(disagreements[:3]): # Top 3 disagreements disagreement_summary += f"- {d['type']}: {d['explanation']}\n" synthesis_prompt = f"""You are an expert consensus synthesizer. Analyze the following AI model responses and create a comprehensive, well-reasoned synthesis that: 1. Identifies the core points of agreement 2. Addresses key disagreements with balanced reasoning 3. Provides a coherent, evidence-based consensus response 4. Acknowledges uncertainty where appropriate 5. Suggests areas for further investigation if needed Model Responses: {responses_text} {disagreement_summary} Create a synthesis that is: - Comprehensive but concise (2-3 paragraphs) - Balanced and objective - Well-structured with clear sections - Professional in tone Format your response with clear sections: Summary, Key Agreements, Addressing Disagreements, and Consensus Conclusion.""" # Use a powerful model for synthesis synthesized_response = self.llm_client._sync_query("meta-llama/Meta-Llama-3.1-405B-Instruct", synthesis_prompt) # Add metadata about the synthesis metadata = f""" --- **Synthesis Metadata:** - Models consulted: {len(successful_responses)} - Consensus score: {self._calculate_overall_consensus(successful_responses):.2f} - Disagreements identified: {len(disagreements)} - Synthesis method: LLM-enhanced consensus --- """ return metadata + "\n\n" + synthesized_response except Exception as e: print(f"Error in LLM synthesis: {e}") # Fallback to basic synthesis # Fallback synthesis (original method) synthesized_response = [] synthesized_response.append("### Synthesized Consensus Response\n\n") synthesized_response.append("Based on the input from various models, here is a consolidated view:\n\n") # Add a general summary of common points synthesized_response.append("**Common Themes:** All models generally agree on the core aspects.\n\n") # Elaborate on disagreements if any if disagreements: synthesized_response.append("**Areas of Divergence:**\n") for i, d in enumerate(disagreements): synthesized_response.append(f"- **Disagreement {i+1} ({d['type']}):** Models {', '.join(d['cluster1'])} and {', '.join(d['cluster2'])} have differing views. {d['explanation']}\n") else: synthesized_response.append("Models are in strong agreement on this topic.\n") # Append all responses for reference synthesized_response.append("\n--- \n\n**Individual Model Responses (for reference):**\n\n") for model_name, response_text in responses.items(): if response_text["status"] == "success": synthesized_response.append(f"**{model_name}:**\n{response_text['response']}\n\n") return "".join(synthesized_response) def _calculate_overall_consensus(self, responses: List[str]) -> float: """Calculate overall consensus score from responses.""" if len(responses) < 2: return 1.0 similarity_matrix = self.calculate_similarity_matrix(responses) return float(np.mean(similarity_matrix))