Spaces:
Sleeping
Sleeping
import asyncio | |
import json | |
import gradio as gr | |
from mcp import ClientSession | |
from mcp.client.sse import sse_client | |
from contextlib import AsyncExitStack | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
class SimpleMCPClient: | |
def __init__(self): | |
self.session = None | |
self.connected = False | |
self.tools = [] | |
self.exit_stack = None | |
self.server_url = "https://chris4k-weather.hf.space/gradio_api/mcp/sse" | |
def connect(self) -> str: | |
"""Connect to the hardcoded MCP server""" | |
return loop.run_until_complete(self._connect()) | |
async def _connect(self) -> str: | |
try: | |
# Clean up previous connection | |
if self.exit_stack: | |
await self.exit_stack.aclose() | |
self.exit_stack = AsyncExitStack() | |
# Connect to SSE MCP server | |
sse_transport = await self.exit_stack.enter_async_context( | |
sse_client(self.server_url) | |
) | |
read_stream, write_callable = sse_transport | |
self.session = await self.exit_stack.enter_async_context( | |
ClientSession(read_stream, write_callable) | |
) | |
await self.session.initialize() | |
# Get available tools | |
response = await self.session.list_tools() | |
self.tools = response.tools | |
self.connected = True | |
tool_names = [tool.name for tool in self.tools] | |
return f"β Connected to weather server!\nAvailable tools: {', '.join(tool_names)}" | |
except Exception as e: | |
self.connected = False | |
return f"β Connection failed: {str(e)}" | |
def get_weather(self, location: str) -> str: | |
"""Get weather for a location (city, country format)""" | |
if not self.connected: | |
return "β Not connected to server. Click Connect first." | |
if not location.strip(): | |
return "β Please enter a location (e.g., 'Berlin, Germany')" | |
return loop.run_until_complete(self._get_weather(location)) | |
async def _get_weather(self, location: str) -> str: | |
try: | |
# Parse location | |
if ',' in location: | |
city, country = [part.strip() for part in location.split(',', 1)] | |
else: | |
city = location.strip() | |
country = "" | |
# Find the weather tool | |
weather_tool = next((tool for tool in self.tools if 'weather' in tool.name.lower()), None) | |
if not weather_tool: | |
return "β Weather tool not found on server" | |
# Call the tool | |
params = {"city": city, "country": country} | |
result = await self.session.call_tool(weather_tool.name, params) | |
# Extract content properly | |
content_text = "" | |
if hasattr(result, 'content') and result.content: | |
if isinstance(result.content, list): | |
for content_item in result.content: | |
if hasattr(content_item, 'text'): | |
content_text += content_item.text | |
elif hasattr(content_item, 'content'): | |
content_text += str(content_item.content) | |
else: | |
content_text += str(content_item) | |
elif hasattr(result.content, 'text'): | |
content_text = result.content.text | |
else: | |
content_text = str(result.content) | |
if not content_text: | |
return "β No content received from server" | |
try: | |
# Try to parse as JSON | |
parsed = json.loads(content_text) | |
if isinstance(parsed, dict): | |
if 'error' in parsed: | |
return f"β Error: {parsed['error']}" | |
# Format weather data nicely | |
if 'current_weather' in parsed: | |
weather = parsed['current_weather'] | |
formatted = f"π **{parsed.get('location', 'Unknown')}**\n\n" | |
formatted += f"π‘οΈ Temperature: {weather.get('temperature_celsius', 'N/A')}Β°C\n" | |
formatted += f"π€οΈ Conditions: {weather.get('weather_description', 'N/A')}\n" | |
formatted += f"π¨ Wind: {weather.get('wind_speed_kmh', 'N/A')} km/h\n" | |
formatted += f"π§ Humidity: {weather.get('humidity_percent', 'N/A')}%\n" | |
return formatted | |
elif 'temperature (Β°C)' in parsed: | |
# Handle the original format from your server | |
formatted = f"π **{parsed.get('location', 'Unknown')}**\n\n" | |
formatted += f"π‘οΈ Temperature: {parsed.get('temperature (Β°C)', 'N/A')}Β°C\n" | |
formatted += f"π€οΈ Weather Code: {parsed.get('weather_code', 'N/A')}\n" | |
formatted += f"π Timezone: {parsed.get('timezone', 'N/A')}\n" | |
formatted += f"π Local Time: {parsed.get('local_time', 'N/A')}\n" | |
return formatted | |
else: | |
return f"β Weather data:\n```json\n{json.dumps(parsed, indent=2)}\n```" | |
except json.JSONDecodeError: | |
# If not JSON, return as text | |
return f"β Weather data:\n```\n{content_text}\n```" | |
return f"β Raw result:\n{content_text}" | |
except Exception as e: | |
return f"β Failed to get weather: {str(e)}" | |
# Global client | |
client = SimpleMCPClient() | |
def create_interface(): | |
with gr.Blocks(title="Weather MCP Test", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π€οΈ Weather MCP Test Client") | |
gr.Markdown("Simple client to test the weather MCP server") | |
# Connection | |
with gr.Row(): | |
connect_btn = gr.Button("π Connect to Weather Server", variant="primary") | |
status = gr.Textbox( | |
label="Status", | |
value="Click Connect to start", | |
interactive=False, | |
scale=2 | |
) | |
# Weather query | |
with gr.Group(): | |
gr.Markdown("### Get Weather") | |
with gr.Row(): | |
location_input = gr.Textbox( | |
label="Location", | |
placeholder="e.g., Berlin, Germany", | |
value="Berlin, Germany", | |
scale=3 | |
) | |
weather_btn = gr.Button("π‘οΈ Get Weather", scale=1) | |
weather_result = gr.Textbox( | |
label="Weather Result", | |
interactive=False, | |
lines=8, | |
placeholder="Weather information will appear here..." | |
) | |
# Examples | |
with gr.Group(): | |
gr.Markdown("### π Examples") | |
examples = gr.Examples( | |
examples=[ | |
["Berlin, Germany"], | |
["Tokyo, Japan"], | |
["New York, USA"], | |
["London, UK"], | |
["Sydney, Australia"] | |
], | |
inputs=[location_input] | |
) | |
# Event handlers | |
connect_btn.click( | |
client.connect, | |
outputs=[status] | |
) | |
weather_btn.click( | |
client.get_weather, | |
inputs=[location_input], | |
outputs=[weather_result] | |
) | |
location_input.submit( | |
client.get_weather, | |
inputs=[location_input], | |
outputs=[weather_result] | |
) | |
return demo | |
if __name__ == "__main__": | |
interface = create_interface() | |
interface.launch(debug=True, share=True, mcp_server=True) |