File size: 11,203 Bytes
c5ed555
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
import requests
import io
import base64
import openai
from openai import OpenAI
from smolagents import tool
import os

import io, time, itertools, functools
from typing import List, Optional
import sys, contextlib

import av
from pytube import YouTube
from yt_dlp import YoutubeDL

from PIL import Image
from tqdm import tqdm
import wikipediaapi
import tempfile

model_id = "gpt-4.1"


@tool
def read_image(query: str, img_url: str) -> str:
    """
    Use a visual question answering (VQA) model to generate a response to a query based on an image.
    Args:
        query (str): A natural language question about the image.
        img_url (str): The URL of the image to analyze.
    Returns:
        str: A response generated by the VQA model based on the provided image and question.
    """
    client = OpenAI()
    response = client.responses.create(
        model=model_id,
        input=[
            {
                "role": "user",
                "content": [
                    {"type": "input_text", "text": query},
                    {
                        "type": "input_image",
                        "image_url": img_url,
                    },
                ],
            }
        ],
    )
    return response.output_text


@tool
def read_code(file_url: str) -> str:
    """
    Read the contents of a code file such as py file instead of executing it. Use this tool to analyze a code snippet.
    Args:
        file_url (str): The URL of the code file to retrieve.
    Returns:
        str: The content of the file as a string.
    """
    response = requests.get(file_url)
    response.raise_for_status()
    return response.text


@tool
def transcribe_audio(file_url: str, file_name: str) -> str:
    """
    Download and transcribe an audio file using transcription model.
    Args:
        file_url (str): Direct URL to the audio file (e.g., .mp3, .wav).
        file_name (str): Filename including extension, used to determine format.
    Returns:
        str: The transcribed text from the audio file.
    """
    response = requests.get(file_url)
    response.raise_for_status()

    extension = file_name.split(".")[-1].lower() or "mp3"

    audio_file = io.BytesIO(response.content)
    audio_file.name = f"audio.{extension}"

    client = OpenAI()
    transcription = client.audio.transcriptions.create(
        model="gpt-4o-transcribe", file=audio_file
    )

    return transcription.text


def _pytube_buffer(url: str) -> Optional[io.BytesIO]:
    try:
        from pytube import YouTube

        yt = YouTube(url)
        stream = (
            yt.streams.filter(progressive=True, file_extension="mp4")
            .order_by("resolution")
            .desc()
            .first()
        )
        if stream is None:
            raise RuntimeError("No MP4 with audio found")
        buf = io.BytesIO()
        stream.stream_to_buffer(buf)
        buf.seek(0)
        return buf
    except Exception as e:
        print(f"[youtube_to_buffer] PyTube failed → {e}", file=sys.stderr)
        return None


def _ytdlp_buffer(url: str) -> io.BytesIO:
    """
    Return a BytesIO containing some MP4 video stream for `url`.
    Works whether YouTube serves a progressive file or separate A/V.
    """
    ydl_opts = {
        "quiet": True,
        "skip_download": True,
        "format": "bestvideo[ext=mp4]/best[ext=mp4]/best",
    }
    with YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=False)
        if "entries" in info:
            info = info["entries"][0]

    if "url" in info:
        video_urls = [info["url"]]

    elif "requested_formats" in info:
        video_urls = [
            fmt["url"]
            for fmt in info["requested_formats"]
            if fmt.get("vcodec") != "none"
        ]
        if not video_urls:
            raise RuntimeError("yt-dlp returned audio-only formats")

    else:
        raise RuntimeError("yt-dlp could not extract a stream URL")

    buf = io.BytesIO()
    for direct_url in video_urls:
        with requests.get(direct_url, stream=True) as r:
            r.raise_for_status()
            for chunk in r.iter_content(chunk_size=1 << 16):
                buf.write(chunk)

    buf.seek(0)
    return buf


@functools.lru_cache(maxsize=8)
def youtube_to_buffer(url: str) -> io.BytesIO:
    """
    Return a BytesIO containing a single progressive MP4
    (H.264 + AAC) – the safest thing PyAV can open everywhere.
    """
    ydl_opts = {
        "quiet": True,
        "skip_download": True,
        "format": (
            "best[ext=mp4][vcodec^=avc1][acodec!=none]" "/best[ext=mp4][acodec!=none]"
        ),
    }

    with YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=False)
        if "entries" in info:
            info = info["entries"][0]

    direct_url = info.get("url")
    if not direct_url:
        raise RuntimeError("yt-dlp could not find a progressive MP4 track")

    buf = io.BytesIO()
    with requests.get(direct_url, stream=True) as r:
        r.raise_for_status()
        for chunk in r.iter_content(chunk_size=1 << 17):
            buf.write(chunk)

    buf.seek(0)
    return buf


def sample_frames(video_bytes: io.BytesIO, n_frames: int = 6) -> List[Image.Image]:
    """Decode `n_frames` uniformly spaced RGB frames as PIL images."""
    container = av.open(video_bytes, metadata_errors="ignore")
    video = container.streams.video[0]
    total = video.frames or 0

    step = max(1, total // n_frames) if total else 30

    frames: list[Image.Image] = []
    for i, frame in enumerate(container.decode(video=0)):
        if i % step == 0:
            frames.append(frame.to_image())
        if len(frames) >= n_frames:
            break
    container.close()
    return frames


def pil_to_data_url(img: Image.Image, quality: int = 80) -> str:
    buf = io.BytesIO()
    img.save(buf, format="JPEG", quality=quality, optimize=True)
    b64 = base64.b64encode(buf.getvalue()).decode()
    return f"data:image/jpeg;base64,{b64}"


def save_audio_stream_to_temp_wav_file(video_bytes: io.BytesIO) -> Optional[str]:
    """
    Extracts the audio stream from video_bytes, saves it as a temporary WAV file,
    and returns the path to the file.
    Returns None if no audio stream is found or an error occurs.
    """
    try:
        video_bytes.seek(0)
        input_container = av.open(video_bytes, metadata_errors="ignore")

        if not input_container.streams.audio:
            print("No audio streams found in the video.", file=sys.stderr)
            return None
        input_audio_stream = input_container.streams.audio[0]
        with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
            temp_audio_file_path = tmp_file.name

        output_container = av.open(temp_audio_file_path, mode="w", format="wav")

        channel_layout = "stereo"
        if (
            hasattr(input_audio_stream.codec_context, "layout")
            and input_audio_stream.codec_context.layout
        ):
            channel_layout = input_audio_stream.codec_context.layout.name
        elif (
            hasattr(input_audio_stream.codec_context, "channels")
            and input_audio_stream.codec_context.channels == 1
        ):
            channel_layout = "mono"

        output_audio_stream = output_container.add_stream(
            "pcm_s16le",
            rate=input_audio_stream.codec_context.sample_rate,
            layout=channel_layout,
        )

        for frame in input_container.decode(input_audio_stream):
            for packet in output_audio_stream.encode(frame):
                output_container.mux(packet)

        for packet in output_audio_stream.encode():
            output_container.mux(packet)

        output_container.close()
        input_container.close()
        return temp_audio_file_path

    except Exception as e:
        print(f"Error extracting audio to temp WAV file: {e}", file=sys.stderr)
        if "temp_audio_file_path" in locals() and os.path.exists(temp_audio_file_path):
            os.remove(temp_audio_file_path)
        return None


@tool
def run_video(query: str, url: str) -> str:
    """
    Get a YouTube video from url and return an answer to a natural-language query using the video.
    Args:
        query (str):  A natural-language question whose answer is expected to be found in the visual content of the video.
        url (str): Fully qualified URL of the YouTube video to analyze.
    Returns:
        str: A response generated by the VQA model based on the provided video and question.
    """
    n_frames = 4
    buff = youtube_to_buffer(url)
    if buff is None:
        return "Error: Could not download or buffer the video."

    frames = sample_frames(buff, n_frames=n_frames)
    buff.seek(0)

    transcript = "[Audio could not be processed]"
    audio_file_path = None
    try:
        audio_file_path = save_audio_stream_to_temp_wav_file(buff)
        if audio_file_path:
            with open(audio_file_path, "rb") as audio_data:
                transcription_response = openai.audio.transcriptions.create(
                    model="gpt-4o-transcribe", file=audio_data
                )
                transcript = transcription_response.text
        else:
            transcript = "[No audio stream found or error during extraction]"
            print(
                "No audio file path returned, skipping transcription.", file=sys.stderr
            )
    except Exception as e:
        print(f"Error during audio transcription: {e}", file=sys.stderr)
        transcript = f"[Error during audio transcription: {e}]"
    finally:
        if audio_file_path and os.path.exists(audio_file_path):
            os.remove(audio_file_path)

    prompt_text = f"Original Query: {query}\n\nVideo Transcript:\n{transcript}\n\nKey Visual Frames (analyze these along with the transcript to answer the query):"

    content = [{"type": "text", "text": prompt_text}]

    for img in frames:
        content.append(
            {
                "type": "image_url",
                "image_url": {"url": pil_to_data_url(img)},
            }
        )

    try:
        resp = openai.chat.completions.create(
            model=model_id,
            messages=[{"role": "user", "content": content}],
            temperature=0.1,
        )
        result = resp.choices[0].message.content.strip()
    except Exception as e:
        print(f"Error calling OpenAI API: {e}", file=sys.stderr)
        result = f"[Error processing with AI model: {e}]"

    return result


@tool
def search_wikipedia(query: str) -> str:
    """
    get the contents of wikipedia page retrieved by search query.
    Args:
        query (str):  A search term to search within wikipedia. Ideally it should be one word or a group of few words.
    Returns:
        str: The text content of wikipedia page
    """
    get_wiki = wikipediaapi.Wikipedia(
        language="en",
        user_agent="test_tokki",
        extract_format=wikipediaapi.ExtractFormat.WIKI,
    )
    page_content = get_wiki.page(query)
    text_content = page_content.text

    cutoff = 25000
    text_content = " ".join(text_content.split(" ")[:cutoff])
    return text_content