|
from typing import Dict, List, Any, Tuple
|
|
import numpy as np
|
|
from sentence_transformers import SentenceTransformer
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
import plotly.graph_objects as go
|
|
import plotly.express as px
|
|
import re
|
|
from collections import defaultdict
|
|
|
|
def strip_formatting(text: str) -> str:
|
|
"""Remove markdown, HTML tags, and normalize whitespace/case."""
|
|
|
|
text = re.sub(r'<[^>]+>', '', text)
|
|
|
|
text = re.sub(r'\*\*|__|\*|`|#+|!\[[^\]]*\]\([^\)]*\)|\[[^\]]*\]\([^\)]*\)', '', text)
|
|
|
|
text = re.sub(r'\s+', ' ', text).strip().lower()
|
|
return text
|
|
|
|
class ConsensusAnalyzer:
|
|
def __init__(self, llm_client=None):
|
|
self.model = SentenceTransformer('all-MiniLM-L6-v2')
|
|
self.llm_client = llm_client
|
|
|
|
def calculate_similarity_matrix(self, responses: List[str]) -> np.ndarray:
|
|
"""Calculate semantic similarity between all responses (formatting stripped)."""
|
|
cleaned = [strip_formatting(r) for r in responses]
|
|
embeddings = self.model.encode(cleaned)
|
|
return cosine_similarity(embeddings)
|
|
|
|
def calculate_consensus(self, responses: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Calculate consensus metrics from model responses (formatting stripped)."""
|
|
|
|
valid_responses = {
|
|
model: data["response"]
|
|
for model, data in responses.items()
|
|
if data["status"] == "success"
|
|
}
|
|
|
|
if not valid_responses:
|
|
return {
|
|
"consensus_score": 0,
|
|
"error": "No valid responses received"
|
|
}
|
|
|
|
|
|
cleaned_responses = {m: strip_formatting(r) for m, r in valid_responses.items()}
|
|
response_texts = list(cleaned_responses.values())
|
|
similarity_matrix = self.calculate_similarity_matrix(list(valid_responses.values()))
|
|
|
|
|
|
consensus_score = float(np.mean(similarity_matrix))
|
|
|
|
|
|
clusters = self._identify_clusters(similarity_matrix, threshold=0.5)
|
|
|
|
|
|
disagreements = self._analyze_disagreements_enhanced(cleaned_responses, clusters)
|
|
|
|
|
|
topics = self._extract_topics(cleaned_responses)
|
|
|
|
|
|
confidence_analysis = self._analyze_confidence_patterns(cleaned_responses, similarity_matrix)
|
|
|
|
return {
|
|
"consensus_score": consensus_score,
|
|
"clusters": clusters,
|
|
"disagreements": disagreements,
|
|
"similarity_matrix": similarity_matrix.tolist(),
|
|
"topics": topics,
|
|
"confidence_analysis": confidence_analysis
|
|
}
|
|
|
|
def _identify_clusters(self, similarity_matrix: np.ndarray, threshold: float) -> List[List[int]]:
|
|
"""Identify clusters of similar responses."""
|
|
n = len(similarity_matrix)
|
|
clusters = []
|
|
used = set()
|
|
|
|
for i in range(n):
|
|
if i in used:
|
|
continue
|
|
|
|
cluster = [i]
|
|
used.add(i)
|
|
|
|
for j in range(i + 1, n):
|
|
if j not in used and similarity_matrix[i, j] >= threshold:
|
|
cluster.append(j)
|
|
used.add(j)
|
|
|
|
clusters.append(cluster)
|
|
|
|
return clusters
|
|
|
|
def _analyze_disagreements_enhanced(self, responses: Dict[str, Any], clusters: List[List[int]]) -> List[Dict[str, Any]]:
|
|
"""Enhanced disagreement analysis with topic extraction and reasoning patterns."""
|
|
disagreements = []
|
|
model_names = list(responses.keys())
|
|
response_texts = list(responses.values())
|
|
|
|
|
|
reasoning_patterns = self._extract_reasoning_patterns(response_texts)
|
|
|
|
|
|
topic_disagreements = self._analyze_topic_disagreements(response_texts, model_names)
|
|
|
|
|
|
for i in range(len(model_names)):
|
|
for j in range(i + 1, len(model_names)):
|
|
model1_name = model_names[i]
|
|
model2_name = model_names[j]
|
|
response1 = response_texts[i]
|
|
response2 = response_texts[j]
|
|
|
|
|
|
similarity = self.calculate_similarity_matrix([response1, response2])[0, 1]
|
|
|
|
|
|
disagreement_type = self._categorize_disagreement_enhanced([response1], [response2])
|
|
|
|
|
|
disagreement_points = self._extract_disagreement_points(response1, response2)
|
|
|
|
|
|
explanation = self._generate_disagreement_explanation_enhanced(
|
|
[response1], [response2], disagreement_points, self.llm_client
|
|
)
|
|
|
|
disagreement = {
|
|
"type": disagreement_type,
|
|
"cluster1": [model1_name],
|
|
"cluster2": [model2_name],
|
|
"explanation": explanation,
|
|
"disagreement_points": disagreement_points,
|
|
"similarity_score": float(similarity),
|
|
"reasoning_patterns": {
|
|
"model1": reasoning_patterns.get(model1_name, {}),
|
|
"model2": reasoning_patterns.get(model2_name, {})
|
|
}
|
|
}
|
|
disagreements.append(disagreement)
|
|
|
|
return disagreements
|
|
|
|
def _extract_reasoning_patterns(self, responses: List[str]) -> Dict[str, Dict[str, Any]]:
|
|
"""Extract reasoning patterns from responses."""
|
|
patterns = {}
|
|
|
|
for i, response in enumerate(responses):
|
|
pattern = {
|
|
"uses_examples": bool(re.search(r'\b(example|instance|case|such as|like)\b', response, re.IGNORECASE)),
|
|
"uses_evidence": bool(re.search(r'\b(evidence|data|research|study|fact|statistic)\b', response, re.IGNORECASE)),
|
|
"uses_conditional": bool(re.search(r'\b(if|when|unless|provided that|assuming)\b', response, re.IGNORECASE)),
|
|
"uses_comparison": bool(re.search(r'\b(however|but|although|while|whereas|on the other hand)\b', response, re.IGNORECASE)),
|
|
"uses_authority": bool(re.search(r'\b(according to|research shows|experts say|studies indicate)\b', response, re.IGNORECASE)),
|
|
"sentence_count": len(re.split(r'[.!?]+', response)),
|
|
"word_count": len(response.split()),
|
|
"has_conclusion": bool(re.search(r'\b(therefore|thus|consequently|in conclusion|overall)\b', response, re.IGNORECASE))
|
|
}
|
|
patterns[f"model_{i}"] = pattern
|
|
|
|
return patterns
|
|
|
|
def _analyze_topic_disagreements(self, responses: List[str], model_names: List[str]) -> Dict[str, List[str]]:
|
|
"""Analyze disagreements by topic areas."""
|
|
|
|
topics = {
|
|
"technical": ["algorithm", "implementation", "code", "system", "technical", "architecture"],
|
|
"ethical": ["ethical", "moral", "right", "wrong", "fair", "bias", "justice"],
|
|
"practical": ["practical", "feasible", "realistic", "implementable", "practical"],
|
|
"economic": ["cost", "economic", "financial", "budget", "expensive", "affordable"],
|
|
"social": ["social", "community", "people", "society", "impact", "benefit"],
|
|
"legal": ["legal", "law", "regulation", "compliance", "policy"],
|
|
"safety": ["safety", "security", "risk", "danger", "protect"]
|
|
}
|
|
|
|
topic_disagreements = defaultdict(list)
|
|
|
|
for topic, keywords in topics.items():
|
|
topic_responses = []
|
|
for response in responses:
|
|
if any(keyword in response.lower() for keyword in keywords):
|
|
topic_responses.append(response)
|
|
|
|
if len(topic_responses) > 1:
|
|
|
|
topic_similarity = self.calculate_similarity_matrix(topic_responses)
|
|
avg_similarity = np.mean(topic_similarity)
|
|
|
|
if avg_similarity < 0.8:
|
|
topic_disagreements[topic].extend(topic_responses)
|
|
|
|
return dict(topic_disagreements)
|
|
|
|
def _extract_disagreement_points(self, response1: str, response2: str) -> List[str]:
|
|
"""Extract specific points where responses disagree."""
|
|
|
|
sentences1 = [s.strip() for s in re.split(r'[.!?]+', response1) if s.strip()]
|
|
sentences2 = [s.strip() for s in re.split(r'[.!?]+', response2) if s.strip()]
|
|
|
|
disagreement_points = []
|
|
|
|
|
|
contrast_keywords = ["however", "but", "although", "while", "whereas", "on the other hand", "in contrast"]
|
|
|
|
for sent1 in sentences1:
|
|
for sent2 in sentences2:
|
|
|
|
has_contrast = any(keyword in sent1.lower() or keyword in sent2.lower()
|
|
for keyword in contrast_keywords)
|
|
|
|
if has_contrast:
|
|
|
|
similarity = self.calculate_similarity_matrix([sent1, sent2])[0, 1]
|
|
if similarity < 0.7:
|
|
disagreement_points.append(f"'{sent1}' vs '{sent2}'")
|
|
|
|
return disagreement_points[:3]
|
|
|
|
def _categorize_disagreement_enhanced(self, responses1: List[str], responses2: List[str]) -> str:
|
|
"""Enhanced categorization of disagreements."""
|
|
|
|
all_responses = responses1 + responses2
|
|
embeddings = self.model.encode(all_responses)
|
|
|
|
|
|
cluster1_avg = np.mean(embeddings[:len(responses1)], axis=0)
|
|
cluster2_avg = np.mean(embeddings[len(responses1):], axis=0)
|
|
|
|
|
|
similarity = np.dot(cluster1_avg, cluster2_avg) / (np.linalg.norm(cluster1_avg) * np.linalg.norm(cluster2_avg))
|
|
|
|
|
|
if similarity < 0.7:
|
|
return "Fundamental Disagreement - Models have completely different perspectives"
|
|
elif similarity < 0.8:
|
|
return "Major Disagreement - Models agree on some aspects but differ significantly"
|
|
elif similarity < 0.9:
|
|
return "Moderate Disagreement - Models mostly agree but have important differences"
|
|
elif similarity < 0.95:
|
|
return "Minor Disagreement - Models agree with slight variations"
|
|
else:
|
|
return "Strong Agreement - Models are essentially in consensus"
|
|
|
|
def _generate_disagreement_explanation_enhanced(self, responses1: List[str], responses2: List[str],
|
|
disagreement_points: List[str], llm_client: Any = None) -> str:
|
|
"""Generate enhanced explanation for disagreements using LLM."""
|
|
if llm_client and responses1 and responses2:
|
|
try:
|
|
prompt = f"""Analyze the disagreement between these two AI model responses.
|
|
Focus on the key differences in reasoning, assumptions, or conclusions.
|
|
|
|
Response 1: {responses1[0]}
|
|
Response 2: {responses2[0]}
|
|
|
|
Specific disagreement points: {disagreement_points}
|
|
|
|
Provide a concise analysis (2-3 sentences) explaining:
|
|
1. What the main disagreement is about
|
|
2. Why the models might have different perspectives
|
|
3. Which aspects they agree on (if any)
|
|
|
|
Format as a clear, objective analysis."""
|
|
|
|
explanation = llm_client._sync_query("meta-llama/Meta-Llama-3.1-70B-Instruct", prompt)
|
|
return explanation
|
|
except Exception as e:
|
|
print(f"Error generating LLM-based explanation: {e}")
|
|
|
|
|
|
|
|
all_responses = responses1 + responses2
|
|
embeddings = self.model.encode(all_responses)
|
|
cluster1_avg = np.mean(embeddings[:len(responses1)], axis=0)
|
|
cluster2_avg = np.mean(embeddings[len(responses1):], axis=0)
|
|
similarity = np.dot(cluster1_avg, cluster2_avg) / (np.linalg.norm(cluster1_avg) * np.linalg.norm(cluster2_avg))
|
|
|
|
if similarity < 0.7:
|
|
return "Models have fundamentally different perspectives, possibly due to different training data, reasoning approaches, or interpretation of the question."
|
|
elif similarity < 0.9:
|
|
return "Models agree on core concepts but differ in their reasoning, emphasis, or specific conclusions."
|
|
else:
|
|
return "Models are in strong agreement with only minor differences in expression or emphasis."
|
|
|
|
def _extract_topics(self, responses: Dict[str, str]) -> Dict[str, List[str]]:
|
|
"""Extract key topics from responses."""
|
|
|
|
topic_keywords = {
|
|
"technical": ["algorithm", "implementation", "code", "system", "technical", "architecture"],
|
|
"ethical": ["ethical", "moral", "right", "wrong", "fair", "bias", "justice"],
|
|
"practical": ["practical", "feasible", "realistic", "implementable", "practical"],
|
|
"economic": ["cost", "economic", "financial", "budget", "expensive", "affordable"],
|
|
"social": ["social", "community", "people", "society", "impact", "benefit"],
|
|
"legal": ["legal", "law", "regulation", "compliance", "policy"],
|
|
"safety": ["safety", "security", "risk", "danger", "protect"]
|
|
}
|
|
|
|
topics = defaultdict(list)
|
|
|
|
for model, response in responses.items():
|
|
response_lower = response.lower()
|
|
for topic, keywords in topic_keywords.items():
|
|
if any(keyword in response_lower for keyword in keywords):
|
|
topics[topic].append(model)
|
|
|
|
return dict(topics)
|
|
|
|
def _analyze_confidence_patterns(self, responses: Dict[str, str], similarity_matrix: np.ndarray) -> Dict[str, Any]:
|
|
"""Analyze confidence patterns across models."""
|
|
model_names = list(responses.keys())
|
|
|
|
|
|
avg_similarities = []
|
|
for i in range(len(similarity_matrix)):
|
|
similarities = [similarity_matrix[i][j] for j in range(len(similarity_matrix)) if i != j]
|
|
avg_similarities.append(np.mean(similarities))
|
|
|
|
|
|
most_confident_idx = np.argmax(avg_similarities)
|
|
least_confident_idx = np.argmin(avg_similarities)
|
|
|
|
return {
|
|
"most_confident_model": model_names[most_confident_idx],
|
|
"least_confident_model": model_names[least_confident_idx],
|
|
"confidence_scores": dict(zip(model_names, avg_similarities)),
|
|
"confidence_variance": float(np.var(avg_similarities))
|
|
}
|
|
|
|
def create_visualization(self, consensus_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Create visualizations for the consensus analysis."""
|
|
|
|
similarity_matrix = np.array(consensus_data["similarity_matrix"])
|
|
heatmap = go.Figure(data=go.Heatmap(
|
|
z=similarity_matrix,
|
|
colorscale='RdYlGn',
|
|
zmin=0,
|
|
zmax=1
|
|
))
|
|
|
|
|
|
gauge = go.Figure(go.Indicator(
|
|
mode="gauge+number",
|
|
value=consensus_data["consensus_score"] * 100,
|
|
title={'text': "Consensus Score"},
|
|
gauge={'axis': {'range': [0, 100]},
|
|
'bar': {'color': "darkblue"},
|
|
'steps': [
|
|
{'range': [0, 33], 'color': "red"},
|
|
{'range': [33, 66], 'color': "yellow"},
|
|
{'range': [66, 100], 'color': "green"}
|
|
]}
|
|
))
|
|
|
|
return {
|
|
"heatmap": heatmap.to_json(),
|
|
"gauge": gauge.to_json()
|
|
}
|
|
|
|
def synthesize_consensus_response(self, responses: Dict[str, Any], disagreements: List[Dict[str, Any]]) -> str:
|
|
"""Enhanced synthesis using LLM to intelligently combine responses."""
|
|
|
|
successful_responses = [data["response"] for data in responses.values() if data["status"] == "success"]
|
|
|
|
if not successful_responses:
|
|
return "No successful model responses to synthesize."
|
|
|
|
|
|
if self.llm_client and len(successful_responses) > 1:
|
|
try:
|
|
|
|
responses_text = "\n\n".join([f"Model {i+1}: {response}" for i, response in enumerate(successful_responses)])
|
|
|
|
|
|
disagreement_summary = ""
|
|
if disagreements:
|
|
disagreement_summary = "\n\nKey Disagreements:\n"
|
|
for i, d in enumerate(disagreements[:3]):
|
|
disagreement_summary += f"- {d['type']}: {d['explanation']}\n"
|
|
|
|
synthesis_prompt = f"""You are an expert consensus synthesizer. Analyze the following AI model responses and create a comprehensive, well-reasoned synthesis that:
|
|
|
|
1. Identifies the core points of agreement
|
|
2. Addresses key disagreements with balanced reasoning
|
|
3. Provides a coherent, evidence-based consensus response
|
|
4. Acknowledges uncertainty where appropriate
|
|
5. Suggests areas for further investigation if needed
|
|
|
|
Model Responses:
|
|
{responses_text}
|
|
{disagreement_summary}
|
|
|
|
Create a synthesis that is:
|
|
- Comprehensive but concise (2-3 paragraphs)
|
|
- Balanced and objective
|
|
- Well-structured with clear sections
|
|
- Professional in tone
|
|
|
|
Format your response with clear sections: Summary, Key Agreements, Addressing Disagreements, and Consensus Conclusion."""
|
|
|
|
|
|
synthesized_response = self.llm_client._sync_query("meta-llama/Meta-Llama-3.1-405B-Instruct", synthesis_prompt)
|
|
|
|
|
|
metadata = f"""
|
|
---
|
|
**Synthesis Metadata:**
|
|
- Models consulted: {len(successful_responses)}
|
|
- Consensus score: {self._calculate_overall_consensus(successful_responses):.2f}
|
|
- Disagreements identified: {len(disagreements)}
|
|
- Synthesis method: LLM-enhanced consensus
|
|
---
|
|
"""
|
|
return metadata + "\n\n" + synthesized_response
|
|
|
|
except Exception as e:
|
|
print(f"Error in LLM synthesis: {e}")
|
|
|
|
|
|
|
|
synthesized_response = []
|
|
synthesized_response.append("### Synthesized Consensus Response\n\n")
|
|
synthesized_response.append("Based on the input from various models, here is a consolidated view:\n\n")
|
|
|
|
|
|
synthesized_response.append("**Common Themes:** All models generally agree on the core aspects.\n\n")
|
|
|
|
|
|
if disagreements:
|
|
synthesized_response.append("**Areas of Divergence:**\n")
|
|
for i, d in enumerate(disagreements):
|
|
synthesized_response.append(f"- **Disagreement {i+1} ({d['type']}):** Models {', '.join(d['cluster1'])} and {', '.join(d['cluster2'])} have differing views. {d['explanation']}\n")
|
|
else:
|
|
synthesized_response.append("Models are in strong agreement on this topic.\n")
|
|
|
|
|
|
synthesized_response.append("\n--- \n\n**Individual Model Responses (for reference):**\n\n")
|
|
for model_name, response_text in responses.items():
|
|
if response_text["status"] == "success":
|
|
synthesized_response.append(f"**{model_name}:**\n{response_text['response']}\n\n")
|
|
|
|
return "".join(synthesized_response)
|
|
|
|
def _calculate_overall_consensus(self, responses: List[str]) -> float:
|
|
"""Calculate overall consensus score from responses."""
|
|
if len(responses) < 2:
|
|
return 1.0
|
|
|
|
similarity_matrix = self.calculate_similarity_matrix(responses)
|
|
return float(np.mean(similarity_matrix)) |