Spaces:
Runtime error
Runtime error
import threading | |
import queue | |
import time | |
from typing import Callable, Dict, List | |
import socket | |
from zeroconf import Zeroconf, ServiceBrowser, ServiceInfo | |
import logging | |
import requests | |
import subprocess | |
import psutil | |
import GPUtil | |
from processor_manager import ResourceMonitor | |
from peer_discovery import PORT | |
logging.basicConfig(level=logging.INFO) | |
# โโโโโโโโโโโโโโโ Device Manager ุนุงู โโโโโโโโโโโโโโโ | |
class DeviceManager: | |
def __init__(self): | |
self.devices = { | |
"GPU": self._detect_gpus(), | |
"DSP": self._detect_dsps(), | |
"NIC": self._detect_nics(), | |
"STORAGE": self._detect_storage(), | |
"CAPTURE": self._detect_capture(), | |
"ACCELERATOR": self._detect_accelerators() | |
} | |
# ุงูุชุดุงู ุงูุฃุฌูุฒุฉ | |
def _detect_gpus(self): | |
try: | |
return GPUtil.getGPUs() | |
except: | |
return [] | |
def _detect_dsps(self): | |
try: | |
output = subprocess.check_output(["aplay", "-l"], stderr=subprocess.DEVNULL).decode() | |
return ["DSP_Audio"] if "card" in output.lower() else [] | |
except: | |
return [] | |
def _detect_nics(self): | |
try: | |
return list(psutil.net_if_addrs().keys()) | |
except: | |
return [] | |
def _detect_storage(self): | |
try: | |
output = subprocess.check_output(["lsblk", "-o", "NAME"], stderr=subprocess.DEVNULL).decode() | |
return output.split() if output else [] | |
except: | |
return [] | |
def _detect_capture(self): | |
try: | |
output = subprocess.check_output(["v4l2-ctl", "--list-devices"], stderr=subprocess.DEVNULL).decode() | |
return output.split(":")[0::2] if output else [] | |
except: | |
return [] | |
def _detect_accelerators(self): | |
# ุงูุชุฑุงุถู: ูู ูู ุฅุถุงูุฉ ุงูุชุดุงู ุญูููู ู ุณุชูุจูุงู | |
return ["Accelerator_Device"] | |
# ููุงุณ ุงูุญู ู | |
def get_device_load(self, device_type, index=0): | |
try: | |
if device_type == "GPU" and self.devices["GPU"]: | |
return self.devices["GPU"][index].load * 100 | |
elif device_type == "DSP" and self.devices["DSP"]: | |
return 10 # ุงูุชุฑุงุถู | |
elif device_type == "NIC" and self.devices["NIC"]: | |
return psutil.net_io_counters().bytes_sent / (1024 * 1024) # MB sent ูู ุซุงู | |
elif device_type == "STORAGE" and self.devices["STORAGE"]: | |
return psutil.disk_usage('/').percent | |
elif device_type == "CAPTURE" and self.devices["CAPTURE"]: | |
return 20 # ุงูุชุฑุงุถู | |
elif device_type == "ACCELERATOR" and self.devices["ACCELERATOR"]: | |
return 15 # ุงูุชุฑุงุถู | |
return 0 | |
except: | |
return 0 | |
# ู ูุทู ุงูุงุณุชูุจุงู/ุงูุฅุฑุณุงู | |
def can_receive(self, device_type, index=0): | |
return self.get_device_load(device_type, index) <= 30 | |
def should_offload(self, device_type, index=0): | |
return self.get_device_load(device_type, index) >= 70 | |
# โโโโโโโโโโโโโโโ Peer Registry โโโโโโโโโโโโโโโ | |
class PeerRegistry: | |
def __init__(self): | |
self._peers = {} | |
self._zeroconf = Zeroconf() | |
self.local_node_id = socket.gethostname() | |
def register_service(self, name: str, port: int, load: float = 0.0): | |
service_info = ServiceInfo( | |
"_tasknode._tcp.local.", | |
f"{name}._tasknode._tcp.local.", | |
addresses=[socket.inet_aton(self._get_local_ip())], | |
port=port, | |
properties={ | |
b'load': str(load).encode(), | |
b'node_id': self.local_node_id.encode() | |
}, | |
server=f"{name}.local." | |
) | |
self._zeroconf.register_service(service_info) | |
logging.info(f"โ Service registered: {name} @ {self._get_local_ip()}:{port}") | |
def discover_peers(self, timeout: int = 3) -> List[Dict]: | |
class Listener: | |
def __init__(self): | |
self.peers = [] | |
def add_service(self, zc, type_, name): | |
try: | |
info = zc.get_service_info(type_, name, timeout=3000) | |
if info: | |
ip = socket.inet_ntoa(info.addresses[0]) | |
peer_data = { | |
'ip': ip, | |
'port': info.port, | |
'load': float(info.properties.get(b'load', b'0')), | |
'node_id': info.properties.get(b'node_id', b'unknown').decode(), | |
'last_seen': time.time() | |
} | |
if peer_data not in self.peers: | |
self.peers.append(peer_data) | |
logging.info(f"โ ุชู ุช ุฅุถุงูุฉ ูุธูุฑ ุฌุฏูุฏ: {peer_data}") | |
else: | |
logging.warning(f"โ ๏ธ ูู ูุชู ุงูุนุซูุฑ ุนูู ู ุนููู ุงุช ุงูุฎุฏู ุฉ: {name}") | |
except Exception as e: | |
logging.error(f"โ ุฎุทุฃ ุฃุซูุงุก ุฌูุจ ู ุนููู ุงุช ุงูุฎุฏู ุฉ {name}: {e}") | |
def update_service(self, zc, type_, name): | |
self.add_service(zc, type_, name) | |
def remove_service(self, zc, type_, name): | |
pass | |
listener = Listener() | |
ServiceBrowser(self._zeroconf, "_tasknode._tcp.local.", listener) | |
time.sleep(timeout) | |
return sorted(listener.peers, key=lambda x: x['load']) | |
def _get_local_ip(self) -> str: | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
try: | |
s.connect(('10.255.255.255', 1)) | |
ip = s.getsockname()[0] | |
except Exception: | |
ip = '127.0.0.1' | |
finally: | |
s.close() | |
return ip | |
# โโโโโโโโโโโโโโโ Distributed Executor โโโโโโโโโโโโโโโ | |
class DistributedExecutor: | |
def __init__(self, shared_secret: str): | |
self.peer_registry = PeerRegistry() | |
self.shared_secret = shared_secret | |
self.task_queue = queue.PriorityQueue() | |
self.result_cache = {} | |
self.available_peers = [] | |
self.devices = DeviceManager() | |
self._init_peer_discovery() | |
def _init_peer_discovery(self): | |
def discovery_loop(): | |
while True: | |
self.available_peers = self.peer_registry.discover_peers() | |
logging.info(f"โ Discovered peers: {self.available_peers}") | |
time.sleep(10) | |
threading.Thread(target=discovery_loop, daemon=True).start() | |
def submit(self, task_func: Callable, *args, task_type=None, **kwargs): | |
monitor = ResourceMonitor().current_load() | |
avg_cpu = monitor["average"]["cpu"] | |
avg_mem = monitor["average"]["mem_percent"] if "mem_percent" in monitor["average"] else 0 | |
# ุชุญุฏูุฏ ููุน ุงูุฌูุงุฒ | |
device_type = task_type.upper() if task_type else "CPU" | |
# ูุญุต ุงูุญู ู | |
if (avg_cpu > 0.6 or avg_mem > 85 or self.devices.should_offload(device_type)): | |
logging.info(f"โ ๏ธ ุงูุญู ู ู ุฑุชูุน ุนูู {device_type} - ุฅุฑุณุงู ุงูู ูู ุฉ ููุฃูุฑุงู") | |
self._offload_task(task_func, *args, **kwargs) | |
elif (avg_cpu <= 0.3 and self.devices.can_receive(device_type)): | |
logging.info(f"โ ุงูุญู ู ู ูุฎูุถ ุนูู {device_type} - ุชูููุฐ ุงูู ูู ุฉ ู ุญููุงู") | |
return task_func(*args, **kwargs) | |
else: | |
logging.info(f"โน๏ธ ุงูุญู ู ู ุชูุณุท ุนูู {device_type} - ุชูููุฐ ุงูู ูู ุฉ ู ุญููุงู") | |
return task_func(*args, **kwargs) | |
def _offload_task(self, task_func: Callable, *args, **kwargs): | |
task_id = f"{task_func.__name__}_{time.time()}" | |
task = { | |
'task_id': task_id, | |
'function': task_func.__name__, | |
'args': args, | |
'kwargs': kwargs, | |
'sender_id': self.peer_registry.local_node_id | |
} | |
if self.available_peers: | |
lan_peers = [p for p in self.available_peers if self._is_local_ip(p['ip'])] | |
wan_peers = [p for p in self.available_peers if not self._is_local_ip(p['ip'])] | |
if lan_peers: | |
peer = min(lan_peers, key=lambda x: x['load']) | |
logging.info(f"โ Sending task {task_id} to LAN peer {peer['node_id']}") | |
else: | |
peer = min(wan_peers, key=lambda x: x['load']) | |
logging.info(f"โ Sending task {task_id} to WAN peer {peer['node_id']}") | |
self._send_to_peer(peer, task) | |
else: | |
logging.warning("โ ๏ธ ูุง ุชูุฌุฏ ุฃุฌูุฒุฉ ู ุชุงุญุฉ - ุณูุชู ุชูููุฐ ุงูู ูู ุฉ ู ุญููุงู") | |
task_func(*args, **kwargs) | |
def _is_local_ip(self, ip: str) -> bool: | |
return ( | |
ip.startswith('192.168.') or | |
ip.startswith('10.') or | |
ip.startswith('172.') or | |
ip == '127.0.0.1' | |
) | |
def _send_to_peer(self, peer: Dict, task: Dict): | |
try: | |
url = f"http://{peer['ip']}:{peer['port']}/run" | |
response = requests.post(url, json=task, timeout=10) | |
response.raise_for_status() | |
logging.info(f"โ Response from peer: {response.text}") | |
return response.json() | |
except Exception as e: | |
logging.error(f"โ ูุดู ุฅุฑุณุงู ุงูู ูู ุฉ ูู {peer['node_id']}: {e}") | |
return None | |
# โโโโโโโโโโโโโโโ ุชุดุบูู ุฑุฆูุณู โโโโโโโโโโโโโโโ | |
if __name__ == "__main__": | |
executor = DistributedExecutor("my_secret_key") | |
executor.peer_registry.register_service("node1", PORT, load=0.1) | |
print("โ ูุธุงู ุชูุฒูุน ุงูู ูุงู ุฌุงูุฒ...") | |
def example_task(x): | |
return x * x | |
executor.submit(example_task, 5, task_type="GPU") | |