Spaces:
Sleeping
Sleeping
File size: 6,006 Bytes
ede8e50 a91b8d9 b72371b a91b8d9 ede8e50 a91b8d9 b72371b 51a9311 b72371b a91b8d9 b72371b a91b8d9 b72371b a91b8d9 b72371b a91b8d9 b72371b ede8e50 b72371b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import os
import shutil
import tempfile
import zipfile
import gradio as gr
import scrapy
from scrapy.crawler import CrawlerProcess
from urllib.parse import urlparse
# Fix for gradio_client issue
def patch_gradio_client():
file_path = "/usr/local/lib/python3.10/site-packages/gradio_client/utils.py"
try:
with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
# Check if patch is already applied
if any("if not isinstance(schema, dict):" in line for line in lines):
return
new_lines = []
inserted = False
for line in lines:
if not inserted and "def get_type(schema):" in line:
new_lines.append(line)
new_lines.append(" if not isinstance(schema, dict):\n")
new_lines.append(" return str(type(schema))\n")
inserted = True
else:
new_lines.append(line)
# Write the patched file
with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding="utf-8") as tmp_file:
tmp_file.writelines(new_lines)
# Replace the original file
shutil.move(tmp_file.name, file_path)
print("Successfully patched gradio_client")
except Exception as e:
print(f"Could not patch gradio_client: {e}")
# Apply the patch
patch_gradio_client()
class LinkSpider(scrapy.Spider):
name = "link_spider"
def __init__(self, start_url, output_dir, max_pages=50, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_urls = [start_url]
self.output_dir = output_dir
self.visited_urls = set()
self.max_pages = max_pages
self.page_count = 0
def parse(self, response):
if response.url in self.visited_urls or self.page_count >= self.max_pages:
return
self.visited_urls.add(response.url)
self.page_count += 1
# Save the file
parsed_url = urlparse(response.url)
path = parsed_url.path.strip("/")
filename = "index.html" if not path else path.replace("/", "_") + ".html"
# Ensure filename is valid
filename = "".join(c for c in filename if c.isalnum() or c in ('_', '.', '-'))
if not filename.endswith(".html"):
filename += ".html"
file_path = os.path.join(self.output_dir, filename)
try:
with open(file_path, "wb") as f:
f.write(response.body)
except OSError:
# If filename is too long or invalid, use a hash
import hashlib
filename = hashlib.md5(response.url.encode()).hexdigest() + ".html"
file_path = os.path.join(self.output_dir, filename)
with open(file_path, "wb") as f:
f.write(response.body)
# Follow links if we haven't reached the limit
if self.page_count < self.max_pages:
for href in response.css('a::attr(href)').getall():
if href.startswith(('http://', 'https://')):
yield response.follow(href, self.parse)
def crawl_and_zip(start_url):
if not start_url.startswith(('http://', 'https://')):
raise gr.Error("URLはhttp://またはhttps://で始まる必要があります")
temp_dir = tempfile.mkdtemp()
output_dir = os.path.join(temp_dir, "pages")
os.makedirs(output_dir, exist_ok=True)
try:
process = CrawlerProcess(settings={
"LOG_ENABLED": False,
"USER_AGENT": "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
"ROBOTSTXT_OBEY": False,
"DOWNLOAD_DELAY": 1,
})
process.crawl(LinkSpider, start_url=start_url, output_dir=output_dir, max_pages=50)
process.start()
# Create ZIP file
zip_path = os.path.join(temp_dir, "website_crawl.zip")
with zipfile.ZipFile(zip_path, "w") as zipf:
for root, _, files in os.walk(output_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, output_dir)
zipf.write(file_path, arcname=arcname)
# Get file info
file_list = []
for root, _, files in os.walk(output_dir):
for file in files:
rel_dir = os.path.relpath(root, output_dir)
rel_file = os.path.join(rel_dir, file) if rel_dir != "." else file
file_list.append(rel_file)
file_count = len(file_list)
file_structure = "\n".join(file_list)
return zip_path, f"クロール完了!\n\nファイル数: {file_count}\n\nファイル一覧:\n{file_structure}"
except Exception as e:
shutil.rmtree(temp_dir, ignore_errors=True)
raise gr.Error(f"クロール中にエラーが発生しました: {str(e)}")
finally:
# Clean up (the ZIP file will be served before this)
shutil.rmtree(output_dir, ignore_errors=True)
# Gradio interface
with gr.Blocks(title="ウェブクローラー") as demo:
gr.Markdown("## 🌐 ウェブページクローラー")
gr.Markdown("指定したURLから開始して、ウェブページをクロールし、ZIPファイルとしてダウンロードします。")
with gr.Row():
url_input = gr.Textbox(
label="開始URL",
placeholder="https://example.com",
max_lines=1
)
run_button = gr.Button("クロール開始", variant="primary")
with gr.Row():
zip_output = gr.File(label="ダウンロードファイル")
info_output = gr.Textbox(label="クロール結果", interactive=False)
run_button.click(
fn=crawl_and_zip,
inputs=url_input,
outputs=[zip_output, info_output],
)
if __name__ == "__main__":
demo.launch() |