File size: 22,185 Bytes
c9bb632 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 |
from typing import Dict, List, Any, Tuple
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import plotly.graph_objects as go
import plotly.express as px
import re
from collections import defaultdict
def strip_formatting(text: str) -> str:
"""Remove markdown, HTML tags, and normalize whitespace/case."""
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Remove markdown bold/italic/code/headers/links/images
text = re.sub(r'\*\*|__|\*|`|#+|!\[[^\]]*\]\([^\)]*\)|\[[^\]]*\]\([^\)]*\)', '', text)
# Remove extra whitespace and lowercase
text = re.sub(r'\s+', ' ', text).strip().lower()
return text
class ConsensusAnalyzer:
def __init__(self, llm_client=None):
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.llm_client = llm_client # Store the LLM client
def calculate_similarity_matrix(self, responses: List[str]) -> np.ndarray:
"""Calculate semantic similarity between all responses (formatting stripped)."""
cleaned = [strip_formatting(r) for r in responses]
embeddings = self.model.encode(cleaned)
return cosine_similarity(embeddings)
def calculate_consensus(self, responses: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate consensus metrics from model responses (formatting stripped)."""
# Extract successful responses
valid_responses = {
model: data["response"]
for model, data in responses.items()
if data["status"] == "success"
}
if not valid_responses:
return {
"consensus_score": 0,
"error": "No valid responses received"
}
# Use cleaned responses for all downstream analysis
cleaned_responses = {m: strip_formatting(r) for m, r in valid_responses.items()}
response_texts = list(cleaned_responses.values())
similarity_matrix = self.calculate_similarity_matrix(list(valid_responses.values()))
# Calculate consensus score (average similarity)
consensus_score = float(np.mean(similarity_matrix))
# Identify clusters of similar responses
clusters = self._identify_clusters(similarity_matrix, threshold=0.5)
# Analyze disagreements with enhanced analysis
disagreements = self._analyze_disagreements_enhanced(cleaned_responses, clusters)
# Extract key topics and themes
topics = self._extract_topics(cleaned_responses)
# Analyze confidence patterns
confidence_analysis = self._analyze_confidence_patterns(cleaned_responses, similarity_matrix)
return {
"consensus_score": consensus_score,
"clusters": clusters,
"disagreements": disagreements,
"similarity_matrix": similarity_matrix.tolist(),
"topics": topics,
"confidence_analysis": confidence_analysis
}
def _identify_clusters(self, similarity_matrix: np.ndarray, threshold: float) -> List[List[int]]:
"""Identify clusters of similar responses."""
n = len(similarity_matrix)
clusters = []
used = set()
for i in range(n):
if i in used:
continue
cluster = [i]
used.add(i)
for j in range(i + 1, n):
if j not in used and similarity_matrix[i, j] >= threshold:
cluster.append(j)
used.add(j)
clusters.append(cluster)
return clusters
def _analyze_disagreements_enhanced(self, responses: Dict[str, Any], clusters: List[List[int]]) -> List[Dict[str, Any]]:
"""Enhanced disagreement analysis with topic extraction and reasoning patterns."""
disagreements = []
model_names = list(responses.keys())
response_texts = list(responses.values())
# Extract reasoning patterns and key arguments
reasoning_patterns = self._extract_reasoning_patterns(response_texts)
# Analyze disagreements by topic areas
topic_disagreements = self._analyze_topic_disagreements(response_texts, model_names)
# Iterate through all unique pairs of models
for i in range(len(model_names)):
for j in range(i + 1, len(model_names)):
model1_name = model_names[i]
model2_name = model_names[j]
response1 = response_texts[i]
response2 = response_texts[j]
# Calculate similarity between the two responses
similarity = self.calculate_similarity_matrix([response1, response2])[0, 1]
# Enhanced disagreement categorization
disagreement_type = self._categorize_disagreement_enhanced([response1], [response2])
# Extract specific points of disagreement
disagreement_points = self._extract_disagreement_points(response1, response2)
# Generate detailed explanation
explanation = self._generate_disagreement_explanation_enhanced(
[response1], [response2], disagreement_points, self.llm_client
)
disagreement = {
"type": disagreement_type,
"cluster1": [model1_name],
"cluster2": [model2_name],
"explanation": explanation,
"disagreement_points": disagreement_points,
"similarity_score": float(similarity),
"reasoning_patterns": {
"model1": reasoning_patterns.get(model1_name, {}),
"model2": reasoning_patterns.get(model2_name, {})
}
}
disagreements.append(disagreement)
return disagreements
def _extract_reasoning_patterns(self, responses: List[str]) -> Dict[str, Dict[str, Any]]:
"""Extract reasoning patterns from responses."""
patterns = {}
for i, response in enumerate(responses):
pattern = {
"uses_examples": bool(re.search(r'\b(example|instance|case|such as|like)\b', response, re.IGNORECASE)),
"uses_evidence": bool(re.search(r'\b(evidence|data|research|study|fact|statistic)\b', response, re.IGNORECASE)),
"uses_conditional": bool(re.search(r'\b(if|when|unless|provided that|assuming)\b', response, re.IGNORECASE)),
"uses_comparison": bool(re.search(r'\b(however|but|although|while|whereas|on the other hand)\b', response, re.IGNORECASE)),
"uses_authority": bool(re.search(r'\b(according to|research shows|experts say|studies indicate)\b', response, re.IGNORECASE)),
"sentence_count": len(re.split(r'[.!?]+', response)),
"word_count": len(response.split()),
"has_conclusion": bool(re.search(r'\b(therefore|thus|consequently|in conclusion|overall)\b', response, re.IGNORECASE))
}
patterns[f"model_{i}"] = pattern
return patterns
def _analyze_topic_disagreements(self, responses: List[str], model_names: List[str]) -> Dict[str, List[str]]:
"""Analyze disagreements by topic areas."""
# Simple keyword-based topic extraction
topics = {
"technical": ["algorithm", "implementation", "code", "system", "technical", "architecture"],
"ethical": ["ethical", "moral", "right", "wrong", "fair", "bias", "justice"],
"practical": ["practical", "feasible", "realistic", "implementable", "practical"],
"economic": ["cost", "economic", "financial", "budget", "expensive", "affordable"],
"social": ["social", "community", "people", "society", "impact", "benefit"],
"legal": ["legal", "law", "regulation", "compliance", "policy"],
"safety": ["safety", "security", "risk", "danger", "protect"]
}
topic_disagreements = defaultdict(list)
for topic, keywords in topics.items():
topic_responses = []
for response in responses:
if any(keyword in response.lower() for keyword in keywords):
topic_responses.append(response)
if len(topic_responses) > 1:
# Check for disagreements within this topic
topic_similarity = self.calculate_similarity_matrix(topic_responses)
avg_similarity = np.mean(topic_similarity)
if avg_similarity < 0.8: # Threshold for disagreement
topic_disagreements[topic].extend(topic_responses)
return dict(topic_disagreements)
def _extract_disagreement_points(self, response1: str, response2: str) -> List[str]:
"""Extract specific points where responses disagree."""
# Split responses into sentences
sentences1 = [s.strip() for s in re.split(r'[.!?]+', response1) if s.strip()]
sentences2 = [s.strip() for s in re.split(r'[.!?]+', response2) if s.strip()]
disagreement_points = []
# Look for contrasting statements
contrast_keywords = ["however", "but", "although", "while", "whereas", "on the other hand", "in contrast"]
for sent1 in sentences1:
for sent2 in sentences2:
# Check if sentences contain contrasting keywords
has_contrast = any(keyword in sent1.lower() or keyword in sent2.lower()
for keyword in contrast_keywords)
if has_contrast:
# Calculate similarity between these sentences
similarity = self.calculate_similarity_matrix([sent1, sent2])[0, 1]
if similarity < 0.7: # Low similarity indicates disagreement
disagreement_points.append(f"'{sent1}' vs '{sent2}'")
return disagreement_points[:3] # Limit to top 3 disagreements
def _categorize_disagreement_enhanced(self, responses1: List[str], responses2: List[str]) -> str:
"""Enhanced categorization of disagreements."""
# Get embeddings for all responses
all_responses = responses1 + responses2
embeddings = self.model.encode(all_responses)
# Calculate average embeddings for each cluster
cluster1_avg = np.mean(embeddings[:len(responses1)], axis=0)
cluster2_avg = np.mean(embeddings[len(responses1):], axis=0)
# Calculate cosine similarity between cluster averages
similarity = np.dot(cluster1_avg, cluster2_avg) / (np.linalg.norm(cluster1_avg) * np.linalg.norm(cluster2_avg))
# Enhanced categorization with more granular types
if similarity < 0.7:
return "Fundamental Disagreement - Models have completely different perspectives"
elif similarity < 0.8:
return "Major Disagreement - Models agree on some aspects but differ significantly"
elif similarity < 0.9:
return "Moderate Disagreement - Models mostly agree but have important differences"
elif similarity < 0.95:
return "Minor Disagreement - Models agree with slight variations"
else:
return "Strong Agreement - Models are essentially in consensus"
def _generate_disagreement_explanation_enhanced(self, responses1: List[str], responses2: List[str],
disagreement_points: List[str], llm_client: Any = None) -> str:
"""Generate enhanced explanation for disagreements using LLM."""
if llm_client and responses1 and responses2:
try:
prompt = f"""Analyze the disagreement between these two AI model responses.
Focus on the key differences in reasoning, assumptions, or conclusions.
Response 1: {responses1[0]}
Response 2: {responses2[0]}
Specific disagreement points: {disagreement_points}
Provide a concise analysis (2-3 sentences) explaining:
1. What the main disagreement is about
2. Why the models might have different perspectives
3. Which aspects they agree on (if any)
Format as a clear, objective analysis."""
explanation = llm_client._sync_query("meta-llama/Meta-Llama-3.1-70B-Instruct", prompt)
return explanation
except Exception as e:
print(f"Error generating LLM-based explanation: {e}")
# Fallback to similarity-based explanation
# Fallback explanation based on similarity
all_responses = responses1 + responses2
embeddings = self.model.encode(all_responses)
cluster1_avg = np.mean(embeddings[:len(responses1)], axis=0)
cluster2_avg = np.mean(embeddings[len(responses1):], axis=0)
similarity = np.dot(cluster1_avg, cluster2_avg) / (np.linalg.norm(cluster1_avg) * np.linalg.norm(cluster2_avg))
if similarity < 0.7:
return "Models have fundamentally different perspectives, possibly due to different training data, reasoning approaches, or interpretation of the question."
elif similarity < 0.9:
return "Models agree on core concepts but differ in their reasoning, emphasis, or specific conclusions."
else:
return "Models are in strong agreement with only minor differences in expression or emphasis."
def _extract_topics(self, responses: Dict[str, str]) -> Dict[str, List[str]]:
"""Extract key topics from responses."""
# Simple keyword-based topic extraction
topic_keywords = {
"technical": ["algorithm", "implementation", "code", "system", "technical", "architecture"],
"ethical": ["ethical", "moral", "right", "wrong", "fair", "bias", "justice"],
"practical": ["practical", "feasible", "realistic", "implementable", "practical"],
"economic": ["cost", "economic", "financial", "budget", "expensive", "affordable"],
"social": ["social", "community", "people", "society", "impact", "benefit"],
"legal": ["legal", "law", "regulation", "compliance", "policy"],
"safety": ["safety", "security", "risk", "danger", "protect"]
}
topics = defaultdict(list)
for model, response in responses.items():
response_lower = response.lower()
for topic, keywords in topic_keywords.items():
if any(keyword in response_lower for keyword in keywords):
topics[topic].append(model)
return dict(topics)
def _analyze_confidence_patterns(self, responses: Dict[str, str], similarity_matrix: np.ndarray) -> Dict[str, Any]:
"""Analyze confidence patterns across models."""
model_names = list(responses.keys())
# Calculate average similarity for each model
avg_similarities = []
for i in range(len(similarity_matrix)):
similarities = [similarity_matrix[i][j] for j in range(len(similarity_matrix)) if i != j]
avg_similarities.append(np.mean(similarities))
# Find most and least confident models
most_confident_idx = np.argmax(avg_similarities)
least_confident_idx = np.argmin(avg_similarities)
return {
"most_confident_model": model_names[most_confident_idx],
"least_confident_model": model_names[least_confident_idx],
"confidence_scores": dict(zip(model_names, avg_similarities)),
"confidence_variance": float(np.var(avg_similarities))
}
def create_visualization(self, consensus_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create visualizations for the consensus analysis."""
# Create similarity heatmap
similarity_matrix = np.array(consensus_data["similarity_matrix"])
heatmap = go.Figure(data=go.Heatmap(
z=similarity_matrix,
colorscale='RdYlGn',
zmin=0,
zmax=1
))
# Create consensus score gauge
gauge = go.Figure(go.Indicator(
mode="gauge+number",
value=consensus_data["consensus_score"] * 100,
title={'text': "Consensus Score"},
gauge={'axis': {'range': [0, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 33], 'color': "red"},
{'range': [33, 66], 'color': "yellow"},
{'range': [66, 100], 'color': "green"}
]}
))
return {
"heatmap": heatmap.to_json(),
"gauge": gauge.to_json()
}
def synthesize_consensus_response(self, responses: Dict[str, Any], disagreements: List[Dict[str, Any]]) -> str:
"""Enhanced synthesis using LLM to intelligently combine responses."""
# Collect all successful responses
successful_responses = [data["response"] for data in responses.values() if data["status"] == "success"]
if not successful_responses:
return "No successful model responses to synthesize."
# Use LLM for intelligent synthesis if available
if self.llm_client and len(successful_responses) > 1:
try:
# Prepare the synthesis prompt
responses_text = "\n\n".join([f"Model {i+1}: {response}" for i, response in enumerate(successful_responses)])
# Include disagreement analysis if available
disagreement_summary = ""
if disagreements:
disagreement_summary = "\n\nKey Disagreements:\n"
for i, d in enumerate(disagreements[:3]): # Top 3 disagreements
disagreement_summary += f"- {d['type']}: {d['explanation']}\n"
synthesis_prompt = f"""You are an expert consensus synthesizer. Analyze the following AI model responses and create a comprehensive, well-reasoned synthesis that:
1. Identifies the core points of agreement
2. Addresses key disagreements with balanced reasoning
3. Provides a coherent, evidence-based consensus response
4. Acknowledges uncertainty where appropriate
5. Suggests areas for further investigation if needed
Model Responses:
{responses_text}
{disagreement_summary}
Create a synthesis that is:
- Comprehensive but concise (2-3 paragraphs)
- Balanced and objective
- Well-structured with clear sections
- Professional in tone
Format your response with clear sections: Summary, Key Agreements, Addressing Disagreements, and Consensus Conclusion."""
# Use a powerful model for synthesis
synthesized_response = self.llm_client._sync_query("meta-llama/Meta-Llama-3.1-405B-Instruct", synthesis_prompt)
# Add metadata about the synthesis
metadata = f"""
---
**Synthesis Metadata:**
- Models consulted: {len(successful_responses)}
- Consensus score: {self._calculate_overall_consensus(successful_responses):.2f}
- Disagreements identified: {len(disagreements)}
- Synthesis method: LLM-enhanced consensus
---
"""
return metadata + "\n\n" + synthesized_response
except Exception as e:
print(f"Error in LLM synthesis: {e}")
# Fallback to basic synthesis
# Fallback synthesis (original method)
synthesized_response = []
synthesized_response.append("### Synthesized Consensus Response\n\n")
synthesized_response.append("Based on the input from various models, here is a consolidated view:\n\n")
# Add a general summary of common points
synthesized_response.append("**Common Themes:** All models generally agree on the core aspects.\n\n")
# Elaborate on disagreements if any
if disagreements:
synthesized_response.append("**Areas of Divergence:**\n")
for i, d in enumerate(disagreements):
synthesized_response.append(f"- **Disagreement {i+1} ({d['type']}):** Models {', '.join(d['cluster1'])} and {', '.join(d['cluster2'])} have differing views. {d['explanation']}\n")
else:
synthesized_response.append("Models are in strong agreement on this topic.\n")
# Append all responses for reference
synthesized_response.append("\n--- \n\n**Individual Model Responses (for reference):**\n\n")
for model_name, response_text in responses.items():
if response_text["status"] == "success":
synthesized_response.append(f"**{model_name}:**\n{response_text['response']}\n\n")
return "".join(synthesized_response)
def _calculate_overall_consensus(self, responses: List[str]) -> float:
"""Calculate overall consensus score from responses."""
if len(responses) < 2:
return 1.0
similarity_matrix = self.calculate_similarity_matrix(responses)
return float(np.mean(similarity_matrix)) |