import logging import os from typing import Any from telegram import Update from telegram.ext import Application, CommandHandler, ContextTypes from dotenv import load_dotenv from api.finnhub.financial_news_requester import fetch_comp_financial_news from fastapi import FastAPI, Request import uvicorn import httpx logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) load_dotenv() BOT_TOKEN = os.getenv("BOT_TOKEN") or os.getenv("TELEGRAM_TOKEN") if not BOT_TOKEN: logger.error("BOT_TOKEN not found! Please add it in Space Settings > Repository secrets") raise ValueError("BOT_TOKEN not found in environment variables") SPACE_ID = os.environ.get('SPACE_ID', 'researchengineering-news_sentiment_analyzer') PORT = int(os.environ.get('PORT', 7860)) application = None def format_news_for_telegram(news_json: list[dict[str, Any]]) -> str: message = "" for item in news_json: message += ( f"📰 {item.get('headline', 'No headline')}\n" f"📝 {item.get('summary', 'No summary')}\n" f"🏷️ Source: {item.get('source', 'Unknown')}\n" f"🔗 Read more\n\n" ) return message if message else "No news available." async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("Hello! I'm your Financial News Bot. Use /run to get latest news.") async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /help command""" await update.message.reply_text( "🤖 Available commands:\n" "/start - Start the bot\n" "/help - Show this help\n" "/run - Get latest financial news" ) async def run_crew(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("Fetching latest financial news...") try: feed = fetch_comp_financial_news() logger.info(f"Processed: {len(feed)} news items") formatted_news = format_news_for_telegram(feed) # Split message if too long (Telegram limit is 4096 characters) if len(formatted_news) > 4000: # Split by news items, not by character count items = formatted_news.split('\n\n') chunk = "" for item in items: if len(chunk) + len(item) + 2 > 4000: await update.message.reply_text(chunk, parse_mode='HTML') chunk = "" chunk += item + "\n\n" if chunk: await update.message.reply_text(chunk, parse_mode='HTML') else: await update.message.reply_text(formatted_news, parse_mode='HTML') except Exception as e: logger.error(f"Error in run_crew: {e}") await update.message.reply_text(f"Sorry, there was an error fetching news: {str(e)}") # Initialize the Telegram application async def init_telegram_app(): global application application = Application.builder().token(BOT_TOKEN).build() # Add handlers application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("run", run_crew)) # Initialize the application await application.initialize() logger.info("Telegram application initialized successfully") # Create FastAPI app app = FastAPI(title="Financial News Bot", description="Telegram bot for financial news") @app.on_event("startup") async def startup_event() -> None: """Initialize Telegram bot on FastAPI startup""" await init_telegram_app() @app.on_event("shutdown") async def shutdown_event() -> None: """Cleanup on shutdown""" global application if application: await application.shutdown() @app.post(f"/{BOT_TOKEN}") async def webhook(request: Request) -> dict[str, Any]: """Handle incoming webhook from Telegram""" try: # Get the update from Telegram json_data = await request.json() update = Update.de_json(json_data, application.bot) # Process the update await application.process_update(update) return {"status": "ok"} except Exception as e: logger.error(f"Error processing update: {e}") return {"status": "error", "message": str(e)} @app.get("/") async def root() -> dict[str, Any]: """Health check endpoint""" return { "status": "Financial News Bot is running!", "webhook_url": f"https://{SPACE_ID}.hf.space/{BOT_TOKEN[:10]}...", "space_id": SPACE_ID, "available_endpoints": ["/", "/set_webhook", "/webhook_info", "/health"], "bot_initialized": application is not None } @app.get("/set_webhook") async def set_webhook() -> dict[str, Any]: """Manually set the webhook (call this once after deployment)""" try: webhook_url = f"https://{SPACE_ID}.hf.space/{BOT_TOKEN}" set_webhook_url = f"https://api.telegram.org/bot{BOT_TOKEN}/setWebhook" logger.info(f"Setting webhook to: {webhook_url}") async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(set_webhook_url, json={ "url": webhook_url, "drop_pending_updates": True }) result = response.json() if result.get("ok"): logger.info("Webhook set successfully!") return {"status": "success", "webhook_url": webhook_url, "result": result} else: logger.error(f"Failed to set webhook: {result}") return {"status": "error", "result": result} except Exception as e: logger.error(f"Error setting webhook: {e}") return {"status": "error", "message": str(e)} @app.get("/webhook_info") async def webhook_info() -> dict[str, Any]: """Get current webhook information""" try: info_url = f"https://api.telegram.org/bot{BOT_TOKEN}/getWebhookInfo" async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(info_url) result = response.json() return result except Exception as e: logger.error(f"Error getting webhook info: {e}") return {"status": "error", "message": str(e)} @app.get("/health") async def health() -> dict[str, Any]: """Additional health check""" return {"status": "healthy", "bot_token_set": bool(BOT_TOKEN)} def main() -> None: logger.info(f"Starting Financial News Bot on port {PORT}") logger.info(f"Bot token: {BOT_TOKEN[:10]}..." if BOT_TOKEN else "No token set") logger.info(f"Space ID: {SPACE_ID}") logger.info(f"Webhook URL will be: https://{SPACE_ID}.hf.space/{BOT_TOKEN[:10]}...") logger.info("After deployment, visit /set_webhook to configure the webhook") uvicorn.run(app, host="0.0.0.0", port=PORT) #application.run_polling()