""" CroxyProxy Rotating Proxy API - HuggingFace Spaces 90 servers: Paris, LA, Dallas, Warsaw, Amsterdam... GET /health - Status GET /servers - List 90 servers POST /proxy/fetch - Rotating proxy POST /proxy/random - Random server POST /proxy/batch - Multiple URLs """ import json, base64, re, random, time, threading from datetime import datetime, timezone from flask import Flask, request, jsonify from bs4 import BeautifulSoup import cloudscraper from html import unescape import warnings warnings.filterwarnings("ignore") BASE = "https://www.croxyproxy.com" app = Flask(__name__) KEEP_HEADERS = { "content-type", "content-length", "content-encoding", "server", "date", "connection", "access-control-allow-origin", "access-control-allow-credentials", "cache-control", "etag", "last-modified", "x-ratelimit-limit", "x-ratelimit-remaining", "x-request-id", "location", "retry-after", } DROP_HEADERS = { "set-cookie", "__cph", "__cpc", "content-security-policy", "strict-transport-security", "referrer-policy", "access-control-allow-headers", "x-frame-options", "x-content-type-options", "permissions-policy", "cross-origin-opener-policy", "cross-origin-embedder-policy", } class S: servers = [] idx = 0 lock = threading.Lock() last = None stats = {"req": 0, "ok": 0, "fail": 0} def dec(e): try: return json.loads(bytes.fromhex(base64.b64decode(e).decode()).decode()) except Exception: return None def filter_headers(raw_headers, include_all=False): if include_all: return dict(raw_headers) cleaned = {} for k, v in raw_headers.items(): kl = k.lower() if kl in DROP_HEADERS: continue if kl in KEEP_HEADERS: cleaned[k] = v return cleaned def parse_body(text, content_type=""): if not text: return None if "json" in content_type.lower() or text.strip().startswith(("{", "[")): try: return json.loads(text) except (json.JSONDecodeError, ValueError): pass if "html" in content_type.lower() or text.strip().startswith("<"): return { "_type": "html", "_length": len(text), "content": text, "_preview": text[:300].strip() + ("..." if len(text) > 300 else ""), } if len(text) > 2000: return { "_type": "text", "_length": len(text), "content": text, "_preview": text[:500].strip() + "...", } return text def extract_ip(url_str): return (url_str or "").replace("https://", "").replace("http://", "").split("/")[0] def format_result(raw, include_raw_headers=False): if not raw.get("success"): return { "success": False, "error": raw.get("error"), "server": raw.get("server"), } ct = "" if raw.get("headers"): ct = raw["headers"].get("Content-Type", raw["headers"].get("content-type", "")) result = { "success": True, "status": raw.get("status"), "url": raw.get("url"), "body": parse_body(raw.get("body", ""), ct), "proxy": raw.get("proxy"), "servers_available": raw.get("servers_available"), } if raw.get("headers"): result["headers"] = filter_headers(raw["headers"], include_all=include_raw_headers) return result def fetch_raw(url, sid=None): sc = cloudscraper.create_scraper( browser={"browser": "chrome", "platform": "windows", "desktop": True} ) S.stats["req"] += 1 try: r1 = sc.get(BASE, timeout=30) if r1.status_code != 200: S.stats["fail"] += 1 return {"success": False, "error": f"Homepage {r1.status_code}"} s1 = BeautifulSoup(r1.text, "lxml") ci = s1.find("input", {"name": "csrf"}) if not ci: S.stats["fail"] += 1 return {"success": False, "error": "No CSRF"} r2 = sc.post( f"{BASE}/servers", data={ "url": url, "proxyServerId": "274", "csrf": ci["value"], "demo": "0", "frontOrigin": BASE, }, headers={ "Content-Type": "application/x-www-form-urlencoded", "Origin": BASE, "Referer": BASE + "/", }, allow_redirects=True, timeout=30, ) if r2.status_code != 200: S.stats["fail"] += 1 return {"success": False, "error": f"Servers {r2.status_code}"} s2 = BeautifulSoup(r2.text, "lxml") sel = s2.find("script", {"id": "serverSelectorScript"}) if not sel: S.stats["fail"] += 1 return {"success": False, "error": "No selector"} ss = [ x for x in (dec(i) for i in json.loads(unescape(sel.get("data-ss", "")))) if x and x.get("id") ] csrf2 = unescape(sel.get("data-csrf", "")).strip('"') fo = unescape(sel.get("data-fo", "")).strip('"') if not ss: S.stats["fail"] += 1 return {"success": False, "error": "No servers"} S.servers = ss S.last = datetime.now(timezone.utc).isoformat() ch = None if sid: ch = next((x for x in ss if x["id"] == sid), None) if not ch: with S.lock: ch = ss[S.idx % len(ss)] S.idx += 1 r3 = sc.post( f"{BASE}/requests?fso=", data={ "url": url, "proxyServerId": str(ch["id"]), "csrf": csrf2, "demo": "0", "frontOrigin": fo, }, headers={ "Content-Type": "application/x-www-form-urlencoded", "Origin": BASE, "Referer": f"{BASE}/servers", }, allow_redirects=False, timeout=30, ) loc = r3.headers.get("Location") or r3.headers.get("location") if not loc: S.stats["fail"] += 1 return { "success": False, "error": f"No redirect ({r3.status_code})", "server": ch.get("name"), } r4 = sc.get(loc, timeout=30, allow_redirects=True) dr = re.search(r'data-r="([^"]+)"', r4.text) if not dr: S.stats["fail"] += 1 return {"success": False, "error": "No data-r", "server": ch.get("name")} final = base64.b64decode(dr.group(1)).decode() r5 = sc.get(final, timeout=30, allow_redirects=True) S.stats["ok"] += 1 return { "success": True, "status": r5.status_code, "headers": dict(r5.headers), "body": r5.text, "url": url, "proxy": { "server_id": ch["id"], "server_name": ch.get("name"), "ip": extract_ip(ch.get("url", "")), }, "servers_available": len(ss), } except Exception as e: S.stats["fail"] += 1 return {"success": False, "error": str(e)} def warmup(): """Populate server list on startup.""" print("Warming up — populating server list...") result = fetch_raw("https://httpbin.org/ip") if result.get("success"): print(f"✓ {len(S.servers)} servers loaded") else: print(f"✗ Warm-up failed: {result.get('error')}") # Gunicorn hook — runs in each worker after fork def post_fork(server, worker): warmup() # Runs at import time — works for both gunicorn and direct python warmup() # ═══════════════════════════════════════════════ # ROUTES # ═══════════════════════════════════════════════ @app.route("/") def index(): return jsonify({ "name": "CroxyProxy Rotating Proxy API", "version": "2.0", "endpoints": { "GET /health": "Status + stats", "GET /servers": "List all servers", "POST /proxy/fetch": "Rotating proxy {url, server_id?, raw_headers?}", "POST /proxy/random": "Random server {url, raw_headers?}", "POST /proxy/batch": "Multiple URLs {urls: [...], raw_headers?}", }, "notes": { "raw_headers": "Set to true to get ALL response headers (default: filtered)", "body": "JSON bodies are auto-parsed. HTML is truncated with preview.", }, }) @app.route("/health") def health(): return jsonify({ "status": "ready", "servers": len(S.servers), "last_refresh": S.last, "stats": S.stats, }) @app.route("/servers") def servers(): return jsonify({ "count": len(S.servers), "servers": [ { "id": s.get("id"), "name": s.get("name"), "ip": extract_ip(s.get("url", "")), } for s in S.servers ], }) @app.route("/proxy/fetch", methods=["POST"]) def proxy_fetch(): d = request.get_json() or {} if not d.get("url"): return jsonify({"error": "url required"}), 400 raw = fetch_raw(d["url"], d.get("server_id")) return jsonify(format_result(raw, include_raw_headers=d.get("raw_headers", False))) @app.route("/proxy/random", methods=["POST"]) def proxy_random(): d = request.get_json() or {} if not d.get("url"): return jsonify({"error": "url required"}), 400 sid = random.choice(S.servers)["id"] if S.servers else None raw = fetch_raw(d["url"], sid) return jsonify(format_result(raw, include_raw_headers=d.get("raw_headers", False))) @app.route("/proxy/batch", methods=["POST"]) def proxy_batch(): d = request.get_json() or {} urls = d.get("urls", []) if not urls: return jsonify({"error": "urls required"}), 400 include_raw = d.get("raw_headers", False) results = [] for u in urls: raw = fetch_raw(u) results.append(format_result(raw, include_raw_headers=include_raw)) time.sleep(0.5) return jsonify({ "count": len(results), "success_count": sum(1 for r in results if r.get("success")), "results": results, }) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)