|
import os
|
|
from typing import List, Dict, Any, Optional
|
|
import modal
|
|
from dotenv import load_dotenv
|
|
import json
|
|
import asyncio
|
|
from openai import OpenAI
|
|
from datetime import datetime
|
|
import logging
|
|
from consensus_logic import ConsensusAnalyzer
|
|
import numpy as np
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
app = modal.App("consensus-builder")
|
|
|
|
|
|
image = (
|
|
modal.Image.debian_slim()
|
|
.pip_install("python-dotenv>=1.0.0")
|
|
.pip_install("sentence-transformers>=2.2.2")
|
|
.pip_install("scikit-learn>=1.3.0")
|
|
.pip_install("plotly>=5.18.0")
|
|
.pip_install("PyPDF2>=3.0.0")
|
|
.pip_install("python-docx>=0.8.11")
|
|
.pip_install_from_requirements("requirements.txt")
|
|
.add_local_file("consensus_logic.py", "/root/consensus_logic.py")
|
|
)
|
|
|
|
|
|
class NebiusClient:
|
|
def __init__(self, api_key: str):
|
|
self.client = OpenAI(
|
|
base_url="https://api.studio.nebius.com/v1/",
|
|
api_key=api_key
|
|
)
|
|
|
|
async def query_model(self, model_name: str, prompt: str) -> str:
|
|
|
|
import concurrent.futures
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, self._sync_query, model_name, prompt)
|
|
|
|
def _sync_query(self, model_name: str, prompt: str) -> str:
|
|
try:
|
|
logger.info(f"Sending request to {model_name} with prompt: {prompt[:100]}...")
|
|
completion = self.client.chat.completions.create(
|
|
model=model_name,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
temperature=0.7,
|
|
max_tokens=1000
|
|
)
|
|
logger.info(f"Raw response from {model_name}: {completion}")
|
|
|
|
if not completion:
|
|
raise Exception(f"Empty response from {model_name}")
|
|
if not completion.choices:
|
|
raise Exception(f"No choices in response from {model_name}")
|
|
if not completion.choices[0].message:
|
|
raise Exception(f"No message in response from {model_name}")
|
|
|
|
content = completion.choices[0].message.content
|
|
logger.info(f"Processed response from {model_name}: {content[:100]}...")
|
|
return content
|
|
except Exception as e:
|
|
logger.error(f"Error querying {model_name}: {str(e)}")
|
|
raise Exception(f"Error querying {model_name}: {str(e)}")
|
|
|
|
class MCPServer:
|
|
def __init__(self):
|
|
try:
|
|
nebius_api_key = os.environ.get("NEBIUS_API_KEY")
|
|
if not nebius_api_key:
|
|
raise ValueError("NEBIUS_API_KEY environment variable not set within MCPServer initialization.")
|
|
self.nebius_client = NebiusClient(api_key=nebius_api_key)
|
|
self.models = [
|
|
|
|
"meta-llama/Meta-Llama-3.1-70B-Instruct",
|
|
"meta-llama/Meta-Llama-3.1-405B-Instruct",
|
|
|
|
|
|
"meta-llama/Llama-3.3-70B-Instruct",
|
|
"google/gemma-3-27b-it",
|
|
|
|
|
|
"Qwen/Qwen2.5-72B-Instruct"
|
|
]
|
|
self.session_state = {}
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize MCPServer: {str(e)}")
|
|
raise
|
|
|
|
def calculate_response_confidence(self, response: str, other_responses: list) -> float:
|
|
"""Calculate confidence score for a model's response based on its similarity to other responses."""
|
|
try:
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
|
|
model = SentenceTransformer('all-MiniLM-L6-v2')
|
|
|
|
|
|
all_responses = [response] + other_responses
|
|
embeddings = model.encode(all_responses)
|
|
|
|
|
|
similarities = []
|
|
for i in range(1, len(embeddings)):
|
|
similarity = np.dot(embeddings[0], embeddings[i]) / (np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[i]))
|
|
similarities.append(similarity)
|
|
|
|
|
|
confidence = np.mean(similarities) if similarities else 0.0
|
|
return float(confidence)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating confidence: {str(e)}")
|
|
return 0.0
|
|
|
|
async def process_query(self, query: str, document_content: Optional[str] = None) -> Dict[str, Any]:
|
|
|
|
session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
self.session_state[session_id] = {
|
|
"query": query,
|
|
"document": document_content,
|
|
"responses": {},
|
|
"status": "processing"
|
|
}
|
|
|
|
try:
|
|
|
|
full_prompt = query
|
|
if document_content:
|
|
full_prompt = f"Context from document:\n{document_content}\n\nQuestion: {query}"
|
|
|
|
logger.info(f"Processing query with prompt: {full_prompt[:100]}...")
|
|
|
|
|
|
tasks = [self.nebius_client.query_model(model, full_prompt) for model in self.models]
|
|
responses = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
processed_responses = {}
|
|
successful_responses = []
|
|
|
|
|
|
for model, response in zip(self.models, responses):
|
|
if isinstance(response, Exception):
|
|
logger.error(f"Error from {model}: {str(response)}")
|
|
processed_responses[model] = {
|
|
"error": str(response),
|
|
"status": "failed"
|
|
}
|
|
else:
|
|
try:
|
|
if response is None:
|
|
raise Exception("Received None response")
|
|
successful_responses.append(response)
|
|
processed_responses[model] = {
|
|
"response": response,
|
|
"status": "success"
|
|
}
|
|
logger.info(f"Successfully processed response from {model}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing response from {model}: {str(e)}")
|
|
processed_responses[model] = {
|
|
"error": f"Error processing response: {str(e)}",
|
|
"status": "failed"
|
|
}
|
|
|
|
|
|
for model, response_data in processed_responses.items():
|
|
if response_data["status"] == "success":
|
|
|
|
other_responses = [r for r in successful_responses if r != response_data["response"]]
|
|
confidence = self.calculate_response_confidence(response_data["response"], other_responses)
|
|
response_data["confidence"] = confidence
|
|
|
|
|
|
self.session_state[session_id]["responses"] = processed_responses
|
|
self.session_state[session_id]["status"] = "completed"
|
|
logger.info(f"Completed processing query. Session ID: {session_id}")
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"responses": processed_responses
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing query: {str(e)}")
|
|
self.session_state[session_id]["status"] = "failed"
|
|
self.session_state[session_id]["error"] = str(e)
|
|
raise
|
|
|
|
def get_session_status(self, session_id: str) -> Dict[str, Any]:
|
|
return self.session_state.get(session_id, {"error": "Session not found"})
|
|
|
|
def clear_session(self, session_id: str) -> bool:
|
|
if session_id in self.session_state:
|
|
del self.session_state[session_id]
|
|
return True
|
|
return False
|
|
|
|
|
|
@app.function(image=image, secrets=[modal.Secret.from_name("nebius")])
|
|
def parallel_model_query(query: str, models: List[str], document_content: Optional[str] = None):
|
|
print("NEBIUS_API_KEY in Modal Cloud:", os.environ.get("NEBIUS_API_KEY"))
|
|
server = MCPServer()
|
|
return asyncio.run(server.process_query(query, document_content))
|
|
|
|
@app.function(image=image)
|
|
def consensus_algorithm(responses: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Calculate consensus between model responses with enhanced analysis."""
|
|
try:
|
|
|
|
analyzer = ConsensusAnalyzer()
|
|
|
|
|
|
consensus_result = analyzer.calculate_consensus(responses)
|
|
|
|
return {
|
|
"consensus_score": consensus_result.get("consensus_score", 0),
|
|
"clusters": consensus_result.get("clusters", []),
|
|
"disagreements": consensus_result.get("disagreements", []),
|
|
"similarity_matrix": consensus_result.get("similarity_matrix", []),
|
|
"topics": consensus_result.get("topics", {}),
|
|
"confidence_analysis": consensus_result.get("confidence_analysis", {})
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error in consensus algorithm: {str(e)}")
|
|
return {
|
|
"consensus_score": 0,
|
|
"clusters": [],
|
|
"disagreements": [],
|
|
"similarity_matrix": [],
|
|
"topics": {},
|
|
"confidence_analysis": {}
|
|
}
|
|
|
|
@app.function(image=image, secrets=[modal.Secret.from_name("nebius")])
|
|
def disagreement_analyzer(responses: Dict[str, Any], api_key: str) -> Dict[str, Any]:
|
|
"""Enhanced disagreement analyzer with topic extraction and confidence analysis."""
|
|
try:
|
|
|
|
nebius_client_for_analyzer = NebiusClient(api_key=api_key)
|
|
analyzer = ConsensusAnalyzer(llm_client=nebius_client_for_analyzer)
|
|
|
|
|
|
consensus_result = analyzer.calculate_consensus(responses)
|
|
|
|
|
|
disagreements = consensus_result.get("disagreements", [])
|
|
topics = consensus_result.get("topics", {})
|
|
confidence_analysis = consensus_result.get("confidence_analysis", {})
|
|
|
|
|
|
if disagreements:
|
|
|
|
disagreement_types = {}
|
|
for d in disagreements:
|
|
d_type = d.get("type", "Unknown")
|
|
disagreement_types[d_type] = disagreement_types.get(d_type, 0) + 1
|
|
|
|
|
|
explanation_parts = []
|
|
explanation_parts.append(f"Analysis of {len(disagreements)} disagreements found:")
|
|
|
|
for d_type, count in disagreement_types.items():
|
|
explanation_parts.append(f"- {count} {d_type.lower()}")
|
|
|
|
if topics:
|
|
explanation_parts.append(f"\nKey topics discussed: {', '.join(topics.keys())}")
|
|
|
|
if confidence_analysis:
|
|
most_confident = confidence_analysis.get("most_confident_model", "Unknown")
|
|
least_confident = confidence_analysis.get("least_confident_model", "Unknown")
|
|
explanation_parts.append(f"\nConfidence analysis: {most_confident} shows highest agreement with others, {least_confident} shows lowest.")
|
|
|
|
explanation = "\n".join(explanation_parts)
|
|
else:
|
|
explanation = "Models are in strong agreement with no significant disagreements detected."
|
|
|
|
return {
|
|
"explanation": explanation,
|
|
"disagreements": disagreements,
|
|
"topics": topics,
|
|
"confidence_analysis": confidence_analysis,
|
|
"disagreement_summary": {
|
|
"total_disagreements": len(disagreements),
|
|
"disagreement_types": {d.get("type", "Unknown"): len([x for x in disagreements if x.get("type") == d.get("type")]) for d in disagreements},
|
|
"avg_similarity": np.mean([d.get("similarity_score", 0) for d in disagreements]) if disagreements else 1.0
|
|
}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error in enhanced disagreement analyzer: {str(e)}")
|
|
return {
|
|
"explanation": f"Error analyzing disagreements: {str(e)}",
|
|
"disagreements": [],
|
|
"topics": {},
|
|
"confidence_analysis": {},
|
|
"disagreement_summary": {
|
|
"total_disagreements": 0,
|
|
"disagreement_types": {},
|
|
"avg_similarity": 0.0
|
|
}
|
|
}
|
|
|
|
@app.function(image=image, secrets=[modal.Secret.from_name("nebius")])
|
|
def synthesize_consensus(responses: Dict[str, Any], disagreements: List[Dict[str, Any]], api_key: str) -> str:
|
|
"""Enhanced consensus synthesis using LLM for intelligent response combination."""
|
|
try:
|
|
analyzer = ConsensusAnalyzer(llm_client=NebiusClient(api_key=api_key))
|
|
return analyzer.synthesize_consensus_response(responses, disagreements)
|
|
except Exception as e:
|
|
logger.error(f"Error in enhanced consensus synthesis: {str(e)}")
|
|
return f"Error synthesizing consensus response: {str(e)}" |