import requests import io import base64 import openai from openai import OpenAI from smolagents import tool import os import io, time, itertools, functools from typing import List, Optional import sys, contextlib import av from pytube import YouTube from yt_dlp import YoutubeDL from PIL import Image from tqdm import tqdm import wikipediaapi import tempfile model_id = "gpt-4.1" @tool def read_image(query: str, img_url: str) -> str: """ Use a visual question answering (VQA) model to generate a response to a query based on an image. Args: query (str): A natural language question about the image. img_url (str): The URL of the image to analyze. Returns: str: A response generated by the VQA model based on the provided image and question. """ client = OpenAI() response = client.responses.create( model=model_id, input=[ { "role": "user", "content": [ {"type": "input_text", "text": query}, { "type": "input_image", "image_url": img_url, }, ], } ], ) return response.output_text @tool def read_code(file_url: str) -> str: """ Read the contents of a code file such as py file instead of executing it. Use this tool to analyze a code snippet. Args: file_url (str): The URL of the code file to retrieve. Returns: str: The content of the file as a string. """ response = requests.get(file_url) response.raise_for_status() return response.text @tool def transcribe_audio(file_url: str, file_name: str) -> str: """ Download and transcribe an audio file using transcription model. Args: file_url (str): Direct URL to the audio file (e.g., .mp3, .wav). file_name (str): Filename including extension, used to determine format. Returns: str: The transcribed text from the audio file. """ response = requests.get(file_url) response.raise_for_status() extension = file_name.split(".")[-1].lower() or "mp3" audio_file = io.BytesIO(response.content) audio_file.name = f"audio.{extension}" client = OpenAI() transcription = client.audio.transcriptions.create( model="gpt-4o-transcribe", file=audio_file ) return transcription.text def _pytube_buffer(url: str) -> Optional[io.BytesIO]: try: from pytube import YouTube yt = YouTube(url) stream = ( yt.streams.filter(progressive=True, file_extension="mp4") .order_by("resolution") .desc() .first() ) if stream is None: raise RuntimeError("No MP4 with audio found") buf = io.BytesIO() stream.stream_to_buffer(buf) buf.seek(0) return buf except Exception as e: print(f"[youtube_to_buffer] PyTube failed → {e}", file=sys.stderr) return None def _ytdlp_buffer(url: str) -> io.BytesIO: """ Return a BytesIO containing some MP4 video stream for `url`. Works whether YouTube serves a progressive file or separate A/V. """ ydl_opts = { "quiet": True, "skip_download": True, "format": "bestvideo[ext=mp4]/best[ext=mp4]/best", } with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) if "entries" in info: info = info["entries"][0] if "url" in info: video_urls = [info["url"]] elif "requested_formats" in info: video_urls = [ fmt["url"] for fmt in info["requested_formats"] if fmt.get("vcodec") != "none" ] if not video_urls: raise RuntimeError("yt-dlp returned audio-only formats") else: raise RuntimeError("yt-dlp could not extract a stream URL") buf = io.BytesIO() for direct_url in video_urls: with requests.get(direct_url, stream=True) as r: r.raise_for_status() for chunk in r.iter_content(chunk_size=1 << 16): buf.write(chunk) buf.seek(0) return buf @functools.lru_cache(maxsize=8) def youtube_to_buffer(url: str) -> io.BytesIO: """ Return a BytesIO containing a single progressive MP4 (H.264 + AAC) – the safest thing PyAV can open everywhere. """ ydl_opts = { "quiet": True, "skip_download": True, "format": ( "best[ext=mp4][vcodec^=avc1][acodec!=none]" "/best[ext=mp4][acodec!=none]" ), } with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) if "entries" in info: info = info["entries"][0] direct_url = info.get("url") if not direct_url: raise RuntimeError("yt-dlp could not find a progressive MP4 track") buf = io.BytesIO() with requests.get(direct_url, stream=True) as r: r.raise_for_status() for chunk in r.iter_content(chunk_size=1 << 17): buf.write(chunk) buf.seek(0) return buf def sample_frames(video_bytes: io.BytesIO, n_frames: int = 6) -> List[Image.Image]: """Decode `n_frames` uniformly spaced RGB frames as PIL images.""" container = av.open(video_bytes, metadata_errors="ignore") video = container.streams.video[0] total = video.frames or 0 step = max(1, total // n_frames) if total else 30 frames: list[Image.Image] = [] for i, frame in enumerate(container.decode(video=0)): if i % step == 0: frames.append(frame.to_image()) if len(frames) >= n_frames: break container.close() return frames def pil_to_data_url(img: Image.Image, quality: int = 80) -> str: buf = io.BytesIO() img.save(buf, format="JPEG", quality=quality, optimize=True) b64 = base64.b64encode(buf.getvalue()).decode() return f"data:image/jpeg;base64,{b64}" def save_audio_stream_to_temp_wav_file(video_bytes: io.BytesIO) -> Optional[str]: """ Extracts the audio stream from video_bytes, saves it as a temporary WAV file, and returns the path to the file. Returns None if no audio stream is found or an error occurs. """ try: video_bytes.seek(0) input_container = av.open(video_bytes, metadata_errors="ignore") if not input_container.streams.audio: print("No audio streams found in the video.", file=sys.stderr) return None input_audio_stream = input_container.streams.audio[0] with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: temp_audio_file_path = tmp_file.name output_container = av.open(temp_audio_file_path, mode="w", format="wav") channel_layout = "stereo" if ( hasattr(input_audio_stream.codec_context, "layout") and input_audio_stream.codec_context.layout ): channel_layout = input_audio_stream.codec_context.layout.name elif ( hasattr(input_audio_stream.codec_context, "channels") and input_audio_stream.codec_context.channels == 1 ): channel_layout = "mono" output_audio_stream = output_container.add_stream( "pcm_s16le", rate=input_audio_stream.codec_context.sample_rate, layout=channel_layout, ) for frame in input_container.decode(input_audio_stream): for packet in output_audio_stream.encode(frame): output_container.mux(packet) for packet in output_audio_stream.encode(): output_container.mux(packet) output_container.close() input_container.close() return temp_audio_file_path except Exception as e: print(f"Error extracting audio to temp WAV file: {e}", file=sys.stderr) if "temp_audio_file_path" in locals() and os.path.exists(temp_audio_file_path): os.remove(temp_audio_file_path) return None @tool def run_video(query: str, url: str) -> str: """ Get a YouTube video from url and return an answer to a natural-language query using the video. Args: query (str): A natural-language question whose answer is expected to be found in the visual content of the video. url (str): Fully qualified URL of the YouTube video to analyze. Returns: str: A response generated by the VQA model based on the provided video and question. """ n_frames = 4 buff = youtube_to_buffer(url) if buff is None: return "Error: Could not download or buffer the video." frames = sample_frames(buff, n_frames=n_frames) buff.seek(0) transcript = "[Audio could not be processed]" audio_file_path = None try: audio_file_path = save_audio_stream_to_temp_wav_file(buff) if audio_file_path: with open(audio_file_path, "rb") as audio_data: transcription_response = openai.audio.transcriptions.create( model="gpt-4o-transcribe", file=audio_data ) transcript = transcription_response.text else: transcript = "[No audio stream found or error during extraction]" print( "No audio file path returned, skipping transcription.", file=sys.stderr ) except Exception as e: print(f"Error during audio transcription: {e}", file=sys.stderr) transcript = f"[Error during audio transcription: {e}]" finally: if audio_file_path and os.path.exists(audio_file_path): os.remove(audio_file_path) prompt_text = f"Original Query: {query}\n\nVideo Transcript:\n{transcript}\n\nKey Visual Frames (analyze these along with the transcript to answer the query):" content = [{"type": "text", "text": prompt_text}] for img in frames: content.append( { "type": "image_url", "image_url": {"url": pil_to_data_url(img)}, } ) try: resp = openai.chat.completions.create( model=model_id, messages=[{"role": "user", "content": content}], temperature=0.1, ) result = resp.choices[0].message.content.strip() except Exception as e: print(f"Error calling OpenAI API: {e}", file=sys.stderr) result = f"[Error processing with AI model: {e}]" return result @tool def search_wikipedia(query: str) -> str: """ get the contents of wikipedia page retrieved by search query. Args: query (str): A search term to search within wikipedia. Ideally it should be one word or a group of few words. Returns: str: The text content of wikipedia page """ get_wiki = wikipediaapi.Wikipedia( language="en", user_agent="test_tokki", extract_format=wikipediaapi.ExtractFormat.WIKI, ) page_content = get_wiki.page(query) text_content = page_content.text cutoff = 25000 text_content = " ".join(text_content.split(" ")[:cutoff]) return text_content