# crypto_tools.py - CrewAI Native Version import os import requests from crewai.tools import BaseTool from typing import Type from pydantic import BaseModel, Field class CryptoPriceInput(BaseModel): """Input schema for CryptoPriceTool.""" crypto_name: str = Field(..., description="Name of the cryptocurrency to get price for") class CryptoPriceTool(BaseTool): name: str = "Get Crypto Price" description: str = "Gets the current price of a cryptocurrency from CoinGecko API" args_schema: Type[BaseModel] = CryptoPriceInput def _run(self, crypto_name: str) -> str: try: crypto_id = crypto_name.lower().replace(' ', '-') url = "https://api.coingecko.com/api/v3/simple/price" params = { 'ids': crypto_id, 'vs_currencies': 'usd', 'include_market_cap': 'true', 'include_24hr_change': 'true' } response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() if crypto_id in data: price = data[crypto_id]['usd'] change_24h = data[crypto_id].get('usd_24h_change', 0) return f"${price:,.2f} (24h change: {change_24h:+.2f}%)" else: # Try common alternatives alternatives = { 'btc': 'bitcoin', 'eth': 'ethereum', 'ada': 'cardano', 'dot': 'polkadot', 'sol': 'solana' } if crypto_name.lower() in alternatives: return self._run(alternatives[crypto_name.lower()]) return f"Cryptocurrency '{crypto_name}' not found. Try using full name like 'bitcoin' instead of 'btc'." except Exception as e: return f"Error fetching price data: {str(e)}" class MarketCapInput(BaseModel): """Input schema for MarketCapTool.""" crypto_name: str = Field(..., description="Name of the cryptocurrency to get market cap for") class MarketCapTool(BaseTool): name: str = "Get Market Cap" description: str = "Gets the market capitalization of a cryptocurrency" args_schema: Type[BaseModel] = MarketCapInput def _run(self, crypto_name: str) -> str: try: crypto_id = crypto_name.lower().replace(' ', '-') url = "https://api.coingecko.com/api/v3/simple/price" params = { 'ids': crypto_id, 'vs_currencies': 'usd', 'include_market_cap': 'true' } response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() if crypto_id in data: market_cap = data[crypto_id].get('usd_market_cap', 0) return f"${market_cap:,.0f}" else: return f"Market cap data not found for '{crypto_name}'" except Exception as e: return f"Error fetching market cap: {str(e)}" class RSIInput(BaseModel): """Input schema for RSITool.""" crypto_name: str = Field(..., description="Name of the cryptocurrency to calculate RSI for") class RSITool(BaseTool): name: str = "Calculate RSI" description: str = "Calculates the RSI (Relative Strength Index) for a cryptocurrency" args_schema: Type[BaseModel] = RSIInput def _run(self, crypto_name: str) -> str: try: # Mock RSI for demo (in production, you'd calculate from historical data) import random mock_rsi = round(random.uniform(20, 80), 2) if mock_rsi > 70: signal = "overbought" elif mock_rsi < 30: signal = "oversold" else: signal = "neutral" return f"RSI: {mock_rsi}, Signal: {signal}" except Exception as e: return f"Error calculating RSI: {str(e)}" class MovingAverageInput(BaseModel): """Input schema for MovingAverageTool.""" crypto_name: str = Field(..., description="Name of the cryptocurrency to calculate moving average for") class MovingAverageTool(BaseTool): name: str = "Calculate Moving Average" description: str = "Calculates the 7-day moving average for a cryptocurrency" args_schema: Type[BaseModel] = MovingAverageInput def _run(self, crypto_name: str) -> str: try: # Get current price first price_tool = CryptoPriceTool() price_result = price_tool._run(crypto_name) # Extract price from result if "$" in price_result: try: current_price = float(price_result.split("$")[1].split(" ")[0].replace(",", "")) # Mock 7-day MA import random ma_7 = current_price * random.uniform(0.95, 1.05) return f"7-day MA: ${ma_7:,.2f}" except: return "Unable to calculate moving average" else: return "Unable to get price for moving average calculation" except Exception as e: return f"Error calculating moving average: {str(e)}"