""" internet_scanner.py - ماسح للبحث عن أجهزة DTS على الإنترنت """ import requests import threading import time import socket from concurrent.futures import ThreadPoolExecutor, as_completed import logging from peer_discovery import PORT class InternetScanner: def __init__(self): self.discovered_peers = set() self.scan_ranges = [ # نطاقات IP شائعة للخوادم العامة "8.8.8.0/24", # Google DNS range "1.1.1.0/24", # Cloudflare range "208.67.222.0/24", # OpenDNS range ] def scan_ip_range(self, ip_range: str, port: int = PORT): """مسح نطاق IP للبحث عن خوادم DTS""" import ipaddress try: network = ipaddress.ip_network(ip_range, strict=False) active_peers = [] with ThreadPoolExecutor(max_workers=50) as executor: futures = [] for ip in network.hosts(): future = executor.submit(self.check_dts_node, str(ip), port) futures.append(future) for future in as_completed(futures, timeout=30): try: result = future.result() if result: active_peers.append(result) except: continue return active_peers except Exception as e: logging.error(f"خطأ في مسح النطاق {ip_range}: {e}") return [] def check_dts_node(self, ip: str, port: int = PORT) -> str: """فحص IP معين للتأكد من وجود خادم DTS مع المشروع""" try: # فحص صفحة الصحة العامة health_url = f"http://{ip}:{port}/health" response = requests.get(health_url, timeout=2) if response.status_code == 200: # فحص وجود المشروع الصحيح run_url = f"http://{ip}:{port}/run" # اختبار مهمة من المشروع للتأكد test_payload = { "func": "matrix_multiply", "args": [2], "kwargs": {} } test_response = requests.post(run_url, json=test_payload, timeout=3) # فحص إضافي للتأكد من هوية المشروع project_check = requests.get(f"http://{ip}:{port}/project_info", timeout=2) if (test_response.status_code in [200, 404] and project_check.status_code == 200): project_data = project_check.json() # التحقق من معرف المشروع الصحيح if (project_data.get("project_name") == "distributed-task-system" and project_data.get("version") == "1.0"): logging.info(f"✅ اكتُشف خادم DTS صحيح: {ip}:{port}") return run_url else: logging.warning(f"⚠️ خادم على {ip}:{port} لكن مشروع مختلف") except: pass return None def scan_public_repositories(self): """البحث في المستودعات العامة عن عناوين خوادم DTS""" try: # البحث في GitHub عن مشاريع DTS github_api = "https://api.github.com/search/repositories" params = { "q": "distributed task system port:PORT", "sort": "updated", "per_page": 10 } response = requests.get(github_api, params=params, timeout=10) if response.status_code == 200: repos = response.json().get("items", []) for repo in repos: # محاولة استخراج IPs من وصف المشروع أو README if repo.get("description"): self.extract_ips_from_text(repo["description"]) except Exception as e: logging.warning(f"خطأ في البحث في المستودعات: {e}") def extract_ips_from_text(self, text: str): """استخراج عناوين IP من النص""" import re ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b' ips = re.findall(ip_pattern, text) for ip in ips: try: # التحقق من صحة IP socket.inet_aton(ip) peer_url = f"http://{ip}:PORT/run" # فحص سريع if self.check_dts_node(ip): self.discovered_peers.add(peer_url) except: continue def start_continuous_scan(self): """بدء المسح المستمر""" def scan_loop(): while True: try: # مسح النطاقات المحددة for ip_range in self.scan_ranges: peers = self.scan_ip_range(ip_range) for peer in peers: self.discovered_peers.add(peer) # البحث في المستودعات العامة self.scan_public_repositories() logging.info(f"اكتُشف {len(self.discovered_peers)} خادم على الإنترنت") except Exception as e: logging.error(f"خطأ في المسح المستمر: {e}") # انتظار 30 دقيقة قبل المسح التالي time.sleep(1800) thread = threading.Thread(target=scan_loop, daemon=True) thread.start() logging.info("🔍 بدء المسح المستمر للإنترنت") def get_discovered_peers(self): """الحصول على قائمة الأجهزة المكتشفة""" return list(self.discovered_peers) # إنشاء مثيل عام internet_scanner = InternetScanner()