Spaces:
Running
Running
import ast | |
import json | |
import operator | |
import re | |
import subprocess | |
from base64 import b64encode | |
from functools import lru_cache | |
from io import BytesIO | |
from tempfile import NamedTemporaryFile | |
import numpy as np | |
import pandas as pd | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper | |
from langchain_core.messages import HumanMessage, SystemMessage | |
from langchain_core.tools import tool | |
from langchain_openai import ChatOpenAI | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from helpers import get_prompt, print_debug_trace | |
# --------------------------------------------------------------------------- # | |
# ARITHMETIC (SAFE CALCULATOR) # | |
# --------------------------------------------------------------------------- # | |
_ALLOWED_AST_OPS = { | |
ast.Add: operator.add, | |
ast.Sub: operator.sub, | |
ast.Mult: operator.mul, | |
ast.Div: operator.truediv, | |
ast.Pow: operator.pow, | |
ast.USub: operator.neg, | |
} | |
def _safe_eval(node: ast.AST) -> float | int | complex: | |
"""Recursively evaluate a *restricted* AST expression tree.""" | |
if isinstance(node, ast.Constant): | |
return node.n | |
if isinstance(node, ast.UnaryOp) and type(node.op) in _ALLOWED_AST_OPS: | |
return _ALLOWED_AST_OPS[type(node.op)](_safe_eval(node.operand)) | |
if isinstance(node, ast.BinOp) and type(node.op) in _ALLOWED_AST_OPS: | |
return _ALLOWED_AST_OPS[type(node.op)]( | |
_safe_eval(node.left), _safe_eval(node.right) | |
) | |
raise ValueError("Unsafe or unsupported expression") | |
def calculator(expression: str) -> str: | |
"""Safely evaluate basic arithmetic expressions (no variables, functions).""" | |
try: | |
tree = ast.parse(expression, mode="eval") | |
value = _safe_eval(tree.body) | |
return str(value) | |
except Exception as exc: | |
print_debug_trace(exc, "Calculator") | |
return f"calc_error:{exc}" | |
# --------------------------------------------------------------------------- # | |
# WEB & WIKI SEARCH # | |
# --------------------------------------------------------------------------- # | |
def _ddg_search(query: str, k: int = 6) -> list[dict[str, str]]: | |
"""Cached DuckDuckGo JSON search.""" | |
wrapper = DuckDuckGoSearchAPIWrapper(max_results=k) | |
hits = wrapper.results(query) | |
return [ | |
{ | |
"title": hit.get("title", "")[:500], | |
"snippet": hit.get("snippet", "")[:750], | |
"link": hit.get("link", "")[:300], | |
} | |
for hit in hits[:k] | |
] | |
def web_multi_search(query: str, k: int = 6) -> str: | |
"""Run DuckDuckGo → Tavily fallback search. Returns JSON list[dict].""" | |
try: | |
hits = _ddg_search(query, k) | |
if hits: | |
return json.dumps(hits, ensure_ascii=False) | |
except Exception: # fall through to Tavily | |
pass | |
try: | |
tavily_results = TavilySearchResults( | |
max_results=5, | |
# include_answer=True, | |
# search_depth="advanced", | |
) | |
search_result = tavily_results.invoke({"query": query}) | |
print( | |
f"[TOOL] TAVILY search is triggered with following response: {search_result}" | |
) | |
formatted = [ | |
{ | |
"title": d.get("title", "")[:500], | |
"snippet": d.get("content", "")[:750], | |
"link": d.get("url", "")[:300], | |
} | |
for d in search_result | |
] | |
return json.dumps(formatted, ensure_ascii=False) | |
except Exception as exc: | |
print_debug_trace(exc, "Multi Search") | |
return f"search_error:{exc}" | |
def wiki_search(query: str, max_pages: int = 2) -> str: | |
"""Lightweight wrapper on WikipediaLoader; returns concatenated page texts.""" | |
print(f"[TOOL] wiki_search called with query: {query}") | |
docs = WikipediaLoader(query=query, load_max_docs=max_pages).load() | |
joined = "\n\n---\n\n".join(d.page_content for d in docs) | |
return joined[:8_000] # simple guardrail – stay within context window | |
# --------------------------------------------------------------------------- # | |
# YOUTUBE TRANSCRIPT # | |
# --------------------------------------------------------------------------- # | |
def youtube_transcript(url: str, chars: int = 10_000) -> str: | |
"""Fetch full YouTube transcript (first *chars* characters).""" | |
video_id_match = re.search(r"[?&]v=([A-Za-z0-9_\-]{11})", url) | |
if not video_id_match: | |
return "yt_error:id_not_found" | |
try: | |
transcript = YouTubeTranscriptApi.get_transcript(video_id_match.group(1)) | |
text = " ".join(piece["text"] for piece in transcript) | |
return text[:chars] | |
except Exception as exc: | |
print_debug_trace(exc, "YouTube") | |
return f"yt_error:{exc}" | |
# --------------------------------------------------------------------------- # | |
# IMAGE DESCRIPTION # | |
# --------------------------------------------------------------------------- # | |
# Instantiate a lightweight CLIP‑based zero‑shot image classifier (runs on CPU) | |
### The model 'openai/clip-vit-base-patch32' is a vision transformer (ViT) model trained as part of OpenAI’s CLIP project. | |
### It performs zero-shot image classification by mapping images and labels into the same embedding space. | |
# _image_pipe = pipeline( | |
# "image-classification", model="openai/clip-vit-base-patch32", device="cpu" | |
# ) | |
# @tool | |
# def image_describe(img_bytes: bytes, top_k: int = 3) -> str: | |
# """Return the top-k CLIP labels for an image supplied as raw bytes. | |
# typical result for a random cat photo can be: | |
# [ | |
# {'label': 'tabby, tabby cat', 'score': 0.41}, | |
# {'label': 'tiger cat', 'score': 0.24}, | |
# {'label': 'Egyptian cat', 'score': 0.22} | |
# ] | |
# """ | |
# try: | |
# labels = _image_pipe(BytesIO(img_bytes))[:top_k] | |
# return ", ".join(f"{d['label']} (score={d['score']:.2f})" for d in labels) | |
# except Exception as exc: | |
# return f"img_error:{exc}" | |
def vision_task(img_bytes: bytes, question: str) -> str: | |
""" | |
Pass the user's question AND the referenced image to a multimodal LLM and | |
return its first line of text as the answer. No domain assumptions made. | |
""" | |
vision_llm = ChatOpenAI( | |
model="gpt-4o-mini", # set OPENAI_API_KEY in env | |
temperature=0, | |
max_tokens=64, | |
) | |
try: | |
b64 = b64encode(img_bytes).decode() | |
messages = [ | |
SystemMessage(content=get_prompt(prompt_key="vision_system")), | |
HumanMessage( | |
content=[ | |
{"type": "text", "text": question.strip()}, | |
{ | |
"type": "image_url", | |
"image_url": {"url": f"data:image/png;base64,{b64}"}, | |
}, | |
] | |
), | |
] | |
reply = vision_llm.invoke(messages).content.strip() | |
return reply | |
except Exception as exc: | |
print_debug_trace(exc, "vision") | |
return f"img_error:{exc}" | |
# --------------------------------------------------------------------------- # | |
# FILE UTILS # | |
# --------------------------------------------------------------------------- # | |
def run_py(code: str) -> str: | |
"""Execute Python code in a sandboxed subprocess and return last stdout line.""" | |
try: | |
with NamedTemporaryFile(delete=False, suffix=".py", mode="w") as f: | |
f.write(code) | |
path = f.name | |
proc = subprocess.run( | |
["python", path], capture_output=True, text=True, timeout=45 | |
) | |
out = proc.stdout.strip().splitlines() | |
return out[-1] if out else "" | |
except Exception as exc: | |
print_debug_trace(exc, "run_py") | |
return f"py_error:{exc}" | |
def transcribe_via_whisper(audio_bytes: bytes) -> str: | |
"""Transcribe audio with Whisper (CPU).""" | |
with NamedTemporaryFile(suffix=".mp3", delete=False) as f: | |
f.write(audio_bytes) | |
path = f.name | |
try: | |
import whisper # openai-whisper | |
model = whisper.load_model("base") | |
output = model.transcribe(path)["text"].strip() | |
print(f"[DEBUG] Whisper transcript (first 200 chars): {output[:200]}") | |
return output | |
except Exception as exc: | |
print_debug_trace(exc, "Whisper") | |
return f"asr_error:{exc}" | |
def analyze_excel_file(xls_bytes: bytes, question: str) -> str: | |
"Analyze Excel or CSV file by passing the data preview to LLM and getting the Python Pandas operation to run" | |
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, max_tokens=64) | |
try: | |
df = pd.read_excel(BytesIO(xls_bytes)) | |
except Exception: | |
df = pd.read_csv(BytesIO(xls_bytes)) | |
for col in df.select_dtypes(include="number").columns: | |
df[col] = df[col].astype(float) | |
# Ask the LLM for a single expression | |
prompt = get_prompt( | |
prompt_key="excel_system", | |
question=question, | |
preview=df.head(5).to_dict(orient="list"), | |
) | |
expr = llm.invoke(prompt).content.strip() | |
# Run generated Pandas' one-line expression | |
try: | |
result = eval(expr, {"df": df, "pd": pd, "__builtins__": {}}) | |
# Normalize scalars to string | |
if isinstance(result, np.generic): | |
result = float(result) # → plain Python float | |
return f"{result:.2f}" # or str(result) if no decimals needed | |
# DataFrame / Series → single-line string | |
return ( | |
result.to_string(index=False) | |
if hasattr(result, "to_string") | |
else str(result) | |
) | |
except Exception as e: | |
print_debug_trace(e, "Excel") | |
return f"eval_error:{e}" | |
__all__ = [ | |
"calculator", | |
"web_multi_search", | |
"wiki_search", | |
"youtube_transcript", | |
"vision_task", | |
"run_py", | |
"transcribe_via_whisper", | |
"analyze_excel_file", | |
] | |