|
|
|
|
|
import requests |
|
|
import feedparser |
|
|
import yt_dlp |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
def search_podcast_series(query: str): |
|
|
base_url = "https://itunes.apple.com/search" |
|
|
params = {"term": query, "media": "podcast", "entity": "podcast", "limit": 10} |
|
|
try: |
|
|
response = requests.get(base_url, params=params, timeout=15) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
series_list = [] |
|
|
for series in data.get("results", []): |
|
|
series_list.append({ |
|
|
"id": series.get("collectionId"), |
|
|
"title": series.get("collectionName", "Untitled"), |
|
|
"artist": series.get("artistName", "Unknown"), |
|
|
"feed_url": series.get("feedUrl", ""), |
|
|
"thumbnail": series.get("artworkUrl600", ""), |
|
|
"episode_count": series.get("trackCount", 0) |
|
|
}) |
|
|
return series_list |
|
|
except Exception as e: |
|
|
print(f"Series search error: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def fetch_episodes(feed_url: str): |
|
|
try: |
|
|
feed = feedparser.parse(feed_url) |
|
|
episodes = [] |
|
|
for entry in feed.entries: |
|
|
audio_url = None |
|
|
for link in entry.get('links', []): |
|
|
if link.get('type', '').startswith('audio/'): |
|
|
audio_url = link.href |
|
|
break |
|
|
if not audio_url: |
|
|
continue |
|
|
episodes.append({ |
|
|
"title": entry.get("title", "Untitled Episode"), |
|
|
"published": entry.get("published", ""), |
|
|
"audio_url": audio_url, |
|
|
"description": entry.get("description", ""), |
|
|
"duration": entry.get("itunes_duration", "0") |
|
|
}) |
|
|
return episodes |
|
|
except Exception as e: |
|
|
print(f"RSS parse error: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def download_podcast_audio(audio_url: str, title: str, status: str) -> tuple: |
|
|
if not audio_url: |
|
|
return None, f"{status}\n❌ No audio URL available" |
|
|
new_status = status + f"\n⏬ Downloading: {title}" |
|
|
temp_dir = tempfile.mkdtemp() |
|
|
ydl_opts = { |
|
|
"format": "bestaudio/best", |
|
|
"outtmpl": os.path.join(temp_dir, "%(title)s.%(ext)s"), |
|
|
"postprocessors": [{"key": "FFmpegExtractAudio", "preferredcodec": "mp3"}], |
|
|
"quiet": True, |
|
|
"socket_timeout": 60, |
|
|
"retries": 5 |
|
|
} |
|
|
try: |
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
ydl.download([audio_url]) |
|
|
mp3_files = [f for f in os.listdir(temp_dir) if f.endswith('.mp3')] |
|
|
if mp3_files: |
|
|
audio_path = os.path.join(temp_dir, mp3_files[0]) |
|
|
new_status += f"\n✅ Downloaded: {mp3_files[0]}" |
|
|
return audio_path, new_status |
|
|
raise FileNotFoundError("MP3 file not found") |
|
|
except Exception as e: |
|
|
new_status += f"\n❌ Download error: {str(e)}" |
|
|
return None, new_status |
|
|
|
|
|
def fetch_audio(youtube_url: str, status: str) -> tuple: |
|
|
try: |
|
|
new_status = status + "\n⏬ Downloading audio from URL..." |
|
|
temp_dir = tempfile.mkdtemp() |
|
|
filename = f"yt_audio_{os.urandom(8).hex()}" |
|
|
temp_path = os.path.join(temp_dir, filename) |
|
|
ydl_opts = { |
|
|
'format': 'bestaudio/best', |
|
|
'outtmpl': temp_path, |
|
|
'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3'}], |
|
|
'quiet': True, |
|
|
} |
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
ydl.download([youtube_url]) |
|
|
audio_path = temp_path + '.mp3' |
|
|
if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0: |
|
|
raise ValueError("Downloaded file is empty") |
|
|
new_status += "\n✅ Downloaded audio" |
|
|
return audio_path, new_status |
|
|
except Exception as e: |
|
|
new_status += f"\n❌ Error: {str(e)}" |
|
|
return None, new_status |