import os from typing import List, Dict, Any, Optional import modal from dotenv import load_dotenv import json import asyncio from openai import OpenAI from datetime import datetime import logging from consensus_logic import ConsensusAnalyzer import numpy as np # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Initialize Modal app app = modal.App("consensus-builder") # Create Modal image with dependencies image = ( modal.Image.debian_slim() .pip_install("python-dotenv>=1.0.0") .pip_install("sentence-transformers>=2.2.2") .pip_install("scikit-learn>=1.3.0") .pip_install("plotly>=5.18.0") .pip_install("PyPDF2>=3.0.0") .pip_install("python-docx>=0.8.11") .pip_install_from_requirements("requirements.txt") .add_local_file("consensus_logic.py", "/root/consensus_logic.py") ) # Nebius client using OpenAI SDK and env variable class NebiusClient: def __init__(self, api_key: str): self.client = OpenAI( base_url="https://api.studio.nebius.com/v1/", api_key=api_key ) async def query_model(self, model_name: str, prompt: str) -> str: # The OpenAI SDK is synchronous, so run in a thread import concurrent.futures loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._sync_query, model_name, prompt) def _sync_query(self, model_name: str, prompt: str) -> str: try: logger.info(f"Sending request to {model_name} with prompt: {prompt[:100]}...") completion = self.client.chat.completions.create( model=model_name, messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=1000 ) logger.info(f"Raw response from {model_name}: {completion}") if not completion: raise Exception(f"Empty response from {model_name}") if not completion.choices: raise Exception(f"No choices in response from {model_name}") if not completion.choices[0].message: raise Exception(f"No message in response from {model_name}") content = completion.choices[0].message.content logger.info(f"Processed response from {model_name}: {content[:100]}...") return content except Exception as e: logger.error(f"Error querying {model_name}: {str(e)}") raise Exception(f"Error querying {model_name}: {str(e)}") class MCPServer: def __init__(self): try: nebius_api_key = os.environ.get("NEBIUS_API_KEY") if not nebius_api_key: raise ValueError("NEBIUS_API_KEY environment variable not set within MCPServer initialization.") self.nebius_client = NebiusClient(api_key=nebius_api_key) self.models = [ # Large models for comprehensive analysis "meta-llama/Meta-Llama-3.1-70B-Instruct", "meta-llama/Meta-Llama-3.1-405B-Instruct", # Latest models "meta-llama/Llama-3.3-70B-Instruct", "google/gemma-3-27b-it", # Additional models "Qwen/Qwen2.5-72B-Instruct" ] self.session_state = {} except Exception as e: logger.error(f"Failed to initialize MCPServer: {str(e)}") raise def calculate_response_confidence(self, response: str, other_responses: list) -> float: """Calculate confidence score for a model's response based on its similarity to other responses.""" try: from sentence_transformers import SentenceTransformer # Initialize the sentence transformer model model = SentenceTransformer('all-MiniLM-L6-v2') # Get embeddings for all responses all_responses = [response] + other_responses embeddings = model.encode(all_responses) # Calculate cosine similarity between this response and all others similarities = [] for i in range(1, len(embeddings)): similarity = np.dot(embeddings[0], embeddings[i]) / (np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[i])) similarities.append(similarity) # Confidence is the average similarity to other responses confidence = np.mean(similarities) if similarities else 0.0 return float(confidence) except Exception as e: logger.error(f"Error calculating confidence: {str(e)}") return 0.0 async def process_query(self, query: str, document_content: Optional[str] = None) -> Dict[str, Any]: # Create a new session session_id = datetime.now().strftime("%Y%m%d_%H%M%S") self.session_state[session_id] = { "query": query, "document": document_content, "responses": {}, "status": "processing" } try: # Prepare the prompt with document context if provided full_prompt = query if document_content: full_prompt = f"Context from document:\n{document_content}\n\nQuestion: {query}" logger.info(f"Processing query with prompt: {full_prompt[:100]}...") # Query all models in parallel with timeout tasks = [self.nebius_client.query_model(model, full_prompt) for model in self.models] responses = await asyncio.gather(*tasks, return_exceptions=True) # Process responses processed_responses = {} successful_responses = [] # First pass: collect successful responses for model, response in zip(self.models, responses): if isinstance(response, Exception): logger.error(f"Error from {model}: {str(response)}") processed_responses[model] = { "error": str(response), "status": "failed" } else: try: if response is None: raise Exception("Received None response") successful_responses.append(response) processed_responses[model] = { "response": response, "status": "success" } logger.info(f"Successfully processed response from {model}") except Exception as e: logger.error(f"Error processing response from {model}: {str(e)}") processed_responses[model] = { "error": f"Error processing response: {str(e)}", "status": "failed" } # Second pass: calculate confidence scores for model, response_data in processed_responses.items(): if response_data["status"] == "success": # Get other successful responses for comparison other_responses = [r for r in successful_responses if r != response_data["response"]] confidence = self.calculate_response_confidence(response_data["response"], other_responses) response_data["confidence"] = confidence # Update session state self.session_state[session_id]["responses"] = processed_responses self.session_state[session_id]["status"] = "completed" logger.info(f"Completed processing query. Session ID: {session_id}") return { "session_id": session_id, "responses": processed_responses } except Exception as e: logger.error(f"Error processing query: {str(e)}") self.session_state[session_id]["status"] = "failed" self.session_state[session_id]["error"] = str(e) raise def get_session_status(self, session_id: str) -> Dict[str, Any]: return self.session_state.get(session_id, {"error": "Session not found"}) def clear_session(self, session_id: str) -> bool: if session_id in self.session_state: del self.session_state[session_id] return True return False # Initialize Modal functions with secret @app.function(image=image, secrets=[modal.Secret.from_name("nebius")]) def parallel_model_query(query: str, models: List[str], document_content: Optional[str] = None): print("NEBIUS_API_KEY in Modal Cloud:", os.environ.get("NEBIUS_API_KEY")) server = MCPServer() return asyncio.run(server.process_query(query, document_content)) @app.function(image=image) def consensus_algorithm(responses: Dict[str, Any]) -> Dict[str, Any]: """Calculate consensus between model responses with enhanced analysis.""" try: # Initialize consensus analyzer analyzer = ConsensusAnalyzer() # Calculate consensus with enhanced features consensus_result = analyzer.calculate_consensus(responses) return { "consensus_score": consensus_result.get("consensus_score", 0), "clusters": consensus_result.get("clusters", []), "disagreements": consensus_result.get("disagreements", []), "similarity_matrix": consensus_result.get("similarity_matrix", []), "topics": consensus_result.get("topics", {}), "confidence_analysis": consensus_result.get("confidence_analysis", {}) } except Exception as e: logger.error(f"Error in consensus algorithm: {str(e)}") return { "consensus_score": 0, "clusters": [], "disagreements": [], "similarity_matrix": [], "topics": {}, "confidence_analysis": {} } @app.function(image=image, secrets=[modal.Secret.from_name("nebius")]) def disagreement_analyzer(responses: Dict[str, Any], api_key: str) -> Dict[str, Any]: """Enhanced disagreement analyzer with topic extraction and confidence analysis.""" try: # Initialize consensus analyzer with LLM client nebius_client_for_analyzer = NebiusClient(api_key=api_key) analyzer = ConsensusAnalyzer(llm_client=nebius_client_for_analyzer) # Calculate enhanced consensus (which includes disagreement analysis) consensus_result = analyzer.calculate_consensus(responses) # Extract enhanced disagreement information disagreements = consensus_result.get("disagreements", []) topics = consensus_result.get("topics", {}) confidence_analysis = consensus_result.get("confidence_analysis", {}) # Generate comprehensive explanation if disagreements: # Count disagreement types disagreement_types = {} for d in disagreements: d_type = d.get("type", "Unknown") disagreement_types[d_type] = disagreement_types.get(d_type, 0) + 1 # Create detailed explanation explanation_parts = [] explanation_parts.append(f"Analysis of {len(disagreements)} disagreements found:") for d_type, count in disagreement_types.items(): explanation_parts.append(f"- {count} {d_type.lower()}") if topics: explanation_parts.append(f"\nKey topics discussed: {', '.join(topics.keys())}") if confidence_analysis: most_confident = confidence_analysis.get("most_confident_model", "Unknown") least_confident = confidence_analysis.get("least_confident_model", "Unknown") explanation_parts.append(f"\nConfidence analysis: {most_confident} shows highest agreement with others, {least_confident} shows lowest.") explanation = "\n".join(explanation_parts) else: explanation = "Models are in strong agreement with no significant disagreements detected." return { "explanation": explanation, "disagreements": disagreements, "topics": topics, "confidence_analysis": confidence_analysis, "disagreement_summary": { "total_disagreements": len(disagreements), "disagreement_types": {d.get("type", "Unknown"): len([x for x in disagreements if x.get("type") == d.get("type")]) for d in disagreements}, "avg_similarity": np.mean([d.get("similarity_score", 0) for d in disagreements]) if disagreements else 1.0 } } except Exception as e: logger.error(f"Error in enhanced disagreement analyzer: {str(e)}") return { "explanation": f"Error analyzing disagreements: {str(e)}", "disagreements": [], "topics": {}, "confidence_analysis": {}, "disagreement_summary": { "total_disagreements": 0, "disagreement_types": {}, "avg_similarity": 0.0 } } @app.function(image=image, secrets=[modal.Secret.from_name("nebius")]) def synthesize_consensus(responses: Dict[str, Any], disagreements: List[Dict[str, Any]], api_key: str) -> str: """Enhanced consensus synthesis using LLM for intelligent response combination.""" try: analyzer = ConsensusAnalyzer(llm_client=NebiusClient(api_key=api_key)) return analyzer.synthesize_consensus_response(responses, disagreements) except Exception as e: logger.error(f"Error in enhanced consensus synthesis: {str(e)}") return f"Error synthesizing consensus response: {str(e)}"