#!/usr/bin/env python3 """ Real MCP with Gradio Agent - Stock Analysis Platform Comprehensive implementation with MCP server and Gradio interface """ import asyncio import json import logging import os from datetime import datetime, timedelta from typing import Dict, List, Any, Optional import traceback # MCP and async imports from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent import mcp.types as types # Data analysis imports import yfinance as yf import pandas as pd import numpy as np from dataclasses import dataclass # Gradio for web interface import gradio as gr import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class StockAnalysis: """Data class for stock analysis results""" symbol: str company_name: str current_price: float ytd_return: float volatility: float investment_score: int recommendation: str risk_level: str sector: str market_cap: int class StockAnalyzer: """Advanced stock analysis engine""" def __init__(self): self.cache = {} self.cache_timeout = 300 # 5 minutes def get_stock_data(self, symbol: str, period: str = "1y") -> Optional[pd.DataFrame]: """Get stock data with caching""" cache_key = f"{symbol}_{period}" current_time = datetime.now() if cache_key in self.cache: data, timestamp = self.cache[cache_key] if (current_time - timestamp).seconds < self.cache_timeout: return data try: stock = yf.Ticker(symbol) data = stock.history(period=period) self.cache[cache_key] = (data, current_time) return data except Exception as e: logger.error(f"Error fetching data for {symbol}: {e}") return None def calculate_technical_indicators(self, data: pd.DataFrame) -> Dict: """Calculate technical indicators""" if data.empty: return {} # Moving averages data['MA20'] = data['Close'].rolling(window=20).mean() data['MA50'] = data['Close'].rolling(window=50).mean() data['MA200'] = data['Close'].rolling(window=200).mean() # RSI delta = data['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss data['RSI'] = 100 - (100 / (1 + rs)) # Bollinger Bands data['BB_Middle'] = data['Close'].rolling(window=20).mean() bb_std = data['Close'].rolling(window=20).std() data['BB_Upper'] = data['BB_Middle'] + (bb_std * 2) data['BB_Lower'] = data['BB_Middle'] - (bb_std * 2) # MACD exp1 = data['Close'].ewm(span=12).mean() exp2 = data['Close'].ewm(span=26).mean() data['MACD'] = exp1 - exp2 data['MACD_Signal'] = data['MACD'].ewm(span=9).mean() return { 'rsi': data['RSI'].iloc[-1] if not data['RSI'].empty else 0, 'macd': data['MACD'].iloc[-1] if not data['MACD'].empty else 0, 'macd_signal': data['MACD_Signal'].iloc[-1] if not data['MACD_Signal'].empty else 0, 'ma20': data['MA20'].iloc[-1] if not data['MA20'].empty else 0, 'ma50': data['MA50'].iloc[-1] if not data['MA50'].empty else 0, 'current_price': data['Close'].iloc[-1] if not data['Close'].empty else 0 } def calculate_investment_score(self, symbol: str) -> Dict: """Calculate comprehensive investment score""" try: stock = yf.Ticker(symbol) info = stock.info # Get YTD data ytd_start = datetime(2025, 1, 1) ytd_data = stock.history(start=ytd_start.strftime("%Y-%m-%d")) if ytd_data.empty: return {'error': f'No YTD data available for {symbol}'} # Calculate YTD return ytd_return = ((ytd_data['Close'].iloc[-1] - ytd_data['Close'].iloc[0]) / ytd_data['Close'].iloc[0]) * 100 # Get 1-year data for volatility year_data = self.get_stock_data(symbol, "1y") volatility = 0 max_drawdown = 0 if year_data is not None and not year_data.empty: returns = year_data['Close'].pct_change().dropna() volatility = returns.std() * np.sqrt(252) * 100 # Annualized volatility # Calculate max drawdown rolling_max = year_data['Close'].expanding().max() drawdown = (year_data['Close'] - rolling_max) / rolling_max max_drawdown = drawdown.min() * 100 # Technical indicators technical = self.calculate_technical_indicators(year_data) if year_data is not None else {} # Fundamental metrics pe_ratio = info.get('trailingPE', 0) or 0 forward_pe = info.get('forwardPE', 0) or 0 peg_ratio = info.get('pegRatio', 0) or 0 roe = info.get('returnOnEquity', 0) or 0 profit_margin = info.get('profitMargins', 0) or 0 revenue_growth = info.get('revenueGrowth', 0) or 0 # Calculate investment score (0-100) score = 50 # Base score # YTD Performance (30% weight) if ytd_return > 25: score += 25 elif ytd_return > 15: score += 20 elif ytd_return > 5: score += 15 elif ytd_return > 0: score += 10 elif ytd_return > -10: score += 5 else: score -= 15 # Technical indicators (25% weight) rsi = technical.get('rsi', 50) if 30 <= rsi <= 70: # Not oversold or overbought score += 12 elif rsi < 30: # Oversold - potential buy score += 8 elif rsi > 70: # Overbought - caution score -= 5 # MACD signal macd = technical.get('macd', 0) macd_signal = technical.get('macd_signal', 0) if macd > macd_signal: # Bullish signal score += 8 else: score -= 3 # Valuation (25% weight) if pe_ratio and 8 < pe_ratio < 20: score += 15 elif pe_ratio and pe_ratio < 8: score += 20 # Very undervalued elif pe_ratio and 20 < pe_ratio < 30: score += 5 elif pe_ratio and pe_ratio > 35: score -= 10 # Growth and profitability (20% weight) if revenue_growth and revenue_growth > 0.20: score += 15 elif revenue_growth and revenue_growth > 0.10: score += 10 elif revenue_growth and revenue_growth > 0.05: score += 5 if profit_margin and profit_margin > 0.15: score += 5 elif profit_margin and profit_margin > 0.10: score += 3 # Risk adjustment if volatility < 15: score += 5 elif volatility > 35: score -= 10 if max_drawdown > -15: score += 5 elif max_drawdown < -30: score -= 8 # Ensure score bounds score = max(0, min(100, score)) # Determine risk level and recommendation if volatility < 15: risk_level = "Low" elif volatility < 25: risk_level = "Medium" else: risk_level = "High" if score >= 80: recommendation = "Strong Buy" elif score >= 70: recommendation = "Buy" elif score >= 60: recommendation = "Hold" elif score >= 50: recommendation = "Weak Hold" else: recommendation = "Sell" return { 'symbol': symbol.upper(), 'company_name': info.get('longName', 'N/A'), 'current_price': ytd_data['Close'].iloc[-1], 'ytd_return': ytd_return, 'volatility': volatility, 'max_drawdown': max_drawdown, 'pe_ratio': pe_ratio, 'forward_pe': forward_pe, 'peg_ratio': peg_ratio, 'roe': roe * 100 if roe else 0, 'profit_margin': profit_margin * 100 if profit_margin else 0, 'revenue_growth': revenue_growth * 100 if revenue_growth else 0, 'investment_score': score, 'recommendation': recommendation, 'risk_level': risk_level, 'sector': info.get('sector', 'N/A'), 'industry': info.get('industry', 'N/A'), 'market_cap': info.get('marketCap', 0), 'technical_indicators': technical, 'analysis_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logger.error(f"Error calculating investment score for {symbol}: {e}") return {'error': f'Error analyzing {symbol}: {str(e)}'} # Initialize the stock analyzer analyzer = StockAnalyzer() # MCP Server Setup server = Server("stock-analysis-mcp") @server.list_tools() async def handle_list_tools() -> List[Tool]: """List available MCP tools""" return [ Tool( name="get_stock_price", description="Get current stock price and basic info", inputSchema={ "type": "object", "properties": { "symbol": {"type": "string", "description": "Stock symbol (e.g., AAPL)"} }, "required": ["symbol"] } ), Tool( name="analyze_stock_comprehensive", description="Comprehensive stock analysis with technical and fundamental metrics", inputSchema={ "type": "object", "properties": { "symbol": {"type": "string", "description": "Stock symbol (e.g., AAPL)"} }, "required": ["symbol"] } ), Tool( name="compare_stocks_ytd", description="Compare multiple stocks for YTD 2025 performance", inputSchema={ "type": "object", "properties": { "symbols": { "type": "array", "items": {"type": "string"}, "description": "List of stock symbols to compare" } }, "required": ["symbols"] } ), Tool( name="get_market_sector_analysis", description="Analyze stocks by sector performance", inputSchema={ "type": "object", "properties": { "symbols": { "type": "array", "items": {"type": "string"}, "description": "List of stock symbols to analyze by sector" } }, "required": ["symbols"] } ) ] @server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: """Handle MCP tool calls""" try: if name == "get_stock_price": symbol = arguments.get("symbol", "").upper() if not symbol: return [TextContent(type="text", text="Error: Symbol is required")] stock = yf.Ticker(symbol) info = stock.info hist = stock.history(period="2d") if hist.empty: return [TextContent(type="text", text=f"Error: No data found for {symbol}")] current_price = hist['Close'].iloc[-1] prev_close = hist['Close'].iloc[-2] if len(hist) > 1 else current_price change = current_price - prev_close change_percent = (change / prev_close) * 100 result = { "symbol": symbol, "company_name": info.get('longName', 'N/A'), "current_price": round(current_price, 2), "change": round(change, 2), "change_percent": round(change_percent, 2), "previous_close": round(prev_close, 2), "market_cap": info.get('marketCap', 0), "volume": hist['Volume'].iloc[-1], "sector": info.get('sector', 'N/A') } return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "analyze_stock_comprehensive": symbol = arguments.get("symbol", "").upper() if not symbol: return [TextContent(type="text", text="Error: Symbol is required")] analysis = analyzer.calculate_investment_score(symbol) return [TextContent(type="text", text=json.dumps(analysis, indent=2))] elif name == "compare_stocks_ytd": symbols = arguments.get("symbols", []) if not symbols: return [TextContent(type="text", text="Error: Symbols list is required")] comparisons = [] for symbol in symbols: analysis = analyzer.calculate_investment_score(symbol) if 'error' not in analysis: comparisons.append(analysis) # Sort by investment score comparisons.sort(key=lambda x: x.get('investment_score', 0), reverse=True) result = { "comparison_results": comparisons, "winner": comparisons[0] if comparisons else None, "analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "get_market_sector_analysis": symbols = arguments.get("symbols", []) if not symbols: return [TextContent(type="text", text="Error: Symbols list is required")] sector_data = {} for symbol in symbols: analysis = analyzer.calculate_investment_score(symbol) if 'error' not in analysis: sector = analysis.get('sector', 'Unknown') if sector not in sector_data: sector_data[sector] = [] sector_data[sector].append(analysis) # Calculate sector averages sector_summary = {} for sector, stocks in sector_data.items(): avg_score = sum(s['investment_score'] for s in stocks) / len(stocks) avg_ytd = sum(s['ytd_return'] for s in stocks) / len(stocks) sector_summary[sector] = { "average_score": round(avg_score, 1), "average_ytd_return": round(avg_ytd, 2), "stock_count": len(stocks), "stocks": [s['symbol'] for s in stocks] } result = { "sector_analysis": sector_summary, "detailed_stocks": sector_data, "analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } return [TextContent(type="text", text=json.dumps(result, indent=2))] else: return [TextContent(type="text", text=f"Error: Unknown tool '{name}'")] except Exception as e: error_msg = f"Error executing tool '{name}': {str(e)}" logger.error(error_msg) return [TextContent(type="text", text=error_msg)] # Gradio Interface Functions def create_stock_chart(symbol: str): """Create interactive stock chart""" try: data = analyzer.get_stock_data(symbol, "6mo") if data is None or data.empty: return None fig = make_subplots( rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=(f'{symbol.upper()} Stock Price', 'Volume'), row_width=[0.7, 0.3] ) # Candlestick chart fig.add_trace( go.Candlestick( x=data.index, open=data['Open'], high=data['High'], low=data['Low'], close=data['Close'], name="Price" ), row=1, col=1 ) # Moving averages if len(data) >= 20: data['MA20'] = data['Close'].rolling(window=20).mean() fig.add_trace( go.Scatter(x=data.index, y=data['MA20'], name='MA20', line=dict(color='orange')), row=1, col=1 ) if len(data) >= 50: data['MA50'] = data['Close'].rolling(window=50).mean() fig.add_trace( go.Scatter(x=data.index, y=data['MA50'], name='MA50', line=dict(color='blue')), row=1, col=1 ) # Volume fig.add_trace( go.Bar(x=data.index, y=data['Volume'], name='Volume', marker_color='lightblue'), row=2, col=1 ) fig.update_layout( title=f'{symbol.upper()} - Stock Analysis', xaxis_rangeslider_visible=False, height=600, showlegend=True ) return fig except Exception as e: logger.error(f"Error creating chart for {symbol}: {e}") return None def analyze_single_stock(symbol: str) -> tuple: """Analyze a single stock and return results""" if not symbol: return "Please enter a stock symbol", None, None try: analysis = analyzer.calculate_investment_score(symbol.upper()) if 'error' in analysis: return f"Error: {analysis['error']}", None, None # Create formatted analysis text analysis_text = f""" # 📊 Stock Analysis for {analysis['symbol']} ## 🏢 Company Information - **Company**: {analysis['company_name']} - **Sector**: {analysis['sector']} - **Industry**: {analysis['industry']} - **Market Cap**: ${analysis['market_cap']/1e9:.2f}B ## 💰 Current Performance - **Current Price**: ${analysis['current_price']:.2f} - **YTD 2025 Return**: {analysis['ytd_return']:+.2f}% - **Investment Score**: {analysis['investment_score']}/100 ## 📈 Investment Recommendation - **Recommendation**: {analysis['recommendation']} - **Risk Level**: {analysis['risk_level']} - **Volatility**: {analysis['volatility']:.1f}% ## 🔍 Fundamental Metrics - **P/E Ratio**: {analysis['pe_ratio']:.1f if analysis['pe_ratio'] else 'N/A'} - **Forward P/E**: {analysis['forward_pe']:.1f if analysis['forward_pe'] else 'N/A'} - **ROE**: {analysis['roe']:.1f}% - **Profit Margin**: {analysis['profit_margin']:.1f}% - **Revenue Growth**: {analysis['revenue_growth']:.1f}% ## 📊 Technical Indicators - **RSI**: {analysis['technical_indicators'].get('rsi', 0):.1f} - **MACD**: {analysis['technical_indicators'].get('macd', 0):.3f} --- *Analysis Date: {analysis['analysis_date']}* """ # Create chart chart = create_stock_chart(symbol) # Create comparison data for table comparison_df = pd.DataFrame([{ 'Metric': 'Investment Score', 'Value': f"{analysis['investment_score']}/100", 'Interpretation': analysis['recommendation'] }, { 'Metric': 'YTD Return', 'Value': f"{analysis['ytd_return']:+.2f}%", 'Interpretation': 'Strong' if analysis['ytd_return'] > 10 else 'Moderate' if analysis['ytd_return'] > 0 else 'Weak' }, { 'Metric': 'Risk Level', 'Value': analysis['risk_level'], 'Interpretation': f"Volatility: {analysis['volatility']:.1f}%" }]) return analysis_text, chart, comparison_df except Exception as e: error_msg = f"Error analyzing {symbol}: {str(e)}" logger.error(error_msg) return error_msg, None, None def compare_multiple_stocks(symbols_input: str) -> tuple: """Compare multiple stocks""" if not symbols_input: return "Please enter stock symbols separated by commas", None, None try: symbols = [s.strip().upper() for s in symbols_input.split(',') if s.strip()] if len(symbols) < 2: return "Please enter at least 2 stock symbols for comparison", None, None comparisons = [] for symbol in symbols: analysis = analyzer.calculate_investment_score(symbol) if 'error' not in analysis: comparisons.append(analysis) if not comparisons: return "No valid stock data found for the provided symbols", None, None # Sort by investment score comparisons.sort(key=lambda x: x['investment_score'], reverse=True) # Create comparison text comparison_text = f"# 🏆 Stock Comparison Results\n\n" comparison_text += f"**Analysis of {len(comparisons)} stocks:**\n\n" for i, stock in enumerate(comparisons[:5]): # Top 5 rank_emoji = ["🥇", "🥈", "🥉", "4️⃣", "5️⃣"][i] comparison_text += f""" ## {rank_emoji} {stock['symbol']} - {stock['company_name']} - **Score**: {stock['investment_score']}/100 - **Recommendation**: {stock['recommendation']} - **YTD Return**: {stock['ytd_return']:+.2f}% - **Current Price**: ${stock['current_price']:.2f} - **Sector**: {stock['sector']} - **Risk Level**: {stock['risk_level']} """ # Create comparison DataFrame comparison_df = pd.DataFrame([{ 'Rank': i+1, 'Symbol': stock['symbol'], 'Company': stock['company_name'][:30] + '...' if len(stock['company_name']) > 30 else stock['company_name'], 'Score': stock['investment_score'], 'YTD Return %': f"{stock['ytd_return']:+.2f}", 'Price': f"${stock['current_price']:.2f}", 'Recommendation': stock['recommendation'], 'Sector': stock['sector'] } for i, stock in enumerate(comparisons)]) # Create comparison chart fig = go.Figure() fig.add_trace(go.Bar( x=[s['symbol'] for s in comparisons], y=[s['investment_score'] for s in comparisons], text=[f"{s['investment_score']}" for s in comparisons], textposition='auto', marker_color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd'][:len(comparisons)] )) fig.update_layout( title='Investment Score Comparison', xaxis_title='Stock Symbol', yaxis_title='Investment Score (0-100)', height=400 ) return comparison_text, fig, comparison_df except Exception as e: error_msg = f"Error comparing stocks: {str(e)}" logger.error(error_msg) return error_msg, None, None # Create Gradio Interface def create_gradio_app(): """Create the Gradio web interface""" with gr.Blocks(title="🚀 MCP Stock Analysis Agent", theme=gr.themes.Soft()) as app: gr.Markdown(""" # 🚀 Real MCP with Gradio Agent - Stock Analysis Platform Advanced stock analysis powered by MCP (Model Context Protocol) with comprehensive technical and fundamental analysis. ## Features: - 📊 Real-time stock data analysis - 🎯 AI-powered investment scoring - 📈 Technical indicator analysis - 🏆 Multi-stock comparison - 📉 Interactive charts and visualizations """) with gr.Tabs(): # Single Stock Analysis Tab with gr.Tab("📊 Single Stock Analysis"): with gr.Row(): with gr.Column(scale=1): stock_input = gr.Textbox( label="Stock Symbol", placeholder="Enter symbol (e.g., AAPL, MSFT, GOOGL)", value="AAPL" ) analyze_btn = gr.Button("🔍 Analyze Stock", variant="primary") with gr.Row(): with gr.Column(scale=2): analysis_output = gr.Markdown(label="Analysis Results") with gr.Column(scale=1): metrics_table = gr.Dataframe( label="Key Metrics", headers=["Metric", "Value", "Interpretation"] ) stock_chart = gr.Plot(label="Stock Chart") # Stock Comparison Tab with gr.Tab("🏆 Stock Comparison"): with gr.Row(): with gr.Column(): stocks_input = gr.Textbox( label="Stock Symbols (comma-separated)", placeholder="Enter symbols (e.g., AAPL, MSFT, GOOGL, TSLA)", value="AAPL, MSFT, GOOGL" ) compare_btn = gr.Button("🔍 Compare Stocks", variant="primary") comparison_output = gr.Markdown(label="Comparison Results") comparison_chart = gr.Plot(label="Comparison Chart") comparison_table = gr.Dataframe( label="Detailed Comparison", headers=["Rank", "Symbol", "Company", "Score", "YTD Return %", "Price", "Recommendation", "Sector"] ) # MCP Tools Tab with gr.Tab("🛠️ MCP Tools"): gr.Markdown(""" ## Available MCP Tools: 1. **get_stock_price** - Get current stock price and basic info 2. **analyze_stock_comprehensive** - Comprehensive analysis with scoring 3. **compare_stocks_ytd** - Compare multiple stocks for YTD performance 4. **get_market_sector_analysis** - Analyze stocks by sector These tools can be called programmatically via the MCP protocol. """) with gr.Row(): mcp_tool_select = gr.Dropdown( choices=["get_stock_price", "analyze_stock_comprehensive", "compare_stocks_ytd", "get_market_sector_analysis"], label="Select MCP Tool", value="get_stock_price" ) mcp_symbol_input = gr.Textbox( label="Symbol/Parameters", placeholder="AAPL or AAPL,MSFT,GOOGL for comparison", value="AAPL" ) mcp_execute_btn = gr.Button("⚡ Execute MCP Tool", variant="secondary") mcp_output = gr.JSON(label="MCP Tool Response") # Event handlers analyze_btn.click( fn=analyze_single_stock, inputs=[stock_input], outputs=[analysis_output, stock_chart, metrics_table] ) compare_btn.click( fn=compare_multiple_stocks, inputs=[stocks_input], outputs=[comparison_output, comparison_chart, comparison_table] ) def execute_mcp_tool(tool_name, params): """Execute MCP tool from Gradio interface""" try: if tool_name == "get_stock_price": arguments = {"symbol": params.strip()} elif tool_name == "analyze_stock_comprehensive": arguments = {"symbol": params.strip()} elif tool_name in ["compare_stocks_ytd", "get_market_sector_analysis"]: symbols = [s.strip() for s in params.split(',')] arguments = {"symbols": symbols} else: return {"error": f"Unknown tool: {tool_name}"} # Simulate MCP tool execution loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(handle_call_tool(tool_name, arguments)) loop.close() # Parse the result if result and len(result) > 0: response_text = result[0].text try: return json.loads(response_text) except json.JSONDecodeError: return {"response": response_text} else: return {"error": "No response from MCP tool"} except Exception as e: return {"error": f"Error executing MCP tool: {str(e)}"} mcp_execute_btn.click( fn=execute_mcp_tool, inputs=[mcp_tool_select, mcp_symbol_input], outputs=[mcp_output] ) # Add footer gr.Markdown(""" --- ### 🔧 Technical Details: - **MCP Protocol**: Model Context Protocol for tool integration - **Data Source**: Yahoo Finance API via yfinance - **Analysis Engine**: Custom investment scoring algorithm - **Visualization**: Plotly interactive charts - **Interface**: Gradio web framework *This platform provides educational analysis and should not be considered financial advice.* """) return app # MCP Server Runner async def run_mcp_server(): """Run the MCP server""" logger.info("Starting MCP Stock Analysis Server...") async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="stock-analysis-mcp", server_version="1.0.0", capabilities=server.get_capabilities() ) ) # Enhanced Portfolio Analysis class PortfolioAnalyzer: """Advanced portfolio analysis with risk metrics""" def __init__(self): self.analyzer = analyzer def calculate_portfolio_metrics(self, symbols: List[str], weights: List[float] = None) -> Dict: """Calculate comprehensive portfolio metrics""" try: if not weights: weights = [1.0 / len(symbols)] * len(symbols) # Equal weights portfolio_data = [] total_weight = sum(weights) weights = [w / total_weight for w in weights] # Normalize weights # Get data for all stocks returns_data = [] for symbol in symbols: data = self.analyzer.get_stock_data(symbol, "1y") if data is not None and not data.empty: returns = data['Close'].pct_change().dropna() returns_data.append(returns) # Individual stock analysis analysis = self.analyzer.calculate_investment_score(symbol) if 'error' not in analysis: portfolio_data.append(analysis) if not returns_data: return {'error': 'No valid data for portfolio analysis'} # Calculate portfolio returns portfolio_returns = pd.DataFrame(returns_data).T portfolio_returns.columns = symbols[:len(returns_data)] # Portfolio daily returns weighted_returns = (portfolio_returns * weights[:len(returns_data)]).sum(axis=1) # Portfolio metrics portfolio_return = weighted_returns.mean() * 252 * 100 # Annualized return portfolio_volatility = weighted_returns.std() * np.sqrt(252) * 100 # Annualized volatility sharpe_ratio = portfolio_return / portfolio_volatility if portfolio_volatility > 0 else 0 # Portfolio max drawdown cumulative_returns = (1 + weighted_returns).cumprod() rolling_max = cumulative_returns.expanding().max() drawdown = (cumulative_returns - rolling_max) / rolling_max max_drawdown = drawdown.min() * 100 # Risk metrics var_95 = np.percentile(weighted_returns, 5) * 100 # 5% VaR # Correlation matrix correlation_matrix = portfolio_returns.corr().to_dict() # Weighted portfolio score portfolio_score = sum(stock['investment_score'] * weight for stock, weight in zip(portfolio_data, weights[:len(portfolio_data)])) return { 'portfolio_return': portfolio_return, 'portfolio_volatility': portfolio_volatility, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': max_drawdown, 'var_95': var_95, 'portfolio_score': portfolio_score, 'correlation_matrix': correlation_matrix, 'individual_stocks': portfolio_data, 'weights': dict(zip(symbols[:len(weights)], weights)), 'analysis_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logger.error(f"Error in portfolio analysis: {e}") return {'error': f'Portfolio analysis error: {str(e)}'} # Enhanced Gradio Interface with Portfolio Analysis def create_enhanced_gradio_app(): """Create enhanced Gradio interface with portfolio analysis""" portfolio_analyzer = PortfolioAnalyzer() def analyze_portfolio(symbols_input: str, weights_input: str = "") -> tuple: """Analyze a portfolio of stocks""" try: if not symbols_input: return "Please enter stock symbols", None, None, None symbols = [s.strip().upper() for s in symbols_input.split(',') if s.strip()] # Parse weights if provided weights = None if weights_input.strip(): try: weights = [float(w.strip()) for w in weights_input.split(',')] if len(weights) != len(symbols): return "Number of weights must match number of symbols", None, None, None except ValueError: return "Invalid weights format. Use comma-separated numbers (e.g., 0.4, 0.3, 0.3)", None, None, None # Analyze portfolio portfolio_analysis = portfolio_analyzer.calculate_portfolio_metrics(symbols, weights) if 'error' in portfolio_analysis: return f"Error: {portfolio_analysis['error']}", None, None, None # Create analysis text analysis_text = f""" # 📊 Portfolio Analysis Results ## 🏦 Portfolio Overview - **Number of Holdings**: {len(symbols)} - **Portfolio Score**: {portfolio_analysis['portfolio_score']:.1f}/100 - **Analysis Date**: {portfolio_analysis['analysis_date']} ## 📈 Performance Metrics - **Expected Annual Return**: {portfolio_analysis['portfolio_return']:+.2f}% - **Annual Volatility**: {portfolio_analysis['portfolio_volatility']:.2f}% - **Sharpe Ratio**: {portfolio_analysis['sharpe_ratio']:.2f} - **Maximum Drawdown**: {portfolio_analysis['max_drawdown']:.2f}% - **Value at Risk (95%)**: {portfolio_analysis['var_95']:.2f}% ## 🏭 Portfolio Composition """ for symbol, weight in portfolio_analysis['weights'].items(): analysis_text += f"- **{symbol}**: {weight:.1%}\n" analysis_text += "\n## 📊 Individual Stock Performance\n" for stock in portfolio_analysis['individual_stocks']: weight = portfolio_analysis['weights'].get(stock['symbol'], 0) analysis_text += f""" ### {stock['symbol']} - {stock['company_name']} ({weight:.1%}) - **Score**: {stock['investment_score']}/100 | **YTD**: {stock['ytd_return']:+.2f}% - **Price**: ${stock['current_price']:.2f} | **Sector**: {stock['sector']} """ # Create portfolio composition chart fig_composition = go.Figure(data=[go.Pie( labels=list(portfolio_analysis['weights'].keys()), values=list(portfolio_analysis['weights'].values()), hole=0.3 )]) fig_composition.update_layout(title="Portfolio Composition", height=400) # Create performance comparison chart stocks_data = portfolio_analysis['individual_stocks'] fig_performance = go.Figure() fig_performance.add_trace(go.Bar( x=[s['symbol'] for s in stocks_data], y=[s['ytd_return'] for s in stocks_data], name='YTD Return %', text=[f"{s['ytd_return']:+.1f}%" for s in stocks_data], textposition='auto' )) fig_performance.update_layout( title='Individual Stock YTD Performance', xaxis_title='Stock Symbol', yaxis_title='YTD Return (%)', height=400 ) # Create portfolio metrics table metrics_df = pd.DataFrame([ {'Metric': 'Portfolio Score', 'Value': f"{portfolio_analysis['portfolio_score']:.1f}/100"}, {'Metric': 'Expected Return', 'Value': f"{portfolio_analysis['portfolio_return']:+.2f}%"}, {'Metric': 'Volatility', 'Value': f"{portfolio_analysis['portfolio_volatility']:.2f}%"}, {'Metric': 'Sharpe Ratio', 'Value': f"{portfolio_analysis['sharpe_ratio']:.2f}"}, {'Metric': 'Max Drawdown', 'Value': f"{portfolio_analysis['max_drawdown']:.2f}%"}, {'Metric': 'VaR (95%)', 'Value': f"{portfolio_analysis['var_95']:.2f}%"} ]) return analysis_text, fig_composition, fig_performance, metrics_df except Exception as e: error_msg = f"Error analyzing portfolio: {str(e)}" logger.error(error_msg) return error_msg, None, None, None with gr.Blocks(title="🚀 Advanced MCP Stock Analysis Agent", theme=gr.themes.Soft()) as app: gr.Markdown(""" # 🚀 Advanced MCP Stock Analysis Agent **Real Model Context Protocol (MCP) implementation with comprehensive stock analysis** Features: Real-time data • AI scoring • Technical analysis • Portfolio optimization • Risk metrics """) with gr.Tabs(): # Single Stock Analysis Tab with gr.Tab("📊 Stock Analysis"): with gr.Row(): with gr.Column(scale=1): stock_input = gr.Textbox( label="📈 Stock Symbol", placeholder="AAPL, MSFT, GOOGL, etc.", value="AAPL" ) analyze_btn = gr.Button("🔍 Analyze Stock", variant="primary", size="lg") with gr.Row(): with gr.Column(scale=2): analysis_output = gr.Markdown() with gr.Column(scale=1): metrics_table = gr.Dataframe(label="📊 Key Metrics") stock_chart = gr.Plot(label="📈 Interactive Chart") # Portfolio Analysis Tab with gr.Tab("🏦 Portfolio Analysis"): with gr.Row(): with gr.Column(): portfolio_symbols = gr.Textbox( label="📊 Portfolio Symbols (comma-separated)", placeholder="AAPL, MSFT, GOOGL, TSLA, NVDA", value="AAPL, MSFT, GOOGL" ) portfolio_weights = gr.Textbox( label="⚖️ Weights (optional, comma-separated)", placeholder="0.4, 0.3, 0.3 (leave empty for equal weights)", value="" ) portfolio_btn = gr.Button("🔍 Analyze Portfolio", variant="primary", size="lg") portfolio_output = gr.Markdown() with gr.Row(): portfolio_composition = gr.Plot(label="🥧 Portfolio Composition") portfolio_performance = gr.Plot(label="📊 Performance Comparison") portfolio_metrics = gr.Dataframe(label="📈 Portfolio Metrics") # Stock Comparison Tab with gr.Tab("🏆 Stock Comparison"): with gr.Row(): with gr.Column(): stocks_input = gr.Textbox( label="🔍 Stock Symbols (comma-separated)", placeholder="AAPL, MSFT, GOOGL, TSLA, NVDA", value="AAPL, MSFT, GOOGL" ) compare_btn = gr.Button("⚡ Compare Stocks", variant="primary", size="lg") comparison_output = gr.Markdown() comparison_chart = gr.Plot(label="📊 Comparison Chart") comparison_table = gr.Dataframe(label="📋 Detailed Comparison") # MCP Server Tools Tab with gr.Tab("🛠️ MCP Server"): gr.Markdown(""" ## 🔧 MCP (Model Context Protocol) Tools This tab demonstrates the MCP server capabilities: """) with gr.Row(): with gr.Column(): mcp_tool_select = gr.Dropdown( choices=[ "get_stock_price", "analyze_stock_comprehensive", "compare_stocks_ytd", "get_market_sector_analysis" ], label="🛠️ Select MCP Tool", value="analyze_stock_comprehensive" ) mcp_symbol_input = gr.Textbox( label="📊 Parameters", placeholder="AAPL or AAPL,MSFT,GOOGL", value="AAPL" ) mcp_execute_btn = gr.Button("⚡ Execute MCP Tool", variant="secondary") mcp_output = gr.JSON(label="📋 MCP Response") gr.Markdown(""" ### 📡 MCP Server Information: - **Server Name**: stock-analysis-mcp - **Version**: 1.0.0 - **Protocol**: stdio - **Tools**: 4 available tools for stock analysis """) # Event handlers analyze_btn.click( fn=analyze_single_stock, inputs=[stock_input], outputs=[analysis_output, stock_chart, metrics_table] ) portfolio_btn.click( fn=analyze_portfolio, inputs=[portfolio_symbols, portfolio_weights], outputs=[portfolio_output, portfolio_composition, portfolio_performance, portfolio_metrics] ) compare_btn.click( fn=compare_multiple_stocks, inputs=[stocks_input], outputs=[comparison_output, comparison_chart, comparison_table] ) def execute_mcp_tool(tool_name, params): """Execute MCP tool from Gradio interface""" try: if tool_name == "get_stock_price": arguments = {"symbol": params.strip()} elif tool_name == "analyze_stock_comprehensive": arguments = {"symbol": params.strip()} elif tool_name in ["compare_stocks_ytd", "get_market_sector_analysis"]: symbols = [s.strip() for s in params.split(',') if s.strip()] arguments = {"symbols": symbols} else: return {"error": f"Unknown tool: {tool_name}"} # Execute MCP tool loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(handle_call_tool(tool_name, arguments)) loop.close() # Parse the result if result and len(result) > 0: response_text = result[0].text try: parsed_result = json.loads(response_text) parsed_result["_mcp_tool"] = tool_name parsed_result["_execution_time"] = datetime.now().isoformat() return parsed_result except json.JSONDecodeError: return { "response": response_text, "_mcp_tool": tool_name, "_execution_time": datetime.now().isoformat() } else: return {"error": "No response from MCP tool"} except Exception as e: return { "error": f"Error executing MCP tool: {str(e)}", "_mcp_tool": tool_name, "_execution_time": datetime.now().isoformat() } mcp_execute_btn.click( fn=execute_mcp_tool, inputs=[mcp_tool_select, mcp_symbol_input], outputs=[mcp_output] ) # Footer gr.Markdown(""" --- ## 🚀 System Architecture **MCP Server**: Implements Model Context Protocol for tool integration **Analysis Engine**: Advanced scoring algorithm with 15+ metrics **Data Pipeline**: Real-time Yahoo Finance integration **Risk Engine**: Portfolio optimization and risk analytics **Visualization**: Interactive Plotly charts and dashboards *Educational platform - not financial advice. Always consult professionals.* """) return app # Main execution functions def main(): """Main function to run the application""" import argparse parser = argparse.ArgumentParser(description="MCP Stock Analysis Agent") parser.add_argument("--mode", choices=["mcp", "gradio", "both"], default="both", help="Run mode: mcp (server only), gradio (web interface), or both") parser.add_argument("--port", type=int, default=7860, help="Gradio server port") parser.add_argument("--share", action="store_true", help="Share Gradio interface publicly") args = parser.parse_args() if args.mode == "mcp": # Run MCP server only asyncio.run(run_mcp_server()) elif args.mode == "gradio": # Run Gradio interface only app = create_enhanced_gradio_app() app.launch(server_port=args.port, share=args.share) else: # Run both MCP server and Gradio interface print("🚀 Starting MCP Stock Analysis Agent...") print("📊 MCP Server will run in background") print(f"🌐 Gradio Interface will be available at http://localhost:{args.port}") # Start Gradio interface (MCP server runs on-demand) app = create_enhanced_gradio_app() app.launch(server_port=args.port, share=args.share) if __name__ == "__main__": main()