|
|
|
|
|
import os |
|
import re |
|
import ast |
|
import tempfile |
|
import time |
|
import webbrowser |
|
import urllib.parse |
|
from typing import Optional, List, Dict, Tuple, Union |
|
import gradio as gr |
|
from huggingface_hub import HfApi, duplicate_space, list_repo_files |
|
import requests |
|
|
|
from utils import get_inference_client |
|
from code_processing import parse_transformers_js_output, parse_svelte_output, parse_multipage_html_output, validate_and_autofix_files |
|
from web_utils import check_hf_space_url, parse_repo_or_model_url |
|
|
|
class DependencyManager: |
|
"""Handles Python dependency extraction and requirements.txt generation""" |
|
|
|
@staticmethod |
|
def extract_import_statements(code: str) -> List[str]: |
|
"""Extract import statements from generated code""" |
|
import_statements = [] |
|
|
|
|
|
builtin_modules = { |
|
'os', 'sys', 'json', 'time', 'datetime', 'random', 'math', 're', 'collections', |
|
'itertools', 'functools', 'pathlib', 'urllib', 'http', 'email', 'html', 'xml', |
|
'csv', 'tempfile', 'shutil', 'subprocess', 'threading', 'multiprocessing', |
|
'asyncio', 'logging', 'typing', 'base64', 'hashlib', 'secrets', 'uuid', |
|
'copy', 'pickle', 'io', 'contextlib', 'warnings', 'sqlite3', 'gzip', 'zipfile', |
|
'tarfile', 'socket', 'ssl', 'platform', 'getpass', 'pwd', 'grp', 'stat', |
|
'glob', 'fnmatch', 'linecache', 'traceback', 'inspect', 'keyword', 'token', |
|
'tokenize', 'ast', 'code', 'codeop', 'dis', 'py_compile', 'compileall', |
|
'importlib', 'pkgutil', 'modulefinder', 'runpy', 'site', 'sysconfig' |
|
} |
|
|
|
try: |
|
|
|
tree = ast.parse(code) |
|
|
|
for node in ast.walk(tree): |
|
if isinstance(node, ast.Import): |
|
for alias in node.names: |
|
module_name = alias.name.split('.')[0] |
|
if module_name not in builtin_modules and not module_name.startswith('_'): |
|
import_statements.append(f"import {alias.name}") |
|
|
|
elif isinstance(node, ast.ImportFrom): |
|
if node.module: |
|
module_name = node.module.split('.')[0] |
|
if module_name not in builtin_modules and not module_name.startswith('_'): |
|
names = [alias.name for alias in node.names] |
|
import_statements.append(f"from {node.module} import {', '.join(names)}") |
|
|
|
except SyntaxError: |
|
|
|
for line in code.split('\n'): |
|
line = line.strip() |
|
if line.startswith('import ') or line.startswith('from '): |
|
|
|
if line.startswith('import '): |
|
module_name = line.split()[1].split('.')[0] |
|
elif line.startswith('from '): |
|
module_name = line.split()[1].split('.')[0] |
|
|
|
if module_name not in builtin_modules and not module_name.startswith('_'): |
|
import_statements.append(line) |
|
|
|
return list(set(import_statements)) |
|
|
|
@staticmethod |
|
def generate_requirements_txt_with_llm(import_statements: List[str]) -> str: |
|
"""Generate requirements.txt content using LLM based on import statements""" |
|
if not import_statements: |
|
return "# No additional dependencies required\n" |
|
|
|
try: |
|
client = get_inference_client("Qwen/Qwen3-Coder-480B-A35B-Instruct", "auto") |
|
|
|
imports_text = '\n'.join(import_statements) |
|
|
|
prompt = f"""Based on the following Python import statements, generate a comprehensive requirements.txt file with all necessary and commonly used related packages: |
|
|
|
{imports_text} |
|
|
|
Instructions: |
|
- Include the direct packages needed for the imports |
|
- Include commonly used companion packages and dependencies for better functionality |
|
- Use correct PyPI package names (e.g., cv2 -> opencv-python, PIL -> Pillow, sklearn -> scikit-learn) |
|
- Examples of comprehensive dependencies: |
|
* transformers often needs: accelerate, torch, tokenizers, datasets |
|
* gradio often needs: requests, Pillow for image handling |
|
* pandas often needs: numpy, openpyxl for Excel files |
|
* matplotlib often needs: numpy, pillow for image saving |
|
* sklearn often needs: numpy, scipy, joblib |
|
* streamlit often needs: pandas, numpy, requests |
|
* opencv-python often needs: numpy, pillow |
|
* fastapi often needs: uvicorn, pydantic |
|
* torch often needs: torchvision, torchaudio (if doing computer vision/audio) |
|
- Include packages for common file formats if relevant (openpyxl, python-docx, PyPDF2) |
|
- Do not include Python built-in modules |
|
- Do not specify versions unless there are known compatibility issues |
|
- One package per line |
|
- If no external packages are needed, return "# No additional dependencies required" |
|
|
|
Generate a comprehensive requirements.txt that ensures the application will work smoothly:""" |
|
|
|
messages = [ |
|
{"role": "system", "content": "You are a Python packaging expert specializing in creating comprehensive, production-ready requirements.txt files. Your goal is to ensure applications work smoothly by including not just direct dependencies but also commonly needed companion packages, popular extensions, and supporting libraries that developers typically need together."}, |
|
{"role": "user", "content": prompt} |
|
] |
|
|
|
response = client.chat.completions.create( |
|
model="Qwen/Qwen3-Coder-480B-A35B-Instruct", |
|
messages=messages, |
|
max_tokens=1024, |
|
temperature=0.1 |
|
) |
|
|
|
requirements_content = response.choices[0].message.content.strip() |
|
|
|
|
|
if '```' in requirements_content: |
|
lines = requirements_content.split('\n') |
|
in_code_block = False |
|
clean_lines = [] |
|
for line in lines: |
|
if line.strip().startswith('```'): |
|
in_code_block = not in_code_block |
|
continue |
|
if in_code_block: |
|
clean_lines.append(line) |
|
requirements_content = '\n'.join(clean_lines).strip() |
|
|
|
|
|
if requirements_content and not requirements_content.endswith('\n'): |
|
requirements_content += '\n' |
|
|
|
return requirements_content if requirements_content else "# No additional dependencies required\n" |
|
|
|
except Exception as e: |
|
print(f"[Dependencies] LLM generation failed, using fallback: {e}") |
|
|
|
return DependencyManager._generate_requirements_fallback(import_statements) |
|
|
|
@staticmethod |
|
def _generate_requirements_fallback(import_statements: List[str]) -> str: |
|
"""Fallback requirements generation with basic mapping""" |
|
dependencies = set() |
|
special_cases = { |
|
'cv2': 'opencv-python', |
|
'PIL': 'Pillow', |
|
'sklearn': 'scikit-learn', |
|
'skimage': 'scikit-image', |
|
'bs4': 'beautifulsoup4' |
|
} |
|
|
|
for stmt in import_statements: |
|
if stmt.startswith('import '): |
|
module_name = stmt.split()[1].split('.')[0] |
|
package_name = special_cases.get(module_name, module_name) |
|
dependencies.add(package_name) |
|
elif stmt.startswith('from '): |
|
module_name = stmt.split()[1].split('.')[0] |
|
package_name = special_cases.get(module_name, module_name) |
|
dependencies.add(package_name) |
|
|
|
if dependencies: |
|
return '\n'.join(sorted(dependencies)) + '\n' |
|
else: |
|
return "# No additional dependencies required\n" |
|
|
|
class SpaceManager: |
|
"""Handles Hugging Face Space operations""" |
|
|
|
@staticmethod |
|
def add_anycoder_tag_to_readme(api: HfApi, repo_id: str) -> None: |
|
"""Download existing README, add anycoder tag, and upload back""" |
|
try: |
|
|
|
readme_path = api.hf_hub_download( |
|
repo_id=repo_id, |
|
filename="README.md", |
|
repo_type="space" |
|
) |
|
|
|
|
|
with open(readme_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
|
|
if content.startswith('---'): |
|
|
|
parts = content.split('---', 2) |
|
if len(parts) >= 3: |
|
frontmatter = parts[1].strip() |
|
body = parts[2] if len(parts) > 2 else "" |
|
|
|
|
|
if 'tags:' in frontmatter: |
|
|
|
if '- anycoder' not in frontmatter: |
|
frontmatter = re.sub(r'(tags:\s*\n(?:\s*-\s*[^\n]+\n)*)', r'\1- anycoder\n', frontmatter) |
|
else: |
|
|
|
frontmatter += '\ntags:\n- anycoder' |
|
|
|
|
|
new_content = f"---\n{frontmatter}\n---{body}" |
|
else: |
|
|
|
new_content = content.replace('---', '---\ntags:\n- anycoder\n---', 1) |
|
else: |
|
|
|
new_content = f"---\ntags:\n- anycoder\n---\n\n{content}" |
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".md", delete=False, encoding='utf-8') as f: |
|
f.write(new_content) |
|
temp_path = f.name |
|
|
|
api.upload_file( |
|
path_or_fileobj=temp_path, |
|
path_in_repo="README.md", |
|
repo_id=repo_id, |
|
repo_type="space" |
|
) |
|
|
|
os.unlink(temp_path) |
|
print(f"[SpaceManager] Added anycoder tag to {repo_id}") |
|
|
|
except Exception as e: |
|
print(f"[SpaceManager] Could not modify README.md: {e}") |
|
|
|
class ProjectImporter: |
|
"""Handles importing projects from various sources""" |
|
|
|
@staticmethod |
|
def fetch_hf_space_content(username: str, project_name: str) -> str: |
|
"""Fetch content from a Hugging Face Space""" |
|
try: |
|
api = HfApi() |
|
space_info = api.space_info(f"{username}/{project_name}") |
|
|
|
|
|
sdk = space_info.sdk |
|
main_file = None |
|
|
|
|
|
if sdk == "static": |
|
file_patterns = ["index.html"] |
|
elif sdk == "gradio": |
|
file_patterns = ["app.py", "main.py", "gradio_app.py"] |
|
elif sdk == "streamlit": |
|
file_patterns = [ |
|
"streamlit_app.py", "src/streamlit_app.py", "app.py", "src/app.py", |
|
"main.py", "src/main.py", "Home.py", "src/Home.py", |
|
"🏠_Home.py", "src/🏠_Home.py", "1_🏠_Home.py", "src/1_🏠_Home.py" |
|
] |
|
else: |
|
file_patterns = [ |
|
"app.py", "src/app.py", "index.html", "streamlit_app.py", |
|
"src/streamlit_app.py", "main.py", "src/main.py", "Home.py", "src/Home.py" |
|
] |
|
|
|
|
|
for file in file_patterns: |
|
try: |
|
content = api.hf_hub_download( |
|
repo_id=f"{username}/{project_name}", |
|
filename=file, |
|
repo_type="space" |
|
) |
|
main_file = file |
|
break |
|
except: |
|
continue |
|
|
|
|
|
if not main_file and sdk in ["streamlit", "gradio"]: |
|
try: |
|
files = list_repo_files(repo_id=f"{username}/{project_name}", repo_type="space") |
|
|
|
|
|
python_files = [f for f in files if f.endswith('.py') and not f.startswith('.') and |
|
(('/' not in f) or f.startswith('src/'))] |
|
|
|
for py_file in python_files: |
|
try: |
|
content = api.hf_hub_download( |
|
repo_id=f"{username}/{project_name}", |
|
filename=py_file, |
|
repo_type="space" |
|
) |
|
main_file = py_file |
|
break |
|
except: |
|
continue |
|
except: |
|
pass |
|
|
|
if main_file: |
|
content = api.hf_hub_download( |
|
repo_id=f"{username}/{project_name}", |
|
filename=main_file, |
|
repo_type="space" |
|
) |
|
|
|
|
|
with open(content, 'r', encoding='utf-8') as f: |
|
file_content = f.read() |
|
|
|
return f"""IMPORTED PROJECT FROM HUGGING FACE SPACE |
|
============================================== |
|
|
|
Space: {username}/{project_name} |
|
SDK: {sdk} |
|
Main File: {main_file} |
|
|
|
{file_content}""" |
|
else: |
|
|
|
try: |
|
files = list_repo_files(repo_id=f"{username}/{project_name}", repo_type="space") |
|
available_files = [f for f in files if not f.startswith('.') and not f.endswith('.md')] |
|
return f"Error: Could not find main file in space {username}/{project_name}.\n\nSDK: {sdk}\nAvailable files: {', '.join(available_files[:10])}{'...' if len(available_files) > 10 else ''}\n\nTried looking for: {', '.join(file_patterns)}" |
|
except: |
|
return f"Error: Could not find main file in space {username}/{project_name}. Expected files for {sdk} SDK: {', '.join(file_patterns) if 'file_patterns' in locals() else 'standard files'}" |
|
|
|
except Exception as e: |
|
return f"Error fetching space content: {str(e)}" |
|
|
|
@staticmethod |
|
def load_project_from_url(url: str) -> Tuple[str, str]: |
|
"""Load project from Hugging Face Space URL""" |
|
|
|
is_valid, username, project_name = check_hf_space_url(url) |
|
|
|
if not is_valid: |
|
return "Error: Please enter a valid Hugging Face Spaces URL.\n\nExpected format: https://huggingface.co/spaces/username/project", "" |
|
|
|
|
|
content = ProjectImporter.fetch_hf_space_content(username, project_name) |
|
|
|
if content.startswith("Error:"): |
|
return content, "" |
|
|
|
|
|
lines = content.split('\n') |
|
code_start = 0 |
|
for i, line in enumerate(lines): |
|
|
|
if (line.strip() and |
|
not line.startswith('=') and |
|
not line.startswith('IMPORTED PROJECT') and |
|
not line.startswith('Space:') and |
|
not line.startswith('SDK:') and |
|
not line.startswith('Main File:')): |
|
code_start = i |
|
break |
|
|
|
code_content = '\n'.join(lines[code_start:]) |
|
|
|
return f"✅ Successfully imported project from {username}/{project_name}", code_content |
|
|
|
class Deployer: |
|
"""Handles deployment to various platforms""" |
|
|
|
def __init__(self): |
|
self.dependency_manager = DependencyManager() |
|
self.space_manager = SpaceManager() |
|
|
|
def deploy_to_user_space(self, code: str, space_name: str, sdk_name: str, |
|
profile: Optional[gr.OAuthProfile] = None, |
|
token: Optional[gr.OAuthToken] = None) -> str: |
|
"""Deploy code to user's Hugging Face Space""" |
|
if not code or not code.strip(): |
|
return "No code to deploy." |
|
|
|
if profile is None or token is None: |
|
return "Please log in with your Hugging Face account to deploy to your own Space." |
|
|
|
|
|
if not token.token or token.token == "hf_": |
|
return "Error: Invalid token. Please log in again with your Hugging Face account to get a valid write token." |
|
|
|
|
|
is_update = "/" in space_name.strip() |
|
if is_update: |
|
repo_id = space_name.strip() |
|
space_username = repo_id.split('/')[0] |
|
if space_username != profile.username: |
|
return f"Error: You can only update your own spaces. This space belongs to {space_username}." |
|
else: |
|
repo_id = f"{profile.username}/{space_name.strip()}" |
|
|
|
|
|
if is_update: |
|
try: |
|
api = HfApi(token=token.token) |
|
space_info = api.space_info(repo_id) |
|
if not space_info: |
|
return f"Error: Could not access space {repo_id}. Please check your permissions." |
|
except Exception as e: |
|
return f"Error: No write access to space {repo_id}. Error: {str(e)}" |
|
|
|
|
|
try: |
|
if sdk_name == "Streamlit (Python)": |
|
return self._deploy_streamlit(code, repo_id, is_update, token) |
|
elif sdk_name == "Transformers.js": |
|
return self._deploy_transformers_js(code, repo_id, is_update, token) |
|
elif sdk_name == "Svelte": |
|
return self._deploy_svelte(code, repo_id, is_update, token) |
|
elif sdk_name == "Static (HTML)": |
|
return self._deploy_static_html(code, repo_id, is_update, token) |
|
else: |
|
return self._deploy_gradio(code, repo_id, is_update, token) |
|
except Exception as e: |
|
return f"Error during deployment: {str(e)}" |
|
|
|
def _deploy_streamlit(self, code: str, repo_id: str, is_update: bool, token) -> str: |
|
"""Deploy Streamlit app""" |
|
api = HfApi(token=token.token) |
|
|
|
try: |
|
if not is_update: |
|
|
|
duplicated_repo = duplicate_space( |
|
from_id="streamlit/streamlit-template-space", |
|
to_id=repo_id.split('/')[-1], |
|
token=token.token, |
|
exist_ok=True |
|
) |
|
|
|
|
|
import_statements = self.dependency_manager.extract_import_statements(code) |
|
requirements_content = self.dependency_manager.generate_requirements_txt_with_llm(import_statements) |
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as f: |
|
f.write(requirements_content) |
|
requirements_temp_path = f.name |
|
|
|
try: |
|
api.upload_file( |
|
path_or_fileobj=requirements_temp_path, |
|
path_in_repo="requirements.txt", |
|
repo_id=repo_id, |
|
repo_type="space" |
|
) |
|
finally: |
|
os.unlink(requirements_temp_path) |
|
|
|
|
|
self.space_manager.add_anycoder_tag_to_readme(api, repo_id) |
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f: |
|
f.write(code) |
|
temp_path = f.name |
|
|
|
try: |
|
api.upload_file( |
|
path_or_fileobj=temp_path, |
|
path_in_repo="src/streamlit_app.py", |
|
repo_id=repo_id, |
|
repo_type="space" |
|
) |
|
|
|
space_url = f"https://huggingface.co/spaces/{repo_id}" |
|
action_text = "Updated" if is_update else "Deployed" |
|
return f"✅ {action_text}! [Open your Space here]({space_url})" |
|
|
|
finally: |
|
os.unlink(temp_path) |
|
|
|
except Exception as e: |
|
error_msg = str(e) |
|
if "403 Forbidden" in error_msg: |
|
return f"Error: Permission denied. Please ensure you have write access to {repo_id}." |
|
else: |
|
action_verb = "updating" if is_update else "creating" |
|
return f"Error {action_verb} Streamlit space: {error_msg}" |
|
|
|
def _deploy_transformers_js(self, code: str, repo_id: str, is_update: bool, token) -> str: |
|
"""Deploy Transformers.js app""" |
|
api = HfApi(token=token.token) |
|
|
|
try: |
|
if not is_update: |
|
|
|
duplicated_repo = duplicate_space( |
|
from_id="static-templates/transformers.js", |
|
to_id=repo_id, |
|
token=token.token, |
|
exist_ok=True |
|
) |
|
|
|
|
|
files = parse_transformers_js_output(code) |
|
if not all(files.values()): |
|
return "Error: Could not parse transformers.js output. Please regenerate the code." |
|
|
|
|
|
files_to_upload = [ |
|
("index.html", files['index.html']), |
|
("index.js", files['index.js']), |
|
("style.css", files['style.css']) |
|
] |
|
|
|
for file_name, file_content in files_to_upload: |
|
with tempfile.NamedTemporaryFile("w", suffix=f".{file_name.split('.')[-1]}", delete=False) as f: |
|
f.write(file_content) |
|
temp_path = f.name |
|
|
|
try: |
|
api.upload_file( |
|
path_or_fileobj=temp_path, |
|
path_in_repo=file_name, |
|
repo_id=repo_id, |
|
repo_type="space" |
|
) |
|
finally: |
|
os.unlink(temp_path) |
|
|
|
|
|
self.space_manager.add_anycoder_tag_to_readme(api, repo_id) |
|
|
|
|
|
if is_update: |
|
try: |
|
api.restart_space(repo_id=repo_id) |
|
except Exception as restart_error: |
|
print(f"[Deploy] Could not restart space after update: {restart_error}") |
|
|
|
space_url = f"https://huggingface.co/spaces/{repo_id}" |
|
action_text = "Updated" if is_update else "Deployed" |
|
return f"✅ {action_text}! [Open your Transformers.js Space here]({space_url})" |
|
|
|
except Exception as e: |
|
error_msg = str(e) |
|
if "403 Forbidden" in error_msg: |
|
return f"Error: Permission denied. Please ensure you have write access to {repo_id}." |
|
else: |
|
action_verb = "updating" if is_update else "creating" |
|
return f"Error {action_verb} Transformers.js space: {error_msg}" |
|
|
|
def _deploy_svelte(self, code: str, repo_id: str, is_update: bool, token) -> str: |
|
"""Deploy Svelte app""" |
|
if is_update: |
|
return "Error: Svelte space updates not yet supported. Please create a new space." |
|
|
|
api = HfApi(token=token.token) |
|
|
|
try: |
|
|
|
duplicated_repo = duplicate_space( |
|
from_id="static-templates/svelte", |
|
to_id=repo_id, |
|
token=token.token, |
|
exist_ok=True |
|
) |
|
|
|
|
|
files = parse_svelte_output(code) |
|
if not files['src/App.svelte']: |
|
return "Error: Could not parse Svelte output. Please regenerate the code." |
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".svelte", delete=False) as f: |
|
f.write(files['src/App.svelte']) |
|
temp_path = f.name |
|
|
|
try: |
|
api.upload_file( |
|
path_or_fileobj=temp_path, |
|
path_in_repo="src/App.svelte", |
|
repo_id=repo_id, |
|
repo_type="space" |
|
) |
|
finally: |
|
os.unlink(temp_path) |
|
|
|
|
|
if files['src/app.css']: |
|
with tempfile.NamedTemporaryFile("w", suffix=".css", delete=False) as f: |
|
f.write(files['src/app.css']) |
|
temp_path = f.name |
|
|
|
try: |
|
api.upload_file( |
|
path_or_fileobj=temp_path, |
|
path_in_repo="src/app.css", |
|
repo_id=repo_id, |
|
repo_type="space" |
|
) |
|
finally: |
|
os.unlink(temp_path) |
|
|
|
|
|
self.space_manager.add_anycoder_tag_to_readme(api, repo_id) |
|
|
|
space_url = f"https://huggingface.co/spaces/{repo_id}" |
|
return f"✅ Deployed! [Open your Svelte Space here]({space_url})" |
|
|
|
except Exception as e: |
|
return f"Error creating Svelte space: {str(e)}" |
|
|
|
def _deploy_static_html(self, code: str, repo_id: str, is_update: bool, token) -> str: |
|
"""Deploy static HTML app""" |
|
api = HfApi(token=token.token) |
|
|
|
try: |
|
if not is_update: |
|
|
|
api.create_repo( |
|
repo_id=repo_id, |
|
repo_type="space", |
|
space_sdk="static", |
|
exist_ok=True |
|
) |
|
|
|
|
|
self.space_manager.add_anycoder_tag_to_readme(api, repo_id) |
|
|
|
|
|
files = parse_multipage_html_output(code) |
|
files = validate_and_autofix_files(files) |
|
|
|
if isinstance(files, dict) and files.get('index.html'): |
|
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
|
|
for rel_path, content in files.items(): |
|
safe_rel_path = rel_path.strip().lstrip('/') |
|
abs_path = os.path.join(tmpdir, safe_rel_path) |
|
os.makedirs(os.path.dirname(abs_path), exist_ok=True) |
|
with open(abs_path, 'w') as fh: |
|
fh.write(content) |
|
|
|
|
|
api.upload_folder( |
|
folder_path=tmpdir, |
|
repo_id=repo_id, |
|
repo_type="space" |
|
) |
|
else: |
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".html", delete=False) as f: |
|
f.write(code) |
|
temp_path = f.name |
|
|
|
try: |
|
api.upload_file( |
|
path_or_fileobj=temp_path, |
|
path_in_repo="index.html", |
|
repo_id=repo_id, |
|
repo_type="space" |
|
) |
|
finally: |
|
os.unlink(temp_path) |
|
|
|
space_url = f"https://huggingface.co/spaces/{repo_id}" |
|
action_text = "Updated" if is_update else "Deployed" |
|
return f"✅ {action_text}! [Open your Space here]({space_url})" |
|
|
|
except Exception as e: |
|
error_msg = str(e) |
|
if "403 Forbidden" in error_msg: |
|
return f"Error: Permission denied. Please ensure you have write access to {repo_id}." |
|
else: |
|
return f"Error deploying static HTML: {error_msg}" |
|
|
|
def _deploy_gradio(self, code: str, repo_id: str, is_update: bool, token) -> str: |
|
"""Deploy Gradio app""" |
|
api = HfApi(token=token.token) |
|
|
|
try: |
|
if not is_update: |
|
|
|
api.create_repo( |
|
repo_id=repo_id, |
|
repo_type="space", |
|
space_sdk="gradio", |
|
exist_ok=True |
|
) |
|
|
|
|
|
import_statements = self.dependency_manager.extract_import_statements(code) |
|
requirements_content = self.dependency_manager.generate_requirements_txt_with_llm(import_statements) |
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as f: |
|
f.write(requirements_content) |
|
requirements_temp_path = f.name |
|
|
|
try: |
|
api.upload_file( |
|
path_or_fileobj=requirements_temp_path, |
|
path_in_repo="requirements.txt", |
|
repo_id=repo_id, |
|
repo_type="space" |
|
) |
|
finally: |
|
os.unlink(requirements_temp_path) |
|
|
|
|
|
self.space_manager.add_anycoder_tag_to_readme(api, repo_id) |
|
|
|
|
|
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f: |
|
f.write(code) |
|
temp_path = f.name |
|
|
|
try: |
|
api.upload_file( |
|
path_or_fileobj=temp_path, |
|
path_in_repo="app.py", |
|
repo_id=repo_id, |
|
repo_type="space" |
|
) |
|
|
|
space_url = f"https://huggingface.co/spaces/{repo_id}" |
|
action_text = "Updated" if is_update else "Deployed" |
|
return f"✅ {action_text}! [Open your Space here]({space_url})" |
|
|
|
finally: |
|
os.unlink(temp_path) |
|
|
|
except Exception as e: |
|
error_msg = str(e) |
|
if "403 Forbidden" in error_msg: |
|
return f"Error: Permission denied. Please ensure you have write access to {repo_id}." |
|
else: |
|
return f"Error deploying Gradio app: {error_msg}" |
|
|
|
def deploy_to_spaces(self, code: str) -> None: |
|
"""Deploy using the old method (opens in new tab)""" |
|
if not code or not code.strip(): |
|
return |
|
|
|
|
|
app_py = self._wrap_html_in_gradio_app(code.strip()) |
|
base_url = "https://huggingface.co/new-space" |
|
params = urllib.parse.urlencode({ |
|
"name": "new-space", |
|
"sdk": "gradio" |
|
}) |
|
files_params = urllib.parse.urlencode({ |
|
"files[0][path]": "app.py", |
|
"files[0][content]": app_py |
|
}) |
|
full_url = f"{base_url}?{params}&{files_params}" |
|
webbrowser.open_new_tab(full_url) |
|
|
|
def deploy_to_spaces_static(self, code: str) -> None: |
|
"""Deploy using static SDK (opens in new tab)""" |
|
if not code or not code.strip(): |
|
return |
|
|
|
base_url = "https://huggingface.co/new-space" |
|
params = urllib.parse.urlencode({ |
|
"name": "new-space", |
|
"sdk": "static" |
|
}) |
|
files_params = urllib.parse.urlencode({ |
|
"files[0][path]": "index.html", |
|
"files[0][content]": code.strip() |
|
}) |
|
full_url = f"{base_url}?{params}&{files_params}" |
|
webbrowser.open_new_tab(full_url) |
|
|
|
def _wrap_html_in_gradio_app(self, html_code: str) -> str: |
|
"""Wrap HTML code in a Gradio app""" |
|
|
|
safe_html = html_code.replace('"""', r'\"\"\"') |
|
|
|
|
|
import_statements = self.dependency_manager.extract_import_statements(html_code) |
|
requirements_comment = "" |
|
if import_statements: |
|
requirements_content = self.dependency_manager.generate_requirements_txt_with_llm(import_statements) |
|
requirements_comment = ( |
|
"# Generated requirements.txt content (create this file manually if needed):\n" |
|
+ '\n'.join(f"# {line}" for line in requirements_content.strip().split('\n')) + '\n\n' |
|
) |
|
|
|
return ( |
|
f'{requirements_comment}' |
|
'import gradio as gr\n\n' |
|
'def show_html():\n' |
|
f' return """{safe_html}"""\n\n' |
|
'demo = gr.Interface(fn=show_html, inputs=None, outputs=gr.HTML())\n\n' |
|
'if __name__ == "__main__":\n' |
|
' demo.launch()\n' |
|
) |
|
|
|
|
|
deployer = Deployer() |
|
project_importer = ProjectImporter() |
|
|
|
|
|
def deploy_to_user_space(code: str, space_name: str, sdk_name: str, |
|
profile: Optional[gr.OAuthProfile] = None, |
|
token: Optional[gr.OAuthToken] = None) -> str: |
|
return deployer.deploy_to_user_space(code, space_name, sdk_name, profile, token) |
|
|
|
def deploy_to_spaces(code: str) -> None: |
|
return deployer.deploy_to_spaces(code) |
|
|
|
def deploy_to_spaces_static(code: str) -> None: |
|
return deployer.deploy_to_spaces_static(code) |
|
|
|
def load_project_from_url(url: str) -> Tuple[str, str]: |
|
return project_importer.load_project_from_url(url) |
|
|
|
def extract_import_statements(code: str) -> List[str]: |
|
return deployer.dependency_manager.extract_import_statements(code) |
|
|
|
def generate_requirements_txt_with_llm(import_statements: List[str]) -> str: |
|
return deployer.dependency_manager.generate_requirements_txt_with_llm(import_statements) |