Spaces:
Running
Running
File size: 10,274 Bytes
e6d549b e879014 545e8ad e6d549b e879014 e6d549b ce76bed e6d549b 545e8ad e6d549b 545e8ad e6d549b e3677e5 ce76bed e6d549b e879014 e3677e5 e6d549b e879014 e6d549b 6339e96 e6d549b 6339e96 e6d549b 6339e96 e6d549b 6339e96 e6d549b e3677e5 e6d549b e3677e5 e6d549b ce76bed e6d549b 545e8ad e6d549b 545e8ad e6d549b 545e8ad 962c619 545e8ad e6d549b e3677e5 e6d549b e879014 e3677e5 e879014 e3677e5 e879014 962c619 e879014 962c619 e879014 e6d549b e879014 e6d549b e879014 e6d549b e3677e5 e879014 ce76bed e879014 ce76bed e879014 962c619 ce76bed 962c619 ce76bed e879014 962c619 ce76bed 962c619 ce76bed e3677e5 ce76bed e6d549b 545e8ad e879014 e6d549b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
import ast
import json
import operator
import re
import subprocess
from base64 import b64encode
from functools import lru_cache
from io import BytesIO
from tempfile import NamedTemporaryFile
import numpy as np
import pandas as pd
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from youtube_transcript_api import YouTubeTranscriptApi
from helpers import get_prompt, print_debug_trace
# --------------------------------------------------------------------------- #
# ARITHMETIC (SAFE CALCULATOR) #
# --------------------------------------------------------------------------- #
_ALLOWED_AST_OPS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.USub: operator.neg,
}
def _safe_eval(node: ast.AST) -> float | int | complex:
"""Recursively evaluate a *restricted* AST expression tree."""
if isinstance(node, ast.Constant):
return node.n
if isinstance(node, ast.UnaryOp) and type(node.op) in _ALLOWED_AST_OPS:
return _ALLOWED_AST_OPS[type(node.op)](_safe_eval(node.operand))
if isinstance(node, ast.BinOp) and type(node.op) in _ALLOWED_AST_OPS:
return _ALLOWED_AST_OPS[type(node.op)](
_safe_eval(node.left), _safe_eval(node.right)
)
raise ValueError("Unsafe or unsupported expression")
@tool
def calculator(expression: str) -> str:
"""Safely evaluate basic arithmetic expressions (no variables, functions)."""
try:
tree = ast.parse(expression, mode="eval")
value = _safe_eval(tree.body)
return str(value)
except Exception as exc:
print_debug_trace(exc, "Calculator")
return f"calc_error:{exc}"
# --------------------------------------------------------------------------- #
# WEB & WIKI SEARCH #
# --------------------------------------------------------------------------- #
@lru_cache(maxsize=256)
def _ddg_search(query: str, k: int = 6) -> list[dict[str, str]]:
"""Cached DuckDuckGo JSON search."""
wrapper = DuckDuckGoSearchAPIWrapper(max_results=k)
hits = wrapper.results(query)
return [
{
"title": hit.get("title", "")[:500],
"snippet": hit.get("snippet", "")[:750],
"link": hit.get("link", "")[:300],
}
for hit in hits[:k]
]
@tool
def web_multi_search(query: str, k: int = 6) -> str:
"""Run DuckDuckGo → Tavily fallback search. Returns JSON list[dict]."""
try:
hits = _ddg_search(query, k)
if hits:
return json.dumps(hits, ensure_ascii=False)
except Exception: # fall through to Tavily
pass
try:
tavily_results = TavilySearchResults(
max_results=5,
# include_answer=True,
# search_depth="advanced",
)
search_result = tavily_results.invoke({"query": query})
print(
f"[TOOL] TAVILY search is triggered with following response: {search_result}"
)
formatted = [
{
"title": d.get("title", "")[:500],
"snippet": d.get("content", "")[:750],
"link": d.get("url", "")[:300],
}
for d in search_result
]
return json.dumps(formatted, ensure_ascii=False)
except Exception as exc:
print_debug_trace(exc, "Multi Search")
return f"search_error:{exc}"
@tool
def wiki_search(query: str, max_pages: int = 2) -> str:
"""Lightweight wrapper on WikipediaLoader; returns concatenated page texts."""
print(f"[TOOL] wiki_search called with query: {query}")
docs = WikipediaLoader(query=query, load_max_docs=max_pages).load()
joined = "\n\n---\n\n".join(d.page_content for d in docs)
return joined[:8_000] # simple guardrail – stay within context window
# --------------------------------------------------------------------------- #
# YOUTUBE TRANSCRIPT #
# --------------------------------------------------------------------------- #
@tool
def youtube_transcript(url: str, chars: int = 10_000) -> str:
"""Fetch full YouTube transcript (first *chars* characters)."""
video_id_match = re.search(r"[?&]v=([A-Za-z0-9_\-]{11})", url)
if not video_id_match:
return "yt_error:id_not_found"
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id_match.group(1))
text = " ".join(piece["text"] for piece in transcript)
return text[:chars]
except Exception as exc:
print_debug_trace(exc, "YouTube")
return f"yt_error:{exc}"
# --------------------------------------------------------------------------- #
# IMAGE DESCRIPTION #
# --------------------------------------------------------------------------- #
# Instantiate a lightweight CLIP‑based zero‑shot image classifier (runs on CPU)
### The model 'openai/clip-vit-base-patch32' is a vision transformer (ViT) model trained as part of OpenAI’s CLIP project.
### It performs zero-shot image classification by mapping images and labels into the same embedding space.
# _image_pipe = pipeline(
# "image-classification", model="openai/clip-vit-base-patch32", device="cpu"
# )
# @tool
# def image_describe(img_bytes: bytes, top_k: int = 3) -> str:
# """Return the top-k CLIP labels for an image supplied as raw bytes.
# typical result for a random cat photo can be:
# [
# {'label': 'tabby, tabby cat', 'score': 0.41},
# {'label': 'tiger cat', 'score': 0.24},
# {'label': 'Egyptian cat', 'score': 0.22}
# ]
# """
# try:
# labels = _image_pipe(BytesIO(img_bytes))[:top_k]
# return ", ".join(f"{d['label']} (score={d['score']:.2f})" for d in labels)
# except Exception as exc:
# return f"img_error:{exc}"
@tool
def vision_task(img_bytes: bytes, question: str) -> str:
"""
Pass the user's question AND the referenced image to a multimodal LLM and
return its first line of text as the answer. No domain assumptions made.
"""
vision_llm = ChatOpenAI(
model="gpt-4o-mini", # set OPENAI_API_KEY in env
temperature=0,
max_tokens=64,
)
try:
b64 = b64encode(img_bytes).decode()
messages = [
SystemMessage(content=get_prompt(prompt_key="vision_system")),
HumanMessage(
content=[
{"type": "text", "text": question.strip()},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{b64}"},
},
]
),
]
reply = vision_llm.invoke(messages).content.strip()
return reply
except Exception as exc:
print_debug_trace(exc, "vision")
return f"img_error:{exc}"
# --------------------------------------------------------------------------- #
# FILE UTILS #
# --------------------------------------------------------------------------- #
@tool
def run_py(code: str) -> str:
"""Execute Python code in a sandboxed subprocess and return last stdout line."""
try:
with NamedTemporaryFile(delete=False, suffix=".py", mode="w") as f:
f.write(code)
path = f.name
proc = subprocess.run(
["python", path], capture_output=True, text=True, timeout=45
)
out = proc.stdout.strip().splitlines()
return out[-1] if out else ""
except Exception as exc:
print_debug_trace(exc, "run_py")
return f"py_error:{exc}"
@tool
def transcribe_via_whisper(audio_bytes: bytes) -> str:
"""Transcribe audio with Whisper (CPU)."""
with NamedTemporaryFile(suffix=".mp3", delete=False) as f:
f.write(audio_bytes)
path = f.name
try:
import whisper # openai-whisper
model = whisper.load_model("base")
output = model.transcribe(path)["text"].strip()
print(f"[DEBUG] Whisper transcript (first 200 chars): {output[:200]}")
return output
except Exception as exc:
print_debug_trace(exc, "Whisper")
return f"asr_error:{exc}"
@tool
def analyze_excel_file(xls_bytes: bytes, question: str) -> str:
"Analyze Excel or CSV file by passing the data preview to LLM and getting the Python Pandas operation to run"
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, max_tokens=64)
try:
df = pd.read_excel(BytesIO(xls_bytes))
except Exception:
df = pd.read_csv(BytesIO(xls_bytes))
for col in df.select_dtypes(include="number").columns:
df[col] = df[col].astype(float)
# Ask the LLM for a single expression
prompt = get_prompt(
prompt_key="excel_system",
question=question,
preview=df.head(5).to_dict(orient="list"),
)
expr = llm.invoke(prompt).content.strip()
# Run generated Pandas' one-line expression
try:
result = eval(expr, {"df": df, "pd": pd, "__builtins__": {}})
# Normalize scalars to string
if isinstance(result, np.generic):
result = float(result) # → plain Python float
return f"{result:.2f}" # or str(result) if no decimals needed
# DataFrame / Series → single-line string
return (
result.to_string(index=False)
if hasattr(result, "to_string")
else str(result)
)
except Exception as e:
print_debug_trace(e, "Excel")
return f"eval_error:{e}"
__all__ = [
"calculator",
"web_multi_search",
"wiki_search",
"youtube_transcript",
"vision_task",
"run_py",
"transcribe_via_whisper",
"analyze_excel_file",
]
|