import os import re import json from typing import List, Generator, Optional from openai import OpenAI from pydoc import html from tenacity import retry, stop_after_attempt, wait_exponential import logging from cachetools import TTLCache import hashlib import requests import pydub import io import torchaudio from PIL import Image from transformers import CLIPModel, CLIPProcessor, AutoProcessor from parler_tts import ParlerTTSForConditionalGeneration from utils.web_search import web_search logger = logging.getLogger(__name__) # إعداد Cache cache = TTLCache(maxsize=int(os.getenv("QUEUE_SIZE", 100)), ttl=600) # تعريف LATEX_DELIMS LATEX_DELIMS = [ {"left": "$$", "right": "$$", "display": True}, {"left": "$", "right": "$", "display": False}, {"left": "\\[", "right": "\\]", "display": True}, {"left": "\\(", "right": "\\)", "display": False}, ] # إعداد العميل لـ Hugging Face API HF_TOKEN = os.getenv("HF_TOKEN") BACKUP_HF_TOKEN = os.getenv("BACKUP_HF_TOKEN") ROUTER_API_URL = os.getenv("ROUTER_API_URL", "https://router.huggingface.co") API_ENDPOINT = os.getenv("API_ENDPOINT", "https://api-inference.huggingface.co/v1") FALLBACK_API_ENDPOINT = os.getenv("FALLBACK_API_ENDPOINT", "https://api-inference.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "openai/gpt-oss-120b:cerebras") SECONDARY_MODEL_NAME = os.getenv("SECONDARY_MODEL_NAME", "mistralai/Mixtral-8x7B-Instruct-v0.1") TERTIARY_MODEL_NAME = os.getenv("TERTIARY_MODEL_NAME", "meta-llama/Llama-3-8b-chat-hf") # استبدال Qwen بنموذج متاح CLIP_BASE_MODEL = os.getenv("CLIP_BASE_MODEL", "Salesforce/blip-image-captioning-large") CLIP_LARGE_MODEL = os.getenv("CLIP_LARGE_MODEL", "openai/clip-vit-large-patch14") ASR_MODEL = os.getenv("ASR_MODEL", "openai/whisper-large-v3") TTS_MODEL = os.getenv("TTS_MODEL", "facebook/mms-tts-ara") # تعطيل PROVIDER_ENDPOINTS لأننا بنستخدم Hugging Face فقط PROVIDER_ENDPOINTS = { "huggingface": API_ENDPOINT # استخدام Hugging Face فقط } def check_model_availability(model_name: str, api_key: str) -> tuple[bool, str, str]: try: response = requests.get( f"{ROUTER_API_URL}/v1/models/{model_name}", headers={"Authorization": f"Bearer {api_key}"}, timeout=30 ) if response.status_code == 200: logger.info(f"Model {model_name} is available at {API_ENDPOINT}") return True, api_key, API_ENDPOINT elif response.status_code == 429 and BACKUP_HF_TOKEN and api_key != BACKUP_HF_TOKEN: logger.warning(f"Rate limit reached for token {api_key}. Switching to backup token.") return check_model_availability(model_name, BACKUP_HF_TOKEN) logger.error(f"Model {model_name} not available: {response.status_code} - {response.text}") return False, api_key, API_ENDPOINT except Exception as e: logger.error(f"Failed to check model availability for {model_name}: {e}") if BACKUP_HF_TOKEN and api_key != BACKUP_HF_TOKEN: logger.warning(f"Retrying with backup token for {model_name}") return check_model_availability(model_name, BACKUP_HF_TOKEN) return False, api_key, API_ENDPOINT def select_model(query: str, input_type: str = "text", preferred_model: Optional[str] = None) -> tuple[str, str]: if preferred_model and preferred_model in MODEL_ALIASES: model_name = MODEL_ALIASES[preferred_model] is_available, _, endpoint = check_model_availability(model_name, HF_TOKEN) if is_available: logger.info(f"Selected preferred model {model_name} with endpoint {endpoint} for query: {query}") return model_name, endpoint query_lower = query.lower() if input_type == "audio" or any(keyword in query_lower for keyword in ["voice", "audio", "speech", "صوت", "تحويل صوت"]): logger.info(f"Selected {ASR_MODEL} with endpoint {FALLBACK_API_ENDPOINT} for audio input") return ASR_MODEL, FALLBACK_API_ENDPOINT if any(keyword in query_lower for keyword in ["text-to-speech", "tts", "تحويل نص إلى صوت"]) or input_type == "tts": logger.info(f"Selected {TTS_MODEL} with endpoint {FALLBACK_API_ENDPOINT} for text-to-speech") return TTS_MODEL, FALLBACK_API_ENDPOINT image_patterns = [ r"\bimage\b", r"\bpicture\b", r"\bphoto\b", r"\bvisual\b", r"\bصورة\b", r"\bتحليل\s+صورة\b", r"\bimage\s+analysis\b", r"\bimage\s+classification\b", r"\bimage\s+description\b" ] for pattern in image_patterns: if re.search(pattern, query_lower, re.IGNORECASE): logger.info(f"Selected {CLIP_BASE_MODEL} with endpoint {FALLBACK_API_ENDPOINT} for image-related query: {query}") return CLIP_BASE_MODEL, FALLBACK_API_ENDPOINT available_models = [ (MODEL_NAME, API_ENDPOINT), (SECONDARY_MODEL_NAME, FALLBACK_API_ENDPOINT), (TERTIARY_MODEL_NAME, API_ENDPOINT) ] for model_name, api_endpoint in available_models: is_available, _, endpoint = check_model_availability(model_name, HF_TOKEN) if is_available: logger.info(f"Selected {model_name} with endpoint {endpoint} for query: {query}") return model_name, endpoint logger.error("No models available. Falling back to default.") return MODEL_NAME, API_ENDPOINT @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=2, min=4, max=60)) def request_generation( api_key: str, api_base: str, message: str, system_prompt: str, model_name: str, chat_history: Optional[List[dict]] = None, temperature: float = 0.7, max_new_tokens: int = 2048, reasoning_effort: str = "off", tools: Optional[List[dict]] = None, tool_choice: Optional[str] = None, deep_search: bool = False, input_type: str = "text", audio_data: Optional[bytes] = None, image_data: Optional[bytes] = None, output_format: str = "text" ) -> Generator[bytes | str, None, None]: is_available, selected_api_key, selected_endpoint = check_model_availability(model_name, api_key) if not is_available: yield f"Error: Model {model_name} is not available. Please check the model endpoint or token." return cache_key = hashlib.md5(json.dumps({ "message": message, "system_prompt": system_prompt, "model_name": model_name, "chat_history": chat_history, "temperature": temperature, "max_new_tokens": max_new_tokens, "output_format": output_format }, sort_keys=True).encode()).hexdigest() if cache_key in cache: logger.info(f"Cache hit for query: {message[:50]}...") for chunk in cache[cache_key]: yield chunk return client = OpenAI(api_key=selected_api_key, base_url=selected_endpoint, timeout=120.0) task_type = "general" enhanced_system_prompt = system_prompt buffer = "" # تعريف buffer هنا لتجنب UnboundLocalError if model_name == ASR_MODEL and audio_data: task_type = "audio_transcription" try: audio_file = io.BytesIO(audio_data) audio = pydub.AudioSegment.from_file(audio_file) audio = audio.set_channels(1).set_frame_rate(16000) audio_file = io.BytesIO() audio.export(audio_file, format="wav") audio_file.name = "audio.wav" transcription = client.audio.transcriptions.create( model=model_name, file=audio_file, response_format="text" ) yield transcription cache[cache_key] = [transcription] return except Exception as e: logger.error(f"Audio transcription failed: {e}") yield f"Error: Audio transcription failed: {e}" return if model_name == TTS_MODEL or output_format == "audio": task_type = "text_to_speech" try: model = ParlerTTSForConditionalGeneration.from_pretrained(TTS_MODEL) processor = AutoProcessor.from_pretrained(TTS_MODEL) inputs = processor(text=message, return_tensors="pt") audio = model.generate(**inputs) audio_file = io.BytesIO() torchaudio.save(audio_file, audio[0], sample_rate=22050, format="wav") audio_file.seek(0) audio_data = audio_file.read() yield audio_data cache[cache_key] = [audio_data] return except Exception as e: logger.error(f"Text-to-speech failed: {e}") yield f"Error: Text-to-speech failed: {e}" return if model_name in [CLIP_BASE_MODEL, CLIP_LARGE_MODEL] and image_data: task_type = "image_analysis" try: model = CLIPModel.from_pretrained(model_name) processor = CLIPProcessor.from_pretrained(model_name) image = Image.open(io.BytesIO(image_data)).convert("RGB") inputs = processor(text=message, images=image, return_tensors="pt", padding=True) outputs = model(**inputs) logits_per_image = outputs.logits_per_image probs = logits_per_image.softmax(dim=1) result = f"Image analysis result: {probs.tolist()}" if output_format == "audio": model = ParlerTTSForConditionalGeneration.from_pretrained(TTS_MODEL) processor = AutoProcessor.from_pretrained(TTS_MODEL) inputs = processor(text=result, return_tensors="pt") audio = model.generate(**inputs) audio_file = io.BytesIO() torchaudio.save(audio_file, audio[0], sample_rate=22050, format="wav") audio_file.seek(0) audio_data = audio_file.read() yield audio_data else: yield result cache[cache_key] = [result] return except Exception as e: logger.error(f"Image analysis failed: {e}") yield f"Error: Image analysis failed: {e}" return if model_name in [CLIP_BASE_MODEL, CLIP_LARGE_MODEL]: task_type = "image" enhanced_system_prompt = f"{system_prompt}\nYou are an expert in image analysis and description. Provide detailed descriptions, classifications, or analysis of images based on the query." elif any(keyword in message.lower() for keyword in ["code", "programming", "python", "javascript", "react", "django", "flask"]): task_type = "code" enhanced_system_prompt = f"{system_prompt}\nYou are an expert programmer. Provide accurate, well-commented code with comprehensive examples and detailed explanations." elif any(keyword in message.lower() for keyword in ["analyze", "analysis", "تحليل"]): task_type = "analysis" enhanced_system_prompt = f"{system_prompt}\nProvide detailed analysis with step-by-step reasoning, examples, and data-driven insights." elif any(keyword in message.lower() for keyword in ["review", "مراجعة"]): task_type = "review" enhanced_system_prompt = f"{system_prompt}\nReview the provided content thoroughly, identify issues, and suggest improvements with detailed explanations." elif any(keyword in message.lower() for keyword in ["publish", "نشر"]): task_type = "publish" enhanced_system_prompt = f"{system_prompt}\nPrepare content for publishing, ensuring clarity, professionalism, and adherence to best practices." else: enhanced_system_prompt = f"{system_prompt}\nFor general queries, provide comprehensive, detailed responses with examples and explanations where applicable." if len(message.split()) < 5: enhanced_system_prompt += "\nEven for short or general queries, provide a detailed, in-depth response." logger.info(f"Task type detected: {task_type}") input_messages: List[dict] = [{"role": "system", "content": enhanced_system_prompt}] if chat_history: for msg in chat_history: clean_msg = {"role": msg.get("role"), "content": msg.get("content")} if clean_msg["content"]: input_messages.append(clean_msg) if deep_search: try: search_result = web_search(message) input_messages.append({"role": "user", "content": f"User query: {message}\nWeb search context: {search_result}"}) except Exception as e: logger.error(f"Web search failed: {e}") input_messages.append({"role": "user", "content": message}) else: input_messages.append({"role": "user", "content": message}) tools = tools if tools and model_name in [MODEL_NAME, SECONDARY_MODEL_NAME, TERTIARY_MODEL_NAME] else [] tool_choice = tool_choice if tool_choice in ["auto", "none", "any", "required"] and model_name in [MODEL_NAME, SECONDARY_MODEL_NAME, TERTIARY_MODEL_NAME] else "none" cached_chunks = [] try: stream = client.chat.completions.create( model=model_name, messages=input_messages, temperature=temperature, max_tokens=max_new_tokens, stream=True, tools=tools, tool_choice=tool_choice, ) reasoning_started = False reasoning_closed = False saw_visible_output = False last_tool_name = None last_tool_args = None for chunk in stream: if chunk.choices[0].delta.content: content = chunk.choices[0].delta.content if content == "<|channel|>analysis<|message|>": if not reasoning_started: cached_chunks.append("analysis") yield "analysis" reasoning_started = True continue if content == "<|channel|>final<|message|>": if reasoning_started and not reasoning_closed: cached_chunks.append("assistantfinal") yield "assistantfinal" reasoning_closed = True continue saw_visible_output = True buffer += content if "\n" in buffer or len(buffer) > 5000: cached_chunks.append(buffer) yield buffer buffer = "" continue if chunk.choices[0].delta.tool_calls and model_name in [MODEL_NAME, SECONDARY_MODEL_NAME, TERTIARY_MODEL_NAME]: tool_call = chunk.choices[0].delta.tool_calls[0] name = getattr(tool_call, "function", {}).get("name", None) args = getattr(tool_call, "function", {}).get("arguments", None) if name: last_tool_name = name if args: last_tool_args = args continue if chunk.choices[0].finish_reason in ("stop", "tool_calls", "error", "length"): if buffer: cached_chunks.append(buffer) yield buffer buffer = "" if reasoning_started and not reasoning_closed: cached_chunks.append("assistantfinal") yield "assistantfinal" reasoning_closed = True if not saw_visible_output: msg = "I attempted to call a tool, but tools aren't executed in this environment." if last_tool_name: try: args_text = json.dumps(last_tool_args, ensure_ascii=False, default=str) except Exception: args_text = str(last_tool_args) msg += f"\n\n• Tool requested: **{last_tool_name}**\n• Arguments: `{args_text}`" cached_chunks.append(msg) yield msg if chunk.choices[0].finish_reason == "error": cached_chunks.append(f"Error: Unknown error") yield f"Error: Unknown error" elif chunk.choices[0].finish_reason == "length": cached_chunks.append("Response truncated due to token limit.") yield "Response truncated due to token limit." break if buffer: cached_chunks.append(buffer) yield buffer if output_format == "audio": try: model = ParlerTTSForConditionalGeneration.from_pretrained(TTS_MODEL) processor = AutoProcessor.from_pretrained(TTS_MODEL) inputs = processor(text=buffer, return_tensors="pt") audio = model.generate(**inputs) audio_file = io.BytesIO() torchaudio.save(audio_file, audio[0], sample_rate=22050, format="wav") audio_file.seek(0) audio_data = audio_file.read() cached_chunks.append(audio_data) yield audio_data except Exception as e: logger.error(f"Text-to-speech conversion failed: {e}") yield f"Error: Text-to-speech conversion failed: {e}" cache[cache_key] = cached_chunks except Exception as e: logger.error(f"[Gateway] Streaming failed for model {model_name}: {e}") if selected_api_key != BACKUP_HF_TOKEN and BACKUP_HF_TOKEN: logger.warning(f"Retrying with backup token for {model_name}") for chunk in request_generation( api_key=BACKUP_HF_TOKEN, api_base=selected_endpoint, message=message, system_prompt=system_prompt, model_name=model_name, chat_history=chat_history, temperature=temperature, max_new_tokens=max_new_tokens, reasoning_effort=reasoning_effort, tools=tools, tool_choice=tool_choice, deep_search=deep_search, input_type=input_type, audio_data=audio_data, image_data=image_data, output_format=output_format, ): yield chunk return if model_name == MODEL_NAME: fallback_model = SECONDARY_MODEL_NAME fallback_endpoint = FALLBACK_API_ENDPOINT logger.info(f"Retrying with fallback model: {fallback_model} on {fallback_endpoint}") try: is_available, selected_api_key, selected_endpoint = check_model_availability(fallback_model, selected_api_key) if not is_available: yield f"Error: Fallback model {fallback_model} is not available." return client = OpenAI(api_key=selected_api_key, base_url=selected_endpoint, timeout=120.0) stream = client.chat.completions.create( model=fallback_model, messages=input_messages, temperature=temperature, max_tokens=max_new_tokens, stream=True, tools=[], tool_choice="none", ) buffer = "" # تعريف buffer للنموذج البديل for chunk in stream: if chunk.choices[0].delta.content: content = chunk.choices[0].delta.content if content == "<|channel|>analysis<|message|>": if not reasoning_started: cached_chunks.append("analysis") yield "analysis" reasoning_started = True continue if content == "<|channel|>final<|message|>": if reasoning_started and not reasoning_closed: cached_chunks.append("assistantfinal") yield "assistantfinal" reasoning_closed = True continue saw_visible_output = True buffer += content if "\n" in buffer or len(buffer) > 5000: cached_chunks.append(buffer) yield buffer buffer = "" continue if chunk.choices[0].finish_reason in ("stop", "error", "length"): if buffer: cached_chunks.append(buffer) yield buffer buffer = "" if reasoning_started and not reasoning_closed: cached_chunks.append("assistantfinal") yield "assistantfinal" reasoning_closed = True if not saw_visible_output: cached_chunks.append("No visible output produced.") yield "No visible output produced." if chunk.choices[0].finish_reason == "error": cached_chunks.append(f"Error: Unknown error with fallback model {fallback_model}") yield f"Error: Unknown error with fallback model {fallback_model}" elif chunk.choices[0].finish_reason == "length": cached_chunks.append("Response truncated due to token limit.") yield "Response truncated due to token limit." break if buffer and output_format == "audio": try: model = ParlerTTSForConditionalGeneration.from_pretrained(TTS_MODEL) processor = AutoProcessor.from_pretrained(TTS_MODEL) inputs = processor(text=buffer, return_tensors="pt") audio = model.generate(**inputs) audio_file = io.BytesIO() torchaudio.save(audio_file, audio[0], sample_rate=22050, format="wav") audio_file.seek(0) audio_data = audio_file.read() cached_chunks.append(audio_data) yield audio_data except Exception as e: logger.error(f"Text-to-speech conversion failed: {e}") yield f"Error: Text-to-speech conversion failed: {e}" cache[cache_key] = cached_chunks except Exception as e2: logger.error(f"[Gateway] Streaming failed for fallback model {fallback_model}: {e2}") try: is_available, selected_api_key, selected_endpoint = check_model_availability(TERTIARY_MODEL_NAME, selected_api_key) if not is_available: yield f"Error: Tertiary model {TERTIARY_MODEL_NAME} is not available." return client = OpenAI(api_key=selected_api_key, base_url=selected_endpoint, timeout=120.0) stream = client.chat.completions.create( model=TERTIARY_MODEL_NAME, messages=input_messages, temperature=temperature, max_tokens=max_new_tokens, stream=True, tools=[], tool_choice="none", ) buffer = "" # تعريف buffer للنموذج الثالث for chunk in stream: if chunk.choices[0].delta.content: content = chunk.choices[0].delta.content saw_visible_output = True buffer += content if "\n" in buffer or len(buffer) > 5000: cached_chunks.append(buffer) yield buffer buffer = "" continue if chunk.choices[0].finish_reason in ("stop", "error", "length"): if buffer: cached_chunks.append(buffer) yield buffer buffer = "" if not saw_visible_output: cached_chunks.append("No visible output produced.") yield "No visible output produced." if chunk.choices[0].finish_reason == "error": cached_chunks.append(f"Error: Unknown error with tertiary model {TERTIARY_MODEL_NAME}") yield f"Error: Unknown error with tertiary model {TERTIARY_MODEL_NAME}" elif chunk.choices[0].finish_reason == "length": cached_chunks.append("Response truncated due to token limit.") yield "Response truncated due to token limit." break if buffer and output_format == "audio": try: model = ParlerTTSForConditionalGeneration.from_pretrained(TTS_MODEL) processor = AutoProcessor.from_pretrained(TTS_MODEL) inputs = processor(text=buffer, return_tensors="pt") audio = model.generate(**inputs) audio_file = io.BytesIO() torchaudio.save(audio_file, audio[0], sample_rate=22050, format="wav") audio_file.seek(0) audio_data = audio_file.read() cached_chunks.append(audio_data) yield audio_data except Exception as e: logger.error(f"Text-to-speech conversion failed: {e}") yield f"Error: Text-to-speech conversion failed: {e}" cache[cache_key] = cached_chunks except Exception as e3: logger.error(f"[Gateway] Streaming failed for tertiary model {TERTIARY_MODEL_NAME}: {e3}") yield f"Error: Failed to load all models: Primary ({model_name}), Secondary ({fallback_model}), Tertiary ({TERTIARY_MODEL_NAME})." return else: yield f"Error: Failed to load model {model_name}: {e}" return def format_final(analysis_text: str, visible_text: str) -> str: reasoning_safe = html.escape((analysis_text or "").strip()) response = (visible_text or "").strip() if not reasoning_safe and not response: return "No response generated." return ( "
🤔 Analysis\n" "
"
        f"{reasoning_safe}"
        "
\n
\n\n" "**💬 Response:**\n\n" f"{response}" if response else "No final response available." ) def generate(message, history, system_prompt, temperature, reasoning_effort, enable_browsing, max_new_tokens, input_type="text", audio_data=None, image_data=None, output_format="text"): if not message.strip() and not audio_data and not image_data: yield "Please enter a prompt or upload a file." return model_name, api_endpoint = select_model(message, input_type=input_type) chat_history = [] for h in history: if isinstance(h, dict): clean_msg = {"role": h.get("role"), "content": h.get("content")} if clean_msg["content"]: chat_history.append(clean_msg) elif isinstance(h, (list, tuple)) and len(h) == 2: u, a = h if u: chat_history.append({"role": "user", "content": u}) if a: chat_history.append({"role": "assistant", "content": a}) tools = [ { "type": "function", "function": { "name": "web_search_preview", "description": "Perform a web search to gather additional context", "parameters": { "type": "object", "properties": {"query": {"type": "string", "description": "Search query"}}, "required": ["query"], }, }, }, { "type": "function", "function": { "name": "code_generation", "description": "Generate or modify code for various frameworks", "parameters": { "type": "object", "properties": { "code": {"type": "string", "description": "Existing code to modify or empty for new code"}, "framework": {"type": "string", "description": "Framework (e.g., React, Django, Flask)"}, "task": {"type": "string", "description": "Task description (e.g., create a component, fix a bug)"}, }, "required": ["task"], }, }, }, { "type": "function", "function": { "name": "code_formatter", "description": "Format code for readability and consistency", "parameters": { "type": "object", "properties": { "code": {"type": "string", "description": "Code to format"}, "language": {"type": "string", "description": "Programming language (e.g., Python, JavaScript)"}, }, "required": ["code", "language"], }, }, }, { "type": "function", "function": { "name": "image_analysis", "description": "Analyze or describe an image based on the provided query", "parameters": { "type": "object", "properties": { "image_url": {"type": "string", "description": "URL of the image to analyze"}, "task": {"type": "string", "description": "Task description (e.g., describe, classify)"}, }, "required": ["task"], }, }, } ] if model_name in [MODEL_NAME, SECONDARY_MODEL_NAME, TERTIARY_MODEL_NAME] else [] tool_choice = "auto" if model_name in [MODEL_NAME, SECONDARY_MODEL_NAME, TERTIARY_MODEL_NAME] else "none" in_analysis = False in_visible = False raw_analysis = "" raw_visible = "" raw_started = False last_flush_len = 0 def make_raw_preview() -> str: return ( """```text Analysis (live): {raw_analysis} Response (draft): {raw_visible} ```""".format(raw_analysis=raw_analysis, raw_visible=raw_visible) ) try: stream = request_generation( api_key=HF_TOKEN, api_base=api_endpoint, message=message, system_prompt=system_prompt, model_name=model_name, chat_history=chat_history, temperature=temperature, max_new_tokens=max_new_tokens, tools=tools, tool_choice=tool_choice, deep_search=enable_browsing, input_type=input_type, audio_data=audio_data, image_data=image_data, output_format=output_format, ) for chunk in stream: if isinstance(chunk, bytes): yield chunk continue if chunk == "analysis": in_analysis, in_visible = True, False if not raw_started: raw_started = True yield make_raw_preview() continue if chunk == "assistantfinal": in_analysis, in_visible = False, True if not raw_started: raw_started = True yield make_raw_preview() continue if in_analysis: raw_analysis += chunk elif in_visible: raw_visible += chunk else: raw_visible += chunk total_len = len(raw_analysis) + len(raw_visible) if total_len - last_flush_len >= 120 or "\n" in chunk: last_flush_len = total_len yield make_raw_preview() final_markdown = format_final(raw_analysis, raw_visible) if final_markdown.count("$") % 2: final_markdown += "$" yield final_markdown except Exception as e: logger.exception("Stream failed") yield f"❌ Error: {e}"