File size: 7,377 Bytes
2e81e75
 
 
 
 
 
 
 
 
 
 
 
 
18d2a16
 
2e81e75
 
 
 
 
18d2a16
2e81e75
 
 
 
dd2f10e
2e81e75
 
 
 
 
 
dd2f10e
2e81e75
 
 
 
dd2f10e
2e81e75
 
 
dd2f10e
 
2e81e75
 
dd2f10e
2e81e75
b3a5707
2e81e75
dd2f10e
2e81e75
 
18d2a16
2e81e75
 
 
 
18d2a16
2e81e75
 
 
 
 
 
 
 
 
 
 
 
 
18d2a16
2e81e75
 
18d2a16
 
2e81e75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18d2a16
dd2f10e
2e81e75
 
dd2f10e
2e81e75
 
 
18d2a16
2e81e75
 
 
 
 
 
18d2a16
2e81e75
 
18d2a16
 
2e81e75
 
18d2a16
2e81e75
 
18d2a16
2e81e75
18d2a16
 
2e81e75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18d2a16
78ec209
2e81e75
78ec209
2e81e75
78ec209
 
2e81e75
18d2a16
 
2e81e75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18d2a16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import os
import requests
import json
import time
from threading import Thread, Event
from requests.exceptions import RequestException
from tqdm import tqdm
import logging

CACHE_DIR = os.getenv("CACHE_DIR")
download_progress = {}

class Instance:
    def __init__(self, id, url, cache_dir, token, repo, load_balancer_api, initial_delay=1):
        self.version = "0.0.1 Alpha"
        self.id = id
        self.url = url
        self.CACHE_DIR = cache_dir
        self.TOKEN = token
        self.REPO = repo
        self.MUSIC_STORE = {}
        self.download_threads = {}
        self.file_structure = None
        self.load_balancer_api = load_balancer_api
        self.initial_delay = initial_delay
        self.last_report_time = time.time()
        self.re_register_event = Event()

        if not os.path.exists(self.CACHE_DIR):
            os.makedirs(self.CACHE_DIR)

        self.register_to_load_balancer()
        self.reload_file_structure()
        registration_thread = Thread(target=self.monitor_registration)
        registration_thread.daemon = True
        registration_thread.start()

        indexer_thread = Thread(target=self.get_file_structure_periodically)
        indexer_thread.daemon = True
        indexer_thread.start()

    def reload_file_structure(self):
        self.file_structure = self.load_balancer_api.get_file_structure()
        logging.info("File structure reloaded successfully.")

    def get_file_structure_periodically(self):
        while True:
            time.sleep(60) # 1 min
            logging.info("Re-running indexer and reloading file structure.")
            self.reload_file_structure()

    def compile_report(self):
        self.last_report_time = time.time()
        cache_size = self.get_cache_size()
        report = {
            "instance_id": self.id,
            "instance_url": self.url,
            "music_store": self.MUSIC_STORE,
            "cache_size": cache_size
        }
        return report

    def register_to_load_balancer(self):
        result = self.load_balancer_api.register_instance(self.id, self.url)
        if result is not None:
            logging.info(f'Registered instance {self.id} to load balancer.')
        else:
            logging.error(f'Failed to register instance {self.id} to load balancer.')

    def monitor_registration(self):
        while True:
            if time.time() - self.last_report_time > 60:
                logging.info('1 minute passed since last report. Re-registering...')
                self.register_to_load_balancer()
                self.last_report_time = time.time()
            time.sleep(30)

    def get_cache_size(self):
        total_size = 0
        for dirpath, dirnames, filenames in os.walk(CACHE_DIR):
            for f in filenames:
                fp = os.path.join(dirpath, f)
                total_size += os.path.getsize(fp)
        return {"cache_size": f"{total_size / (1024 * 1024 * 1024):.2f} GB"}

    @staticmethod
    def read_json(file_path):
        if os.path.exists(file_path):
            with open(file_path, 'r') as json_file:
                return json.load(json_file)
        return {}

    def download_music(self, file_url, token, cache_path, music_id, title, chunk_size=100 * 1024 * 1024):
        print(f"Downloading file from URL: {file_url} to {cache_path}")
        headers = {'Authorization': f'Bearer {token}'}
        try:
            response = requests.get(file_url, headers=headers, stream=True)
            response.raise_for_status()
            
            total_size = int(response.headers.get('content-length', 0))
            download_progress[music_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()}

            os.makedirs(os.path.dirname(cache_path), exist_ok=True)
            with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar:
                for data in response.iter_content(chunk_size=chunk_size):
                    file.write(data)
                    pbar.update(len(data))
                    download_progress[music_id]["downloaded"] += len(data)
            
            print(f'File cached to {cache_path} successfully.')
            self.MUSIC_STORE[title] = cache_path
            download_progress[music_id]["status"] = "Completed"
        except RequestException as e:
            print(f"Error downloading file: {e}")
            download_progress[music_id]["status"] = "Failed"
        except IOError as e:
            print(f"Error writing file {cache_path}: {e}")
            download_progress[music_id]["status"] = "Failed"
        finally:
            if download_progress[music_id]["status"] != "Downloading":
                download_progress[music_id]["end_time"] = time.time()

    @staticmethod
    def get_download_progress(id):
        if id in download_progress:
            total = download_progress[id]["total"]
            downloaded = download_progress[id]["downloaded"]
            status = download_progress[id].get("status", "In Progress")
            progress = (downloaded / total) * 100 if total > 0 else 0

            eta = None
            if status == "Downloading" and downloaded > 0:
                elapsed_time = time.time() - download_progress[id]["start_time"]
                estimated_total_time = elapsed_time * (total / downloaded)
                eta = estimated_total_time - elapsed_time
            elif status == "Completed":
                eta = 0

            return {"total": total, "downloaded": downloaded, "progress": progress, "status": status, "eta": eta}
        return {"total": 0, "downloaded": 0, "progress": 0, "status": "Not Found", "eta": None}

    def load_json(self, file_path):
        with open(file_path, 'r') as file:
            return json.load(file)

    def find_music_path(self, title):
        """Find the path of the music in the indexed data based on the title."""
        for directory in self.file_structure:
            if directory['type'] == 'directory':
                for sub_directory in directory['contents']:
                    if sub_directory['type'] == 'file' and title.lower() in sub_directory['path'].lower():
                        return sub_directory['path']
        return None
    
    def get_music_id(self, title):
        return title.replace(" ", "_").lower()

    def bytes_to_human_readable(self, num, suffix="B"):
        for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
            if abs(num) < 1024.0:
                return f"{num:3.1f} {unit}{suffix}"
            num /= 1024.0
        return f"{num:.1f} Y{suffix}"

    def register_to_load_balancer(self):
        retries = 0
        delay = self.initial_delay
        max_delay = 120

        while True:
            try:
                result = self.load_balancer_api.register_instance(self.id, self.url)
                if result:
                    logging.info(f'Successfully registered instance {self.id} to load balancer.')
                    return result

            except Exception as e:
                logging.error(f'Error during registration: {e}')
            
            retries += 1
            logging.warning(f'Attempt {retries} to register instance {self.id} failed. Retrying in {delay} seconds...')
            time.sleep(delay)
            delay = min(delay * 2, max_delay)