Spaces:
Runtime error
Runtime error
import logging | |
import os | |
from typing import Any | |
from telegram import Update | |
from telegram.ext import Application, CommandHandler, ContextTypes | |
from dotenv import load_dotenv | |
from api.finnhub.financial_news_requester import fetch_comp_financial_news | |
from fastapi import FastAPI, Request | |
import uvicorn | |
import httpx | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
load_dotenv() | |
BOT_TOKEN = os.getenv("BOT_TOKEN") or os.getenv("TELEGRAM_TOKEN") | |
if not BOT_TOKEN: | |
logger.error("BOT_TOKEN not found! Please add it in Space Settings > Repository secrets") | |
raise ValueError("BOT_TOKEN not found in environment variables") | |
SPACE_ID = os.environ.get('SPACE_ID', 'researchengineering-news_sentiment_analyzer') | |
PORT = int(os.environ.get('PORT', 7860)) | |
application = None | |
def format_news_for_telegram(news_json: list[dict[str, Any]]) -> str: | |
message = "" | |
for item in news_json: | |
message += ( | |
f"π° <b>{item.get('headline', 'No headline')}</b>\n" | |
f"π {item.get('summary', 'No summary')}\n" | |
f"π·οΈ Source: {item.get('source', 'Unknown')}\n" | |
f"π <a href=\"{item.get('url', '#')}\">Read more</a>\n\n" | |
) | |
return message if message else "No news available." | |
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
await update.message.reply_text("Hello! I'm your Financial News Bot. Use /run to get latest news.") | |
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
"""Handle /help command""" | |
await update.message.reply_text( | |
"π€ Available commands:\n" | |
"/start - Start the bot\n" | |
"/help - Show this help\n" | |
"/run - Get latest financial news" | |
) | |
async def run_crew(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
await update.message.reply_text("Fetching latest financial news...") | |
try: | |
feed = fetch_comp_financial_news() | |
logger.info(f"Processed: {len(feed)} news items") | |
formatted_news = format_news_for_telegram(feed) | |
# Split message if too long (Telegram limit is 4096 characters) | |
if len(formatted_news) > 4000: | |
# Split by news items, not by character count | |
items = formatted_news.split('\n\n') | |
chunk = "" | |
for item in items: | |
if len(chunk) + len(item) + 2 > 4000: | |
await update.message.reply_text(chunk, parse_mode='HTML') | |
chunk = "" | |
chunk += item + "\n\n" | |
if chunk: | |
await update.message.reply_text(chunk, parse_mode='HTML') | |
else: | |
await update.message.reply_text(formatted_news, parse_mode='HTML') | |
except Exception as e: | |
logger.error(f"Error in run_crew: {e}") | |
await update.message.reply_text(f"Sorry, there was an error fetching news: {str(e)}") | |
# Initialize the Telegram application | |
async def init_telegram_app(): | |
global application | |
application = Application.builder().token(BOT_TOKEN).build() | |
# Add handlers | |
application.add_handler(CommandHandler("start", start)) | |
application.add_handler(CommandHandler("help", help_command)) | |
application.add_handler(CommandHandler("run", run_crew)) | |
# Initialize the application | |
await application.initialize() | |
logger.info("Telegram application initialized successfully") | |
# Create FastAPI app | |
app = FastAPI(title="Financial News Bot", description="Telegram bot for financial news") | |
async def startup_event() -> None: | |
"""Initialize Telegram bot on FastAPI startup""" | |
await init_telegram_app() | |
async def shutdown_event() -> None: | |
"""Cleanup on shutdown""" | |
global application | |
if application: | |
await application.shutdown() | |
async def webhook(request: Request) -> dict[str, Any]: | |
"""Handle incoming webhook from Telegram""" | |
try: | |
# Get the update from Telegram | |
json_data = await request.json() | |
update = Update.de_json(json_data, application.bot) | |
# Process the update | |
await application.process_update(update) | |
return {"status": "ok"} | |
except Exception as e: | |
logger.error(f"Error processing update: {e}") | |
return {"status": "error", "message": str(e)} | |
async def root() -> dict[str, Any]: | |
"""Health check endpoint""" | |
return { | |
"status": "Financial News Bot is running!", | |
"webhook_url": f"https://{SPACE_ID}.hf.space/{BOT_TOKEN[:10]}...", | |
"space_id": SPACE_ID, | |
"available_endpoints": ["/", "/set_webhook", "/webhook_info", "/health"], | |
"bot_initialized": application is not None | |
} | |
async def set_webhook() -> dict[str, Any]: | |
"""Manually set the webhook (call this once after deployment)""" | |
try: | |
webhook_url = f"https://{SPACE_ID}.hf.space/{BOT_TOKEN}" | |
set_webhook_url = f"https://api.telegram.org/bot{BOT_TOKEN}/setWebhook" | |
logger.info(f"Setting webhook to: {webhook_url}") | |
async with httpx.AsyncClient(timeout=30.0) as client: | |
response = await client.post(set_webhook_url, json={ | |
"url": webhook_url, | |
"drop_pending_updates": True | |
}) | |
result = response.json() | |
if result.get("ok"): | |
logger.info("Webhook set successfully!") | |
return {"status": "success", "webhook_url": webhook_url, "result": result} | |
else: | |
logger.error(f"Failed to set webhook: {result}") | |
return {"status": "error", "result": result} | |
except Exception as e: | |
logger.error(f"Error setting webhook: {e}") | |
return {"status": "error", "message": str(e)} | |
async def webhook_info() -> dict[str, Any]: | |
"""Get current webhook information""" | |
try: | |
info_url = f"https://api.telegram.org/bot{BOT_TOKEN}/getWebhookInfo" | |
async with httpx.AsyncClient(timeout=30.0) as client: | |
response = await client.get(info_url) | |
result = response.json() | |
return result | |
except Exception as e: | |
logger.error(f"Error getting webhook info: {e}") | |
return {"status": "error", "message": str(e)} | |
async def health() -> dict[str, Any]: | |
"""Additional health check""" | |
return {"status": "healthy", "bot_token_set": bool(BOT_TOKEN)} | |
def main() -> None: | |
logger.info(f"Starting Financial News Bot on port {PORT}") | |
logger.info(f"Bot token: {BOT_TOKEN[:10]}..." if BOT_TOKEN else "No token set") | |
logger.info(f"Space ID: {SPACE_ID}") | |
logger.info(f"Webhook URL will be: https://{SPACE_ID}.hf.space/{BOT_TOKEN[:10]}...") | |
logger.info("After deployment, visit /set_webhook to configure the webhook") | |
uvicorn.run(app, host="0.0.0.0", port=PORT) | |
#application.run_polling() | |