Such.AI / consensus_logic.py
aaleya-5's picture
main changes
c9bb632 verified
from typing import Dict, List, Any, Tuple
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import plotly.graph_objects as go
import plotly.express as px
import re
from collections import defaultdict
def strip_formatting(text: str) -> str:
"""Remove markdown, HTML tags, and normalize whitespace/case."""
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Remove markdown bold/italic/code/headers/links/images
text = re.sub(r'\*\*|__|\*|`|#+|!\[[^\]]*\]\([^\)]*\)|\[[^\]]*\]\([^\)]*\)', '', text)
# Remove extra whitespace and lowercase
text = re.sub(r'\s+', ' ', text).strip().lower()
return text
class ConsensusAnalyzer:
def __init__(self, llm_client=None):
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.llm_client = llm_client # Store the LLM client
def calculate_similarity_matrix(self, responses: List[str]) -> np.ndarray:
"""Calculate semantic similarity between all responses (formatting stripped)."""
cleaned = [strip_formatting(r) for r in responses]
embeddings = self.model.encode(cleaned)
return cosine_similarity(embeddings)
def calculate_consensus(self, responses: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate consensus metrics from model responses (formatting stripped)."""
# Extract successful responses
valid_responses = {
model: data["response"]
for model, data in responses.items()
if data["status"] == "success"
}
if not valid_responses:
return {
"consensus_score": 0,
"error": "No valid responses received"
}
# Use cleaned responses for all downstream analysis
cleaned_responses = {m: strip_formatting(r) for m, r in valid_responses.items()}
response_texts = list(cleaned_responses.values())
similarity_matrix = self.calculate_similarity_matrix(list(valid_responses.values()))
# Calculate consensus score (average similarity)
consensus_score = float(np.mean(similarity_matrix))
# Identify clusters of similar responses
clusters = self._identify_clusters(similarity_matrix, threshold=0.5)
# Analyze disagreements with enhanced analysis
disagreements = self._analyze_disagreements_enhanced(cleaned_responses, clusters)
# Extract key topics and themes
topics = self._extract_topics(cleaned_responses)
# Analyze confidence patterns
confidence_analysis = self._analyze_confidence_patterns(cleaned_responses, similarity_matrix)
return {
"consensus_score": consensus_score,
"clusters": clusters,
"disagreements": disagreements,
"similarity_matrix": similarity_matrix.tolist(),
"topics": topics,
"confidence_analysis": confidence_analysis
}
def _identify_clusters(self, similarity_matrix: np.ndarray, threshold: float) -> List[List[int]]:
"""Identify clusters of similar responses."""
n = len(similarity_matrix)
clusters = []
used = set()
for i in range(n):
if i in used:
continue
cluster = [i]
used.add(i)
for j in range(i + 1, n):
if j not in used and similarity_matrix[i, j] >= threshold:
cluster.append(j)
used.add(j)
clusters.append(cluster)
return clusters
def _analyze_disagreements_enhanced(self, responses: Dict[str, Any], clusters: List[List[int]]) -> List[Dict[str, Any]]:
"""Enhanced disagreement analysis with topic extraction and reasoning patterns."""
disagreements = []
model_names = list(responses.keys())
response_texts = list(responses.values())
# Extract reasoning patterns and key arguments
reasoning_patterns = self._extract_reasoning_patterns(response_texts)
# Analyze disagreements by topic areas
topic_disagreements = self._analyze_topic_disagreements(response_texts, model_names)
# Iterate through all unique pairs of models
for i in range(len(model_names)):
for j in range(i + 1, len(model_names)):
model1_name = model_names[i]
model2_name = model_names[j]
response1 = response_texts[i]
response2 = response_texts[j]
# Calculate similarity between the two responses
similarity = self.calculate_similarity_matrix([response1, response2])[0, 1]
# Enhanced disagreement categorization
disagreement_type = self._categorize_disagreement_enhanced([response1], [response2])
# Extract specific points of disagreement
disagreement_points = self._extract_disagreement_points(response1, response2)
# Generate detailed explanation
explanation = self._generate_disagreement_explanation_enhanced(
[response1], [response2], disagreement_points, self.llm_client
)
disagreement = {
"type": disagreement_type,
"cluster1": [model1_name],
"cluster2": [model2_name],
"explanation": explanation,
"disagreement_points": disagreement_points,
"similarity_score": float(similarity),
"reasoning_patterns": {
"model1": reasoning_patterns.get(model1_name, {}),
"model2": reasoning_patterns.get(model2_name, {})
}
}
disagreements.append(disagreement)
return disagreements
def _extract_reasoning_patterns(self, responses: List[str]) -> Dict[str, Dict[str, Any]]:
"""Extract reasoning patterns from responses."""
patterns = {}
for i, response in enumerate(responses):
pattern = {
"uses_examples": bool(re.search(r'\b(example|instance|case|such as|like)\b', response, re.IGNORECASE)),
"uses_evidence": bool(re.search(r'\b(evidence|data|research|study|fact|statistic)\b', response, re.IGNORECASE)),
"uses_conditional": bool(re.search(r'\b(if|when|unless|provided that|assuming)\b', response, re.IGNORECASE)),
"uses_comparison": bool(re.search(r'\b(however|but|although|while|whereas|on the other hand)\b', response, re.IGNORECASE)),
"uses_authority": bool(re.search(r'\b(according to|research shows|experts say|studies indicate)\b', response, re.IGNORECASE)),
"sentence_count": len(re.split(r'[.!?]+', response)),
"word_count": len(response.split()),
"has_conclusion": bool(re.search(r'\b(therefore|thus|consequently|in conclusion|overall)\b', response, re.IGNORECASE))
}
patterns[f"model_{i}"] = pattern
return patterns
def _analyze_topic_disagreements(self, responses: List[str], model_names: List[str]) -> Dict[str, List[str]]:
"""Analyze disagreements by topic areas."""
# Simple keyword-based topic extraction
topics = {
"technical": ["algorithm", "implementation", "code", "system", "technical", "architecture"],
"ethical": ["ethical", "moral", "right", "wrong", "fair", "bias", "justice"],
"practical": ["practical", "feasible", "realistic", "implementable", "practical"],
"economic": ["cost", "economic", "financial", "budget", "expensive", "affordable"],
"social": ["social", "community", "people", "society", "impact", "benefit"],
"legal": ["legal", "law", "regulation", "compliance", "policy"],
"safety": ["safety", "security", "risk", "danger", "protect"]
}
topic_disagreements = defaultdict(list)
for topic, keywords in topics.items():
topic_responses = []
for response in responses:
if any(keyword in response.lower() for keyword in keywords):
topic_responses.append(response)
if len(topic_responses) > 1:
# Check for disagreements within this topic
topic_similarity = self.calculate_similarity_matrix(topic_responses)
avg_similarity = np.mean(topic_similarity)
if avg_similarity < 0.8: # Threshold for disagreement
topic_disagreements[topic].extend(topic_responses)
return dict(topic_disagreements)
def _extract_disagreement_points(self, response1: str, response2: str) -> List[str]:
"""Extract specific points where responses disagree."""
# Split responses into sentences
sentences1 = [s.strip() for s in re.split(r'[.!?]+', response1) if s.strip()]
sentences2 = [s.strip() for s in re.split(r'[.!?]+', response2) if s.strip()]
disagreement_points = []
# Look for contrasting statements
contrast_keywords = ["however", "but", "although", "while", "whereas", "on the other hand", "in contrast"]
for sent1 in sentences1:
for sent2 in sentences2:
# Check if sentences contain contrasting keywords
has_contrast = any(keyword in sent1.lower() or keyword in sent2.lower()
for keyword in contrast_keywords)
if has_contrast:
# Calculate similarity between these sentences
similarity = self.calculate_similarity_matrix([sent1, sent2])[0, 1]
if similarity < 0.7: # Low similarity indicates disagreement
disagreement_points.append(f"'{sent1}' vs '{sent2}'")
return disagreement_points[:3] # Limit to top 3 disagreements
def _categorize_disagreement_enhanced(self, responses1: List[str], responses2: List[str]) -> str:
"""Enhanced categorization of disagreements."""
# Get embeddings for all responses
all_responses = responses1 + responses2
embeddings = self.model.encode(all_responses)
# Calculate average embeddings for each cluster
cluster1_avg = np.mean(embeddings[:len(responses1)], axis=0)
cluster2_avg = np.mean(embeddings[len(responses1):], axis=0)
# Calculate cosine similarity between cluster averages
similarity = np.dot(cluster1_avg, cluster2_avg) / (np.linalg.norm(cluster1_avg) * np.linalg.norm(cluster2_avg))
# Enhanced categorization with more granular types
if similarity < 0.7:
return "Fundamental Disagreement - Models have completely different perspectives"
elif similarity < 0.8:
return "Major Disagreement - Models agree on some aspects but differ significantly"
elif similarity < 0.9:
return "Moderate Disagreement - Models mostly agree but have important differences"
elif similarity < 0.95:
return "Minor Disagreement - Models agree with slight variations"
else:
return "Strong Agreement - Models are essentially in consensus"
def _generate_disagreement_explanation_enhanced(self, responses1: List[str], responses2: List[str],
disagreement_points: List[str], llm_client: Any = None) -> str:
"""Generate enhanced explanation for disagreements using LLM."""
if llm_client and responses1 and responses2:
try:
prompt = f"""Analyze the disagreement between these two AI model responses.
Focus on the key differences in reasoning, assumptions, or conclusions.
Response 1: {responses1[0]}
Response 2: {responses2[0]}
Specific disagreement points: {disagreement_points}
Provide a concise analysis (2-3 sentences) explaining:
1. What the main disagreement is about
2. Why the models might have different perspectives
3. Which aspects they agree on (if any)
Format as a clear, objective analysis."""
explanation = llm_client._sync_query("meta-llama/Meta-Llama-3.1-70B-Instruct", prompt)
return explanation
except Exception as e:
print(f"Error generating LLM-based explanation: {e}")
# Fallback to similarity-based explanation
# Fallback explanation based on similarity
all_responses = responses1 + responses2
embeddings = self.model.encode(all_responses)
cluster1_avg = np.mean(embeddings[:len(responses1)], axis=0)
cluster2_avg = np.mean(embeddings[len(responses1):], axis=0)
similarity = np.dot(cluster1_avg, cluster2_avg) / (np.linalg.norm(cluster1_avg) * np.linalg.norm(cluster2_avg))
if similarity < 0.7:
return "Models have fundamentally different perspectives, possibly due to different training data, reasoning approaches, or interpretation of the question."
elif similarity < 0.9:
return "Models agree on core concepts but differ in their reasoning, emphasis, or specific conclusions."
else:
return "Models are in strong agreement with only minor differences in expression or emphasis."
def _extract_topics(self, responses: Dict[str, str]) -> Dict[str, List[str]]:
"""Extract key topics from responses."""
# Simple keyword-based topic extraction
topic_keywords = {
"technical": ["algorithm", "implementation", "code", "system", "technical", "architecture"],
"ethical": ["ethical", "moral", "right", "wrong", "fair", "bias", "justice"],
"practical": ["practical", "feasible", "realistic", "implementable", "practical"],
"economic": ["cost", "economic", "financial", "budget", "expensive", "affordable"],
"social": ["social", "community", "people", "society", "impact", "benefit"],
"legal": ["legal", "law", "regulation", "compliance", "policy"],
"safety": ["safety", "security", "risk", "danger", "protect"]
}
topics = defaultdict(list)
for model, response in responses.items():
response_lower = response.lower()
for topic, keywords in topic_keywords.items():
if any(keyword in response_lower for keyword in keywords):
topics[topic].append(model)
return dict(topics)
def _analyze_confidence_patterns(self, responses: Dict[str, str], similarity_matrix: np.ndarray) -> Dict[str, Any]:
"""Analyze confidence patterns across models."""
model_names = list(responses.keys())
# Calculate average similarity for each model
avg_similarities = []
for i in range(len(similarity_matrix)):
similarities = [similarity_matrix[i][j] for j in range(len(similarity_matrix)) if i != j]
avg_similarities.append(np.mean(similarities))
# Find most and least confident models
most_confident_idx = np.argmax(avg_similarities)
least_confident_idx = np.argmin(avg_similarities)
return {
"most_confident_model": model_names[most_confident_idx],
"least_confident_model": model_names[least_confident_idx],
"confidence_scores": dict(zip(model_names, avg_similarities)),
"confidence_variance": float(np.var(avg_similarities))
}
def create_visualization(self, consensus_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create visualizations for the consensus analysis."""
# Create similarity heatmap
similarity_matrix = np.array(consensus_data["similarity_matrix"])
heatmap = go.Figure(data=go.Heatmap(
z=similarity_matrix,
colorscale='RdYlGn',
zmin=0,
zmax=1
))
# Create consensus score gauge
gauge = go.Figure(go.Indicator(
mode="gauge+number",
value=consensus_data["consensus_score"] * 100,
title={'text': "Consensus Score"},
gauge={'axis': {'range': [0, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 33], 'color': "red"},
{'range': [33, 66], 'color': "yellow"},
{'range': [66, 100], 'color': "green"}
]}
))
return {
"heatmap": heatmap.to_json(),
"gauge": gauge.to_json()
}
def synthesize_consensus_response(self, responses: Dict[str, Any], disagreements: List[Dict[str, Any]]) -> str:
"""Enhanced synthesis using LLM to intelligently combine responses."""
# Collect all successful responses
successful_responses = [data["response"] for data in responses.values() if data["status"] == "success"]
if not successful_responses:
return "No successful model responses to synthesize."
# Use LLM for intelligent synthesis if available
if self.llm_client and len(successful_responses) > 1:
try:
# Prepare the synthesis prompt
responses_text = "\n\n".join([f"Model {i+1}: {response}" for i, response in enumerate(successful_responses)])
# Include disagreement analysis if available
disagreement_summary = ""
if disagreements:
disagreement_summary = "\n\nKey Disagreements:\n"
for i, d in enumerate(disagreements[:3]): # Top 3 disagreements
disagreement_summary += f"- {d['type']}: {d['explanation']}\n"
synthesis_prompt = f"""You are an expert consensus synthesizer. Analyze the following AI model responses and create a comprehensive, well-reasoned synthesis that:
1. Identifies the core points of agreement
2. Addresses key disagreements with balanced reasoning
3. Provides a coherent, evidence-based consensus response
4. Acknowledges uncertainty where appropriate
5. Suggests areas for further investigation if needed
Model Responses:
{responses_text}
{disagreement_summary}
Create a synthesis that is:
- Comprehensive but concise (2-3 paragraphs)
- Balanced and objective
- Well-structured with clear sections
- Professional in tone
Format your response with clear sections: Summary, Key Agreements, Addressing Disagreements, and Consensus Conclusion."""
# Use a powerful model for synthesis
synthesized_response = self.llm_client._sync_query("meta-llama/Meta-Llama-3.1-405B-Instruct", synthesis_prompt)
# Add metadata about the synthesis
metadata = f"""
---
**Synthesis Metadata:**
- Models consulted: {len(successful_responses)}
- Consensus score: {self._calculate_overall_consensus(successful_responses):.2f}
- Disagreements identified: {len(disagreements)}
- Synthesis method: LLM-enhanced consensus
---
"""
return metadata + "\n\n" + synthesized_response
except Exception as e:
print(f"Error in LLM synthesis: {e}")
# Fallback to basic synthesis
# Fallback synthesis (original method)
synthesized_response = []
synthesized_response.append("### Synthesized Consensus Response\n\n")
synthesized_response.append("Based on the input from various models, here is a consolidated view:\n\n")
# Add a general summary of common points
synthesized_response.append("**Common Themes:** All models generally agree on the core aspects.\n\n")
# Elaborate on disagreements if any
if disagreements:
synthesized_response.append("**Areas of Divergence:**\n")
for i, d in enumerate(disagreements):
synthesized_response.append(f"- **Disagreement {i+1} ({d['type']}):** Models {', '.join(d['cluster1'])} and {', '.join(d['cluster2'])} have differing views. {d['explanation']}\n")
else:
synthesized_response.append("Models are in strong agreement on this topic.\n")
# Append all responses for reference
synthesized_response.append("\n--- \n\n**Individual Model Responses (for reference):**\n\n")
for model_name, response_text in responses.items():
if response_text["status"] == "success":
synthesized_response.append(f"**{model_name}:**\n{response_text['response']}\n\n")
return "".join(synthesized_response)
def _calculate_overall_consensus(self, responses: List[str]) -> float:
"""Calculate overall consensus score from responses."""
if len(responses) < 2:
return 1.0
similarity_matrix = self.calculate_similarity_matrix(responses)
return float(np.mean(similarity_matrix))