Spaces:
Sleeping
Sleeping
# app.py | |
import os | |
import logging | |
import asyncio | |
import nest_asyncio | |
from datetime import datetime | |
import uuid | |
import aiohttp | |
import gradio as gr | |
import requests | |
import xml.etree.ElementTree as ET | |
import json | |
from langfuse.llama_index import LlamaIndexInstrumentor | |
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec | |
from llama_index.tools.weather import OpenWeatherMapToolSpec | |
from llama_index.tools.playwright import PlaywrightToolSpec | |
from llama_index.core.tools import FunctionTool | |
from llama_index.core.agent.workflow import AgentWorkflow | |
from llama_index.core.workflow import Context | |
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI | |
from llama_index.core.memory import ChatMemoryBuffer | |
from llama_index.readers.web import RssReader, SimpleWebPageReader | |
from llama_index.core import SummaryIndex | |
# Import the event types for streaming | |
from llama_index.core.agent.workflow import AgentStream, ToolCall, ToolCallResult | |
import subprocess | |
subprocess.run(["playwright", "install"]) | |
# allow nested loops in Spaces | |
nest_asyncio.apply() | |
# --- Llangfuse --- | |
instrumentor = LlamaIndexInstrumentor( | |
public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"), | |
secret_key=os.environ.get("LANGFUSE_SECRET_KEY"), | |
host=os.environ.get("LANGFUSE_HOST"), | |
) | |
instrumentor.start() | |
# --- Secrets via env vars --- | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
OPENWEATHERMAP_KEY = os.getenv("OPENWEATHERMAP_API_KEY") | |
SERPER_API_KEY = os.getenv("SERPER_API_KEY") | |
# --- LLMs --- | |
llm = HuggingFaceInferenceAPI( | |
model_name="Qwen/Qwen2.5-Coder-32B-Instruct", | |
token=HF_TOKEN, | |
task="conversational", | |
streaming=True | |
) | |
memory = ChatMemoryBuffer.from_defaults(token_limit=8192) | |
today_str = datetime.now().strftime("%B %d, %Y") | |
ANON_USER_ID = os.environ.get("ANON_USER_ID", uuid.uuid4().hex) | |
# # OpenAI for pure function-calling | |
# openai_llm = OpenAI( | |
# model="gpt-4o", | |
# api_key=OPENAI_API_KEY, | |
# temperature=0.0, | |
# streaming=False, | |
# ) | |
# --- Tools Setup --- | |
# DuckDuckGo | |
# duck_spec = DuckDuckGoSearchToolSpec() | |
# search_tool = FunctionTool.from_defaults(duck_spec.duckduckgo_full_search) | |
# Weather | |
openweather_api_key=OPENWEATHERMAP_KEY | |
weather_tool_spec = OpenWeatherMapToolSpec(key=openweather_api_key) | |
weather_tool = FunctionTool.from_defaults( | |
weather_tool_spec.weather_at_location, | |
name="current_weather", | |
description="Get the current weather at a specific location (city, country)." | |
) | |
forecast_tool = FunctionTool.from_defaults( | |
weather_tool_spec.forecast_tommorrow_at_location, | |
name="weather_forecast", | |
description="Get tomorrow's weather forecast for a specific location (city, country)." | |
) | |
# Playwright (synchronous start) | |
# async def _start_browser(): | |
# return await PlaywrightToolSpec.create_async_playwright_browser(headless=True) | |
# browser = asyncio.get_event_loop().run_until_complete(_start_browser()) | |
# playwright_tool_spec = PlaywrightToolSpec.from_async_browser(browser) | |
# navigate_tool = FunctionTool.from_defaults( | |
# playwright_tool_spec.navigate_to, | |
# name="web_navigate", | |
# description="Navigate to a specific URL." | |
# ) | |
# extract_text_tool = FunctionTool.from_defaults( | |
# playwright_tool_spec.extract_text, | |
# name="web_extract_text", | |
# description="Extract all text from the current page." | |
# ) | |
# extract_links_tool = FunctionTool.from_defaults( | |
# playwright_tool_spec.extract_hyperlinks, | |
# name="web_extract_links", | |
# description="Extract all hyperlinks from the current page." | |
# ) | |
# Google News RSS | |
# def fetch_google_news_rss(): | |
# docs = RssReader(html_to_text=True).load_data(["https://news.google.com/rss"]) | |
# return [{"title":d.metadata.get("title",""), "url":d.metadata.get("link","")} for d in docs] | |
# ----------------------------- | |
# Google News RSS | |
# ----------------------------- | |
def fetch_news_headlines() -> str: | |
"""Fetches the latest news from Google News RSS feed. | |
Returns: | |
A string containing the latest news articles from Google News, or an error message if the request fails. | |
""" | |
url = "https://news.google.com/rss" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
# Parse the XML content | |
root = ET.fromstring(response.content) | |
# Format the news articles into a readable string | |
formatted_news = [] | |
for i, item in enumerate(root.findall('.//item')): | |
if i >= 5: | |
break | |
title = item.find('title').text if item.find('title') is not None else 'N/A' | |
link = item.find('link').text if item.find('link') is not None else 'N/A' | |
pub_date = item.find('pubDate').text if item.find('pubDate') is not None else 'N/A' | |
description = item.find('description').text if item.find('description') is not None else 'N/A' | |
formatted_news.append(f"Title: {title}") | |
formatted_news.append(f"Published: {pub_date}") | |
formatted_news.append(f"Link: {link}") | |
formatted_news.append(f"Description: {description}") | |
formatted_news.append("---") | |
return "\n".join(formatted_news) if formatted_news else "No news articles found." | |
except requests.exceptions.RequestException as e: | |
return f"Error fetching news: {str(e)}" | |
except Exception as e: | |
return f"An unexpected error occurred: {str(e)}" | |
google_rss_tool = FunctionTool.from_defaults( | |
fn=fetch_news_headlines, | |
name="fetch_google_news_rss", | |
description="Fetch latest headlines." | |
) | |
# ----------------------------- | |
# SERPER API | |
# ----------------------------- | |
def fetch_news_topics(query: str) -> str: | |
"""Fetches news articles about a specific topic using the Serper API. | |
Args: | |
query: The topic to search for news about. | |
Returns: | |
A string containing the news articles found, or an error message if the request fails. | |
""" | |
url = "https://google.serper.dev/news" | |
payload = json.dumps({ | |
"q": query | |
}) | |
headers = { | |
'X-API-KEY': os.getenv('SERPER_API_KEY'), | |
'Content-Type': 'application/json' | |
} | |
try: | |
response = requests.post(url, headers=headers, data=payload) | |
response.raise_for_status() | |
news_data = response.json() | |
# Format the news articles into a readable string | |
formatted_news = [] | |
for i, article in enumerate(news_data.get('news', [])): | |
if i >= 5: | |
break | |
formatted_news.append(f"Title: {article.get('title', 'N/A')}") | |
formatted_news.append(f"Source: {article.get('source', 'N/A')}") | |
formatted_news.append(f"Link: {article.get('link', 'N/A')}") | |
formatted_news.append(f"Snippet: {article.get('snippet', 'N/A')}") | |
formatted_news.append("---") | |
return "\n".join(formatted_news) if formatted_news else "No news articles found." | |
except requests.exceptions.RequestException as e: | |
return f"Error fetching news: {str(e)}" | |
except Exception as e: | |
return f"An unexpected error occurred: {str(e)}" | |
serper_news_tool = FunctionTool.from_defaults( | |
fetch_news_topics, | |
name="fetch_news_from_serper", | |
description="Fetch news articles on a specific topic." | |
) | |
# ----------------------------- | |
# WEB PAGE READER | |
# ----------------------------- | |
def summarize_webpage(url: str) -> str: | |
"""Fetches and summarizes the content of a web page.""" | |
try: | |
# NOTE: the html_to_text=True option requires html2text to be installed | |
documents = SimpleWebPageReader(html_to_text=True).load_data([url]) | |
if not documents: | |
return "No content could be loaded from the provided URL." | |
index = SummaryIndex.from_documents(documents) | |
query_engine = index.as_query_engine() | |
response = query_engine.query("Summarize the main points of this page.") | |
return str(response) | |
except Exception as e: | |
return f"An error occurred while summarizing the web page: {str(e)}" | |
webpage_reader_tool = FunctionTool.from_defaults( | |
summarize_webpage, | |
name="summarize_webpage", | |
description="Read and summarize the main points of a web page given its URL." | |
) | |
# Create the agent workflow | |
tools = [ | |
#search_tool, | |
#navigate_tool, | |
#extract_text_tool, | |
#extract_links_tool, | |
weather_tool, | |
forecast_tool, | |
google_rss_tool, | |
serper_news_tool, | |
webpage_reader_tool, | |
] | |
web_agent = AgentWorkflow.from_tools_or_functions( | |
tools, | |
llm=llm, | |
system_prompt="""You are a helpful assistant with access to specialized tools for retrieving information about weather, and news. | |
AVAILABLE TOOLS: | |
1. current_weather - Get current weather conditions for a location | |
2. weather_forecast - Get tomorrow's weather forecast for a location | |
3. fetch_google_news_rss - Fetch the latest general news headlines | |
4. fetch_news_from_serper - Fetch news articles on a specific topic | |
5. summarize_webpage - Read and summarize the content of a web page | |
WHEN AND HOW TO USE EACH TOOL: | |
For weather information: | |
- Use current_weather when asked about present conditions | |
EXAMPLE: User asks "What's the weather in Tokyo?" | |
TOOL: current_weather | |
PARAMETERS: {"location": "Tokyo, JP"} | |
- Use weather_forecast when asked about future weather | |
EXAMPLE: User asks "What will the weather be like in Paris tomorrow?" | |
TOOL: weather_forecast | |
PARAMETERS: {"location": "Paris, FR"} | |
For news retrieval: | |
- Use fetch_google_news_rss for general headlines (requires NO parameters) | |
EXAMPLE: User asks "What's happening in the news today?" | |
TOOL: fetch_google_news_rss | |
PARAMETERS: {} | |
- Use fetch_news_from_serper for specific news topics | |
EXAMPLE: User asks "Any news about AI advancements?" | |
TOOL: fetch_news_from_serper | |
PARAMETERS: {"query": "artificial intelligence advancements"} | |
For web content: | |
- Use summarize_webpage to extract information from websites | |
EXAMPLE: User asks "Can you summarize the content on hf.co/learn?" | |
TOOL: summarize_webpage | |
PARAMETERS: {"url": "https://hf.co/learn"} | |
IMPORTANT GUIDELINES: | |
- Always verify the format of parameters before submitting | |
- For locations, use the format "City, Country Code" (e.g., "Montreal, CA") | |
- For URLs, include the full address with http:// or https:// | |
- When multiple tools are needed to answer a complex question, use them in sequence | |
- If possible, provide clickable links for your sources in your final answer. | |
When you use a tool, explain to the user that you're retrieving information. After receiving the tool's output, provide a helpful summary of the information. | |
""" | |
) | |
ctx = Context(web_agent) | |
# Async helper to run agent queries (kept for compatibility) | |
def run_query_sync(query: str): | |
"""Helper to run async agent.run in sync context.""" | |
return asyncio.get_event_loop().run_until_complete( | |
web_agent.run(query, ctx=ctx) | |
) | |
# Updated run_query function to use stream_events | |
async def run_query(query: str): | |
trace_id = f"agent-run-{uuid.uuid4().hex}" | |
try: | |
with instrumentor.observe( | |
trace_id=trace_id, | |
session_id="web-agent-session", | |
user_id=ANON_USER_ID, | |
): | |
# Start the handler | |
handler = web_agent.run(query, ctx=ctx) | |
# Keep track of what we're showing to avoid duplicates | |
tool_calls_shown = set() | |
# Stream content | |
async for event in handler.stream_events(): | |
if isinstance(event, AgentStream): | |
# Filter out any lines starting with "Thought:" or "Action:" | |
if hasattr(event, 'delta') and event.delta: | |
delta = event.delta | |
# Filter out thought processes and internal reasoning | |
if not (delta.strip().startswith("Thought:") or | |
delta.strip().startswith("Action:") or | |
delta.strip().startswith("Answer:")): | |
yield delta | |
elif isinstance(event, ToolCall): | |
tool_name = getattr(event, 'name', getattr(event, 'function_name', getattr(event, 'tool_name', "unknown tool"))) | |
# Only show tool call message once per tool+call combo | |
tool_call_id = f"{tool_name}_{hash(str(getattr(event, 'args', '')))}" | |
if tool_call_id not in tool_calls_shown: | |
tool_calls_shown.add(tool_call_id) | |
yield f"\n\n🔧 Using tool: {tool_name}...\n" | |
elif isinstance(event, ToolCallResult): | |
# We don't need to show the raw tool result to the user | |
# The agent will incorporate the results in its response | |
pass | |
except Exception as e: | |
yield f"\n\n❌ Error: {str(e)}\n" | |
import traceback | |
yield f"Traceback: {traceback.format_exc()}" | |
finally: | |
instrumentor.flush() | |
# Updated gradio_query function | |
async def gradio_query(user_input, chat_history=None): | |
history = chat_history or [] | |
history.append({"role": "user", "content": user_input}) | |
# Add initial assistant message | |
history.append({"role": "assistant", "content": "Processing..."}) | |
yield history, history | |
# Get streaming response | |
full_response = "" | |
async for chunk in run_query(user_input): | |
if chunk: | |
full_response += chunk | |
history[-1]["content"] = full_response | |
yield history, history | |
# Build and launch Gradio app | |
grb = gr.Blocks() | |
with grb: | |
gr.Markdown("## Perspicacity") | |
gr.Markdown( | |
""" | |
This bot can check the news, tell you the weather, and even browse websites to answer follow-up questions — all powered by a team of tiny AI tools working behind the scenes.\n\n | |
🧪 Built for fun during the [AI Agents course](https://huggingface.co/learn/agents-course/unit0/introduction) — it's just a demo to show what agents can do.\n | |
🙌 Got ideas or improvements? PRs welcome!\n\n | |
👉 Try asking 'What's the weather in Montreal?' or 'What's in the news today?' | |
""" | |
) | |
chatbot = gr.Chatbot(type="messages") | |
txt = gr.Textbox(placeholder="Ask me anything...", show_label=False) | |
# Set up event handlers for streaming | |
txt.submit( | |
gradio_query, | |
inputs=[txt, chatbot], | |
outputs=[chatbot, chatbot] | |
).then( | |
lambda: gr.update(value=""), # Clear the textbox after submission | |
None, | |
[txt] | |
) | |
# Also update the button click handler | |
send_btn = gr.Button("Send") | |
send_btn.click( | |
gradio_query, | |
[txt, chatbot], | |
[chatbot, chatbot] | |
).then( | |
lambda: gr.update(value=""), # Clear the textbox after submission | |
None, | |
[txt] | |
) | |
if __name__ == "__main__": | |
grb.launch() |