import logging
import os
from typing import Any
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
from dotenv import load_dotenv
from api.finnhub.financial_news_requester import fetch_comp_financial_news
from fastapi import FastAPI, Request
import uvicorn
import httpx
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
BOT_TOKEN = os.getenv("BOT_TOKEN") or os.getenv("TELEGRAM_TOKEN")
if not BOT_TOKEN:
logger.error("BOT_TOKEN not found! Please add it in Space Settings > Repository secrets")
raise ValueError("BOT_TOKEN not found in environment variables")
SPACE_ID = os.environ.get('SPACE_ID', 'researchengineering-news_sentiment_analyzer')
PORT = int(os.environ.get('PORT', 7860))
application = None
def format_news_for_telegram(news_json: list[dict[str, Any]]) -> str:
message = ""
for item in news_json:
message += (
f"📰 {item.get('headline', 'No headline')}\n"
f"📝 {item.get('summary', 'No summary')}\n"
f"🏷️ Source: {item.get('source', 'Unknown')}\n"
f"🔗 Read more\n\n"
)
return message if message else "No news available."
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Hello! I'm your Financial News Bot. Use /run to get latest news.")
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /help command"""
await update.message.reply_text(
"🤖 Available commands:\n"
"/start - Start the bot\n"
"/help - Show this help\n"
"/run - Get latest financial news"
)
async def run_crew(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Fetching latest financial news...")
try:
feed = fetch_comp_financial_news()
logger.info(f"Processed: {len(feed)} news items")
formatted_news = format_news_for_telegram(feed)
# Split message if too long (Telegram limit is 4096 characters)
if len(formatted_news) > 4000:
# Split by news items, not by character count
items = formatted_news.split('\n\n')
chunk = ""
for item in items:
if len(chunk) + len(item) + 2 > 4000:
await update.message.reply_text(chunk, parse_mode='HTML')
chunk = ""
chunk += item + "\n\n"
if chunk:
await update.message.reply_text(chunk, parse_mode='HTML')
else:
await update.message.reply_text(formatted_news, parse_mode='HTML')
except Exception as e:
logger.error(f"Error in run_crew: {e}")
await update.message.reply_text(f"Sorry, there was an error fetching news: {str(e)}")
# Initialize the Telegram application
async def init_telegram_app():
global application
application = Application.builder().token(BOT_TOKEN).build()
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("run", run_crew))
# Initialize the application
await application.initialize()
logger.info("Telegram application initialized successfully")
# Create FastAPI app
app = FastAPI(title="Financial News Bot", description="Telegram bot for financial news")
@app.on_event("startup")
async def startup_event() -> None:
"""Initialize Telegram bot on FastAPI startup"""
await init_telegram_app()
@app.on_event("shutdown")
async def shutdown_event() -> None:
"""Cleanup on shutdown"""
global application
if application:
await application.shutdown()
@app.post(f"/{BOT_TOKEN}")
async def webhook(request: Request) -> dict[str, Any]:
"""Handle incoming webhook from Telegram"""
try:
# Get the update from Telegram
json_data = await request.json()
update = Update.de_json(json_data, application.bot)
# Process the update
await application.process_update(update)
return {"status": "ok"}
except Exception as e:
logger.error(f"Error processing update: {e}")
return {"status": "error", "message": str(e)}
@app.get("/")
async def root() -> dict[str, Any]:
"""Health check endpoint"""
return {
"status": "Financial News Bot is running!",
"webhook_url": f"https://{SPACE_ID}.hf.space/{BOT_TOKEN[:10]}...",
"space_id": SPACE_ID,
"available_endpoints": ["/", "/set_webhook", "/webhook_info", "/health"],
"bot_initialized": application is not None
}
@app.get("/set_webhook")
async def set_webhook() -> dict[str, Any]:
"""Manually set the webhook (call this once after deployment)"""
try:
webhook_url = f"https://{SPACE_ID}.hf.space/{BOT_TOKEN}"
set_webhook_url = f"https://api.telegram.org/bot{BOT_TOKEN}/setWebhook"
logger.info(f"Setting webhook to: {webhook_url}")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(set_webhook_url, json={
"url": webhook_url,
"drop_pending_updates": True
})
result = response.json()
if result.get("ok"):
logger.info("Webhook set successfully!")
return {"status": "success", "webhook_url": webhook_url, "result": result}
else:
logger.error(f"Failed to set webhook: {result}")
return {"status": "error", "result": result}
except Exception as e:
logger.error(f"Error setting webhook: {e}")
return {"status": "error", "message": str(e)}
@app.get("/webhook_info")
async def webhook_info() -> dict[str, Any]:
"""Get current webhook information"""
try:
info_url = f"https://api.telegram.org/bot{BOT_TOKEN}/getWebhookInfo"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(info_url)
result = response.json()
return result
except Exception as e:
logger.error(f"Error getting webhook info: {e}")
return {"status": "error", "message": str(e)}
@app.get("/health")
async def health() -> dict[str, Any]:
"""Additional health check"""
return {"status": "healthy", "bot_token_set": bool(BOT_TOKEN)}
def main() -> None:
logger.info(f"Starting Financial News Bot on port {PORT}")
logger.info(f"Bot token: {BOT_TOKEN[:10]}..." if BOT_TOKEN else "No token set")
logger.info(f"Space ID: {SPACE_ID}")
logger.info(f"Webhook URL will be: https://{SPACE_ID}.hf.space/{BOT_TOKEN[:10]}...")
logger.info("After deployment, visit /set_webhook to configure the webhook")
uvicorn.run(app, host="0.0.0.0", port=PORT)
#application.run_polling()