# app.py # Single-file Flask app: # - Поиск в YouTube Music (ytmusicapi) # - Встроенный интерфейс + плеер (играет прямо в интерфейсе) # - Прокси-стрим через сервер (/play/youtube/) с поддержкой Range (progressive streaming) # - Скачивание трека (/download/youtube/) -> attachment # # Требования: # pip install flask ytmusicapi requests mutagen # yt-dlp и ffmpeg должны лежать в той же папке, что и app.py (или быть в PATH). # # Запуск: # python app.py # Открой http://localhost:5100 from flask import ( Flask, request, jsonify, render_template_string, send_file, after_this_request, Response, stream_with_context ) from ytmusicapi import YTMusic import os, subprocess, tempfile, pathlib, logging, re, shutil, requests from werkzeug.utils import secure_filename app = Flask(__name__) app.logger.setLevel(logging.INFO) BASE_DIR = pathlib.Path(__file__).resolve().parent TEMP_DIR = pathlib.Path(tempfile.gettempdir()) / "ytmusic_downloader" TEMP_DIR.mkdir(parents=True, exist_ok=True) def find_executable(name: str) -> str: exe_win = BASE_DIR / f"{name}.exe" exe_nix = BASE_DIR / name if exe_win.exists() and os.access(exe_win, os.X_OK): return str(exe_win) if exe_nix.exists() and os.access(exe_nix, os.X_OK): return str(exe_nix) return name YTDLP = find_executable("yt-dlp") FFMPEG = find_executable("ffmpeg") def clean_filename(s: str) -> str: if not s: return "" s = re.sub(r'[\/:*?"<>|]', ' ', s) s = re.sub(r'\s+', ' ', s) return s.strip() def run_subprocess(cmd, cwd=None, timeout=None): app.logger.info("Run: %s", " ".join(cmd) if isinstance(cmd, (list,tuple)) else str(cmd)) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd) try: stdout, stderr = proc.communicate(timeout=timeout) except subprocess.TimeoutExpired: proc.kill() stdout, stderr = proc.communicate() return proc.returncode, stdout.decode(errors='ignore'), stderr.decode(errors='ignore') # --- HTML шаблон (single file) --- HTML = """ YouTube Music — Поиск и встроенный плеер

🔎 YouTube Music — Поиск и встроенный плеер

Интерфейс в одном файле. yt-dlp и ffmpeg должны быть в той же папке, что и сервер.
""" @app.route("/") def index(): return render_template_string(HTML) # Поиск @app.route('/search/youtube', methods=['GET']) def search_youtube_music(): query = request.args.get('query', '').strip() limit = int(request.args.get('limit', 10)) if not query: return jsonify({'success': False, 'error': 'Поисковый запрос не указан'}), 400 try: ytmusic = YTMusic() results = ytmusic.search(query, filter='songs', limit=limit) formatted = [] for r in results: if r.get('resultType') != 'song': continue title = r.get('title') or '' artists = r.get('artists') or [] artist_name = artists[0]['name'] if artists and isinstance(artists, list) and 'name' in artists[0] else '' duration = r.get('duration', '') videoId = r.get('videoId', '') thumbnails = r.get('thumbnails') or [] thumb = thumbnails[-1]['url'] if thumbnails else '' formatted.append({ 'title': title, 'artist': artist_name, 'duration': duration, 'videoId': videoId, 'thumbnail': thumb }) return jsonify({'success': True, 'results': formatted}) except Exception as e: app.logger.exception("Ошибка при поиске в YTMusic") return jsonify({'success': False, 'error': str(e)}), 500 # Прокси-стрим с поддержкой Range и низкоуровневой передачей @app.route('/play/youtube/', methods=['GET', 'HEAD']) def play_proxy(video_id): if not video_id: return ("video_id не указан", 400) try: # Получаем прямой URL аудиопотока через yt-dlp -g cmd = [YTDLP, '-f', 'bestaudio', '-g', f'https://www.youtube.com/watch?v={video_id}'] rc, out, err = run_subprocess(cmd, cwd=str(BASE_DIR), timeout=20) if rc != 0: app.logger.error("yt-dlp -g error: %s", err) return (f"Ошибка получения аудиопотока: {err}", 500) audio_url = out.strip().splitlines()[0] if out else '' if not audio_url: app.logger.error("Не получили URL из yt-dlp -g") return ("Не удалось получить URL аудиопотока", 500) # Пробрасываем Range если пришёл headers = {"User-Agent": request.headers.get("User-Agent", "ytmusic-downloader-proxy/1.0")} range_header = request.headers.get('Range') if range_header: headers['Range'] = range_header # Делаем запрос к upstream (stream=True) и используем raw для чтения upstream = requests.get(audio_url, headers=headers, stream=True, allow_redirects=True, timeout=15) app.logger.info("Upstream headers: %s", dict(upstream.headers)) if upstream.status_code not in (200, 206): app.logger.error("Upstream returned status %s", upstream.status_code) return (f"Upstream returned {upstream.status_code}", 500) # Формируем заголовки ответа на основе upstream resp_headers = {} content_type = upstream.headers.get('Content-Type', 'audio/mpeg') resp_headers['Content-Type'] = content_type if 'Content-Length' in upstream.headers: resp_headers['Content-Length'] = upstream.headers['Content-Length'] if 'Content-Range' in upstream.headers: resp_headers['Content-Range'] = upstream.headers['Content-Range'] resp_headers['Accept-Ranges'] = upstream.headers.get('Accept-Ranges', 'bytes') resp_headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' if request.method == 'HEAD': return Response(status=upstream.status_code, headers=resp_headers) # Низкоуровневая генерация байтов — upstream.raw.read upstream.raw.decode_content = True def generate(): try: while True: chunk = upstream.raw.read(16384) if not chunk: break yield chunk finally: try: upstream.close() except Exception: pass status_code = upstream.status_code if upstream.status_code in (200,206) else 200 return Response(stream_with_context(generate()), headers=resp_headers, status=status_code) except requests.exceptions.RequestException as e: app.logger.exception("Ошибка запроса к upstream") return (f"Ошибка запроса к upstream: {str(e)}", 500) except Exception as e: app.logger.exception("Ошибка stream proxy") return (f"Ошибка сервера: {str(e)}", 500) # favicon чтобы убрать 404 @app.route('/favicon.ico') def favicon(): return ('', 204) # Скачать трек и вернуть как attachment @app.route('/download/youtube/', methods=['GET','POST']) def download_youtube(video_id): if not video_id: return jsonify({'success': False, 'error': 'video_id не указан'}), 400 try: title = None artist = None try: ytmusic = YTMusic() info = ytmusic.get_song(video_id) if info: title = info.get('videoDetails', {}).get('title') or title author = info.get('videoDetails', {}).get('author') if author: artist = author for k in ('artists','artist'): if k in info and isinstance(info[k], list): names = [] for a in info[k]: if isinstance(a, dict) and a.get('name'): names.append(a.get('name')) elif isinstance(a, str): names.append(a) if names: artist = ', '.join(names) except Exception: app.logger.info("YTMusic info failed, fallback to id") safe_title = clean_filename(title) if title else video_id safe_artist = clean_filename(artist) if artist else "unknown" final_name = f"{safe_artist} - {safe_title}.mp3" final_name = secure_filename(final_name) temp_template = TEMP_DIR / f"temp_{video_id}_{os.getpid()}.%(ext)s" temp_template_str = str(temp_template) music_url = f"https://music.youtube.com/watch?v={video_id}" cmd = [ YTDLP, '--no-playlist', '--extract-audio', '--audio-format', 'mp3', '--audio-quality', '0', '--add-metadata', '--embed-thumbnail', '--no-keep-video', '--no-overwrites', '--prefer-ffmpeg', '--output', temp_template_str, music_url ] rc, out, err = run_subprocess(cmd, cwd=str(BASE_DIR)) if rc != 0: app.logger.error("yt-dlp error: %s", err) return jsonify({'success': False, 'error': err}), 500 created_files = list(TEMP_DIR.glob(f"temp_{video_id}_*.*")) created = None if created_files: created = sorted(created_files, key=lambda p: p.stat().st_mtime, reverse=True)[0] else: mp3s = sorted(TEMP_DIR.glob("*.mp3"), key=lambda p: p.stat().st_mtime, reverse=True) if mp3s: created = mp3s[0] if not created or not created.exists(): app.logger.error("mp3 not found after yt-dlp") return jsonify({'success': False, 'error': 'mp3 not found after yt-dlp'}), 500 send_path = TEMP_DIR / final_name try: created.replace(send_path) except Exception: shutil.copy2(created, send_path) @after_this_request def cleanup(response): try: if send_path.exists(): send_path.unlink() if created.exists(): try: created.unlink() except Exception: pass except Exception: app.logger.exception("cleanup error") return response return send_file(str(send_path), as_attachment=True, download_name=final_name) except Exception as e: app.logger.exception("download error") return jsonify({'success': False, 'error': str(e)}), 500 if __name__ == "__main__": TEMP_DIR.mkdir(parents=True, exist_ok=True) app.run(host="0.0.0.0", port=5100, debug=True)