Spaces:
Sleeping
Sleeping
from googlesearch import search as google_search | |
from smolagents import tool, Tool | |
from curl_scraper import BasicScraper | |
from huggingface_hub import hf_hub_download, upload_folder | |
import os | |
class FileReader(Tool): | |
name = "read_file" | |
description = "Reads a file and returns the contents as a string." | |
inputs = {"file": {"type": "string", "description": "The absolute path to the file to be read."}} | |
output_type = "string" | |
def __init__(self, space_id, folder_path, **kwargs): | |
super().__init__() | |
self.space_id = space_id | |
self.folder_path = folder_path | |
def forward(self, file: str) -> str: | |
file_path = os.path.join(self.folder_path, file) | |
if not os.path.exists(file_path): | |
hf_hub_download(repo_id=self.space_id, filename=file, repo_type="space", local_dir=self.folder_path, local_dir_use_symlinks=False) | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
return content | |
class FileWriter(Tool): | |
name = "write_file" | |
description = "Overwrite or append content to a file. Use for creating new files, appending content, or modifying existing files." | |
inputs = { | |
"file": {"type": "string", "description": "The absolute path to the file to be written."}, | |
"content": {"type": "string", "description": "The content to be written to the file."}, | |
"append": {"type": "boolean", "description": "Whether to append to the file or overwrite it.", "nullable": True}, | |
} | |
output_type = "string" | |
def __init__(self, space_id, folder_path, **kwargs): | |
super().__init__() | |
self.space_id = space_id | |
self.folder_path = folder_path | |
def forward(self, file: str, content: str, append: bool = False) -> str: | |
file_path = os.path.join(self.folder_path, file) | |
mode = 'a' if append else 'w' | |
with open(file_path, mode, encoding='utf-8') as f: | |
f.write(content if append else content.strip()) | |
return f"Successfully {'appended to' if append else 'wrote to'} file at {file_path}" | |
def search(query: str) -> list: | |
""" | |
Search for web pages and scrape their content. | |
Args: | |
query (str): The search query. | |
Returns: | |
list: List of top web results including url, title, description from the search results. | |
""" | |
# Get search results | |
results = [] | |
for result in google_search(query, advanced=True, unique=True): | |
results.append(result) | |
return results | |
def scraper(url: str, response_type: str = 'plain_text') -> str: | |
""" | |
Scrapes the content of a web page. Important: Don't use regex to extract information from the output of a this scraper tool, Think more and find answer yourself. | |
Args: | |
url (str): The URL of the web page to scrape. | |
response_type (str): The type of response to return. Valid options are 'markdown' and 'plain_text'. Default is 'plain_text'. | |
Returns: | |
str: The content of the web page in the specified format. | |
""" | |
response = BasicScraper(timeout=5).get(url) | |
if response_type == 'markdown': | |
return response.markdown | |
elif response_type == 'plain_text': | |
return response.plain_text | |
else: | |
raise ValueError("Invalid response_type. Use 'markdown' or 'plain_text'.") | |
class CommitChanges(Tool): | |
name = "commit_changes" | |
description = "Commits changes to a repository." | |
inputs = { | |
"commit_message": {"type": "string", "description": "The commit message."}, | |
"commit_description": {"type": "string", "description": "The commit description."}, | |
} | |
output_type = "string" | |
def __init__(self, space_id, folder_path, **kwargs): | |
super().__init__() | |
self.space_id = space_id | |
self.folder_path = folder_path | |
def forward(self, commit_message: str, commit_description: str) -> str: | |
try: | |
upload_folder( | |
folder_path=self.folder_path, | |
repo_id=self.space_id, | |
repo_type="space", | |
create_pr=True, | |
commit_message=commit_message, | |
commit_description=commit_description, | |
) | |
return "Changes committed successfully." | |
except Exception as e: | |
return f"Error committing changes: {str(e)}" | |
from huggingface_hub import HfApi | |
def get_repository_structure(space_id: str) -> str: | |
api = HfApi() | |
space_info = api.space_info(space_id) | |
printed_dirs = set() | |
output_lines = [] | |
sorted_siblings = sorted(space_info.siblings, key=lambda x: x.rfilename) | |
for sibling in sorted_siblings: | |
rfilename = sibling.rfilename | |
path_parts = rfilename.split('/') | |
current_dir_path_parts = [] | |
for i in range(len(path_parts) - 1): | |
dir_name = path_parts[i] | |
current_dir_path_parts.append(dir_name) | |
dir_path = '/'.join(current_dir_path_parts) | |
if dir_path not in printed_dirs: | |
depth = i | |
indent = ' ' * depth | |
output_lines.append(f"{indent}{dir_name}") | |
printed_dirs.add(dir_path) | |
file_name = path_parts[-1] | |
file_indent_depth = len(path_parts) - 1 | |
file_indent = ' ' * file_indent_depth | |
if file_indent_depth == 0: | |
output_lines.append(file_name) | |
else: | |
is_last_file = sibling == sorted_siblings[-1] or not any( | |
s.rfilename.startswith('/'.join(path_parts[:-1]) + '/') | |
for s in sorted_siblings[sorted_siblings.index(sibling)+1:] | |
) | |
file_prefix = 'ββ ' if is_last_file else 'ββ ' | |
output_lines.append(f"{file_indent}{file_prefix}{file_name}") | |
return '\n'.join(output_lines) |