File size: 4,207 Bytes
ad68adf
ecf897a
 
 
 
 
 
 
 
 
 
 
ad68adf
ecf897a
 
 
 
ad68adf
 
ecf897a
 
 
 
 
 
 
 
 
ad68adf
ecf897a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad68adf
ecf897a
 
 
 
 
ad68adf
ecf897a
ad68adf
ecf897a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71140be
ecf897a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad68adf
ecf897a
 
 
 
71140be
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import gradio as gr
import os
import time
import urllib.request
from pathlib import Path
from urllib.parse import urlparse, parse_qs, unquote

CHUNK_SIZE = 1638400
TOKEN_FILE = Path.home() / '.civitai' / 'config'
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'

def get_token():
    try:
        with open(TOKEN_FILE, 'r') as file:
            token = file.read()
            return token
    except Exception:
        return None

def store_token(token: str):
    TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True)
    with open(TOKEN_FILE, 'w') as file:
        file.write(token)

def download_file(url: str, output_path: str, token: str):
    headers = {
        'Authorization': f'Bearer {token}',
        'User-Agent': USER_AGENT,
    }

    class NoRedirection(urllib.request.HTTPErrorProcessor):
        def http_response(self, request, response):
            return response
        https_response = http_response

    request = urllib.request.Request(url, headers=headers)
    opener = urllib.request.build_opener(NoRedirection)
    response = opener.open(request)

    if response.status in [301, 302, 303, 307, 308]:
        redirect_url = response.getheader('Location')
        parsed_url = urlparse(redirect_url)
        query_params = parse_qs(parsed_url.query)
        content_disposition = query_params.get('response-content-disposition', [None])[0]

        if content_disposition:
            filename = unquote(content_disposition.split('filename=')[1].strip('"'))
        else:
            return None, f"ERROR: Unable to determine filename"

        response = urllib.request.urlopen(redirect_url)
    elif response.status == 404:
        return None, f"ERROR: File not found"
    else:
        return None, f"ERROR: No redirect found, something went wrong"

    total_size = response.getheader('Content-Length')
    if total_size is not None:
        total_size = int(total_size)

    output_file = os.path.join(output_path, filename)

    with open(output_file, 'wb') as f:
        downloaded = 0
        start_time = time.time()

        while True:
            chunk_start_time = time.time()
            buffer = response.read(CHUNK_SIZE)
            chunk_end_time = time.time()

            if not buffer:
                break

            downloaded += len(buffer)
            f.write(buffer)
            chunk_time = chunk_end_time - chunk_start_time

            if chunk_time > 0:
                speed = len(buffer) / chunk_time / (1024 ** 2)  # Speed in MB/s

            if total_size is not None:
                progress = downloaded / total_size
                gr.Progress.update(f'Downloading: {filename} [{progress*100:.2f}%] - {speed:.2f} MB/s')

    end_time = time.time()
    time_taken = end_time - start_time
    hours, remainder = divmod(time_taken, 3600)
    minutes, seconds = divmod(remainder, 60)

    if hours > 0:
        time_str = f'{int(hours)}h {int(minutes)}m {int(seconds)}s'
    elif minutes > 0:
        time_str = f'{int(minutes)}m {int(seconds)}s'
    else:
        time_str = f'{int(seconds)}s'

    return output_file, f"Download completed. File saved as: {filename}. Downloaded in {time_str}"

def run_downloader(url, output_path, token_input):
    token = get_token() or token_input
    if not token:
        return None, "ERROR: Token not provided."

    store_token(token)
    file_path, message = download_file(url, output_path, token)
    return file_path, message

with gr.Blocks() as demo:
    gr.Markdown("# CivitAI Downloader")
    with gr.Row():
        url_input = gr.Textbox(label="CivitAI Download URL")
        output_path_input = gr.Textbox(label="Output Path", value="./")  # Default value set to current directory
        token_input = gr.Textbox(label="CivitAI API Token (Optional)")
    download_button = gr.Button("Download")
    output_file = gr.File(label="Downloaded Model")
    message_output = gr.Textbox(label="Status Message", interactive=False)
    
    download_button.click(
        run_downloader, 
        inputs=[url_input, output_path_input, token_input],
        outputs=[output_file, message_output]
    )

demo.launch()