File size: 8,395 Bytes
9da7658
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import time
import logging
from typing import List, Dict, Optional, Union
from src.llm.base_provider import LLMProvider
from src.llm.hf_provider import HuggingFaceProvider
from src.llm.ollama_provider import OllamaProvider
from utils.config import config

logger = logging.getLogger(__name__)

class CoordinatedProvider(LLMProvider):
    """Coordinated provider that orchestrates Ollama and HF interaction"""
    
    def __init__(self, model_name: str, timeout: int = 120, max_retries: int = 2):
        super().__init__(model_name, timeout, max_retries)
        self.hf_provider = None
        self.ollama_provider = None
        
        # Initialize providers
        try:
            if config.hf_token:
                self.hf_provider = HuggingFaceProvider(
                    model_name="DavidAU/OpenAi-GPT-oss-20b-abliterated-uncensored-NEO-Imatrix-gguf",
                    timeout=120
                )
        except Exception as e:
            logger.warning(f"Failed to initialize HF provider: {e}")
            
        try:
            if config.ollama_host:
                self.ollama_provider = OllamaProvider(
                    model_name=config.local_model_name,
                    timeout=60
                )
        except Exception as e:
            logger.warning(f"Failed to initialize Ollama provider: {e}")

    def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]:
        """Generate coordinated response using HF+Ollama"""
        try:
            return self._retry_with_backoff(self._generate_coordinated_response, prompt, conversation_history)
        except Exception as e:
            logger.error(f"Coordinated generation failed: {e}")
            raise

    def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]:
        """Generate coordinated response with streaming support"""
        try:
            return self._retry_with_backoff(self._stream_generate_coordinated_response, prompt, conversation_history)
        except Exception as e:
            logger.error(f"Coordinated stream generation failed: {e}")
            raise

    def _generate_coordinated_response(self, prompt: str, conversation_history: List[Dict]) -> str:
        """Main method that orchestrates both providers"""
        try:
            # Step 1: Get response from HF Endpoint (primary expert)
            hf_response = self._get_hf_response(prompt, conversation_history)
            
            # Step 2: Get Ollama commentary on HF response
            ollama_commentary = self._get_ollama_commentary(prompt, hf_response, conversation_history)
            
            # Step 3: Combine responses with clear formatting
            coordinated_response = self._format_coordinated_response(hf_response, ollama_commentary)
            
            return coordinated_response
            
        except Exception as e:
            logger.error(f"Coordinated response generation failed: {e}")
            # Fallback to Ollama only
            if self.ollama_provider:
                try:
                    ollama_response = self.ollama_provider.generate(prompt, conversation_history)
                    # Generate self-commentary
                    self_commentary = self._get_ollama_self_commentary(prompt, ollama_response, conversation_history)
                    return self._format_fallback_response(ollama_response, self_commentary)
                except Exception as fallback_error:
                    logger.error(f"Ollama fallback also failed: {fallback_error}")
            
            raise Exception(f"Both HF Endpoint and Ollama failed: {str(e)}")

    def _stream_generate_coordinated_response(self, prompt: str, conversation_history: List[Dict]) -> List[str]:
        """Implementation of streaming coordinated generation"""
        # For simplicity, we'll return the full response as chunks
        full_response = self._generate_coordinated_response(prompt, conversation_history)
        return [full_response]

    def _get_hf_response(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]:
        """Get response from HF Endpoint (primary expert)"""
        if not self.hf_provider:
            return None
            
        try:
            logger.info("πŸš€ Getting detailed response from HF Endpoint (primary expert)...")
            response = self.hf_provider.generate(prompt, conversation_history)
            logger.info("βœ… HF Endpoint expert response received")
            return response
        except Exception as e:
            logger.error(f"HF Endpoint expert failed: {e}")
            return None

    def _get_ollama_commentary(self, user_prompt: str, hf_response: str, conversation_history: List[Dict]) -> Optional[str]:
        """Get Ollama commentary on HF response"""
        if not self.ollama_provider:
            return None
            
        try:
            logger.info("🐱 Getting Ollama commentary on HF response...")
            commentary = self.ollama_provider.generate_commentary(user_prompt, hf_response, conversation_history)
            logger.info("βœ… Ollama commentary received")
            return commentary
        except Exception as e:
            logger.warning(f"Ollama commentary failed: {e}")
            return None

    def _get_ollama_self_commentary(self, user_prompt: str, ollama_response: str, conversation_history: List[Dict]) -> Optional[str]:
        """Get Ollama self-commentary when HF fails"""
        if not self.ollama_provider:
            return None
            
        try:
            logger.info("🐱 Getting Ollama self-commentary on own response...")
            commentary = self.ollama_provider.generate_self_commentary(user_prompt, ollama_response, conversation_history)
            logger.info("βœ… Ollama self-commentary received")
            return commentary
        except Exception as e:
            logger.warning(f"Ollama self-commentary failed: {e}")
            return None

    def _format_coordinated_response(self, hf_response: str, ollama_commentary: str) -> str:
        """Format combined response with clear separation"""
        response_parts = []
        
        # Add HF Expert response
        if hf_response:
            response_parts.append("πŸ€– HF Expert Analysis\n" + hf_response)
        else:
            response_parts.append("πŸ€– HF Expert Analysis\n*No response from HF Expert*")
        
        # Add separator
        response_parts.append("\n" + "="*50 + "\n")
        
        # Add Ollama Mentor commentary
        if ollama_commentary:
            response_parts.append("🐱 Ollama Mentor Commentary\n" + ollama_commentary)
        else:
            response_parts.append("🐱 Ollama Mentor Commentary\n*I've reviewed the HF expert's response but couldn't provide additional insights.*")
        
        return "\n\n".join(response_parts)

    def _format_fallback_response(self, ollama_response: str, self_commentary: str) -> str:
        """Format fallback response when HF fails"""
        response_parts = []
        
        # Add Ollama main response with fallback indication
        if ollama_response:
            response_parts.append("πŸ¦™ Ollama Primary Response (HF Expert Unavailable)\n" + ollama_response)
        else:
            response_parts.append("πŸ¦™ Ollama Primary Response (HF Expert Unavailable)\n*No response generated*")
        
        # Add separator
        response_parts.append("\n" + "="*50 + "\n")
        
        # Add Ollama self-commentary
        if self_commentary:
            response_parts.append("🐱 Ollama Self-Commentary\n" + self_commentary)
        else:
            response_parts.append("🐱 Ollama Self-Commentary\n*I've reviewed my own response but couldn't provide additional insights.*")
        
        return "\n\n".join(response_parts)

    def validate_model(self) -> bool:
        """Validate if coordinated system is available"""
        # At least one provider must be available
        hf_available = self.hf_provider and self.hf_provider.validate_model() if self.hf_provider else False
        ollama_available = self.ollama_provider and self.ollama_provider.validate_model() if self.ollama_provider else False
        return hf_available or ollama_available

# Global instance
coordinated_provider = CoordinatedProvider("coordinated_model")