|
import requests |
|
import io |
|
import base64 |
|
import openai |
|
from openai import OpenAI |
|
from smolagents import tool |
|
import os |
|
import pandas as pd |
|
import functools |
|
from typing import List, Optional, Dict, Any |
|
import sys |
|
|
|
import av |
|
from yt_dlp import YoutubeDL |
|
|
|
from PIL import Image |
|
import wikipediaapi |
|
import tempfile |
|
|
|
model_id = "gpt-4.1" |
|
|
|
|
|
@tool |
|
def read_image(query: str, img_url: str) -> str: |
|
""" |
|
Use a visual question answering (VQA) model to generate a response to a query based on an image. |
|
|
|
Args: |
|
query (str): A natural language question about the image. |
|
img_url (str): The URL of the image to analyze. |
|
|
|
Returns: |
|
str: A response generated by the VQA model based on the provided image and question. |
|
""" |
|
client = OpenAI() |
|
response = client.responses.create( |
|
model=model_id, |
|
input=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "input_text", "text": query}, |
|
{ |
|
"type": "input_image", |
|
"image_url": img_url, |
|
}, |
|
], |
|
} |
|
], |
|
) |
|
return response.output_text |
|
|
|
|
|
@tool |
|
def read_code(file_url: str) -> str: |
|
""" |
|
Read the contents of a code file such as py file instead of executing it. Use this tool to analyze a code snippet. |
|
|
|
Args: |
|
file_url (str): The URL of the code file to retrieve. |
|
|
|
Returns: |
|
str: The content of the file as a string. |
|
""" |
|
response = requests.get(file_url) |
|
response.raise_for_status() |
|
return response.text |
|
|
|
|
|
@tool |
|
def transcribe_audio(file_url: str, file_name: str) -> str: |
|
""" |
|
Download and transcribe an audio file using transcription model. |
|
|
|
Args: |
|
file_url (str): Direct URL to the audio file (e.g., .mp3, .wav). |
|
file_name (str): Filename including extension, used to determine format. |
|
|
|
Returns: |
|
str: The transcribed text from the audio file. |
|
""" |
|
|
|
response = requests.get(file_url) |
|
response.raise_for_status() |
|
|
|
|
|
extension = file_name.split(".")[-1].lower() or "mp3" |
|
|
|
|
|
audio_file = io.BytesIO(response.content) |
|
audio_file.name = f"audio.{extension}" |
|
|
|
|
|
client = OpenAI() |
|
transcription = client.audio.transcriptions.create( |
|
model="gpt-4o-transcribe", file=audio_file |
|
) |
|
|
|
return transcription.text |
|
|
|
|
|
|
|
def _pytube_buffer(url: str) -> Optional[io.BytesIO]: |
|
try: |
|
from pytube import YouTube |
|
|
|
yt = YouTube(url) |
|
stream = ( |
|
yt.streams.filter(progressive=True, file_extension="mp4") |
|
.order_by("resolution") |
|
.desc() |
|
.first() |
|
) |
|
if stream is None: |
|
raise RuntimeError("No MP4 with audio found") |
|
buf = io.BytesIO() |
|
stream.stream_to_buffer(buf) |
|
buf.seek(0) |
|
return buf |
|
except Exception as e: |
|
print(f"[youtube_to_buffer] PyTube failed → {e}", file=sys.stderr) |
|
return None |
|
|
|
|
|
def _ytdlp_buffer(url: str) -> io.BytesIO: |
|
""" |
|
Return a BytesIO containing some MP4 video stream for `url`. |
|
Works whether YouTube serves a progressive file or separate A/V. |
|
""" |
|
ydl_opts = { |
|
"quiet": True, |
|
"skip_download": True, |
|
"format": "bestvideo[ext=mp4]/best[ext=mp4]/best", |
|
} |
|
with YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(url, download=False) |
|
if "entries" in info: |
|
info = info["entries"][0] |
|
|
|
if "url" in info: |
|
video_urls = [info["url"]] |
|
|
|
elif "requested_formats" in info: |
|
video_urls = [ |
|
fmt["url"] |
|
for fmt in info["requested_formats"] |
|
if fmt.get("vcodec") != "none" |
|
] |
|
if not video_urls: |
|
raise RuntimeError("yt-dlp returned audio-only formats") |
|
|
|
else: |
|
raise RuntimeError("yt-dlp could not extract a stream URL") |
|
|
|
buf = io.BytesIO() |
|
for direct_url in video_urls: |
|
with requests.get(direct_url, stream=True) as r: |
|
r.raise_for_status() |
|
for chunk in r.iter_content(chunk_size=1 << 16): |
|
buf.write(chunk) |
|
|
|
buf.seek(0) |
|
return buf |
|
|
|
|
|
@functools.lru_cache(maxsize=8) |
|
def youtube_to_buffer(url: str) -> io.BytesIO: |
|
""" |
|
|
|
Return a BytesIO containing a single progressive MP4 |
|
(H.264 + AAC) – the safest thing PyAV can open everywhere. |
|
""" |
|
ydl_opts = { |
|
"quiet": True, |
|
"skip_download": True, |
|
|
|
"format": ( |
|
"best[ext=mp4][vcodec^=avc1][acodec!=none]" |
|
"/best[ext=mp4][acodec!=none]" |
|
), |
|
} |
|
|
|
with YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(url, download=False) |
|
if "entries" in info: |
|
info = info["entries"][0] |
|
|
|
direct_url = info.get("url") |
|
if not direct_url: |
|
raise RuntimeError("yt-dlp could not find a progressive MP4 track") |
|
|
|
|
|
buf = io.BytesIO() |
|
with requests.get(direct_url, stream=True) as r: |
|
r.raise_for_status() |
|
for chunk in r.iter_content(chunk_size=1 << 17): |
|
buf.write(chunk) |
|
|
|
buf.seek(0) |
|
return buf |
|
|
|
|
|
def sample_frames(video_bytes: io.BytesIO, n_frames: int = 6) -> List[Image.Image]: |
|
"""Decode `n_frames` uniformly spaced RGB frames as PIL images.""" |
|
container = av.open(video_bytes, metadata_errors="ignore") |
|
video = container.streams.video[0] |
|
total = video.frames or 0 |
|
|
|
|
|
step = max(1, total // n_frames) if total else 30 |
|
|
|
frames: list[Image.Image] = [] |
|
for i, frame in enumerate(container.decode(video=0)): |
|
if i % step == 0: |
|
frames.append(frame.to_image()) |
|
if len(frames) >= n_frames: |
|
break |
|
container.close() |
|
return frames |
|
|
|
|
|
def pil_to_data_url(img: Image.Image, quality: int = 80) -> str: |
|
buf = io.BytesIO() |
|
img.save(buf, format="JPEG", quality=quality, optimize=True) |
|
b64 = base64.b64encode(buf.getvalue()).decode() |
|
return f"data:image/jpeg;base64,{b64}" |
|
|
|
|
|
def save_audio_stream_to_temp_wav_file(video_bytes: io.BytesIO) -> Optional[str]: |
|
""" |
|
Extracts the audio stream from video_bytes, saves it as a temporary WAV file, |
|
and returns the path to the file. |
|
Returns None if no audio stream is found or an error occurs. |
|
""" |
|
try: |
|
video_bytes.seek(0) |
|
input_container = av.open(video_bytes, metadata_errors="ignore") |
|
|
|
if not input_container.streams.audio: |
|
print("No audio streams found in the video.", file=sys.stderr) |
|
return None |
|
input_audio_stream = input_container.streams.audio[0] |
|
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: |
|
temp_audio_file_path = tmp_file.name |
|
|
|
output_container = av.open(temp_audio_file_path, mode="w", format="wav") |
|
|
|
|
|
|
|
|
|
channel_layout = "stereo" |
|
if ( |
|
hasattr(input_audio_stream.codec_context, "layout") |
|
and input_audio_stream.codec_context.layout |
|
): |
|
channel_layout = input_audio_stream.codec_context.layout.name |
|
elif ( |
|
hasattr(input_audio_stream.codec_context, "channels") |
|
and input_audio_stream.codec_context.channels == 1 |
|
): |
|
channel_layout = "mono" |
|
|
|
output_audio_stream = output_container.add_stream( |
|
"pcm_s16le", |
|
rate=input_audio_stream.codec_context.sample_rate, |
|
layout=channel_layout, |
|
) |
|
|
|
for frame in input_container.decode(input_audio_stream): |
|
|
|
|
|
for packet in output_audio_stream.encode(frame): |
|
output_container.mux(packet) |
|
|
|
|
|
for packet in output_audio_stream.encode(): |
|
output_container.mux(packet) |
|
|
|
output_container.close() |
|
input_container.close() |
|
return temp_audio_file_path |
|
|
|
except Exception as e: |
|
print(f"Error extracting audio to temp WAV file: {e}", file=sys.stderr) |
|
|
|
if "temp_audio_file_path" in locals() and os.path.exists(temp_audio_file_path): |
|
os.remove(temp_audio_file_path) |
|
return None |
|
|
|
|
|
@tool |
|
def run_video(query: str, url: str) -> str: |
|
""" |
|
Get a YouTube video from url and return an answer to a natural-language query using the video. |
|
|
|
Args: |
|
query (str): A natural-language question whose answer is expected to be found in the visual content of the video. |
|
url (str): Fully qualified URL of the YouTube video to analyze. |
|
|
|
Returns: |
|
str: A response generated by the VQA model based on the provided video and question. |
|
""" |
|
n_frames = 4 |
|
buff = youtube_to_buffer(url) |
|
if buff is None: |
|
return "Error: Could not download or buffer the video." |
|
|
|
|
|
frames = sample_frames(buff, n_frames=n_frames) |
|
buff.seek(0) |
|
|
|
|
|
transcript = "[Audio could not be processed]" |
|
audio_file_path = None |
|
try: |
|
audio_file_path = save_audio_stream_to_temp_wav_file(buff) |
|
if audio_file_path: |
|
with open(audio_file_path, "rb") as audio_data: |
|
|
|
transcription_response = openai.audio.transcriptions.create( |
|
model="gpt-4o-transcribe", file=audio_data |
|
) |
|
transcript = transcription_response.text |
|
else: |
|
transcript = "[No audio stream found or error during extraction]" |
|
print( |
|
"No audio file path returned, skipping transcription.", file=sys.stderr |
|
) |
|
except Exception as e: |
|
print(f"Error during audio transcription: {e}", file=sys.stderr) |
|
transcript = f"[Error during audio transcription: {e}]" |
|
finally: |
|
if audio_file_path and os.path.exists(audio_file_path): |
|
os.remove(audio_file_path) |
|
|
|
|
|
prompt_text = f"Original Query: {query}\n\nVideo Transcript:\n{transcript}\n\nKey Visual Frames (analyze these along with the transcript to answer the query):" |
|
|
|
content = [{"type": "text", "text": prompt_text}] |
|
|
|
for img in frames: |
|
content.append( |
|
{ |
|
"type": "image_url", |
|
"image_url": {"url": pil_to_data_url(img)}, |
|
} |
|
) |
|
|
|
|
|
try: |
|
resp = openai.chat.completions.create( |
|
model=model_id, |
|
messages=[{"role": "user", "content": content}], |
|
temperature=0.1, |
|
) |
|
result = resp.choices[0].message.content.strip() |
|
except Exception as e: |
|
print(f"Error calling OpenAI API: {e}", file=sys.stderr) |
|
result = f"[Error processing with AI model: {e}]" |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_image(response, filename, content_type): |
|
"""Process image files - convert to base64 data URL for vision models""" |
|
img_data = base64.b64encode(response.content).decode("utf-8") |
|
data_url = f"data:{content_type};base64,{img_data}" |
|
|
|
return { |
|
"file_type": "image", |
|
"filename": filename, |
|
"content_type": content_type, |
|
"data_url": data_url, |
|
} |
|
|
|
|
|
def process_audio(response, filename, content_type): |
|
"""Process audio files - either return data URL or save to temp file for processing""" |
|
audio_data = base64.b64encode(response.content).decode("utf-8") |
|
data_url = f"data:{content_type};base64,{audio_data}" |
|
|
|
|
|
audio_file = io.BytesIO(response.content) |
|
extension = os.path.splitext(filename)[1].lower() or ".mp3" |
|
audio_file.name = f"audio{extension}" |
|
|
|
return { |
|
"file_type": "audio", |
|
"filename": filename, |
|
"content_type": content_type, |
|
"data_url": data_url, |
|
"audio_buffer": audio_file, |
|
} |
|
|
|
|
|
def process_video(response, filename, content_type): |
|
"""Process video files - save to buffer and extract frames""" |
|
video_buffer = io.BytesIO(response.content) |
|
|
|
|
|
try: |
|
frames = sample_frames(video_buffer, n_frames=4) |
|
frame_urls = [pil_to_data_url(img) for img in frames] |
|
frame_extraction_success = True |
|
except Exception: |
|
frame_urls = [] |
|
frame_extraction_success = False |
|
|
|
return { |
|
"file_type": "video", |
|
"filename": filename, |
|
"content_type": content_type, |
|
"video_buffer": video_buffer, |
|
"frame_urls": frame_urls, |
|
"frames_extracted": frame_extraction_success, |
|
} |
|
|
|
|
|
def process_tabular(response, filename, content_type): |
|
"""Process spreadsheet files using pandas""" |
|
excel_buffer = io.BytesIO(response.content) |
|
|
|
try: |
|
|
|
if filename.lower().endswith(".csv"): |
|
df = pd.read_csv(excel_buffer) |
|
else: |
|
df = pd.read_excel(excel_buffer) |
|
|
|
return { |
|
"file_type": "tabular", |
|
"filename": filename, |
|
"content_type": content_type, |
|
"data": df.to_dict(orient="records"), |
|
"columns": df.columns.tolist(), |
|
"shape": df.shape, |
|
} |
|
except Exception as e: |
|
|
|
return { |
|
"file_type": "tabular", |
|
"filename": filename, |
|
"content_type": content_type, |
|
"error": f"Failed to parse tabular data: {e}", |
|
"raw_data": base64.b64encode(response.content).decode("utf-8"), |
|
} |
|
|
|
|
|
def process_text(response, filename, content_type): |
|
"""Process text files (code, plain text, etc.)""" |
|
try: |
|
text_content = response.text |
|
return { |
|
"file_type": "text", |
|
"filename": filename, |
|
"content_type": content_type, |
|
"content": text_content, |
|
"extension": os.path.splitext(filename)[ |
|
1 |
|
], |
|
} |
|
except Exception as e: |
|
return { |
|
"file_type": "text", |
|
"filename": filename, |
|
"content_type": content_type, |
|
"error": f"Failed to decode text: {e}", |
|
"raw_data": base64.b64encode(response.content).decode("utf-8"), |
|
} |
|
|
|
|
|
def process_json(response, filename, content_type): |
|
"""Process JSON data""" |
|
try: |
|
json_data = response.json() |
|
return { |
|
"file_type": "json", |
|
"filename": filename, |
|
"content_type": content_type, |
|
"data": json_data, |
|
} |
|
except Exception: |
|
|
|
return process_text(response, filename, content_type) |
|
|
|
|
|
def process_pdf(response, filename, content_type): |
|
"""Process PDF files - return as binary with metadata""" |
|
|
|
|
|
pdf_data = base64.b64encode(response.content).decode("utf-8") |
|
|
|
return { |
|
"file_type": "pdf", |
|
"filename": filename, |
|
"content_type": content_type, |
|
"data": pdf_data, |
|
} |
|
|
|
|
|
def process_binary(response, filename, content_type): |
|
"""Process other binary files (fallback handler)""" |
|
binary_data = base64.b64encode(response.content).decode("utf-8") |
|
|
|
return { |
|
"file_type": "binary", |
|
"filename": filename, |
|
"content_type": content_type, |
|
"data": binary_data, |
|
} |
|
|
|
|
|
@tool |
|
def fetch_task_files(task_id: str) -> Dict[str, Any]: |
|
""" |
|
Download files associated with a specific task from the API. |
|
|
|
Args: |
|
task_id (str): The Task-ID of the task to download files for. |
|
|
|
Returns: |
|
dict: A dictionary containing file information and data in appropriate format for the file type |
|
""" |
|
api_base_url: str = "https://agents-course-unit4-scoring.hf.space" |
|
files_url = f"{api_base_url}/files/{task_id}" |
|
|
|
try: |
|
response = requests.get(files_url, timeout=15) |
|
response.raise_for_status() |
|
|
|
|
|
content_type = response.headers.get("Content-Type", "").lower() |
|
filename = response.headers.get("content-disposition", "") |
|
if "filename=" in filename: |
|
filename = filename.split("filename=")[-1].strip('"') |
|
else: |
|
filename = f"{task_id}.bin" |
|
|
|
print(f"Received file: {filename}, type: {content_type}") |
|
|
|
|
|
if "image/" in content_type or any( |
|
filename.lower().endswith(ext) for ext in [".png", ".jpg", ".jpeg", ".gif"] |
|
): |
|
return process_image(response, filename, content_type) |
|
|
|
elif "audio/" in content_type or any( |
|
filename.lower().endswith(ext) for ext in [".mp3", ".wav", ".ogg"] |
|
): |
|
return process_audio(response, filename, content_type) |
|
|
|
elif "video/" in content_type or any( |
|
filename.lower().endswith(ext) for ext in [".mp4", ".avi", ".mov"] |
|
): |
|
return process_video(response, filename, content_type) |
|
|
|
elif ( |
|
"spreadsheet" in content_type |
|
or "excel" in content_type |
|
or any(filename.lower().endswith(ext) for ext in [".xlsx", ".xls", ".csv"]) |
|
): |
|
return process_tabular(response, filename, content_type) |
|
|
|
elif ( |
|
"text/" in content_type |
|
or "code" in content_type |
|
or any( |
|
filename.lower().endswith(ext) |
|
for ext in [".txt", ".py", ".js", ".html", ".md"] |
|
) |
|
): |
|
return process_text(response, filename, content_type) |
|
|
|
elif "application/json" in content_type or filename.lower().endswith(".json"): |
|
return process_json(response, filename, content_type) |
|
|
|
elif "application/pdf" in content_type or filename.lower().endswith(".pdf"): |
|
return process_pdf(response, filename, content_type) |
|
|
|
else: |
|
|
|
return process_binary(response, filename, content_type) |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching files for task {task_id}: {e}") |
|
return {"error": f"Error fetching files: {e}"} |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching files for task {task_id}: {e}") |
|
return {"error": f"An unexpected error occurred: {e}"} |
|
|
|
|
|
@tool |
|
def search_wikipedia(query: str) -> str: |
|
""" |
|
get the contents of wikipedia page retrieved by search query. |
|
|
|
Args: |
|
query (str): A search term to search within wikipedia. Ideally it should be one word or a group of few words. |
|
|
|
Returns: |
|
str: The text content of wikipedia page |
|
""" |
|
get_wiki = wikipediaapi.Wikipedia( |
|
language="en", |
|
user_agent="test_tokki", |
|
extract_format=wikipediaapi.ExtractFormat.WIKI, |
|
) |
|
page_content = get_wiki.page(query) |
|
text_content = page_content.text |
|
|
|
cutoff = 25000 |
|
text_content = " ".join(text_content.split(" ")[:cutoff]) |
|
return text_content |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
task_ids = [ |
|
"cca530fc-4052-43b2-b130-b30968d8aa44", |
|
"99c9cc74-fdc8-46c6-8f8d-3ce2d3bfeea3", |
|
"7bd855d8-463d-4ed5-93ca-5fe35145f733", |
|
] |
|
for task_id in task_ids: |
|
print( |
|
"=" * 20 |
|
+ " " |
|
+ f"Testing fetch_task_files with task_id: {task_id}" |
|
+ " " |
|
+ "=" * 20 |
|
) |
|
|
|
result = fetch_task_files(task_id) |
|
print(f"File type: {result.get('file_type')}") |
|
print(f"Filename: {result.get('filename')}") |
|
|