Spaces:
Sleeping
Sleeping
| import os | |
| from indexer import indexer | |
| from threading import Event, Thread | |
| import asyncio | |
| import time | |
| import logging | |
| from utils import convert_to_gb | |
| from api import InstancesAPI | |
| CACHE_DIR = os.getenv("CACHE_DIR") | |
| class LoadBalancer: | |
| def __init__(self, cache_dir, token, repo, polling_interval=4, max_retries=3, initial_delay=1): | |
| self.version = "0.0.2 Alpha" | |
| self.instances = [] | |
| self.instances_health = {} | |
| self.polling_interval = polling_interval | |
| self.max_retries = max_retries | |
| self.initial_delay = initial_delay | |
| self.stop_event = Event() | |
| self.instances_api = InstancesAPI(self.instances) | |
| self.CACHE_DIR = cache_dir | |
| self.TOKEN = token | |
| self.REPO = repo | |
| self.MUSIC_STORE = {} | |
| self.file_structure = None | |
| self.category_files_map = {} | |
| # Ensure CACHE_DIR exists | |
| if not os.path.exists(self.CACHE_DIR): | |
| os.makedirs(self.CACHE_DIR) | |
| # Initialize file structure and start prefetching | |
| self.update_file_structure() | |
| # Start polling and file checking in separate threads | |
| polling_thread = Thread(target=self.start_polling) | |
| polling_thread.daemon = True | |
| polling_thread.start() | |
| # Start periodic tasks | |
| asyncio.create_task(self.run_periodic_tasks()) | |
| def update_file_structure(self): | |
| """Update the file structure and the category-files map.""" | |
| self.file_structure = indexer() # Assume this re-fetches the file structure | |
| self.category_files_map = {} # Reset the map | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory': | |
| # Map category to its files | |
| self.category_files_map[directory['path']] = [ | |
| file['path'] for file in directory['contents'] if file['type'] == 'file' | |
| ] | |
| async def run_periodic_tasks(self): | |
| """Run indexer and prefetch functions every 5 minutes.""" | |
| while not self.stop_event.is_set(): | |
| self.update_file_structure() # Re-run indexer and update the map | |
| await asyncio.sleep(300) # Sleep for 5 minutes | |
| def get_reports(self): | |
| reports = self.instances_api.fetch_reports() | |
| temp_music_store = {} | |
| for instance_url in self.instances[:]: | |
| if instance_url in reports: | |
| report = reports[instance_url] | |
| logging.info(f"Report from {instance_url}: {report}") | |
| self.process_report(instance_url, report, temp_music_store) | |
| else: | |
| logging.error(f"Failed to get report from {instance_url}. Removing instance.") | |
| self.remove_instance(instance_url) | |
| self.MUSIC_STORE = temp_music_store | |
| def process_report(self, instance_url, report, temp_music_store): | |
| music_store = report.get('music_store', {}) | |
| cache_size = report.get('cache_size') | |
| logging.info(f"Processing report from {instance_url}") | |
| # Update temporary music store | |
| for title, path in music_store.items(): | |
| url = f"{instance_url}/api/get/music/{title.replace(' ', '%20')}" | |
| temp_music_store[title] = url | |
| logging.info("Music Store processed successfully.") | |
| self.update_instances_health(instance=instance_url, cache_size=cache_size) | |
| def start_polling(self): | |
| logging.info("Starting polling.") | |
| while not self.stop_event.is_set(): | |
| self.get_reports() | |
| time.sleep(self.polling_interval) | |
| logging.info("Polling stopped.") | |
| def stop_polling(self): | |
| logging.info("Stopping polling.") | |
| self.stop_event.set() | |
| def register_instance(self, instance_url): | |
| if instance_url not in self.instances: | |
| self.instances.append(instance_url) | |
| logging.info(f"Registered instance {instance_url}") | |
| else: | |
| logging.info(f"Instance {instance_url} is already registered.") | |
| def remove_instance(self, instance_url): | |
| if instance_url in self.instances: | |
| self.instances.remove(instance_url) | |
| self.instances_health.pop(instance_url, None) | |
| logging.info(f"Removed instance {instance_url}") | |
| else: | |
| logging.info(f"Instance {instance_url} not found for removal.") | |
| def update_instances_health(self, instance, cache_size): | |
| self.instances_health[instance] = {"used": cache_size["cache_size"], "total": "50 GB"} | |
| logging.info(f"Updated instance {instance} with cache size {cache_size}") | |
| def download_music_to_best_instance(self, file_name): | |
| """Downloads a music file to the instance with the most free space in self.instance_health.""" | |
| best_instance = None | |
| max_free_space = -1 | |
| # Determine the instance with the most free space | |
| for instance_url, space_info in self.instances_health.items(): | |
| total_space = convert_to_gb(space_info.get('total', 0)) | |
| used_space = convert_to_gb(space_info.get('used', 0)) | |
| free_space = total_space - used_space | |
| if free_space > max_free_space: | |
| max_free_space = free_space | |
| best_instance = instance_url | |
| if not best_instance: | |
| logging.error("No suitable instance found for downloading the music.") | |
| return {"error": "No suitable instance found for downloading the music."} | |
| # Attempt to download music to the best instance | |
| try: | |
| result = self.instances_api.download_music(best_instance, file_name) | |
| # Check if the response is as expected | |
| if not result or "music_id" not in result or "status" not in result: | |
| logging.error(f"Unexpected response from instance {best_instance}: {result}") | |
| return { | |
| "error": "Failed to retrieve valid download data from the instance.", | |
| "details": result if result else "Empty response" | |
| } | |
| # Prepare response with download progress URL | |
| music_id = self.get_music_id(file_name) | |
| status = result["status"] | |
| progress_url = f'{best_instance}/api/get/progress/{music_id}' | |
| response = { | |
| "music_id": music_id, | |
| "status": status, | |
| "progress_url": progress_url | |
| } | |
| return response | |
| except Exception as e: | |
| # Log network or API call-related errors | |
| logging.error(f"Error downloading music to {best_instance}: {str(e)}") | |
| return { | |
| "error": "Error occurred during music download.", | |
| "details": str(e) | |
| } | |
| def find_music_path(self, title): | |
| """Find the path of the music in the indexed data based on the title.""" | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'file' and title.lower() in sub_directory['path'].lower(): | |
| return sub_directory['path'] | |
| return None | |
| def get_music_id(self, title): | |
| """Generate a unique music ID based on the title.""" | |
| return title.replace(" ", "_").lower() | |
| def get_all_music(self): | |
| """Get all music files from the indexed file structure.""" | |
| music_files = [] | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'file': | |
| music_files.append(sub_directory['path']) | |
| return music_files | |
| def get_all_categories(self): | |
| """Get a list of all category folders.""" | |
| return list(self.category_files_map.keys()) | |
| def get_files_from_category(self, category): | |
| """Get all files from a specified category.""" | |
| return self.category_files_map.get(category, []) | |