File size: 11,446 Bytes
0d52beb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# import os
# import re
# import tempfile
# import shutil
# import git
# from huggingface_hub import (
# create_repo,
# upload_folder,
# list_repo_files,
# Repository,
# whoami,
# hf_hub_download, # New import
# )
# import logging
# from pathlib import Path
# from PIL import Image
# try:
# from keylock_decode import decode_from_image_pil
# KEYLOCK_DECODE_AVAILABLE = True
# except ImportError:
# KEYLOCK_DECODE_AVAILABLE = False
# decode_from_image_pil = None
# logging.warning("keylock-decode library not found. KeyLock Wallet image feature will be disabled.")
# logging.basicConfig(
# level=logging.INFO,
# format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# )
# logger = logging.getLogger(name)
# # --- Helper Function to Get API Token (Unchanged) ---
# def _get_api_token(ui_token_from_textbox=None):
# env_token = os.getenv('HF_TOKEN')
# if env_token: return env_token, None
# if ui_token_from_textbox: return ui_token_from_textbox, None
# return None, "Error: Hugging Face API token not provided."
# # --- load_token_from_image_and_set_env (Unchanged - Terminology and debug logic as before) ---
# def load_token_from_image_and_set_env(image_pil_object: Image.Image, password: str):
# if not KEYLOCK_DECODE_AVAILABLE: return "Error: KeyLock-Decode library is not installed."
# if image_pil_object is None: return "Error: No KeyLock Wallet image provided."
# if not password: return "Error: Password cannot be empty."
# status_messages_display = []
# # ... (rest of the function, ensure debug logic is as intended or removed)
# try:
# logger.info(f"Attempting to decode from KeyLock Wallet image...")
# decoded_data, status_msgs_from_lib = decode_from_image_pil(image_pil_object, password, set_environment_variables=True)
# status_messages_display.extend(status_msgs_from_lib)
# if decoded_data:
# status_messages_display.append("\nDecoded Data Summary (sensitive values masked):")
# for key, value in decoded_data.items():
# display_value = '********' if any(k_word in key.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else value
# status_messages_display.append(f"- {key}: {display_value}")
# if os.getenv('HF_TOKEN'): status_messages_display.append(f"\nSUCCESS: HF_TOKEN set from KeyLock Wallet image.")
# # ... (other status messages)
# except ValueError as e: status_messages_display.append(f"Decoding Error: {e}")
# except Exception as e: status_messages_display.append(f"Unexpected decoding error: {str(e)}")
# return "\n".join(status_messages_display)
# # --- parse_markdown (Unchanged from previous corrected version) ---
# def parse_markdown(markdown_input):
# space_info = {"repo_name_md": "", "owner_md": "", "files": []}
# current_file_path = None; current_file_content_lines = []
# in_file_definition = False; in_code_block = False
# lines = markdown_input.strip().split("\n")
# for line_content_orig in lines:
# line_content_stripped = line_content_orig.strip()
# if line_content_stripped.startswith("### File:"):
# if current_file_path and in_file_definition:
# space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines)})
# current_file_path = line_content_stripped.replace("### File:", "").strip()
# current_file_content_lines = []
# in_file_definition = True; in_code_block = False
# continue
# if not in_file_definition:
# if line_content_stripped.startswith("# Space:"):
# full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
# if "/" in full_space_name_md: space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
# else: space_info["repo_name_md"] = full_space_name_md
# continue
# if line_content_stripped.startswith("```"):
# in_code_block = not in_code_block
# continue
# current_file_content_lines.append(line_content_orig)
# if current_file_path and in_file_definition:
# space_info["files"].append({"path": current_file_path, "content": "\n".join(current_file_content_lines)})
# space_info["files"] = [f for f in space_info["files"] if f.get("path")]
# return space_info
# # --- _determine_repo_id (Unchanged) ---
# def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
# if not space_name_ui: return None, "Error: Space Name cannot be empty."
# if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field."
# final_owner = owner_ui; error_message = None
# if not final_owner:
# resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
# if token_err: return None, token_err
# if not resolved_api_token: return None, "Error: API token required for auto owner determination."
# try:
# user_info = whoami(token=resolved_api_token)
# if user_info and 'name' in user_info: final_owner = user_info['name']
# else: error_message = "Error: Could not retrieve username. Check token/permissions or specify Owner."
# except Exception as e: error_message = f"Error retrieving username: {str(e)}. Specify Owner."
# if error_message: return None, error_message
# if not final_owner: return None, "Error: Owner could not be determined."
# return f"{final_owner}/{space_name_ui}", None
# # --- New Function to Fetch File Content from Hub ---
# def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo):
# """Fetches content of a specific file from a Hugging Face Space."""
# repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
# try:
# resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
# if token_err:
# return None, token_err # Return error as second element for consistency
# repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
# if err:
# return None, err
# repo_id_for_error_logging = repo_id
# if not file_path_in_repo:
# return None, "Error: File path cannot be empty."
# logger.info(f"Attempting to download file: {file_path_in_repo} from Space: {repo_id}")
# downloaded_file_path = hf_hub_download(
# repo_id=repo_id,
# filename=file_path_in_repo,
# repo_type="space",
# token=resolved_api_token,
# # revision="main", # Optional: specify a branch/commit
# # cache_dir=... # Optional: manage cache
# )
# content = Path(downloaded_file_path).read_text(encoding="utf-8")
# logger.info(f"Successfully downloaded and read file: {file_path_in_repo} from {repo_id}")
# return content, None # Return content and no error
# except Exception as e:
# # Catch specific huggingface_hub.utils.HFValidationError for not found etc.
# if "404" in str(e) or "not found" in str(e).lower():
# logger.warning(f"File not found {file_path_in_repo} in {repo_id_for_error_logging}: {e}")
# return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging}'."
# logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging}:")
# return None, f"Error fetching file content: {str(e)}"
# # --- Function to list files (reused, but now distinct from fetching content) ---
# def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui):
# """Lists files in a Hugging Face Space, returns list or error."""
# repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
# try:
# resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
# if token_err: return None, token_err
# repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
# if err: return None, err
# repo_id_for_error_logging = repo_id
# files = list_repo_files(repo_id=repo_id, token=resolved_api_token, repo_type="space")
# if not files:
# return [], f"No files found in Space {repo_id}." # Return empty list and info message
# return files, None # Return list of files and no error
# except Exception as e:
# logger.exception(f"Error listing files for {repo_id_for_error_logging}:")
# return None, f"Error listing files for {repo_id_for_error_logging}: {str(e)}"
# # --- Core Functions: create_space, update_space_file (Unchanged from previous correct versions) ---
# def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input):
# repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
# try:
# resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
# if token_err: return token_err
# repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
# if err: return err
# repo_id_for_error_logging = repo_id
# space_info = parse_markdown(markdown_input)
# if not space_info["files"]: return "Error: No files found in markdown."
# with tempfile.TemporaryDirectory() as temp_dir:
# repo_staging_path = Path(temp_dir) / "repo_staging_content"
# repo_staging_path.mkdir(exist_ok=True)
# for file_info in space_info["files"]:
# if not file_info.get("path"): continue
# file_path_abs = repo_staging_path / file_info["path"]
# file_path_abs.parent.mkdir(parents=True, exist_ok=True)
# with open(file_path_abs, "w", encoding="utf-8") as f: f.write(file_info["content"])
# try:
# create_repo(repo_id=repo_id, token=resolved_api_token, repo_type="space", space_sdk=sdk_ui, private=False)
# except Exception as e:
# err_str = str(e).lower()
# if not ("already exists" in err_str or "you already created this repo" in err_str or "exists" in err_str):
# return f"Error creating Space '{repo_id}': {str(e)}"
# upload_folder(repo_id=repo_id, folder_path=str(repo_staging_path), path_in_repo=".", token=resolved_api_token, repo_type="space", commit_message=f"Initial Space setup of {repo_id} via Builder")
# return f"Successfully created/updated Space: {repo_id}"
# except Exception as e:
# logger.exception(f"Error in create_space for {repo_id_for_error_logging}:")
# return f"Error during Space creation/update: {str(e)}"
# def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
# repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
# try:
# resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
# if token_err: return token_err
# repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
# if err: return err
# repo_id_for_error_logging = repo_id
# if not file_path_in_repo: return "Error: File Path to update cannot be empty."
# file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
# commit_message_ui = commit_message_ui or f"Update {file_path_in_repo} via Space Builder"
# with tempfile.TemporaryDirectory() as temp_dir_for_update:
# repo_local_clone_path = Path(temp_dir_for_update) / "update_clone"
# cloned_repo = Repository(local_dir=str(repo_local_clone_path), clone_from=f"https://huggingface.co/spaces/{repo_id}", repo_type="space", use_auth_token=resolved_api_token, git_user="Space Builder Bot", git_email="space-builder@huggingface.co")
# full_local_file_path = Path(cloned_repo.local_dir) / file_path_in_repo
# full_local_file_path.parent.mkdir(parents=True, exist_ok=True)
# with open(full_local_file_path, "w", encoding="utf-8") as f: f.write(file_content)
# cloned_repo.push_to_hub(commit_message=commit_message_ui)
# return f"Successfully updated {file_path_in_repo} in Space {repo_id}"
# except Exception as e:
# logger.exception(f"Error in update_space_file for {repo_id_for_error_logging}, file {file_path_in_repo}:")
# return f"Error updating file for {repo_id_for_error_logging}: {str(e)}"