Spaces:
Configuration error
Configuration error
File size: 11,446 Bytes
0d52beb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# import os
# import re
# import tempfile
# import shutil
# import git
# from huggingface_hub import (
# create_repo,
# upload_folder,
# list_repo_files,
# Repository,
# whoami,
# hf_hub_download, # New import
# )
# import logging
# from pathlib import Path
# from PIL import Image
# try:
# from keylock_decode import decode_from_image_pil
# KEYLOCK_DECODE_AVAILABLE = True
# except ImportError:
# KEYLOCK_DECODE_AVAILABLE = False
# decode_from_image_pil = None
# logging.warning("keylock-decode library not found. KeyLock Wallet image feature will be disabled.")
# logging.basicConfig(
# level=logging.INFO,
# format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# )
# logger = logging.getLogger(name)
# # --- Helper Function to Get API Token (Unchanged) ---
# def _get_api_token(ui_token_from_textbox=None):
# env_token = os.getenv('HF_TOKEN')
# if env_token: return env_token, None
# if ui_token_from_textbox: return ui_token_from_textbox, None
# return None, "Error: Hugging Face API token not provided."
# # --- load_token_from_image_and_set_env (Unchanged - Terminology and debug logic as before) ---
# def load_token_from_image_and_set_env(image_pil_object: Image.Image, password: str):
# if not KEYLOCK_DECODE_AVAILABLE: return "Error: KeyLock-Decode library is not installed."
# if image_pil_object is None: return "Error: No KeyLock Wallet image provided."
# if not password: return "Error: Password cannot be empty."
# status_messages_display = []
# # ... (rest of the function, ensure debug logic is as intended or removed)
# try:
# logger.info(f"Attempting to decode from KeyLock Wallet image...")
# decoded_data, status_msgs_from_lib = decode_from_image_pil(image_pil_object, password, set_environment_variables=True)
# status_messages_display.extend(status_msgs_from_lib)
# if decoded_data:
# status_messages_display.append("\nDecoded Data Summary (sensitive values masked):")
# for key, value in decoded_data.items():
# display_value = '********' if any(k_word in key.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else value
# status_messages_display.append(f"- {key}: {display_value}")
# if os.getenv('HF_TOKEN'): status_messages_display.append(f"\nSUCCESS: HF_TOKEN set from KeyLock Wallet image.")
# # ... (other status messages)
# except ValueError as e: status_messages_display.append(f"Decoding Error: {e}")
# except Exception as e: status_messages_display.append(f"Unexpected decoding error: {str(e)}")
# return "\n".join(status_messages_display)
# # --- parse_markdown (Unchanged from previous corrected version) ---
# def parse_markdown(markdown_input):
# space_info = {"repo_name_md": "", "owner_md": "", "files": []}
# current_file_path = None; current_file_content_lines = []
# in_file_definition = False; in_code_block = False
# lines = markdown_input.strip().split("\n")
# for line_content_orig in lines:
# line_content_stripped = line_content_orig.strip()
# if line_content_stripped.startswith("### File:"):
# if current_file_path and in_file_definition:
# space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines)})
# current_file_path = line_content_stripped.replace("### File:", "").strip()
# current_file_content_lines = []
# in_file_definition = True; in_code_block = False
# continue
# if not in_file_definition:
# if line_content_stripped.startswith("# Space:"):
# full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
# if "/" in full_space_name_md: space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
# else: space_info["repo_name_md"] = full_space_name_md
# continue
# if line_content_stripped.startswith("```"):
# in_code_block = not in_code_block
# continue
# current_file_content_lines.append(line_content_orig)
# if current_file_path and in_file_definition:
# space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines)})
# space_info["files"] = [f for f in space_info["files"] if f.get("path")]
# return space_info
# # --- _determine_repo_id (Unchanged) ---
# def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
# if not space_name_ui: return None, "Error: Space Name cannot be empty."
# if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field."
# final_owner = owner_ui; error_message = None
# if not final_owner:
# resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
# if token_err: return None, token_err
# if not resolved_api_token: return None, "Error: API token required for auto owner determination."
# try:
# user_info = whoami(token=resolved_api_token)
# if user_info and 'name' in user_info: final_owner = user_info['name']
# else: error_message = "Error: Could not retrieve username. Check token/permissions or specify Owner."
# except Exception as e: error_message = f"Error retrieving username: {str(e)}. Specify Owner."
# if error_message: return None, error_message
# if not final_owner: return None, "Error: Owner could not be determined."
# return f"{final_owner}/{space_name_ui}", None
# # --- New Function to Fetch File Content from Hub ---
# def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo):
# """Fetches content of a specific file from a Hugging Face Space."""
# repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
# try:
# resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
# if token_err:
# return None, token_err # Return error as second element for consistency
# repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
# if err:
# return None, err
# repo_id_for_error_logging = repo_id
# if not file_path_in_repo:
# return None, "Error: File path cannot be empty."
# logger.info(f"Attempting to download file: {file_path_in_repo} from Space: {repo_id}")
# downloaded_file_path = hf_hub_download(
# repo_id=repo_id,
# filename=file_path_in_repo,
# repo_type="space",
# token=resolved_api_token,
# # revision="main", # Optional: specify a branch/commit
# # cache_dir=... # Optional: manage cache
# )
# content = Path(downloaded_file_path).read_text(encoding="utf-8")
# logger.info(f"Successfully downloaded and read file: {file_path_in_repo} from {repo_id}")
# return content, None # Return content and no error
# except Exception as e:
# # Catch specific huggingface_hub.utils.HFValidationError for not found etc.
# if "404" in str(e) or "not found" in str(e).lower():
# logger.warning(f"File not found {file_path_in_repo} in {repo_id_for_error_logging}: {e}")
# return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging}'."
# logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging}:")
# return None, f"Error fetching file content: {str(e)}"
# # --- Function to list files (reused, but now distinct from fetching content) ---
# def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui):
# """Lists files in a Hugging Face Space, returns list or error."""
# repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
# try:
# resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
# if token_err: return None, token_err
# repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
# if err: return None, err
# repo_id_for_error_logging = repo_id
# files = list_repo_files(repo_id=repo_id, token=resolved_api_token, repo_type="space")
# if not files:
# return [], f"No files found in Space {repo_id}." # Return empty list and info message
# return files, None # Return list of files and no error
# except Exception as e:
# logger.exception(f"Error listing files for {repo_id_for_error_logging}:")
# return None, f"Error listing files for {repo_id_for_error_logging}: {str(e)}"
# # --- Core Functions: create_space, update_space_file (Unchanged from previous correct versions) ---
# def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input):
# repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
# try:
# resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
# if token_err: return token_err
# repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
# if err: return err
# repo_id_for_error_logging = repo_id
# space_info = parse_markdown(markdown_input)
# if not space_info["files"]: return "Error: No files found in markdown."
# with tempfile.TemporaryDirectory() as temp_dir:
# repo_staging_path = Path(temp_dir) / "repo_staging_content"
# repo_staging_path.mkdir(exist_ok=True)
# for file_info in space_info["files"]:
# if not file_info.get("path"): continue
# file_path_abs = repo_staging_path / file_info["path"]
# file_path_abs.parent.mkdir(parents=True, exist_ok=True)
# with open(file_path_abs, "w", encoding="utf-8") as f: f.write(file_info["content"])
# try:
# create_repo(repo_id=repo_id, token=resolved_api_token, repo_type="space", space_sdk=sdk_ui, private=False)
# except Exception as e:
# err_str = str(e).lower()
# if not ("already exists" in err_str or "you already created this repo" in err_str or "exists" in err_str):
# return f"Error creating Space '{repo_id}': {str(e)}"
# upload_folder(repo_id=repo_id, folder_path=str(repo_staging_path), path_in_repo=".", token=resolved_api_token, repo_type="space", commit_message=f"Initial Space setup of {repo_id} via Builder")
# return f"Successfully created/updated Space: {repo_id}"
# except Exception as e:
# logger.exception(f"Error in create_space for {repo_id_for_error_logging}:")
# return f"Error during Space creation/update: {str(e)}"
# def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
# repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
# try:
# resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
# if token_err: return token_err
# repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
# if err: return err
# repo_id_for_error_logging = repo_id
# if not file_path_in_repo: return "Error: File Path to update cannot be empty."
# file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
# commit_message_ui = commit_message_ui or f"Update {file_path_in_repo} via Space Builder"
# with tempfile.TemporaryDirectory() as temp_dir_for_update:
# repo_local_clone_path = Path(temp_dir_for_update) / "update_clone"
# cloned_repo = Repository(local_dir=str(repo_local_clone_path), clone_from=f"https://huggingface.co/spaces/{repo_id}", repo_type="space", use_auth_token=resolved_api_token, git_user="Space Builder Bot", git_email="space-builder@huggingface.co")
# full_local_file_path = Path(cloned_repo.local_dir) / file_path_in_repo
# full_local_file_path.parent.mkdir(parents=True, exist_ok=True)
# with open(full_local_file_path, "w", encoding="utf-8") as f: f.write(file_content)
# cloned_repo.push_to_hub(commit_message=commit_message_ui)
# return f"Successfully updated {file_path_in_repo} in Space {repo_id}"
# except Exception as e:
# logger.exception(f"Error in update_space_file for {repo_id_for_error_logging}, file {file_path_in_repo}:")
# return f"Error updating file for {repo_id_for_error_logging}: {str(e)}" |