Such.AI / mcp_server.py
aaleya-5's picture
main changes
c9bb632 verified
import os
from typing import List, Dict, Any, Optional
import modal
from dotenv import load_dotenv
import json
import asyncio
from openai import OpenAI
from datetime import datetime
import logging
from consensus_logic import ConsensusAnalyzer
import numpy as np
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Initialize Modal app
app = modal.App("consensus-builder")
# Create Modal image with dependencies
image = (
modal.Image.debian_slim()
.pip_install("python-dotenv>=1.0.0")
.pip_install("sentence-transformers>=2.2.2")
.pip_install("scikit-learn>=1.3.0")
.pip_install("plotly>=5.18.0")
.pip_install("PyPDF2>=3.0.0")
.pip_install("python-docx>=0.8.11")
.pip_install_from_requirements("requirements.txt")
.add_local_file("consensus_logic.py", "/root/consensus_logic.py")
)
# Nebius client using OpenAI SDK and env variable
class NebiusClient:
def __init__(self, api_key: str):
self.client = OpenAI(
base_url="https://api.studio.nebius.com/v1/",
api_key=api_key
)
async def query_model(self, model_name: str, prompt: str) -> str:
# The OpenAI SDK is synchronous, so run in a thread
import concurrent.futures
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._sync_query, model_name, prompt)
def _sync_query(self, model_name: str, prompt: str) -> str:
try:
logger.info(f"Sending request to {model_name} with prompt: {prompt[:100]}...")
completion = self.client.chat.completions.create(
model=model_name,
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1000
)
logger.info(f"Raw response from {model_name}: {completion}")
if not completion:
raise Exception(f"Empty response from {model_name}")
if not completion.choices:
raise Exception(f"No choices in response from {model_name}")
if not completion.choices[0].message:
raise Exception(f"No message in response from {model_name}")
content = completion.choices[0].message.content
logger.info(f"Processed response from {model_name}: {content[:100]}...")
return content
except Exception as e:
logger.error(f"Error querying {model_name}: {str(e)}")
raise Exception(f"Error querying {model_name}: {str(e)}")
class MCPServer:
def __init__(self):
try:
nebius_api_key = os.environ.get("NEBIUS_API_KEY")
if not nebius_api_key:
raise ValueError("NEBIUS_API_KEY environment variable not set within MCPServer initialization.")
self.nebius_client = NebiusClient(api_key=nebius_api_key)
self.models = [
# Large models for comprehensive analysis
"meta-llama/Meta-Llama-3.1-70B-Instruct",
"meta-llama/Meta-Llama-3.1-405B-Instruct",
# Latest models
"meta-llama/Llama-3.3-70B-Instruct",
"google/gemma-3-27b-it",
# Additional models
"Qwen/Qwen2.5-72B-Instruct"
]
self.session_state = {}
except Exception as e:
logger.error(f"Failed to initialize MCPServer: {str(e)}")
raise
def calculate_response_confidence(self, response: str, other_responses: list) -> float:
"""Calculate confidence score for a model's response based on its similarity to other responses."""
try:
from sentence_transformers import SentenceTransformer
# Initialize the sentence transformer model
model = SentenceTransformer('all-MiniLM-L6-v2')
# Get embeddings for all responses
all_responses = [response] + other_responses
embeddings = model.encode(all_responses)
# Calculate cosine similarity between this response and all others
similarities = []
for i in range(1, len(embeddings)):
similarity = np.dot(embeddings[0], embeddings[i]) / (np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[i]))
similarities.append(similarity)
# Confidence is the average similarity to other responses
confidence = np.mean(similarities) if similarities else 0.0
return float(confidence)
except Exception as e:
logger.error(f"Error calculating confidence: {str(e)}")
return 0.0
async def process_query(self, query: str, document_content: Optional[str] = None) -> Dict[str, Any]:
# Create a new session
session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
self.session_state[session_id] = {
"query": query,
"document": document_content,
"responses": {},
"status": "processing"
}
try:
# Prepare the prompt with document context if provided
full_prompt = query
if document_content:
full_prompt = f"Context from document:\n{document_content}\n\nQuestion: {query}"
logger.info(f"Processing query with prompt: {full_prompt[:100]}...")
# Query all models in parallel with timeout
tasks = [self.nebius_client.query_model(model, full_prompt) for model in self.models]
responses = await asyncio.gather(*tasks, return_exceptions=True)
# Process responses
processed_responses = {}
successful_responses = []
# First pass: collect successful responses
for model, response in zip(self.models, responses):
if isinstance(response, Exception):
logger.error(f"Error from {model}: {str(response)}")
processed_responses[model] = {
"error": str(response),
"status": "failed"
}
else:
try:
if response is None:
raise Exception("Received None response")
successful_responses.append(response)
processed_responses[model] = {
"response": response,
"status": "success"
}
logger.info(f"Successfully processed response from {model}")
except Exception as e:
logger.error(f"Error processing response from {model}: {str(e)}")
processed_responses[model] = {
"error": f"Error processing response: {str(e)}",
"status": "failed"
}
# Second pass: calculate confidence scores
for model, response_data in processed_responses.items():
if response_data["status"] == "success":
# Get other successful responses for comparison
other_responses = [r for r in successful_responses if r != response_data["response"]]
confidence = self.calculate_response_confidence(response_data["response"], other_responses)
response_data["confidence"] = confidence
# Update session state
self.session_state[session_id]["responses"] = processed_responses
self.session_state[session_id]["status"] = "completed"
logger.info(f"Completed processing query. Session ID: {session_id}")
return {
"session_id": session_id,
"responses": processed_responses
}
except Exception as e:
logger.error(f"Error processing query: {str(e)}")
self.session_state[session_id]["status"] = "failed"
self.session_state[session_id]["error"] = str(e)
raise
def get_session_status(self, session_id: str) -> Dict[str, Any]:
return self.session_state.get(session_id, {"error": "Session not found"})
def clear_session(self, session_id: str) -> bool:
if session_id in self.session_state:
del self.session_state[session_id]
return True
return False
# Initialize Modal functions with secret
@app.function(image=image, secrets=[modal.Secret.from_name("nebius")])
def parallel_model_query(query: str, models: List[str], document_content: Optional[str] = None):
print("NEBIUS_API_KEY in Modal Cloud:", os.environ.get("NEBIUS_API_KEY"))
server = MCPServer()
return asyncio.run(server.process_query(query, document_content))
@app.function(image=image)
def consensus_algorithm(responses: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate consensus between model responses with enhanced analysis."""
try:
# Initialize consensus analyzer
analyzer = ConsensusAnalyzer()
# Calculate consensus with enhanced features
consensus_result = analyzer.calculate_consensus(responses)
return {
"consensus_score": consensus_result.get("consensus_score", 0),
"clusters": consensus_result.get("clusters", []),
"disagreements": consensus_result.get("disagreements", []),
"similarity_matrix": consensus_result.get("similarity_matrix", []),
"topics": consensus_result.get("topics", {}),
"confidence_analysis": consensus_result.get("confidence_analysis", {})
}
except Exception as e:
logger.error(f"Error in consensus algorithm: {str(e)}")
return {
"consensus_score": 0,
"clusters": [],
"disagreements": [],
"similarity_matrix": [],
"topics": {},
"confidence_analysis": {}
}
@app.function(image=image, secrets=[modal.Secret.from_name("nebius")])
def disagreement_analyzer(responses: Dict[str, Any], api_key: str) -> Dict[str, Any]:
"""Enhanced disagreement analyzer with topic extraction and confidence analysis."""
try:
# Initialize consensus analyzer with LLM client
nebius_client_for_analyzer = NebiusClient(api_key=api_key)
analyzer = ConsensusAnalyzer(llm_client=nebius_client_for_analyzer)
# Calculate enhanced consensus (which includes disagreement analysis)
consensus_result = analyzer.calculate_consensus(responses)
# Extract enhanced disagreement information
disagreements = consensus_result.get("disagreements", [])
topics = consensus_result.get("topics", {})
confidence_analysis = consensus_result.get("confidence_analysis", {})
# Generate comprehensive explanation
if disagreements:
# Count disagreement types
disagreement_types = {}
for d in disagreements:
d_type = d.get("type", "Unknown")
disagreement_types[d_type] = disagreement_types.get(d_type, 0) + 1
# Create detailed explanation
explanation_parts = []
explanation_parts.append(f"Analysis of {len(disagreements)} disagreements found:")
for d_type, count in disagreement_types.items():
explanation_parts.append(f"- {count} {d_type.lower()}")
if topics:
explanation_parts.append(f"\nKey topics discussed: {', '.join(topics.keys())}")
if confidence_analysis:
most_confident = confidence_analysis.get("most_confident_model", "Unknown")
least_confident = confidence_analysis.get("least_confident_model", "Unknown")
explanation_parts.append(f"\nConfidence analysis: {most_confident} shows highest agreement with others, {least_confident} shows lowest.")
explanation = "\n".join(explanation_parts)
else:
explanation = "Models are in strong agreement with no significant disagreements detected."
return {
"explanation": explanation,
"disagreements": disagreements,
"topics": topics,
"confidence_analysis": confidence_analysis,
"disagreement_summary": {
"total_disagreements": len(disagreements),
"disagreement_types": {d.get("type", "Unknown"): len([x for x in disagreements if x.get("type") == d.get("type")]) for d in disagreements},
"avg_similarity": np.mean([d.get("similarity_score", 0) for d in disagreements]) if disagreements else 1.0
}
}
except Exception as e:
logger.error(f"Error in enhanced disagreement analyzer: {str(e)}")
return {
"explanation": f"Error analyzing disagreements: {str(e)}",
"disagreements": [],
"topics": {},
"confidence_analysis": {},
"disagreement_summary": {
"total_disagreements": 0,
"disagreement_types": {},
"avg_similarity": 0.0
}
}
@app.function(image=image, secrets=[modal.Secret.from_name("nebius")])
def synthesize_consensus(responses: Dict[str, Any], disagreements: List[Dict[str, Any]], api_key: str) -> str:
"""Enhanced consensus synthesis using LLM for intelligent response combination."""
try:
analyzer = ConsensusAnalyzer(llm_client=NebiusClient(api_key=api_key))
return analyzer.synthesize_consensus_response(responses, disagreements)
except Exception as e:
logger.error(f"Error in enhanced consensus synthesis: {str(e)}")
return f"Error synthesizing consensus response: {str(e)}"