|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
import re |
|
import json |
|
import sys |
|
import os |
|
import random |
|
from io import StringIO |
|
from typing import List, Dict, Tuple, Annotated |
|
|
|
import gradio as gr |
|
import requests |
|
from bs4 import BeautifulSoup |
|
from markdownify import markdownify as md |
|
from readability import Document |
|
from urllib.parse import urljoin, urldefrag, urlparse |
|
from duckduckgo_search import DDGS |
|
from PIL import Image |
|
from huggingface_hub import InferenceClient |
|
import time |
|
|
|
|
|
import numpy as np |
|
try: |
|
import torch |
|
except Exception: |
|
torch = None |
|
try: |
|
from kokoro import KModel, KPipeline |
|
except Exception: |
|
KModel = None |
|
KPipeline = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
def _http_get(url: str) -> requests.Response: |
|
""" |
|
Download the page politely with a short timeout and realistic headers. |
|
(Layman's terms: grab the web page like a normal browser would, but quickly.) |
|
""" |
|
headers = { |
|
"User-Agent": "Mozilla/5.0 (compatible; WebMCP/1.0; +https://example.com)", |
|
"Accept-Language": "en-US,en;q=0.9", |
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
|
} |
|
return requests.get(url, headers=headers, timeout=15) |
|
|
|
|
|
def _normalize_whitespace(text: str) -> str: |
|
""" |
|
Squeeze extra spaces and blank lines to keep things compact. |
|
(Layman's terms: tidy up the text so it’s not full of weird spacing.) |
|
""" |
|
text = re.sub(r"[ \t\u00A0]+", " ", text) |
|
text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text.strip()) |
|
return text.strip() |
|
|
|
|
|
def _truncate(text: str, max_chars: int) -> Tuple[str, bool]: |
|
""" |
|
Cut text if it gets too long; return the text and whether we trimmed. |
|
(Layman's terms: shorten long text and tell us if we had to cut it.) |
|
""" |
|
if max_chars is None or max_chars <= 0 or len(text) <= max_chars: |
|
return text, False |
|
return text[:max_chars].rstrip() + " …", True |
|
|
|
|
|
def _shorten(text: str, limit: int) -> str: |
|
""" |
|
Hard cap a string with an ellipsis to keep tokens small. |
|
(Layman's terms: force a string to a max length with an ellipsis.) |
|
""" |
|
if limit <= 0 or len(text) <= limit: |
|
return text |
|
return text[: max(0, limit - 1)].rstrip() + "…" |
|
|
|
|
|
def _domain_of(url: str) -> str: |
|
""" |
|
Show a friendly site name like "example.com". |
|
(Layman's terms: pull the website's domain.) |
|
""" |
|
try: |
|
return urlparse(url).netloc or "" |
|
except Exception: |
|
return "" |
|
|
|
|
|
def _meta(soup: BeautifulSoup, name: str) -> str | None: |
|
tag = soup.find("meta", attrs={"name": name}) |
|
return tag.get("content") if tag and tag.has_attr("content") else None |
|
|
|
|
|
def _og(soup: BeautifulSoup, prop: str) -> str | None: |
|
tag = soup.find("meta", attrs={"property": prop}) |
|
return tag.get("content") if tag and tag.has_attr("content") else None |
|
|
|
|
|
def _extract_metadata(soup: BeautifulSoup, final_url: str) -> Dict[str, str]: |
|
""" |
|
Pull the useful bits: title, description, site name, canonical URL, language, etc. |
|
(Layman's terms: gather page basics like title/description/address.) |
|
""" |
|
meta: Dict[str, str] = {} |
|
|
|
|
|
title_candidates = [ |
|
(soup.title.string if soup.title and soup.title.string else None), |
|
_og(soup, "og:title"), |
|
_meta(soup, "twitter:title"), |
|
] |
|
meta["title"] = next((t.strip() for t in title_candidates if t and t.strip()), "") |
|
|
|
|
|
desc_candidates = [ |
|
_meta(soup, "description"), |
|
_og(soup, "og:description"), |
|
_meta(soup, "twitter:description"), |
|
] |
|
meta["description"] = next((d.strip() for d in desc_candidates if d and d.strip()), "") |
|
|
|
|
|
link_canonical = soup.find("link", rel=lambda v: v and "canonical" in v) |
|
meta["canonical"] = (link_canonical.get("href") or "").strip() if link_canonical else "" |
|
|
|
|
|
meta["site_name"] = (_og(soup, "og:site_name") or "").strip() |
|
html_tag = soup.find("html") |
|
meta["lang"] = (html_tag.get("lang") or "").strip() if html_tag else "" |
|
|
|
|
|
meta["fetched_url"] = final_url |
|
meta["domain"] = _domain_of(final_url) |
|
|
|
return meta |
|
|
|
|
|
def _extract_main_text(html: str) -> Tuple[str, BeautifulSoup]: |
|
""" |
|
Use Readability to isolate the main article and turn it into clean text. |
|
Returns (clean_text, soup_of_readable_html). |
|
(Layman's terms: find the real article text and clean it.) |
|
""" |
|
|
|
doc = Document(html) |
|
readable_html = doc.summary(html_partial=True) |
|
|
|
|
|
s = BeautifulSoup(readable_html, "lxml") |
|
|
|
|
|
for sel in ["script", "style", "noscript", "iframe", "svg"]: |
|
for tag in s.select(sel): |
|
tag.decompose() |
|
|
|
|
|
text_parts: List[str] = [] |
|
for p in s.find_all(["p", "li", "h2", "h3", "h4", "blockquote"]): |
|
chunk = p.get_text(" ", strip=True) |
|
if chunk: |
|
text_parts.append(chunk) |
|
|
|
clean_text = _normalize_whitespace("\n\n".join(text_parts)) |
|
return clean_text, s |
|
|
|
|
|
def _fullpage_markdown_from_soup(full_soup: BeautifulSoup, base_url: str) -> str: |
|
""" |
|
Convert the page's main content (or body fallback) to Markdown, similar to |
|
web-scraper's Content Scraper tool, but without any file download side-effects. |
|
|
|
Steps: |
|
- Remove noisy elements (script/style/nav/footer/header/aside) |
|
- Prefer <main>, <article>, or common content containers; fallback to <body> |
|
- Convert to Markdown with ATX headings |
|
- Clean up excessive newlines, empty links, and whitespace |
|
- Prepend a title header when available |
|
""" |
|
|
|
for element in full_soup.select("script, style, nav, footer, header, aside"): |
|
element.decompose() |
|
|
|
|
|
main = ( |
|
full_soup.find("main") |
|
or full_soup.find("article") |
|
or full_soup.find("div", class_=re.compile(r"content|main|post|article", re.I)) |
|
or full_soup.find("body") |
|
) |
|
|
|
if not main: |
|
return "No main content found on the webpage." |
|
|
|
|
|
markdown_text = md(str(main), heading_style="ATX") |
|
|
|
|
|
markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) |
|
markdown_text = re.sub(r"\[\s*\]\([^)]*\)", "", markdown_text) |
|
markdown_text = re.sub(r"[ \t]+", " ", markdown_text) |
|
markdown_text = markdown_text.strip() |
|
|
|
|
|
title = full_soup.find("title") |
|
if title and title.get_text(strip=True): |
|
markdown_text = f"# {title.get_text(strip=True)}\n\n{markdown_text}" |
|
|
|
return markdown_text or "No content could be extracted." |
|
|
|
|
|
def _extract_links(readable_soup: BeautifulSoup, base_url: str, max_links: int) -> List[Tuple[str, str]]: |
|
""" |
|
Collect clean, unique, absolute links from the readable section only. |
|
(Layman's terms: pull a tidy list of links from the article body.) |
|
""" |
|
seen = set() |
|
links: List[Tuple[str, str]] = [] |
|
|
|
for a in readable_soup.find_all("a", href=True): |
|
href = a.get("href").strip() |
|
|
|
if not href or href.startswith("#") or href.startswith("mailto:") or href.startswith("javascript:"): |
|
continue |
|
|
|
|
|
absolute = urljoin(base_url, href) |
|
absolute, _ = urldefrag(absolute) |
|
|
|
if absolute in seen: |
|
continue |
|
seen.add(absolute) |
|
|
|
text = a.get_text(" ", strip=True) |
|
if len(text) > 120: |
|
text = text[:117] + "…" |
|
|
|
links.append((text or absolute, absolute)) |
|
|
|
if len(links) >= max_links > 0: |
|
break |
|
|
|
return links |
|
|
|
|
|
def _format_markdown( |
|
meta: Dict[str, str], |
|
body: str, |
|
body_truncated: bool, |
|
links: List[Tuple[str, str]], |
|
include_text: bool, |
|
include_metadata: bool, |
|
include_links: bool, |
|
verbosity: str, |
|
) -> str: |
|
""" |
|
Assemble a compact Markdown summary with optional sections. |
|
(Layman's terms: build the final markdown output with options.) |
|
""" |
|
lines: List[str] = [] |
|
|
|
|
|
title = meta.get("title") or meta.get("domain") or "Untitled" |
|
lines.append(f"# {title}") |
|
|
|
|
|
if include_metadata: |
|
md: List[str] = [] |
|
if meta.get("description"): |
|
md.append(f"- **Description:** {meta['description']}") |
|
if meta.get("site_name"): |
|
md.append(f"- **Site:** {meta['site_name']}") |
|
if meta.get("canonical"): |
|
md.append(f"- **Canonical:** {meta['canonical']}") |
|
if meta.get("lang"): |
|
md.append(f"- **Language:** {meta['lang']}") |
|
if meta.get("fetched_url"): |
|
md.append(f"- **Fetched From:** {meta['fetched_url']}") |
|
if md: |
|
lines.append("## Metadata") |
|
lines.extend(md) |
|
|
|
|
|
if include_text and body: |
|
if verbosity == "Brief": |
|
brief, was_more = _truncate(body, 800) |
|
lines.append("## Text") |
|
lines.append(brief) |
|
if was_more or body_truncated: |
|
lines.append("\n> (Trimmed for brevity)") |
|
else: |
|
lines.append("## Text") |
|
lines.append(body) |
|
if body_truncated: |
|
lines.append("\n> (Trimmed for brevity)") |
|
|
|
|
|
if include_links and links: |
|
lines.append(f"## Links ({len(links)})") |
|
for text, url in links: |
|
lines.append(f"- [{text}]({url})") |
|
|
|
return "\n\n".join(lines).strip() |
|
|
|
|
|
def Fetch_Webpage( |
|
url: Annotated[str, "The absolute URL to fetch (must return HTML)."] , |
|
verbosity: Annotated[str, "Controls body length: one of 'Brief', 'Standard', or 'Full'."] = "Standard", |
|
include_metadata: Annotated[bool, "Include a Metadata section (description, site name, canonical, lang, fetched URL)."] = True, |
|
include_text: Annotated[bool, "Include the readable main text extracted with Readability."] = True, |
|
include_links: Annotated[bool, "Include outbound links discovered in the readable section."] = True, |
|
max_chars: Annotated[int, "Hard cap for body characters after the verbosity preset. Use 0 to disable the cap."] = 3000, |
|
max_links: Annotated[int, "Maximum number of links to include from the readable content. Set 0 to omit links."] = 20, |
|
full_page_markdown: Annotated[bool, "If true, return the page as full Markdown (Content Scraper mode) instead of a compact summary."] = False, |
|
) -> str: |
|
""" |
|
Fetch a web page and return a compact Markdown summary containing title, key |
|
metadata, readable main text, and outbound links. |
|
|
|
Args: |
|
url: The absolute URL to fetch (must return HTML). |
|
verbosity: Controls body length: one of 'Brief', 'Standard', or 'Full'. |
|
include_metadata: Include a Metadata section (description, site name, canonical, lang, fetched URL). |
|
include_text: Include the readable main text extracted with Readability. |
|
include_links: Include outbound links discovered in the readable section. |
|
max_chars: Hard cap for body characters after the verbosity preset. Use 0 to disable the cap. |
|
max_links: Maximum number of links to include from the readable content. Set 0 to omit links. |
|
full_page_markdown: If True, return the page converted to full Markdown (Content Scraper mode) |
|
instead of the compact summary. This ignores verbosity/include_* and max_* limits and |
|
attempts to convert the main content area to Markdown with headings preserved. |
|
|
|
Returns: |
|
str: Markdown that may contain the following sections: |
|
- Title (H1) |
|
- Metadata (optional) |
|
- Text (optional, may be trimmed) |
|
- Links (optional, deduped and absolute) |
|
|
|
Special mode: |
|
If full_page_markdown=True, the function returns the page converted to Markdown, |
|
similar to the "Content Scraper" tool, ignoring verbosity/include_* limits. |
|
""" |
|
if not url or not url.strip(): |
|
return "Please enter a valid URL." |
|
|
|
try: |
|
resp = _http_get(url) |
|
resp.raise_for_status() |
|
except requests.exceptions.RequestException as e: |
|
return f"An error occurred: {e}" |
|
|
|
final_url = str(resp.url) |
|
ctype = resp.headers.get("Content-Type", "") |
|
if "html" not in ctype.lower(): |
|
return f"Unsupported content type for extraction: {ctype or 'unknown'}" |
|
|
|
|
|
resp.encoding = resp.encoding or resp.apparent_encoding |
|
html = resp.text |
|
|
|
|
|
full_soup = BeautifulSoup(html, "lxml") |
|
meta = _extract_metadata(full_soup, final_url) |
|
|
|
|
|
if full_page_markdown: |
|
return _fullpage_markdown_from_soup(full_soup, final_url) |
|
|
|
|
|
body_text, readable_soup = _extract_main_text(html) |
|
if not body_text: |
|
|
|
fallback_text = full_soup.get_text(" ", strip=True) |
|
body_text = _normalize_whitespace(fallback_text) |
|
|
|
|
|
preset_caps = {"Brief": 1200, "Standard": 3000, "Full": 999_999} |
|
target_cap = preset_caps.get(verbosity, 3000) |
|
cap = min(max_chars if max_chars > 0 else target_cap, target_cap) |
|
body_text, truncated = _truncate(body_text, cap) if include_text else ("", False) |
|
|
|
|
|
links = _extract_links(readable_soup, final_url, max_links=max_links if include_links else 0) |
|
|
|
|
|
md = _format_markdown( |
|
meta=meta, |
|
body=body_text, |
|
body_truncated=truncated, |
|
links=links, |
|
include_text=include_text, |
|
include_metadata=include_metadata, |
|
include_links=include_links, |
|
verbosity=verbosity, |
|
) |
|
return md or "No content could be extracted." |
|
|
|
|
|
|
|
|
|
|
|
|
|
def Search_DuckDuckGo( |
|
query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."] , |
|
max_results: Annotated[int, "Number of results to return (1–20)."] = 5, |
|
include_snippets: Annotated[bool, "Include a short snippet for each result (adds tokens)."] = False, |
|
max_snippet_chars: Annotated[int, "Character cap applied to each snippet when included."] = 80, |
|
dedupe_domains: Annotated[bool, "If true, only the first result from each domain is kept."] = True, |
|
title_chars: Annotated[int, "Character cap applied to titles."] = 80, |
|
) -> str: |
|
""" |
|
Run a DuckDuckGo search and return ultra-compact JSONL with short keys to |
|
minimize tokens. |
|
|
|
Args: |
|
query: The search query (supports operators like site:, quotes, OR). |
|
max_results: Number of results to return (1–20). |
|
include_snippets: Include a short snippet for each result (adds tokens). |
|
max_snippet_chars: Character cap applied to each snippet when included. |
|
dedupe_domains: If true, only the first result from each domain is kept. |
|
title_chars: Character cap applied to titles. |
|
|
|
Returns: |
|
str: Newline-delimited JSON (JSONL). Each line has: |
|
{"t": "title", "u": "url"[, "s": "snippet"]} |
|
""" |
|
if not query or not query.strip(): |
|
return "" |
|
|
|
try: |
|
with DDGS() as ddgs: |
|
raw = ddgs.text(query, max_results=max_results) |
|
except Exception as e: |
|
return json.dumps({"error": str(e)[:120]}, ensure_ascii=False, separators=(",", ":")) |
|
|
|
seen_domains = set() |
|
lines: List[str] = [] |
|
|
|
for r in raw or []: |
|
title = _shorten((r.get("title") or "").strip(), title_chars) |
|
url = (r.get("href") or r.get("link") or "").strip() |
|
body = (r.get("body") or r.get("snippet") or "").strip() |
|
|
|
if not url: |
|
continue |
|
|
|
if dedupe_domains: |
|
dom = _domain_of(url) |
|
if dom in seen_domains: |
|
continue |
|
seen_domains.add(dom) |
|
|
|
obj = {"t": title or _domain_of(url), "u": url} |
|
|
|
if include_snippets and body: |
|
obj["s"] = _shorten(body, max_snippet_chars) |
|
|
|
|
|
lines.append(json.dumps(obj, ensure_ascii=False, separators=(",", ":"))) |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def Execute_Python(code: Annotated[str, "Python source code to run; stdout is captured and returned."]) -> str: |
|
""" |
|
Execute arbitrary Python code and return captured stdout or an error message. |
|
|
|
Args: |
|
code: Python source code to run; stdout is captured and returned. |
|
|
|
Returns: |
|
str: Combined stdout produced by the code, or the exception text if |
|
execution failed. |
|
""" |
|
if code is None: |
|
return "No code provided." |
|
|
|
old_stdout = sys.stdout |
|
redirected_output = sys.stdout = StringIO() |
|
try: |
|
exec(code) |
|
return redirected_output.getvalue() |
|
except Exception as e: |
|
return str(e) |
|
finally: |
|
sys.stdout = old_stdout |
|
|
|
|
|
|
|
|
|
|
|
|
|
_KOKORO_STATE = { |
|
"initialized": False, |
|
"device": "cpu", |
|
"model": None, |
|
"pipelines": {}, |
|
} |
|
|
|
|
|
def get_kokoro_voices(): |
|
"""Get comprehensive list of available Kokoro voice IDs (54 total).""" |
|
try: |
|
from huggingface_hub import list_repo_files |
|
|
|
files = list_repo_files('hexgrad/Kokoro-82M') |
|
voice_files = [f for f in files if f.endswith('.pt') and f.startswith('voices/')] |
|
voices = [f.replace('voices/', '').replace('.pt', '') for f in voice_files] |
|
return sorted(voices) if voices else _get_fallback_voices() |
|
except Exception: |
|
return _get_fallback_voices() |
|
|
|
|
|
def _get_fallback_voices(): |
|
"""Return comprehensive fallback list of known Kokoro voices (54 total).""" |
|
return [ |
|
|
|
"af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", |
|
"af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", |
|
|
|
"am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", |
|
"am_michael", "am_onyx", "am_puck", "am_santa", |
|
|
|
"bf_alice", "bf_emma", "bf_isabella", "bf_lily", |
|
|
|
"bm_daniel", "bm_fable", "bm_george", "bm_lewis", |
|
|
|
"ef_dora", "em_alex", "em_santa", |
|
|
|
"ff_siwis", |
|
|
|
"hf_alpha", "hf_beta", "hm_omega", "hm_psi", |
|
|
|
"if_sara", "im_nicola", |
|
|
|
"jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo", |
|
|
|
"pf_dora", "pm_alex", "pm_santa", |
|
|
|
"zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", |
|
"zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang" |
|
] |
|
|
|
|
|
def _init_kokoro() -> None: |
|
"""Lazy-initialize Kokoro model and pipelines on first use. |
|
|
|
Tries CUDA if torch is present and available; falls back to CPU. Keeps a |
|
minimal English pipeline and custom lexicon tweak for the word "kokoro". |
|
""" |
|
if _KOKORO_STATE["initialized"]: |
|
return |
|
|
|
if KModel is None or KPipeline is None: |
|
raise RuntimeError( |
|
"Kokoro is not installed. Please install the 'kokoro' package (>=0.9.4)." |
|
) |
|
|
|
device = "cpu" |
|
if torch is not None: |
|
try: |
|
if torch.cuda.is_available(): |
|
device = "cuda" |
|
except Exception: |
|
device = "cpu" |
|
|
|
model = KModel().to(device).eval() |
|
pipelines = {"a": KPipeline(lang_code="a", model=False)} |
|
|
|
try: |
|
pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" |
|
except Exception: |
|
pass |
|
|
|
_KOKORO_STATE.update( |
|
{ |
|
"initialized": True, |
|
"device": device, |
|
"model": model, |
|
"pipelines": pipelines, |
|
} |
|
) |
|
|
|
|
|
def List_Kokoro_Voices() -> List[str]: |
|
""" |
|
Get a list of all available Kokoro voice identifiers. |
|
|
|
This MCP tool helps clients discover the 54 available voice options |
|
for the Generate_Speech tool. |
|
|
|
Returns: |
|
List[str]: A list of voice identifiers (e.g., ["af_heart", "am_adam", "bf_alice", ...]) |
|
|
|
Voice naming convention: |
|
- First 2 letters: Language/Region (af=American Female, am=American Male, bf=British Female, etc.) |
|
- Following letters: Voice name (heart, adam, alice, etc.) |
|
|
|
Available categories: |
|
- American Female/Male (20 voices) |
|
- British Female/Male (8 voices) |
|
- European Female/Male (3 voices) |
|
- French Female (1 voice) |
|
- Hindi Female/Male (4 voices) |
|
- Italian Female/Male (2 voices) |
|
- Japanese Female/Male (5 voices) |
|
- Portuguese Female/Male (3 voices) |
|
- Chinese Female/Male (8 voices) |
|
""" |
|
return get_kokoro_voices() |
|
|
|
|
|
def Generate_Speech( |
|
text: Annotated[str, "The text to synthesize (English)."], |
|
speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.25, |
|
voice: Annotated[str, "Voice identifier from 54 available options. Use List_Kokoro_Voices() to see all choices. Examples: 'af_heart' (US female), 'am_adam' (US male), 'bf_alice' (British female), 'jf_alpha' (Japanese female)."] = "af_heart", |
|
) -> Tuple[int, np.ndarray]: |
|
""" |
|
Synthesize speech from text using the Kokoro-82M model with 54 voice options. |
|
|
|
This function returns raw audio suitable for a Gradio Audio component and is |
|
also exposed as an MCP tool. It supports 54 different voices across multiple |
|
languages and accents including American, British, European, Hindi, Italian, |
|
Japanese, Portuguese, and Chinese speakers. |
|
|
|
Enhanced for longer audio generation: |
|
- Processes ALL text segments (not just the first one) |
|
- Can generate audio of any length based on input text |
|
- Concatenates multiple segments for seamless longer audio |
|
|
|
Default behavior: |
|
- Speed defaults to 1.25 (slightly brisk cadence) for clearer, snappier delivery. |
|
- Voice defaults to "af_heart" (American Female, Heart voice) |
|
|
|
Args: |
|
text: The text to synthesize. Works best with English but supports multiple languages. |
|
speed: Speech speed multiplier in 0.5–2.0; 1.0 = normal speed. Default: 1.25 (slightly brisk). |
|
voice: Voice identifier from 54 available options. Use List_Kokoro_Voices() to see all choices. Default: 'af_heart'. |
|
|
|
Returns: |
|
A tuple of (sample_rate_hz, audio_waveform) where: |
|
- sample_rate_hz: int sample rate in Hz (24_000) |
|
- audio_waveform: numpy.ndarray float32 mono waveform in range [-1, 1] |
|
|
|
Notes: |
|
- Requires the 'kokoro' package (>=0.9.4). If unavailable, an error is raised. |
|
- Runs on CUDA if available; otherwise CPU. |
|
- Supports 54 voices across 9 language/accent categories. |
|
- Can generate audio of any length - no 30 second limit! |
|
- Use List_Kokoro_Voices() MCP tool to discover all available voice options. |
|
""" |
|
if not text or not text.strip(): |
|
raise gr.Error("Please provide non-empty text to synthesize.") |
|
|
|
_init_kokoro() |
|
model = _KOKORO_STATE["model"] |
|
pipelines = _KOKORO_STATE["pipelines"] |
|
|
|
pipeline = pipelines.get("a") |
|
if pipeline is None: |
|
raise gr.Error("Kokoro English pipeline not initialized.") |
|
|
|
|
|
audio_segments = [] |
|
pack = pipeline.load_voice(voice) |
|
|
|
try: |
|
|
|
segments = list(pipeline(text, voice, speed)) |
|
total_segments = len(segments) |
|
|
|
|
|
for segment_idx, (text_chunk, ps, _) in enumerate(segments): |
|
ref_s = pack[len(ps) - 1] |
|
try: |
|
audio = model(ps, ref_s, float(speed)) |
|
audio_segments.append(audio.detach().cpu().numpy()) |
|
|
|
|
|
if total_segments > 10 and (segment_idx + 1) % 5 == 0: |
|
print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...") |
|
|
|
except Exception as e: |
|
raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {str(e)}") |
|
|
|
if not audio_segments: |
|
raise gr.Error("No audio was generated (empty synthesis result).") |
|
|
|
|
|
if len(audio_segments) == 1: |
|
final_audio = audio_segments[0] |
|
else: |
|
final_audio = np.concatenate(audio_segments, axis=0) |
|
|
|
duration = len(final_audio) / 24_000 |
|
if total_segments > 1: |
|
print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio") |
|
|
|
|
|
return 24_000, final_audio |
|
|
|
except gr.Error: |
|
raise |
|
except Exception as e: |
|
raise gr.Error(f"Error during speech generation: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fetch_interface = gr.Interface( |
|
fn=Fetch_Webpage, |
|
inputs=[ |
|
gr.Textbox(label="URL", placeholder="https://example.com/article"), |
|
gr.Dropdown(label="Verbosity", choices=["Brief", "Standard", "Full"], value="Standard"), |
|
gr.Checkbox(value=True, label="Include Metadata"), |
|
gr.Checkbox(value=True, label="Include Main Text"), |
|
gr.Checkbox(value=True, label="Include Links"), |
|
gr.Slider(400, 12000, value=3000, step=100, label="Max Characters (body text)"), |
|
gr.Slider(0, 100, value=20, step=1, label="Max Links"), |
|
gr.Checkbox(value=False, label="Full-page Markdown (Content Scraper mode)"), |
|
], |
|
outputs=gr.Markdown(label="Extracted Summary"), |
|
title="Fetch Webpage", |
|
description=( |
|
"<div style=\"text-align:center\">Extract title, key metadata, readable text, and links from webpages — or toggle full-page Markdown.</div>" |
|
), |
|
api_description=( |
|
"Fetch a web page and return a compact Markdown summary with title, key " |
|
"metadata, readable body text, and outbound links. Or, enable the " |
|
"'Full-page Markdown (Content Scraper mode)' option to return the page " |
|
"converted to Markdown." |
|
), |
|
allow_flagging="never", |
|
) |
|
|
|
|
|
concise_interface = gr.Interface( |
|
fn=Search_DuckDuckGo, |
|
inputs=[ |
|
gr.Textbox(label="Query", placeholder="topic OR site:example.com"), |
|
gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results"), |
|
gr.Checkbox(value=False, label="Include snippets (adds tokens)"), |
|
gr.Slider(minimum=20, maximum=200, value=80, step=5, label="Max snippet chars"), |
|
gr.Checkbox(value=True, label="Dedupe by domain"), |
|
gr.Slider(minimum=20, maximum=120, value=80, step=5, label="Max title chars"), |
|
], |
|
outputs=gr.Textbox(label="Results (JSONL)", interactive=False), |
|
title="DuckDuckGo Search", |
|
description=( |
|
"<div style=\"text-align:center\">Very concise web search to avoid unnecessary context. Emits JSONL with short keys (t,u[,s]). Defaults avoid snippets and duplicate domains.</div>" |
|
), |
|
api_description=( |
|
"Run a DuckDuckGo search and return newline-delimited JSON with short keys: " |
|
"t=title, u=url, optional s=snippet. Options control result count, " |
|
"snippet inclusion and length, domain deduping, and title length." |
|
), |
|
allow_flagging="never", |
|
submit_btn="Search", |
|
) |
|
|
|
|
|
|
|
|
|
code_interface = gr.Interface( |
|
fn=Execute_Python, |
|
inputs=gr.Code(label="Python Code", language="python"), |
|
outputs=gr.Textbox(label="Output"), |
|
title="Python Code Executor", |
|
description=( |
|
"<div style=\"text-align:center\">Execute Python code and see the output.</div>" |
|
), |
|
api_description=( |
|
"Execute arbitrary Python code and return captured stdout or an error message.\n\n" |
|
"Parameters:\n" |
|
"- code (string): The Python source code to run.\n\n" |
|
"Returns:\n" |
|
"- string: Combined stdout produced by the code, or the exception text if execution failed." |
|
), |
|
allow_flagging="never", |
|
) |
|
|
|
CSS_STYLES = """ |
|
.gradio-container h1 { |
|
text-align: center; |
|
/* Ensure main title appears first, then our two subtitle lines */ |
|
display: grid; |
|
justify-items: center; |
|
} |
|
/* Place bold tools list on line 2, normal auth note on line 3 (below title) */ |
|
.gradio-container h1::before { |
|
grid-row: 2; |
|
content: "Fetch Webpage | Search DuckDuckGo | Code Interpreter | Kokoro TTS (54 voices) | Image Generation | Video Generation"; |
|
display: block; |
|
font-size: 1rem; |
|
font-weight: 700; |
|
opacity: 0.9; |
|
margin-top: 6px; |
|
white-space: pre-wrap; |
|
} |
|
.gradio-container h1::after { |
|
grid-row: 3; |
|
content: "Authentication is optional but Image/Video Generation require a `HF_READ_TOKEN` in env variables. They are hidden otherwise."; |
|
display: block; |
|
font-size: 1rem; |
|
font-weight: 400; |
|
opacity: 0.9; |
|
margin-top: 2px; |
|
white-space: pre-wrap; |
|
} |
|
|
|
/* Remove inside tab panels so it doesn't duplicate under each tool title */ |
|
.gradio-container [role=\"tabpanel\"] h1::before, |
|
.gradio-container [role=\"tabpanel\"] h1::after { |
|
content: none !important; |
|
} |
|
""" |
|
|
|
|
|
available_voices = get_kokoro_voices() |
|
kokoro_interface = gr.Interface( |
|
fn=Generate_Speech, |
|
inputs=[ |
|
gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4), |
|
gr.Slider(minimum=0.5, maximum=2.0, value=1.25, step=0.1, label="Speed"), |
|
gr.Dropdown( |
|
label="Voice", |
|
choices=available_voices, |
|
value="af_heart", |
|
info="Select from 54 available voices across multiple languages and accents" |
|
), |
|
], |
|
outputs=gr.Audio(label="Audio", type="numpy"), |
|
title="Kokoro TTS", |
|
description=( |
|
"<div style=\"text-align:center\">Generate speech with Kokoro-82M using 54 different voices. Supports multiple languages and accents. Can generate audio of any length! Runs on CPU or CUDA if available.</div>" |
|
), |
|
api_description=( |
|
"Synthesize speech from text using Kokoro-82M with 54 voice options. Returns (sample_rate, waveform) suitable for playback. " |
|
"Parameters: text (str), speed (float 0.5–2.0, default 1.25x), voice (str from 54 available options). " |
|
"Default voice: `af_heart`. " |
|
"Can generate audio of unlimited length by processing all text segments. " |
|
"Return the generated media to the user in this format ``" |
|
), |
|
allow_flagging="never", |
|
) |
|
|
|
|
|
|
|
|
|
|
|
HF_API_TOKEN = os.getenv("HF_READ_TOKEN") |
|
|
|
|
|
def Generate_Image( |
|
prompt: Annotated[str, "Text description of the image to generate."], |
|
model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name' (e.g., black-forest-labs/FLUX.1-Krea-dev)."] = "black-forest-labs/FLUX.1-Krea-dev", |
|
negative_prompt: Annotated[str, "What should NOT appear in the image." ] = ( |
|
"(deformed, distorted, disfigured), poorly drawn, bad anatomy, wrong anatomy, extra limb, " |
|
"missing limb, floating limbs, (mutated hands and fingers), disconnected limbs, mutation, " |
|
"mutated, ugly, disgusting, blurry, amputation, misspellings, typos" |
|
), |
|
steps: Annotated[int, "Number of denoising steps (1–100). Higher = slower, potentially higher quality."] = 35, |
|
cfg_scale: Annotated[float, "Classifier-free guidance scale (1–20). Higher = follow the prompt more closely."] = 7.0, |
|
sampler: Annotated[str, "Sampling method label (UI only). Common options: 'DPM++ 2M Karras', 'DPM++ SDE Karras', 'Euler', 'Euler a', 'Heun', 'DDIM'."] = "DPM++ 2M Karras", |
|
seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1, |
|
width: Annotated[int, "Output width in pixels (64–1216, multiple of 32 recommended)."] = 1024, |
|
height: Annotated[int, "Output height in pixels (64–1216, multiple of 32 recommended)."] = 1024, |
|
) -> Image.Image: |
|
""" |
|
Generate a single image from a text prompt using a Hugging Face model via |
|
serverless Inference. Returns a PIL image. By default, the model is |
|
black-forest-labs/FLUX.1-Krea-dev. |
|
|
|
Notes (MCP): |
|
- Per the latest Gradio MCP docs, images returned from tools are handled by the server and |
|
converted to file URLs automatically for MCP clients. Ensure type hints and this docstring |
|
"Args:" block are present so the tool schema is accurate. |
|
|
|
Args: |
|
prompt (str): Text description of the image to generate. |
|
model_id (str): The Hugging Face model id (creator/model-name). Defaults to "black-forest-labs/FLUX.1-Krea-dev". |
|
negative_prompt (str): What should NOT appear in the image. |
|
steps (int): Number of denoising steps (1–100). Higher can improve quality. |
|
cfg_scale (float): Guidance scale (1–20). Higher = follow the prompt more closely. |
|
sampler (str): Sampling method label for UI; not all providers expose this control. |
|
seed (int): Random seed. Use -1 to randomize on each call. |
|
width (int): Output width in pixels (64–1216; multiples of 32 recommended). |
|
height (int): Output height in pixels (64–1216; multiples of 32 recommended). |
|
|
|
Returns: |
|
PIL.Image.Image: The generated image. |
|
|
|
Error modes: |
|
- Raises gr.Error with a user-friendly message on auth/model/load errors. |
|
""" |
|
if not prompt or not prompt.strip(): |
|
raise gr.Error("Please provide a non-empty prompt.") |
|
|
|
|
|
enhanced_prompt = f"{prompt} | ultra detail, ultra elaboration, ultra quality, perfect." |
|
|
|
|
|
providers = ["auto", "replicate", "fal-ai"] |
|
last_error: Exception | None = None |
|
|
|
for provider in providers: |
|
try: |
|
client = InferenceClient(api_key=HF_API_TOKEN, provider=provider) |
|
image = client.text_to_image( |
|
prompt=enhanced_prompt, |
|
negative_prompt=negative_prompt, |
|
model=model_id, |
|
width=width, |
|
height=height, |
|
num_inference_steps=steps, |
|
guidance_scale=cfg_scale, |
|
seed=seed if seed != -1 else random.randint(1, 1_000_000_000), |
|
) |
|
return image |
|
except Exception as e: |
|
last_error = e |
|
continue |
|
|
|
|
|
msg = str(last_error) if last_error else "Unknown error" |
|
if "404" in msg: |
|
raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and your HF token access.") |
|
if "503" in msg: |
|
raise gr.Error("The model is warming up. Please try again shortly.") |
|
if "401" in msg or "403" in msg: |
|
raise gr.Error("Authentication failed. Set HF_READ_TOKEN environment variable with access to the model.") |
|
raise gr.Error(f"Image generation failed: {msg}") |
|
|
|
|
|
image_generation_interface = gr.Interface( |
|
fn=Generate_Image, |
|
inputs=[ |
|
gr.Textbox(label="Prompt", placeholder="Enter a prompt", lines=2), |
|
gr.Textbox(label="Model", value="black-forest-labs/FLUX.1-Krea-dev", placeholder="creator/model-name"), |
|
gr.Textbox( |
|
label="Negative Prompt", |
|
value=( |
|
"(deformed, distorted, disfigured), poorly drawn, bad anatomy, wrong anatomy, extra limb, " |
|
"missing limb, floating limbs, (mutated hands and fingers), disconnected limbs, mutation, " |
|
"mutated, ugly, disgusting, blurry, amputation, misspellings, typos" |
|
), |
|
lines=2, |
|
), |
|
gr.Slider(minimum=1, maximum=100, value=35, step=1, label="Steps"), |
|
gr.Slider(minimum=1.0, maximum=20.0, value=7.0, step=0.1, label="CFG Scale"), |
|
gr.Radio(label="Sampler", value="DPM++ 2M Karras", choices=[ |
|
"DPM++ 2M Karras", "DPM++ SDE Karras", "Euler", "Euler a", "Heun", "DDIM" |
|
]), |
|
gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"), |
|
gr.Slider(minimum=64, maximum=1216, value=1024, step=32, label="Width"), |
|
gr.Slider(minimum=64, maximum=1216, value=1024, step=32, label="Height"), |
|
], |
|
outputs=gr.Image(label="Generated Image"), |
|
title="Image Generation", |
|
description=( |
|
"<div style=\"text-align:center\">Generate images via Hugging Face Inference. " |
|
"Default model is FLUX.1-Krea</div>" |
|
), |
|
api_description=( |
|
"Generate a single image from a text prompt using a Hugging Face model (serverless Inference). " |
|
"Parameters: prompt (str), model_id (str, creator/model-name), negative_prompt (str), steps (int, 1–100), cfg_scale (float, 1–20), " |
|
"sampler (str, label only), seed (int, -1=random), width/height (int, 64–1216). Returns a PIL.Image. " |
|
"Return the generated media to the user in this format ``" |
|
), |
|
allow_flagging="never", |
|
) |
|
|
|
|
|
|
|
|
|
|
|
def _write_video_tmp(data_iter_or_bytes: object, suffix: str = ".mp4") -> str: |
|
"""Write video bytes or iterable of bytes to a temporary file and return its path.""" |
|
os.makedirs("outputs", exist_ok=True) |
|
fname = f"outputs/video_{int(time.time())}_{random.randint(1000,9999)}{suffix}" |
|
mode = "wb" |
|
with open(fname, mode) as f: |
|
|
|
if isinstance(data_iter_or_bytes, (bytes, bytearray)): |
|
f.write(data_iter_or_bytes) |
|
|
|
elif hasattr(data_iter_or_bytes, "read"): |
|
f.write(data_iter_or_bytes.read()) |
|
|
|
elif hasattr(data_iter_or_bytes, "content"): |
|
f.write(data_iter_or_bytes.content) |
|
|
|
elif hasattr(data_iter_or_bytes, "__iter__") and not isinstance(data_iter_or_bytes, (str, dict)): |
|
for chunk in data_iter_or_bytes: |
|
if chunk: |
|
f.write(chunk) |
|
else: |
|
raise gr.Error("Unsupported video data type returned by provider.") |
|
return fname |
|
|
|
|
|
HF_VIDEO_TOKEN = os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN") |
|
|
|
|
|
def Generate_Video( |
|
prompt: Annotated[str, "Text description of the video to generate (e.g., 'a red fox running through a snowy forest at sunrise')."], |
|
model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name'. Defaults to Wan-AI/Wan2.2-T2V-A14B."] = "Wan-AI/Wan2.2-T2V-A14B", |
|
negative_prompt: Annotated[str, "What should NOT appear in the video."] = "", |
|
steps: Annotated[int, "Number of denoising steps (1–100). Higher can improve quality but is slower."] = 25, |
|
cfg_scale: Annotated[float, "Guidance scale (1–20). Higher = follow the prompt more closely, lower = more creative."] = 3.5, |
|
seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1, |
|
width: Annotated[int, "Output width in pixels (multiples of 8 recommended)."] = 768, |
|
height: Annotated[int, "Output height in pixels (multiples of 8 recommended)."] = 768, |
|
fps: Annotated[int, "Frames per second of the output video (e.g., 24)."] = 24, |
|
duration: Annotated[float, "Target duration in seconds (provider/model dependent, commonly 2–6s)."] = 4.0, |
|
) -> str: |
|
""" |
|
Generate a short video from a text prompt using Hugging Face Inference Providers (Serverless Inference). |
|
|
|
This tool follows the latest MCP guidance for Gradio-based MCP servers: clear type hints and |
|
docstrings define the tool schema automatically. The returned file path will be converted to a file URL |
|
for MCP clients. |
|
|
|
Args: |
|
prompt (str): Text description of the video to generate. |
|
model_id (str): The Hugging Face model id (creator/model-name). Defaults to "Wan-AI/Wan2.2-T2V-A14B". |
|
negative_prompt (str): What should NOT appear in the video. |
|
steps (int): Number of denoising steps (1–100). Higher can improve quality but is slower. |
|
cfg_scale (float): Guidance scale (1–20). Higher = follow the prompt more closely. |
|
seed (int): Random seed. Use -1 to randomize on each call. |
|
width (int): Output width in pixels. |
|
height (int): Output height in pixels. |
|
fps (int): Frames per second. |
|
duration (float): Target duration in seconds. |
|
|
|
Returns: |
|
str: Path to an MP4 file on disk (Gradio will serve this file; MCP converts it to a file URL). |
|
|
|
Error modes: |
|
- Raises gr.Error with a user-friendly message on auth/model/load errors or unsupported parameters. |
|
""" |
|
if not prompt or not prompt.strip(): |
|
raise gr.Error("Please provide a non-empty prompt.") |
|
|
|
if not HF_VIDEO_TOKEN: |
|
|
|
pass |
|
|
|
providers = ["auto", "replicate", "fal-ai"] |
|
last_error: Exception | None = None |
|
|
|
|
|
parameters = { |
|
"negative_prompt": negative_prompt or None, |
|
"num_inference_steps": steps, |
|
"guidance_scale": cfg_scale, |
|
"seed": seed if seed != -1 else random.randint(1, 1_000_000_000), |
|
"width": width, |
|
"height": height, |
|
"fps": fps, |
|
|
|
|
|
"duration": duration, |
|
} |
|
|
|
for provider in providers: |
|
try: |
|
client = InferenceClient(api_key=HF_VIDEO_TOKEN, provider=provider) |
|
|
|
if hasattr(client, "text_to_video"): |
|
|
|
num_frames = int(duration * fps) if duration and fps else None |
|
|
|
|
|
extra_body = {} |
|
if width: |
|
extra_body["width"] = width |
|
if height: |
|
extra_body["height"] = height |
|
if fps: |
|
extra_body["fps"] = fps |
|
if duration: |
|
extra_body["duration"] = duration |
|
|
|
result = client.text_to_video( |
|
prompt=prompt, |
|
model=model_id, |
|
guidance_scale=cfg_scale, |
|
negative_prompt=[negative_prompt] if negative_prompt else None, |
|
num_frames=num_frames, |
|
num_inference_steps=steps, |
|
seed=parameters["seed"], |
|
extra_body=extra_body if extra_body else None, |
|
) |
|
else: |
|
|
|
result = client.post( |
|
model=model_id, |
|
json={ |
|
"inputs": prompt, |
|
"parameters": {k: v for k, v in parameters.items() if v is not None}, |
|
}, |
|
) |
|
|
|
|
|
path = _write_video_tmp(result, suffix=".mp4") |
|
return path |
|
except Exception as e: |
|
last_error = e |
|
continue |
|
|
|
msg = str(last_error) if last_error else "Unknown error" |
|
if "404" in msg: |
|
raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and HF token access.") |
|
if "503" in msg: |
|
raise gr.Error("The model is warming up. Please try again shortly.") |
|
if "401" in msg or "403" in msg: |
|
raise gr.Error("Authentication failed or not permitted. Set HF_READ_TOKEN/HF_TOKEN with inference access.") |
|
raise gr.Error(f"Video generation failed: {msg}") |
|
|
|
|
|
video_generation_interface = gr.Interface( |
|
fn=Generate_Video, |
|
inputs=[ |
|
gr.Textbox(label="Prompt", placeholder="Enter a prompt for the video", lines=2), |
|
gr.Textbox(label="Model", value="Wan-AI/Wan2.2-T2V-A14B", placeholder="creator/model-name"), |
|
gr.Textbox(label="Negative Prompt", value="", lines=2), |
|
gr.Slider(minimum=1, maximum=100, value=25, step=1, label="Steps"), |
|
gr.Slider(minimum=1.0, maximum=20.0, value=3.5, step=0.1, label="CFG Scale"), |
|
gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"), |
|
gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Width"), |
|
gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Height"), |
|
gr.Slider(minimum=4, maximum=60, value=24, step=1, label="FPS"), |
|
gr.Slider(minimum=1.0, maximum=10.0, value=4.0, step=0.5, label="Duration (s)"), |
|
], |
|
outputs=gr.Video(label="Generated Video"), |
|
title="Video Generation", |
|
description=( |
|
"<div style=\"text-align:center\">Generate short videos via Hugging Face Inference Providers. " |
|
"Default model is Wan2.2-T2V-A14B.</div>" |
|
), |
|
api_description=( |
|
"Generate a short video from a text prompt using a Hugging Face model (Serverless Inference). " |
|
"Parameters: prompt (str), model_id (str), negative_prompt (str), steps (int), cfg_scale (float), seed (int), " |
|
"width/height (int), fps (int), duration (float). Return the generated media to the user in this format ``" |
|
), |
|
allow_flagging="never", |
|
) |
|
|
|
|
|
HAS_HF_TOKEN = bool(HF_API_TOKEN or HF_VIDEO_TOKEN) |
|
|
|
_interfaces = [ |
|
fetch_interface, |
|
concise_interface, |
|
code_interface, |
|
kokoro_interface, |
|
] |
|
_tab_names = [ |
|
"Fetch Webpage", |
|
"DuckDuckGo Search", |
|
"Python Code Executor", |
|
"Kokoro TTS", |
|
] |
|
|
|
if HAS_HF_TOKEN: |
|
_interfaces.extend([image_generation_interface, video_generation_interface]) |
|
_tab_names.extend(["Image Generation", "Video Generation"]) |
|
|
|
demo = gr.TabbedInterface( |
|
interface_list=_interfaces, |
|
tab_names=_tab_names, |
|
title="Tools MCP", |
|
theme="Nymbo/Nymbo_Theme", |
|
css=CSS_STYLES, |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch(mcp_server=True) |