import os import logging import asyncio import requests from telegram import Update from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext import aiohttp # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # Load environment variables from Hugging Face Secrets BOT_TOKEN = os.getenv("BOT_TOKEN") BASE_URL = os.getenv("BASE_URL") API_USERNAME = os.getenv("API_USERNAME") API_PASSWORD = os.getenv("API_PASSWORD") # Set the secret password for authentication SECRET_PASSWORD = "secure123" # Change this to your desired password # Dictionary to store authenticated users AUTHENTICATED_USERS = set() AWAITING_PASSWORD = set() class TelegramBot: """A Telegram bot with password-based authentication.""" def __init__(self, bot_token, base_url, username, password): """Initialize the bot with Telegram API token, API credentials, and authentication.""" self.bot_token = bot_token self.base_url = base_url self.username = username self.password = password self.auth_token = None # API Endpoints self.login_url = f"{self.base_url}/api/v1/auth/login" self.ai_url = f"{self.base_url}/api/v1/questions/text" self.excel_url = f"{self.base_url}/api/v1/questions/excel" # Start Telegram Bot self.app = Application.builder().token(self.bot_token).build() self.setup_handlers() # Authenticate with API logging.info("Authenticating with API...") self.authenticate() def authenticate(self): """Authenticate with the API and retrieve an access token.""" payload = {"username": self.username, "password": self.password} headers = {"Content-Type": "application/json", "accept": "application/json"} try: response = requests.post(self.login_url, headers=headers, json=payload) if response.status_code == 200: self.auth_token = response.json().get("access_token") logging.info("Successfully authenticated with API") else: logging.error(f"Authentication failed: {response.status_code} - {response.text}") except Exception as e: logging.error(f"Authentication Error: {e}") async def start_command(self, update: Update, context: CallbackContext): """Handles the /start command and asks for a password if the user is not authenticated.""" user_id = update.message.from_user.id if user_id in AUTHENTICATED_USERS: await update.message.reply_text("✅ You are already authenticated! You can start chatting.") else: AWAITING_PASSWORD.add(user_id) await update.message.reply_text("🔑 Please enter the secret password to access the bot.") async def handle_message(self, update: Update, context: CallbackContext): """Handles all incoming messages.""" user_id = update.message.from_user.id user_message = update.message.text.strip() # If user is waiting to enter a password, validate it if user_id in AWAITING_PASSWORD: await self.check_password(update, context) return # If user is authenticated, process AI request if user_id in AUTHENTICATED_USERS: await self.chat_with_ai(update, context) else: await update.message.reply_text("❌ You are not authenticated. Please enter the password first.") async def check_password(self, update: Update, context: CallbackContext): """Checks if the password is correct and authenticates the user.""" user_id = update.message.from_user.id user_message = update.message.text.strip() if user_id in AUTHENTICATED_USERS: await update.message.reply_text("✅ You are already authenticated!") return if user_message == SECRET_PASSWORD: AUTHENTICATED_USERS.add(user_id) # ✅ Save authentication status AWAITING_PASSWORD.discard(user_id) # ✅ Remove user from the waiting list logging.info(f"User {user_id} authenticated successfully.") await update.message.reply_text("✅ Authentication successful! You can now use the bot.") else: await update.message.reply_text("❌ Wrong password. Try again.") async def chat_with_ai(self, update: Update, context: CallbackContext): """Handles messages and sends them to the AI API.""" user_id = update.message.from_user.id if user_id not in AUTHENTICATED_USERS: await update.message.reply_text("❌ You are not authenticated. Please enter the password first.") return if not self.auth_token: self.authenticate() if not self.auth_token: await update.message.reply_text("Authentication failed. Please try again later.") return user_message = update.message.text headers = { "Authorization": f"Bearer {self.auth_token}", "accept": "application/json" } json_payload = {"question": user_message} form_payload = {"question": user_message} try: logging.info(f"Sending payload as JSON: {json_payload}") response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/json"}, json=json_payload) if response.status_code == 422: logging.warning("JSON format rejected. Retrying with form-data...") response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/x-www-form-urlencoded"}, data=form_payload) logging.info(f"Response status: {response.status_code}") logging.info(f"Response content: {response.text}") if response.status_code == 200: bot_reply = response.json().get("answer", "I didn't understand that.") elif response.status_code == 401: logging.warning("Authorization expired. Re-authenticating...") self.authenticate() await self.chat_with_ai(update, context) return elif response.status_code == 422: bot_reply = "Error: The API rejected the request. Check payload format." else: bot_reply = f"Error: {response.status_code} - {response.text}" except Exception as e: bot_reply = f"Connection error: {e}" await update.message.reply_text(bot_reply) async def handle_excel(self, update: Update, context: CallbackContext): """Handles Excel file uploads.""" user_id = update.message.from_user.id if user_id not in AUTHENTICATED_USERS: await update.message.reply_text("❌ You are not authenticated. Please enter the password first.") return if not self.auth_token: self.authenticate() if not self.auth_token: await update.message.reply_text("Authentication failed. Please try again later.") return try: # Get file from Telegram file = await context.bot.get_file(update.message.document.file_id) file_bytes = await file.download_as_bytearray() # Prepare the file upload headers = { "Authorization": f"Bearer {self.auth_token}", "accept": "application/json" } # Create form data with the file form_data = aiohttp.FormData() form_data.add_field('file', file_bytes, filename=update.message.document.file_name, content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') async with aiohttp.ClientSession() as session: async with session.post(self.excel_url, headers=headers, data=form_data) as response: if response.status == 200: response_json = await response.json() await update.message.reply_text(response_json.get("message", "Excel file processed successfully!")) elif response.status == 401: logging.warning("Authorization expired. Re-authenticating...") self.authenticate() await self.handle_excel(update, context) else: error_text = await response.text() await update.message.reply_text(f"Error processing Excel file: {error_text}") except Exception as e: logging.error(f"Error handling Excel file: {e}") await update.message.reply_text(f"Error processing Excel file: {str(e)}") def setup_handlers(self): """Set up Telegram command and message handlers.""" self.app.add_handler(CommandHandler("start", self.start_command)) self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) # Add handler for document messages self.app.add_handler(MessageHandler(filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"), self.handle_excel)) def run(self): """Start the bot and listen for messages.""" logging.info("Starting Telegram bot...") asyncio.set_event_loop(asyncio.new_event_loop()) try: self.app.run_polling() except Exception as e: logging.error(f"Bot failed to start: {e}") if __name__ == "__main__": bot = TelegramBot( bot_token=BOT_TOKEN, base_url=BASE_URL, username=API_USERNAME, password=API_PASSWORD ) bot.run()