import os from indexer import indexer from threading import Event, Thread import asyncio import time import logging from utils import convert_to_gb from api import InstancesAPI CACHE_DIR = os.getenv("CACHE_DIR") class LoadBalancer: def __init__(self, cache_dir, token, repo, polling_interval=4, max_retries=3, initial_delay=1): self.version = "0.0.2 Alpha" self.instances = [] self.instances_health = {} self.polling_interval = polling_interval self.max_retries = max_retries self.initial_delay = initial_delay self.stop_event = Event() self.instances_api = InstancesAPI(self.instances) self.CACHE_DIR = cache_dir self.TOKEN = token self.REPO = repo self.MUSIC_STORE = {} self.file_structure = None self.category_files_map = {} # Ensure CACHE_DIR exists if not os.path.exists(self.CACHE_DIR): os.makedirs(self.CACHE_DIR) # Initialize file structure and start prefetching self.update_file_structure() # Start polling and file checking in separate threads polling_thread = Thread(target=self.start_polling) polling_thread.daemon = True polling_thread.start() # Start periodic tasks asyncio.create_task(self.run_periodic_tasks()) def update_file_structure(self): """Update the file structure and the category-files map.""" self.file_structure = indexer() # Assume this re-fetches the file structure self.category_files_map = {} # Reset the map for directory in self.file_structure: if directory['type'] == 'directory': # Map category to its files self.category_files_map[directory['path']] = [ file['path'] for file in directory['contents'] if file['type'] == 'file' ] async def run_periodic_tasks(self): """Run indexer and prefetch functions every 5 minutes.""" while not self.stop_event.is_set(): self.update_file_structure() # Re-run indexer and update the map await asyncio.sleep(300) # Sleep for 5 minutes def get_reports(self): reports = self.instances_api.fetch_reports() temp_music_store = {} for instance_url in self.instances[:]: if instance_url in reports: report = reports[instance_url] logging.info(f"Report from {instance_url}: {report}") self.process_report(instance_url, report, temp_music_store) else: logging.error(f"Failed to get report from {instance_url}. Removing instance.") self.remove_instance(instance_url) self.MUSIC_STORE = temp_music_store def process_report(self, instance_url, report, temp_music_store): music_store = report.get('music_store', {}) cache_size = report.get('cache_size') logging.info(f"Processing report from {instance_url}") # Update temporary music store for title, path in music_store.items(): url = f"{instance_url}/api/get/music/{title.replace(' ', '%20')}" temp_music_store[title] = url logging.info("Music Store processed successfully.") self.update_instances_health(instance=instance_url, cache_size=cache_size) def start_polling(self): logging.info("Starting polling.") while not self.stop_event.is_set(): self.get_reports() time.sleep(self.polling_interval) logging.info("Polling stopped.") def stop_polling(self): logging.info("Stopping polling.") self.stop_event.set() def register_instance(self, instance_url): if instance_url not in self.instances: self.instances.append(instance_url) logging.info(f"Registered instance {instance_url}") else: logging.info(f"Instance {instance_url} is already registered.") def remove_instance(self, instance_url): if instance_url in self.instances: self.instances.remove(instance_url) self.instances_health.pop(instance_url, None) logging.info(f"Removed instance {instance_url}") else: logging.info(f"Instance {instance_url} not found for removal.") def update_instances_health(self, instance, cache_size): self.instances_health[instance] = {"used": cache_size["cache_size"], "total": "50 GB"} logging.info(f"Updated instance {instance} with cache size {cache_size}") def download_music_to_best_instance(self, file_name): """Downloads a music file to the instance with the most free space in self.instance_health.""" best_instance = None max_free_space = -1 # Determine the instance with the most free space for instance_url, space_info in self.instances_health.items(): total_space = convert_to_gb(space_info.get('total', 0)) used_space = convert_to_gb(space_info.get('used', 0)) free_space = total_space - used_space if free_space > max_free_space: max_free_space = free_space best_instance = instance_url if not best_instance: logging.error("No suitable instance found for downloading the music.") return {"error": "No suitable instance found for downloading the music."} # Attempt to download music to the best instance try: result = self.instances_api.download_music(best_instance, file_name) # Check if the response is as expected if not result or "music_id" not in result or "status" not in result: logging.error(f"Unexpected response from instance {best_instance}: {result}") return { "error": "Failed to retrieve valid download data from the instance.", "details": result if result else "Empty response" } # Prepare response with download progress URL music_id = self.get_music_id(file_name) status = result["status"] progress_url = f'{best_instance}/api/get/progress/{music_id}' response = { "music_id": music_id, "status": status, "progress_url": progress_url } return response except Exception as e: # Log network or API call-related errors logging.error(f"Error downloading music to {best_instance}: {str(e)}") return { "error": "Error occurred during music download.", "details": str(e) } def find_music_path(self, title): """Find the path of the music in the indexed data based on the title.""" for directory in self.file_structure: if directory['type'] == 'directory': for sub_directory in directory['contents']: if sub_directory['type'] == 'file' and title.lower() in sub_directory['path'].lower(): return sub_directory['path'] return None def get_music_id(self, title): """Generate a unique music ID based on the title.""" return title.replace(" ", "_").lower() def get_all_music(self): """Get all music files from the indexed file structure.""" music_files = [] for directory in self.file_structure: if directory['type'] == 'directory': for sub_directory in directory['contents']: if sub_directory['type'] == 'file': music_files.append(sub_directory['path']) return music_files def get_all_categories(self): """Get a list of all category folders.""" return list(self.category_files_map.keys()) def get_files_from_category(self, category): """Get all files from a specified category.""" return self.category_files_map.get(category, [])