import os import re import ast import tempfile import time import webbrowser import urllib.parse from typing import Optional, List, Dict, Tuple, Union import gradio as gr from huggingface_hub import HfApi, duplicate_space, list_repo_files import requests from utils import get_inference_client from code_processing import parse_transformers_js_output, parse_svelte_output, parse_multipage_html_output, validate_and_autofix_files from web_utils import check_hf_space_url, parse_repo_or_model_url class DependencyManager: """Handles Python dependency extraction and requirements.txt generation""" @staticmethod def extract_import_statements(code: str) -> List[str]: """Extract import statements from generated code""" import_statements = [] # Built-in Python modules to exclude builtin_modules = { 'os', 'sys', 'json', 'time', 'datetime', 'random', 'math', 're', 'collections', 'itertools', 'functools', 'pathlib', 'urllib', 'http', 'email', 'html', 'xml', 'csv', 'tempfile', 'shutil', 'subprocess', 'threading', 'multiprocessing', 'asyncio', 'logging', 'typing', 'base64', 'hashlib', 'secrets', 'uuid', 'copy', 'pickle', 'io', 'contextlib', 'warnings', 'sqlite3', 'gzip', 'zipfile', 'tarfile', 'socket', 'ssl', 'platform', 'getpass', 'pwd', 'grp', 'stat', 'glob', 'fnmatch', 'linecache', 'traceback', 'inspect', 'keyword', 'token', 'tokenize', 'ast', 'code', 'codeop', 'dis', 'py_compile', 'compileall', 'importlib', 'pkgutil', 'modulefinder', 'runpy', 'site', 'sysconfig' } try: # Try to parse as Python AST tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: module_name = alias.name.split('.')[0] if module_name not in builtin_modules and not module_name.startswith('_'): import_statements.append(f"import {alias.name}") elif isinstance(node, ast.ImportFrom): if node.module: module_name = node.module.split('.')[0] if module_name not in builtin_modules and not module_name.startswith('_'): names = [alias.name for alias in node.names] import_statements.append(f"from {node.module} import {', '.join(names)}") except SyntaxError: # Fallback: use regex to find import statements for line in code.split('\n'): line = line.strip() if line.startswith('import ') or line.startswith('from '): # Check if it's not a builtin module if line.startswith('import '): module_name = line.split()[1].split('.')[0] elif line.startswith('from '): module_name = line.split()[1].split('.')[0] if module_name not in builtin_modules and not module_name.startswith('_'): import_statements.append(line) return list(set(import_statements)) # Remove duplicates @staticmethod def generate_requirements_txt_with_llm(import_statements: List[str]) -> str: """Generate requirements.txt content using LLM based on import statements""" if not import_statements: return "# No additional dependencies required\n" try: client = get_inference_client("Qwen/Qwen3-Coder-480B-A35B-Instruct", "auto") imports_text = '\n'.join(import_statements) prompt = f"""Based on the following Python import statements, generate a comprehensive requirements.txt file with all necessary and commonly used related packages: {imports_text} Instructions: - Include the direct packages needed for the imports - Include commonly used companion packages and dependencies for better functionality - Use correct PyPI package names (e.g., cv2 -> opencv-python, PIL -> Pillow, sklearn -> scikit-learn) - Examples of comprehensive dependencies: * transformers often needs: accelerate, torch, tokenizers, datasets * gradio often needs: requests, Pillow for image handling * pandas often needs: numpy, openpyxl for Excel files * matplotlib often needs: numpy, pillow for image saving * sklearn often needs: numpy, scipy, joblib * streamlit often needs: pandas, numpy, requests * opencv-python often needs: numpy, pillow * fastapi often needs: uvicorn, pydantic * torch often needs: torchvision, torchaudio (if doing computer vision/audio) - Include packages for common file formats if relevant (openpyxl, python-docx, PyPDF2) - Do not include Python built-in modules - Do not specify versions unless there are known compatibility issues - One package per line - If no external packages are needed, return "# No additional dependencies required" Generate a comprehensive requirements.txt that ensures the application will work smoothly:""" messages = [ {"role": "system", "content": "You are a Python packaging expert specializing in creating comprehensive, production-ready requirements.txt files. Your goal is to ensure applications work smoothly by including not just direct dependencies but also commonly needed companion packages, popular extensions, and supporting libraries that developers typically need together."}, {"role": "user", "content": prompt} ] response = client.chat.completions.create( model="Qwen/Qwen3-Coder-480B-A35B-Instruct", messages=messages, max_tokens=1024, temperature=0.1 ) requirements_content = response.choices[0].message.content.strip() # Clean up the response if '```' in requirements_content: lines = requirements_content.split('\n') in_code_block = False clean_lines = [] for line in lines: if line.strip().startswith('```'): in_code_block = not in_code_block continue if in_code_block: clean_lines.append(line) requirements_content = '\n'.join(clean_lines).strip() # Ensure it ends with a newline if requirements_content and not requirements_content.endswith('\n'): requirements_content += '\n' return requirements_content if requirements_content else "# No additional dependencies required\n" except Exception as e: print(f"[Dependencies] LLM generation failed, using fallback: {e}") # Fallback: simple extraction with basic mapping return DependencyManager._generate_requirements_fallback(import_statements) @staticmethod def _generate_requirements_fallback(import_statements: List[str]) -> str: """Fallback requirements generation with basic mapping""" dependencies = set() special_cases = { 'cv2': 'opencv-python', 'PIL': 'Pillow', 'sklearn': 'scikit-learn', 'skimage': 'scikit-image', 'bs4': 'beautifulsoup4' } for stmt in import_statements: if stmt.startswith('import '): module_name = stmt.split()[1].split('.')[0] package_name = special_cases.get(module_name, module_name) dependencies.add(package_name) elif stmt.startswith('from '): module_name = stmt.split()[1].split('.')[0] package_name = special_cases.get(module_name, module_name) dependencies.add(package_name) if dependencies: return '\n'.join(sorted(dependencies)) + '\n' else: return "# No additional dependencies required\n" class SpaceManager: """Handles Hugging Face Space operations""" @staticmethod def add_anycoder_tag_to_readme(api: HfApi, repo_id: str) -> None: """Download existing README, add anycoder tag, and upload back""" try: # Download the existing README readme_path = api.hf_hub_download( repo_id=repo_id, filename="README.md", repo_type="space" ) # Read the existing README content with open(readme_path, 'r', encoding='utf-8') as f: content = f.read() # Parse frontmatter and content if content.startswith('---'): # Split frontmatter and body parts = content.split('---', 2) if len(parts) >= 3: frontmatter = parts[1].strip() body = parts[2] if len(parts) > 2 else "" # Check if tags already exist if 'tags:' in frontmatter: # Add anycoder to existing tags if not present if '- anycoder' not in frontmatter: frontmatter = re.sub(r'(tags:\s*\n(?:\s*-\s*[^\n]+\n)*)', r'\1- anycoder\n', frontmatter) else: # Add tags section with anycoder frontmatter += '\ntags:\n- anycoder' # Reconstruct the README new_content = f"---\n{frontmatter}\n---{body}" else: # Malformed frontmatter, just add tags at the end of frontmatter new_content = content.replace('---', '---\ntags:\n- anycoder\n---', 1) else: # No frontmatter, add it at the beginning new_content = f"---\ntags:\n- anycoder\n---\n\n{content}" # Upload the modified README with tempfile.NamedTemporaryFile("w", suffix=".md", delete=False, encoding='utf-8') as f: f.write(new_content) temp_path = f.name api.upload_file( path_or_fileobj=temp_path, path_in_repo="README.md", repo_id=repo_id, repo_type="space" ) os.unlink(temp_path) print(f"[SpaceManager] Added anycoder tag to {repo_id}") except Exception as e: print(f"[SpaceManager] Could not modify README.md: {e}") class ProjectImporter: """Handles importing projects from various sources""" @staticmethod def fetch_hf_space_content(username: str, project_name: str) -> str: """Fetch content from a Hugging Face Space""" try: api = HfApi() space_info = api.space_info(f"{username}/{project_name}") # Try to fetch the main file based on SDK sdk = space_info.sdk main_file = None # Define file patterns based on SDK if sdk == "static": file_patterns = ["index.html"] elif sdk == "gradio": file_patterns = ["app.py", "main.py", "gradio_app.py"] elif sdk == "streamlit": file_patterns = [ "streamlit_app.py", "src/streamlit_app.py", "app.py", "src/app.py", "main.py", "src/main.py", "Home.py", "src/Home.py", "🏠_Home.py", "src/🏠_Home.py", "1_🏠_Home.py", "src/1_🏠_Home.py" ] else: file_patterns = [ "app.py", "src/app.py", "index.html", "streamlit_app.py", "src/streamlit_app.py", "main.py", "src/main.py", "Home.py", "src/Home.py" ] # Try to find and download the main file for file in file_patterns: try: content = api.hf_hub_download( repo_id=f"{username}/{project_name}", filename=file, repo_type="space" ) main_file = file break except: continue # If no main file found, list repository files and find Python files if not main_file and sdk in ["streamlit", "gradio"]: try: files = list_repo_files(repo_id=f"{username}/{project_name}", repo_type="space") # Look for Python files that might be the main file python_files = [f for f in files if f.endswith('.py') and not f.startswith('.') and (('/' not in f) or f.startswith('src/'))] for py_file in python_files: try: content = api.hf_hub_download( repo_id=f"{username}/{project_name}", filename=py_file, repo_type="space" ) main_file = py_file break except: continue except: pass if main_file: content = api.hf_hub_download( repo_id=f"{username}/{project_name}", filename=main_file, repo_type="space" ) # Read the file content with open(content, 'r', encoding='utf-8') as f: file_content = f.read() return f"""IMPORTED PROJECT FROM HUGGING FACE SPACE ============================================== Space: {username}/{project_name} SDK: {sdk} Main File: {main_file} {file_content}""" else: # Try to get more information about available files try: files = list_repo_files(repo_id=f"{username}/{project_name}", repo_type="space") available_files = [f for f in files if not f.startswith('.') and not f.endswith('.md')] return f"Error: Could not find main file in space {username}/{project_name}.\n\nSDK: {sdk}\nAvailable files: {', '.join(available_files[:10])}{'...' if len(available_files) > 10 else ''}\n\nTried looking for: {', '.join(file_patterns)}" except: return f"Error: Could not find main file in space {username}/{project_name}. Expected files for {sdk} SDK: {', '.join(file_patterns) if 'file_patterns' in locals() else 'standard files'}" except Exception as e: return f"Error fetching space content: {str(e)}" @staticmethod def load_project_from_url(url: str) -> Tuple[str, str]: """Load project from Hugging Face Space URL""" # Validate URL is_valid, username, project_name = check_hf_space_url(url) if not is_valid: return "Error: Please enter a valid Hugging Face Spaces URL.\n\nExpected format: https://huggingface.co/spaces/username/project", "" # Fetch content content = ProjectImporter.fetch_hf_space_content(username, project_name) if content.startswith("Error:"): return content, "" # Extract the actual code content by removing metadata lines = content.split('\n') code_start = 0 for i, line in enumerate(lines): # Skip metadata lines and find the start of actual code if (line.strip() and not line.startswith('=') and not line.startswith('IMPORTED PROJECT') and not line.startswith('Space:') and not line.startswith('SDK:') and not line.startswith('Main File:')): code_start = i break code_content = '\n'.join(lines[code_start:]) return f"✅ Successfully imported project from {username}/{project_name}", code_content class Deployer: """Handles deployment to various platforms""" def __init__(self): self.dependency_manager = DependencyManager() self.space_manager = SpaceManager() def deploy_to_user_space(self, code: str, space_name: str, sdk_name: str, profile: Optional[gr.OAuthProfile] = None, token: Optional[gr.OAuthToken] = None) -> str: """Deploy code to user's Hugging Face Space""" if not code or not code.strip(): return "No code to deploy." if profile is None or token is None: return "Please log in with your Hugging Face account to deploy to your own Space." # Check token validity if not token.token or token.token == "hf_": return "Error: Invalid token. Please log in again with your Hugging Face account to get a valid write token." # Determine if this is an update or new space is_update = "/" in space_name.strip() if is_update: repo_id = space_name.strip() space_username = repo_id.split('/')[0] if space_username != profile.username: return f"Error: You can only update your own spaces. This space belongs to {space_username}." else: repo_id = f"{profile.username}/{space_name.strip()}" # Verify access for updates if is_update: try: api = HfApi(token=token.token) space_info = api.space_info(repo_id) if not space_info: return f"Error: Could not access space {repo_id}. Please check your permissions." except Exception as e: return f"Error: No write access to space {repo_id}. Error: {str(e)}" # Route to appropriate deployment method try: if sdk_name == "Streamlit (Python)": return self._deploy_streamlit(code, repo_id, is_update, token) elif sdk_name == "Transformers.js": return self._deploy_transformers_js(code, repo_id, is_update, token) elif sdk_name == "Svelte": return self._deploy_svelte(code, repo_id, is_update, token) elif sdk_name == "Static (HTML)": return self._deploy_static_html(code, repo_id, is_update, token) else: # Gradio (Python) return self._deploy_gradio(code, repo_id, is_update, token) except Exception as e: return f"Error during deployment: {str(e)}" def _deploy_streamlit(self, code: str, repo_id: str, is_update: bool, token) -> str: """Deploy Streamlit app""" api = HfApi(token=token.token) try: if not is_update: # Duplicate template space duplicated_repo = duplicate_space( from_id="streamlit/streamlit-template-space", to_id=repo_id.split('/')[-1], token=token.token, exist_ok=True ) # Generate and upload requirements.txt import_statements = self.dependency_manager.extract_import_statements(code) requirements_content = self.dependency_manager.generate_requirements_txt_with_llm(import_statements) # Upload requirements.txt with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as f: f.write(requirements_content) requirements_temp_path = f.name try: api.upload_file( path_or_fileobj=requirements_temp_path, path_in_repo="requirements.txt", repo_id=repo_id, repo_type="space" ) finally: os.unlink(requirements_temp_path) # Add anycoder tag self.space_manager.add_anycoder_tag_to_readme(api, repo_id) # Upload the main Streamlit app with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f: f.write(code) temp_path = f.name try: api.upload_file( path_or_fileobj=temp_path, path_in_repo="src/streamlit_app.py", repo_id=repo_id, repo_type="space" ) space_url = f"https://huggingface.co/spaces/{repo_id}" action_text = "Updated" if is_update else "Deployed" return f"✅ {action_text}! [Open your Space here]({space_url})" finally: os.unlink(temp_path) except Exception as e: error_msg = str(e) if "403 Forbidden" in error_msg: return f"Error: Permission denied. Please ensure you have write access to {repo_id}." else: action_verb = "updating" if is_update else "creating" return f"Error {action_verb} Streamlit space: {error_msg}" def _deploy_transformers_js(self, code: str, repo_id: str, is_update: bool, token) -> str: """Deploy Transformers.js app""" api = HfApi(token=token.token) try: if not is_update: # Duplicate template space duplicated_repo = duplicate_space( from_id="static-templates/transformers.js", to_id=repo_id, token=token.token, exist_ok=True ) # Parse transformers.js output files = parse_transformers_js_output(code) if not all(files.values()): return "Error: Could not parse transformers.js output. Please regenerate the code." # Upload the three files files_to_upload = [ ("index.html", files['index.html']), ("index.js", files['index.js']), ("style.css", files['style.css']) ] for file_name, file_content in files_to_upload: with tempfile.NamedTemporaryFile("w", suffix=f".{file_name.split('.')[-1]}", delete=False) as f: f.write(file_content) temp_path = f.name try: api.upload_file( path_or_fileobj=temp_path, path_in_repo=file_name, repo_id=repo_id, repo_type="space" ) finally: os.unlink(temp_path) # Add anycoder tag self.space_manager.add_anycoder_tag_to_readme(api, repo_id) # Restart space if updating if is_update: try: api.restart_space(repo_id=repo_id) except Exception as restart_error: print(f"[Deploy] Could not restart space after update: {restart_error}") space_url = f"https://huggingface.co/spaces/{repo_id}" action_text = "Updated" if is_update else "Deployed" return f"✅ {action_text}! [Open your Transformers.js Space here]({space_url})" except Exception as e: error_msg = str(e) if "403 Forbidden" in error_msg: return f"Error: Permission denied. Please ensure you have write access to {repo_id}." else: action_verb = "updating" if is_update else "creating" return f"Error {action_verb} Transformers.js space: {error_msg}" def _deploy_svelte(self, code: str, repo_id: str, is_update: bool, token) -> str: """Deploy Svelte app""" if is_update: return "Error: Svelte space updates not yet supported. Please create a new space." api = HfApi(token=token.token) try: # Duplicate template space duplicated_repo = duplicate_space( from_id="static-templates/svelte", to_id=repo_id, token=token.token, exist_ok=True ) # Parse Svelte output files = parse_svelte_output(code) if not files['src/App.svelte']: return "Error: Could not parse Svelte output. Please regenerate the code." # Upload src/App.svelte with tempfile.NamedTemporaryFile("w", suffix=".svelte", delete=False) as f: f.write(files['src/App.svelte']) temp_path = f.name try: api.upload_file( path_or_fileobj=temp_path, path_in_repo="src/App.svelte", repo_id=repo_id, repo_type="space" ) finally: os.unlink(temp_path) # Upload src/app.css if present if files['src/app.css']: with tempfile.NamedTemporaryFile("w", suffix=".css", delete=False) as f: f.write(files['src/app.css']) temp_path = f.name try: api.upload_file( path_or_fileobj=temp_path, path_in_repo="src/app.css", repo_id=repo_id, repo_type="space" ) finally: os.unlink(temp_path) # Add anycoder tag self.space_manager.add_anycoder_tag_to_readme(api, repo_id) space_url = f"https://huggingface.co/spaces/{repo_id}" return f"✅ Deployed! [Open your Svelte Space here]({space_url})" except Exception as e: return f"Error creating Svelte space: {str(e)}" def _deploy_static_html(self, code: str, repo_id: str, is_update: bool, token) -> str: """Deploy static HTML app""" api = HfApi(token=token.token) try: if not is_update: # Create the static space api.create_repo( repo_id=repo_id, repo_type="space", space_sdk="static", exist_ok=True ) # Add anycoder tag self.space_manager.add_anycoder_tag_to_readme(api, repo_id) # Detect multi-file structure files = parse_multipage_html_output(code) files = validate_and_autofix_files(files) if isinstance(files, dict) and files.get('index.html'): # Multi-file deployment with tempfile.TemporaryDirectory() as tmpdir: # Write each file preserving subdirectories for rel_path, content in files.items(): safe_rel_path = rel_path.strip().lstrip('/') abs_path = os.path.join(tmpdir, safe_rel_path) os.makedirs(os.path.dirname(abs_path), exist_ok=True) with open(abs_path, 'w') as fh: fh.write(content) # Upload the entire folder api.upload_folder( folder_path=tmpdir, repo_id=repo_id, repo_type="space" ) else: # Single HTML file deployment with tempfile.NamedTemporaryFile("w", suffix=".html", delete=False) as f: f.write(code) temp_path = f.name try: api.upload_file( path_or_fileobj=temp_path, path_in_repo="index.html", repo_id=repo_id, repo_type="space" ) finally: os.unlink(temp_path) space_url = f"https://huggingface.co/spaces/{repo_id}" action_text = "Updated" if is_update else "Deployed" return f"✅ {action_text}! [Open your Space here]({space_url})" except Exception as e: error_msg = str(e) if "403 Forbidden" in error_msg: return f"Error: Permission denied. Please ensure you have write access to {repo_id}." else: return f"Error deploying static HTML: {error_msg}" def _deploy_gradio(self, code: str, repo_id: str, is_update: bool, token) -> str: """Deploy Gradio app""" api = HfApi(token=token.token) try: if not is_update: # Create the Gradio space api.create_repo( repo_id=repo_id, repo_type="space", space_sdk="gradio", exist_ok=True ) # Generate and upload requirements.txt import_statements = self.dependency_manager.extract_import_statements(code) requirements_content = self.dependency_manager.generate_requirements_txt_with_llm(import_statements) with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as f: f.write(requirements_content) requirements_temp_path = f.name try: api.upload_file( path_or_fileobj=requirements_temp_path, path_in_repo="requirements.txt", repo_id=repo_id, repo_type="space" ) finally: os.unlink(requirements_temp_path) # Add anycoder tag self.space_manager.add_anycoder_tag_to_readme(api, repo_id) # Upload the main app.py file with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f: f.write(code) temp_path = f.name try: api.upload_file( path_or_fileobj=temp_path, path_in_repo="app.py", repo_id=repo_id, repo_type="space" ) space_url = f"https://huggingface.co/spaces/{repo_id}" action_text = "Updated" if is_update else "Deployed" return f"✅ {action_text}! [Open your Space here]({space_url})" finally: os.unlink(temp_path) except Exception as e: error_msg = str(e) if "403 Forbidden" in error_msg: return f"Error: Permission denied. Please ensure you have write access to {repo_id}." else: return f"Error deploying Gradio app: {error_msg}" def deploy_to_spaces(self, code: str) -> None: """Deploy using the old method (opens in new tab)""" if not code or not code.strip(): return # Wrap HTML in Gradio app app_py = self._wrap_html_in_gradio_app(code.strip()) base_url = "https://huggingface.co/new-space" params = urllib.parse.urlencode({ "name": "new-space", "sdk": "gradio" }) files_params = urllib.parse.urlencode({ "files[0][path]": "app.py", "files[0][content]": app_py }) full_url = f"{base_url}?{params}&{files_params}" webbrowser.open_new_tab(full_url) def deploy_to_spaces_static(self, code: str) -> None: """Deploy using static SDK (opens in new tab)""" if not code or not code.strip(): return base_url = "https://huggingface.co/new-space" params = urllib.parse.urlencode({ "name": "new-space", "sdk": "static" }) files_params = urllib.parse.urlencode({ "files[0][path]": "index.html", "files[0][content]": code.strip() }) full_url = f"{base_url}?{params}&{files_params}" webbrowser.open_new_tab(full_url) def _wrap_html_in_gradio_app(self, html_code: str) -> str: """Wrap HTML code in a Gradio app""" # Escape triple quotes for safe embedding safe_html = html_code.replace('"""', r'\"\"\"') # Extract import statements and generate requirements.txt comment import_statements = self.dependency_manager.extract_import_statements(html_code) requirements_comment = "" if import_statements: requirements_content = self.dependency_manager.generate_requirements_txt_with_llm(import_statements) requirements_comment = ( "# Generated requirements.txt content (create this file manually if needed):\n" + '\n'.join(f"# {line}" for line in requirements_content.strip().split('\n')) + '\n\n' ) return ( f'{requirements_comment}' 'import gradio as gr\n\n' 'def show_html():\n' f' return """{safe_html}"""\n\n' 'demo = gr.Interface(fn=show_html, inputs=None, outputs=gr.HTML())\n\n' 'if __name__ == "__main__":\n' ' demo.launch()\n' ) # Global deployer instance deployer = Deployer() project_importer = ProjectImporter() # Export main functions def deploy_to_user_space(code: str, space_name: str, sdk_name: str, profile: Optional[gr.OAuthProfile] = None, token: Optional[gr.OAuthToken] = None) -> str: return deployer.deploy_to_user_space(code, space_name, sdk_name, profile, token) def deploy_to_spaces(code: str) -> None: return deployer.deploy_to_spaces(code) def deploy_to_spaces_static(code: str) -> None: return deployer.deploy_to_spaces_static(code) def load_project_from_url(url: str) -> Tuple[str, str]: return project_importer.load_project_from_url(url) def extract_import_statements(code: str) -> List[str]: return deployer.dependency_manager.extract_import_statements(code) def generate_requirements_txt_with_llm(import_statements: List[str]) -> str: return deployer.dependency_manager.generate_requirements_txt_with_llm(import_statements)