File size: 7,377 Bytes
2e81e75 18d2a16 2e81e75 18d2a16 2e81e75 dd2f10e 2e81e75 dd2f10e 2e81e75 dd2f10e 2e81e75 dd2f10e 2e81e75 dd2f10e 2e81e75 b3a5707 2e81e75 dd2f10e 2e81e75 18d2a16 2e81e75 18d2a16 2e81e75 18d2a16 2e81e75 18d2a16 2e81e75 18d2a16 dd2f10e 2e81e75 dd2f10e 2e81e75 18d2a16 2e81e75 18d2a16 2e81e75 18d2a16 2e81e75 18d2a16 2e81e75 18d2a16 2e81e75 18d2a16 2e81e75 18d2a16 78ec209 2e81e75 78ec209 2e81e75 78ec209 2e81e75 18d2a16 2e81e75 18d2a16 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import os
import requests
import json
import time
from threading import Thread, Event
from requests.exceptions import RequestException
from tqdm import tqdm
import logging
CACHE_DIR = os.getenv("CACHE_DIR")
download_progress = {}
class Instance:
def __init__(self, id, url, cache_dir, token, repo, load_balancer_api, initial_delay=1):
self.version = "0.0.1 Alpha"
self.id = id
self.url = url
self.CACHE_DIR = cache_dir
self.TOKEN = token
self.REPO = repo
self.MUSIC_STORE = {}
self.download_threads = {}
self.file_structure = None
self.load_balancer_api = load_balancer_api
self.initial_delay = initial_delay
self.last_report_time = time.time()
self.re_register_event = Event()
if not os.path.exists(self.CACHE_DIR):
os.makedirs(self.CACHE_DIR)
self.register_to_load_balancer()
self.reload_file_structure()
registration_thread = Thread(target=self.monitor_registration)
registration_thread.daemon = True
registration_thread.start()
indexer_thread = Thread(target=self.get_file_structure_periodically)
indexer_thread.daemon = True
indexer_thread.start()
def reload_file_structure(self):
self.file_structure = self.load_balancer_api.get_file_structure()
logging.info("File structure reloaded successfully.")
def get_file_structure_periodically(self):
while True:
time.sleep(60) # 1 min
logging.info("Re-running indexer and reloading file structure.")
self.reload_file_structure()
def compile_report(self):
self.last_report_time = time.time()
cache_size = self.get_cache_size()
report = {
"instance_id": self.id,
"instance_url": self.url,
"music_store": self.MUSIC_STORE,
"cache_size": cache_size
}
return report
def register_to_load_balancer(self):
result = self.load_balancer_api.register_instance(self.id, self.url)
if result is not None:
logging.info(f'Registered instance {self.id} to load balancer.')
else:
logging.error(f'Failed to register instance {self.id} to load balancer.')
def monitor_registration(self):
while True:
if time.time() - self.last_report_time > 60:
logging.info('1 minute passed since last report. Re-registering...')
self.register_to_load_balancer()
self.last_report_time = time.time()
time.sleep(30)
def get_cache_size(self):
total_size = 0
for dirpath, dirnames, filenames in os.walk(CACHE_DIR):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return {"cache_size": f"{total_size / (1024 * 1024 * 1024):.2f} GB"}
@staticmethod
def read_json(file_path):
if os.path.exists(file_path):
with open(file_path, 'r') as json_file:
return json.load(json_file)
return {}
def download_music(self, file_url, token, cache_path, music_id, title, chunk_size=100 * 1024 * 1024):
print(f"Downloading file from URL: {file_url} to {cache_path}")
headers = {'Authorization': f'Bearer {token}'}
try:
response = requests.get(file_url, headers=headers, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
download_progress[music_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()}
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar:
for data in response.iter_content(chunk_size=chunk_size):
file.write(data)
pbar.update(len(data))
download_progress[music_id]["downloaded"] += len(data)
print(f'File cached to {cache_path} successfully.')
self.MUSIC_STORE[title] = cache_path
download_progress[music_id]["status"] = "Completed"
except RequestException as e:
print(f"Error downloading file: {e}")
download_progress[music_id]["status"] = "Failed"
except IOError as e:
print(f"Error writing file {cache_path}: {e}")
download_progress[music_id]["status"] = "Failed"
finally:
if download_progress[music_id]["status"] != "Downloading":
download_progress[music_id]["end_time"] = time.time()
@staticmethod
def get_download_progress(id):
if id in download_progress:
total = download_progress[id]["total"]
downloaded = download_progress[id]["downloaded"]
status = download_progress[id].get("status", "In Progress")
progress = (downloaded / total) * 100 if total > 0 else 0
eta = None
if status == "Downloading" and downloaded > 0:
elapsed_time = time.time() - download_progress[id]["start_time"]
estimated_total_time = elapsed_time * (total / downloaded)
eta = estimated_total_time - elapsed_time
elif status == "Completed":
eta = 0
return {"total": total, "downloaded": downloaded, "progress": progress, "status": status, "eta": eta}
return {"total": 0, "downloaded": 0, "progress": 0, "status": "Not Found", "eta": None}
def load_json(self, file_path):
with open(file_path, 'r') as file:
return json.load(file)
def find_music_path(self, title):
"""Find the path of the music in the indexed data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'file' and title.lower() in sub_directory['path'].lower():
return sub_directory['path']
return None
def get_music_id(self, title):
return title.replace(" ", "_").lower()
def bytes_to_human_readable(self, num, suffix="B"):
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return f"{num:3.1f} {unit}{suffix}"
num /= 1024.0
return f"{num:.1f} Y{suffix}"
def register_to_load_balancer(self):
retries = 0
delay = self.initial_delay
max_delay = 120
while True:
try:
result = self.load_balancer_api.register_instance(self.id, self.url)
if result:
logging.info(f'Successfully registered instance {self.id} to load balancer.')
return result
except Exception as e:
logging.error(f'Error during registration: {e}')
retries += 1
logging.warning(f'Attempt {retries} to register instance {self.id} failed. Retrying in {delay} seconds...')
time.sleep(delay)
delay = min(delay * 2, max_delay)
|