Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import tempfile | |
import zipfile | |
import gradio as gr | |
import scrapy | |
from scrapy.crawler import CrawlerProcess | |
from urllib.parse import urlparse | |
# Fix for gradio_client issue | |
def patch_gradio_client(): | |
file_path = "/usr/local/lib/python3.10/site-packages/gradio_client/utils.py" | |
try: | |
with open(file_path, "r", encoding="utf-8") as f: | |
lines = f.readlines() | |
# Check if patch is already applied | |
if any("if not isinstance(schema, dict):" in line for line in lines): | |
return | |
new_lines = [] | |
inserted = False | |
for line in lines: | |
if not inserted and "def get_type(schema):" in line: | |
new_lines.append(line) | |
new_lines.append(" if not isinstance(schema, dict):\n") | |
new_lines.append(" return str(type(schema))\n") | |
inserted = True | |
else: | |
new_lines.append(line) | |
# Write the patched file | |
with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding="utf-8") as tmp_file: | |
tmp_file.writelines(new_lines) | |
# Replace the original file | |
shutil.move(tmp_file.name, file_path) | |
print("Successfully patched gradio_client") | |
except Exception as e: | |
print(f"Could not patch gradio_client: {e}") | |
# Apply the patch | |
patch_gradio_client() | |
class LinkSpider(scrapy.Spider): | |
name = "link_spider" | |
def __init__(self, start_url, output_dir, max_pages=50, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.start_urls = [start_url] | |
self.output_dir = output_dir | |
self.visited_urls = set() | |
self.max_pages = max_pages | |
self.page_count = 0 | |
def parse(self, response): | |
if response.url in self.visited_urls or self.page_count >= self.max_pages: | |
return | |
self.visited_urls.add(response.url) | |
self.page_count += 1 | |
# Save the file | |
parsed_url = urlparse(response.url) | |
path = parsed_url.path.strip("/") | |
filename = "index.html" if not path else path.replace("/", "_") + ".html" | |
# Ensure filename is valid | |
filename = "".join(c for c in filename if c.isalnum() or c in ('_', '.', '-')) | |
if not filename.endswith(".html"): | |
filename += ".html" | |
file_path = os.path.join(self.output_dir, filename) | |
try: | |
with open(file_path, "wb") as f: | |
f.write(response.body) | |
except OSError: | |
# If filename is too long or invalid, use a hash | |
import hashlib | |
filename = hashlib.md5(response.url.encode()).hexdigest() + ".html" | |
file_path = os.path.join(self.output_dir, filename) | |
with open(file_path, "wb") as f: | |
f.write(response.body) | |
# Follow links if we haven't reached the limit | |
if self.page_count < self.max_pages: | |
for href in response.css('a::attr(href)').getall(): | |
if href.startswith(('http://', 'https://')): | |
yield response.follow(href, self.parse) | |
def crawl_and_zip(start_url): | |
if not start_url.startswith(('http://', 'https://')): | |
raise gr.Error("URLはhttp://またはhttps://で始まる必要があります") | |
temp_dir = tempfile.mkdtemp() | |
output_dir = os.path.join(temp_dir, "pages") | |
os.makedirs(output_dir, exist_ok=True) | |
try: | |
process = CrawlerProcess(settings={ | |
"LOG_ENABLED": False, | |
"USER_AGENT": "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)", | |
"ROBOTSTXT_OBEY": False, | |
"DOWNLOAD_DELAY": 1, | |
}) | |
process.crawl(LinkSpider, start_url=start_url, output_dir=output_dir, max_pages=50) | |
process.start() | |
# Create ZIP file | |
zip_path = os.path.join(temp_dir, "website_crawl.zip") | |
with zipfile.ZipFile(zip_path, "w") as zipf: | |
for root, _, files in os.walk(output_dir): | |
for file in files: | |
file_path = os.path.join(root, file) | |
arcname = os.path.relpath(file_path, output_dir) | |
zipf.write(file_path, arcname=arcname) | |
# Get file info | |
file_list = [] | |
for root, _, files in os.walk(output_dir): | |
for file in files: | |
rel_dir = os.path.relpath(root, output_dir) | |
rel_file = os.path.join(rel_dir, file) if rel_dir != "." else file | |
file_list.append(rel_file) | |
file_count = len(file_list) | |
file_structure = "\n".join(file_list) | |
return zip_path, f"クロール完了!\n\nファイル数: {file_count}\n\nファイル一覧:\n{file_structure}" | |
except Exception as e: | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
raise gr.Error(f"クロール中にエラーが発生しました: {str(e)}") | |
finally: | |
# Clean up (the ZIP file will be served before this) | |
shutil.rmtree(output_dir, ignore_errors=True) | |
# Gradio interface | |
with gr.Blocks(title="ウェブクローラー") as demo: | |
gr.Markdown("## 🌐 ウェブページクローラー") | |
gr.Markdown("指定したURLから開始して、ウェブページをクロールし、ZIPファイルとしてダウンロードします。") | |
with gr.Row(): | |
url_input = gr.Textbox( | |
label="開始URL", | |
placeholder="https://example.com", | |
max_lines=1 | |
) | |
run_button = gr.Button("クロール開始", variant="primary") | |
with gr.Row(): | |
zip_output = gr.File(label="ダウンロードファイル") | |
info_output = gr.Textbox(label="クロール結果", interactive=False) | |
run_button.click( | |
fn=crawl_and_zip, | |
inputs=url_input, | |
outputs=[zip_output, info_output], | |
) | |
if __name__ == "__main__": | |
demo.launch() |