import os import re import mimetypes import tempfile import uuid import datetime import base64 import time import threading import atexit from typing import Dict, List, Optional, Tuple, Union from pathlib import Path import PyPDF2 import docx import cv2 import numpy as np from PIL import Image import pytesseract from huggingface_hub import InferenceClient, HfApi import gradio as gr from config import HF_TOKEN, SEARCH_START, DIVIDER, REPLACE_END, TEMP_DIR_TTL_SECONDS # Global temp file tracking MEDIA_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_media") VIDEO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_videos") AUDIO_TEMP_DIR = os.path.join(tempfile.gettempdir(), "anycoder_audio") _SESSION_MEDIA_FILES: Dict[str, List[str]] = {} _SESSION_VIDEO_FILES: Dict[str, List[str]] = {} _SESSION_AUDIO_FILES: Dict[str, List[str]] = {} _MEDIA_FILES_LOCK = threading.Lock() _VIDEO_FILES_LOCK = threading.Lock() _AUDIO_FILES_LOCK = threading.Lock() temp_media_files = {} def ensure_temp_dirs(): """Ensure all temporary directories exist""" for temp_dir in [MEDIA_TEMP_DIR, VIDEO_TEMP_DIR, AUDIO_TEMP_DIR]: try: os.makedirs(temp_dir, exist_ok=True) except Exception: pass def get_inference_client(model_id: str, provider: str = "auto"): """Return an InferenceClient based on model_id and provider""" if not HF_TOKEN: raise RuntimeError("HF_TOKEN environment variable is not set") # Special API handling for specific models openai_models = { "qwen3-30b-a3b-instruct-2507": { "api_key": os.getenv("DASHSCOPE_API_KEY"), "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1" }, "gpt-5": { "api_key": os.getenv("POE_API_KEY"), "base_url": "https://api.poe.com/v1" }, "kimi-k2-turbo-preview": { "api_key": os.getenv("MOONSHOT_API_KEY"), "base_url": "https://api.moonshot.ai/v1" }, "gemini-2.5-flash": { "api_key": os.getenv("GEMINI_API_KEY"), "base_url": "https://generativelanguage.googleapis.com/v1beta/openai/" } } if model_id in openai_models: from openai import OpenAI config = openai_models[model_id] return OpenAI(api_key=config["api_key"], base_url=config["base_url"]) # Mistral models if model_id in ("codestral-2508", "mistral-medium-2508"): from mistralai import Mistral return Mistral(api_key=os.getenv("MISTRAL_API_KEY")) # Provider-specific routing provider_map = { "openai/gpt-oss-120b": "groq", "openai/gpt-oss-20b": "groq", "Qwen/Qwen3-235B-A22B": "cerebras", "Qwen/Qwen3-Coder-480B-A35B-Instruct": "cerebras", "deepseek-ai/DeepSeek-V3.1": "novita", "zai-org/GLM-4.5": "fireworks-ai" } if model_id in provider_map: provider = provider_map[model_id] return InferenceClient( provider=provider, api_key=HF_TOKEN, bill_to="huggingface" ) def remove_code_block(text: str) -> str: """Remove code block markers from text""" if not text: return text patterns = [ r'```(?:html|HTML)\n([\s\S]+?)\n```', r'```\n([\s\S]+?)\n```', r'```([\s\S]+?)```' ] for pattern in patterns: match = re.search(pattern, text, re.DOTALL) if match: extracted = match.group(1).strip() # Remove language marker line if present lines = extracted.split('\n', 1) if lines[0].strip().lower() in ['python', 'html', 'css', 'javascript', 'json']: return lines[1] if len(lines) > 1 else '' # Handle HTML content with potential prefixes for tag in [' 0: return extracted[idx:].strip() return extracted # Check if the entire text is HTML stripped = text.strip() if stripped.startswith(('', ' 0: return stripped[idx:].strip() return stripped return text.strip() def extract_text_from_image(image_path: str) -> str: """Extract text from image using OCR""" try: # Check if tesseract is available try: pytesseract.get_tesseract_version() except Exception: return "Error: Tesseract OCR is not installed. Please install Tesseract to extract text from images." # Read and process image image = cv2.imread(image_path) if image is None: return "Error: Could not read image file" # Convert and preprocess image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) gray = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2GRAY) _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # Extract text text = pytesseract.image_to_string(binary, config='--psm 6') return text.strip() if text.strip() else "No text found in image" except Exception as e: return f"Error extracting text from image: {e}" def extract_text_from_file(file_path: str) -> str: """Extract text from various file formats""" if not file_path or not os.path.exists(file_path): return "" ext = os.path.splitext(file_path)[1].lower() try: if ext == ".pdf": with open(file_path, "rb") as f: reader = PyPDF2.PdfReader(f) return "\n".join(page.extract_text() or "" for page in reader.pages) elif ext in [".txt", ".md", ".csv"]: with open(file_path, "r", encoding="utf-8") as f: return f.read() elif ext == ".docx": doc = docx.Document(file_path) return "\n".join([para.text for para in doc.paragraphs]) elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".gif", ".webp"]: return extract_text_from_image(file_path) else: return "" except Exception as e: return f"Error extracting text: {e}" def compress_media_for_data_uri(media_bytes: bytes, media_type: str = "video", max_size_mb: int = 8) -> bytes: """Compress media bytes for data URI embedding""" max_size = max_size_mb * 1024 * 1024 if len(media_bytes) <= max_size: return media_bytes print(f"[MediaCompress] {media_type} size {len(media_bytes)} bytes exceeds {max_size_mb}MB limit, attempting compression") try: import subprocess # Create temp files with tempfile.NamedTemporaryFile(suffix=f'.{media_type[:3]}', delete=False) as temp_input: temp_input.write(media_bytes) temp_input_path = temp_input.name temp_output_path = temp_input_path.replace(f'.{media_type[:3]}', f'_compressed.{media_type[:3]}') try: if media_type == "video": # Compress video with ffmpeg subprocess.run([ 'ffmpeg', '-i', temp_input_path, '-vcodec', 'libx264', '-crf', '30', '-preset', 'fast', '-vf', 'scale=480:-1', '-r', '15', '-an', # Remove audio '-y', temp_output_path ], check=True, capture_output=True, stderr=subprocess.DEVNULL) else: # audio subprocess.run([ 'ffmpeg', '-i', temp_input_path, '-codec:a', 'libmp3lame', '-b:a', '64k', '-y', temp_output_path ], check=True, capture_output=True, stderr=subprocess.DEVNULL) # Read compressed media with open(temp_output_path, 'rb') as f: compressed_bytes = f.read() print(f"[MediaCompress] Compressed from {len(media_bytes)} to {len(compressed_bytes)} bytes") return compressed_bytes except (subprocess.CalledProcessError, FileNotFoundError): print(f"[MediaCompress] ffmpeg compression failed, using original {media_type}") return media_bytes finally: # Clean up temp files for path in [temp_input_path, temp_output_path]: try: if os.path.exists(path): os.remove(path) except Exception: pass except Exception as e: print(f"[MediaCompress] Compression failed: {e}, using original {media_type}") return media_bytes def create_temp_media_url(media_bytes: bytes, filename: str, media_type: str = "image", session_id: Optional[str] = None) -> str: """Create a temporary file and return a local URL for preview""" try: # Create unique filename timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") unique_id = str(uuid.uuid4())[:8] base_name, ext = os.path.splitext(filename) unique_filename = f"{media_type}_{timestamp}_{unique_id}_{base_name}{ext}" # Create temporary file ensure_temp_dirs() temp_path = os.path.join(MEDIA_TEMP_DIR, unique_filename) with open(temp_path, 'wb') as f: f.write(media_bytes) # Track file for cleanup if session_id: track_session_media_file(session_id, temp_path) # Store file info file_id = f"{media_type}_{unique_id}" temp_media_files[file_id] = { 'path': temp_path, 'filename': filename, 'media_type': media_type, 'media_bytes': media_bytes } file_url = f"file://{temp_path}" print(f"[TempMedia] Created temporary {media_type} file: {file_url}") return file_url except Exception as e: print(f"[TempMedia] Failed to create temporary file: {str(e)}") return f"Error creating temporary {media_type} file: {str(e)}" def track_session_media_file(session_id: Optional[str], file_path: str) -> None: """Track a media file for session-based cleanup""" if not session_id or not file_path: return with _MEDIA_FILES_LOCK: if session_id not in _SESSION_MEDIA_FILES: _SESSION_MEDIA_FILES[session_id] = [] _SESSION_MEDIA_FILES[session_id].append(file_path) def cleanup_session_media(session_id: Optional[str]) -> None: """Clean up media files for a specific session""" if not session_id: return with _MEDIA_FILES_LOCK: files_to_clean = _SESSION_MEDIA_FILES.pop(session_id, []) for path in files_to_clean: try: if path and os.path.exists(path): os.unlink(path) except Exception: pass def reap_old_media(ttl_seconds: int = TEMP_DIR_TTL_SECONDS) -> None: """Delete old media files based on modification time""" try: ensure_temp_dirs() now_ts = time.time() for temp_dir in [MEDIA_TEMP_DIR, VIDEO_TEMP_DIR, AUDIO_TEMP_DIR]: if not os.path.exists(temp_dir): continue for name in os.listdir(temp_dir): path = os.path.join(temp_dir, name) if os.path.isfile(path): try: mtime = os.path.getmtime(path) if (now_ts - mtime) > ttl_seconds: os.unlink(path) except Exception: pass except Exception: pass def cleanup_all_temp_media(): """Clean up all temporary media files""" try: print("[Cleanup] Cleaning up temporary media files...") # Clean up temp_media_files registry for file_id, file_info in temp_media_files.items(): try: if os.path.exists(file_info['path']): os.unlink(file_info['path']) except Exception: pass temp_media_files.clear() # Clean up all session files with _MEDIA_FILES_LOCK: for session_files in _SESSION_MEDIA_FILES.values(): for path in session_files: try: if path and os.path.exists(path): os.unlink(path) except Exception: pass _SESSION_MEDIA_FILES.clear() print("[Cleanup] Temporary media cleanup completed") except Exception as e: print(f"[Cleanup] Error during cleanup: {str(e)}") def process_image_for_model(image) -> Optional[str]: """Convert image to base64 for model input""" if image is None: return None import io import base64 import numpy as np from PIL import Image as PILImage # Handle numpy array from Gradio if isinstance(image, np.ndarray): image = PILImage.fromarray(image) buffer = io.BytesIO() image.save(buffer, format='PNG') img_str = base64.b64encode(buffer.getvalue()).decode('utf-8') return f"data:image/png;base64,{img_str}" def create_multimodal_message(text: str, image=None) -> Dict: """Create a chat message with optional image""" if image is None: return {"role": "user", "content": text} # For broad provider compatibility, use string content with note return {"role": "user", "content": f"{text}\n\n[An image was provided as reference.]"} def apply_search_replace_changes(original_content: str, changes_text: str) -> str: """Apply search/replace changes to content""" if not changes_text.strip(): return original_content # CSS rule fallback for non-block formats if (SEARCH_START not in changes_text) and (DIVIDER not in changes_text) and (REPLACE_END not in changes_text): try: updated_content = original_content replaced_any_rule = False # Find CSS-like rule blocks css_blocks = re.findall(r"([^{]+)\{([\s\S]*?)\}", changes_text, flags=re.MULTILINE) for selector_raw, body_raw in css_blocks: selector = selector_raw.strip() body = body_raw.strip() if not selector: continue pattern = re.compile(rf"({re.escape(selector)}\s*\{{)([\s\S]*?)(\}})") def _replace_rule(match): nonlocal replaced_any_rule replaced_any_rule = True prefix, existing_body, suffix = match.groups() # Preserve indentation first_line_indent = "" for line in existing_body.splitlines(): stripped = line.lstrip(" \t") if stripped: first_line_indent = line[: len(line) - len(stripped)] break if body: new_body_lines = [first_line_indent + line if line.strip() else line for line in body.splitlines()] new_body_text = "\n" + "\n".join(new_body_lines) + "\n" else: new_body_text = existing_body return f"{prefix}{new_body_text}{suffix}" updated_content, num_subs = pattern.subn(_replace_rule, updated_content, count=1) if replaced_any_rule: return updated_content except Exception: pass # Parse search/replace blocks blocks = [] current_block = "" lines = changes_text.split('\n') for line in lines: if line.strip() == SEARCH_START: if current_block.strip(): blocks.append(current_block.strip()) current_block = line + '\n' elif line.strip() == REPLACE_END: current_block += line + '\n' blocks.append(current_block.strip()) current_block = "" else: current_block += line + '\n' if current_block.strip(): blocks.append(current_block.strip()) modified_content = original_content for block in blocks: if not block.strip(): continue lines = block.split('\n') search_lines = [] replace_lines = [] in_search = False in_replace = False for line in lines: if line.strip() == SEARCH_START: in_search = True in_replace = False elif line.strip() == DIVIDER: in_search = False in_replace = True elif line.strip() == REPLACE_END: in_replace = False elif in_search: search_lines.append(line) elif in_replace: replace_lines.append(line) if search_lines: search_text = '\n'.join(search_lines).strip() replace_text = '\n'.join(replace_lines).strip() if search_text in modified_content: modified_content = modified_content.replace(search_text, replace_text) else: print(f"Warning: Search text not found: {search_text[:100]}...") return modified_content def validate_video_html(video_html: str) -> bool: """Validate that video HTML is well-formed and safe""" try: if not video_html or not video_html.strip(): return False if '' not in video_html: return False if '') + 8 if video_start == -1 or video_end == 7: return False return True except Exception: return False # Register cleanup handler atexit.register(cleanup_all_temp_media)