rfp_telegram_bot_old / bot_telegram.py
baha-99's picture
Update bot_telegram.py
9d3abf7 verified
raw
history blame
9.87 kB
import os
import logging
import asyncio
import requests
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
import aiohttp
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Load environment variables from Hugging Face Secrets
BOT_TOKEN = os.getenv("BOT_TOKEN")
BASE_URL = os.getenv("BASE_URL")
API_USERNAME = os.getenv("API_USERNAME")
API_PASSWORD = os.getenv("API_PASSWORD")
# Set the secret password for authentication
SECRET_PASSWORD = "secure123" # Change this to your desired password
# Dictionary to store authenticated users
AUTHENTICATED_USERS = set()
AWAITING_PASSWORD = set()
class TelegramBot:
"""A Telegram bot with password-based authentication."""
def __init__(self, bot_token, base_url, username, password):
"""Initialize the bot with Telegram API token, API credentials, and authentication."""
self.bot_token = bot_token
self.base_url = base_url
self.username = username
self.password = password
self.auth_token = None
# API Endpoints
self.login_url = f"{self.base_url}/api/v1/auth/login"
self.ai_url = f"{self.base_url}/api/v1/questions/text"
self.excel_url = f"{self.base_url}/api/v1/questions/excel"
# Start Telegram Bot
self.app = Application.builder().token(self.bot_token).build()
self.setup_handlers()
# Authenticate with API
logging.info("Authenticating with API...")
self.authenticate()
def authenticate(self):
"""Authenticate with the API and retrieve an access token."""
payload = {"username": self.username, "password": self.password}
headers = {"Content-Type": "application/json", "accept": "application/json"}
try:
response = requests.post(self.login_url, headers=headers, json=payload)
if response.status_code == 200:
self.auth_token = response.json().get("access_token")
logging.info("Successfully authenticated with API")
else:
logging.error(f"Authentication failed: {response.status_code} - {response.text}")
except Exception as e:
logging.error(f"Authentication Error: {e}")
async def start_command(self, update: Update, context: CallbackContext):
"""Handles the /start command and asks for a password if the user is not authenticated."""
user_id = update.message.from_user.id
if user_id in AUTHENTICATED_USERS:
await update.message.reply_text("βœ… You are already authenticated! You can start chatting.")
else:
AWAITING_PASSWORD.add(user_id)
await update.message.reply_text("πŸ”‘ Please enter the secret password to access the bot.")
async def handle_message(self, update: Update, context: CallbackContext):
"""Handles all incoming messages."""
user_id = update.message.from_user.id
user_message = update.message.text.strip()
# If user is waiting to enter a password, validate it
if user_id in AWAITING_PASSWORD:
await self.check_password(update, context)
return
# If user is authenticated, process AI request
if user_id in AUTHENTICATED_USERS:
await self.chat_with_ai(update, context)
else:
await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
async def check_password(self, update: Update, context: CallbackContext):
"""Checks if the password is correct and authenticates the user."""
user_id = update.message.from_user.id
user_message = update.message.text.strip()
if user_id in AUTHENTICATED_USERS:
await update.message.reply_text("βœ… You are already authenticated!")
return
if user_message == SECRET_PASSWORD:
AUTHENTICATED_USERS.add(user_id) # βœ… Save authentication status
AWAITING_PASSWORD.discard(user_id) # βœ… Remove user from the waiting list
logging.info(f"User {user_id} authenticated successfully.")
await update.message.reply_text("βœ… Authentication successful! You can now use the bot.")
else:
await update.message.reply_text("❌ Wrong password. Try again.")
async def chat_with_ai(self, update: Update, context: CallbackContext):
"""Handles messages and sends them to the AI API."""
user_id = update.message.from_user.id
if user_id not in AUTHENTICATED_USERS:
await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
return
if not self.auth_token:
self.authenticate()
if not self.auth_token:
await update.message.reply_text("Authentication failed. Please try again later.")
return
user_message = update.message.text
headers = {
"Authorization": f"Bearer {self.auth_token}",
"accept": "application/json"
}
json_payload = {"question": user_message}
form_payload = {"question": user_message}
try:
logging.info(f"Sending payload as JSON: {json_payload}")
response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/json"}, json=json_payload)
if response.status_code == 422:
logging.warning("JSON format rejected. Retrying with form-data...")
response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/x-www-form-urlencoded"}, data=form_payload)
logging.info(f"Response status: {response.status_code}")
logging.info(f"Response content: {response.text}")
if response.status_code == 200:
bot_reply = response.json().get("answer", "I didn't understand that.")
elif response.status_code == 401:
logging.warning("Authorization expired. Re-authenticating...")
self.authenticate()
await self.chat_with_ai(update, context)
return
elif response.status_code == 422:
bot_reply = "Error: The API rejected the request. Check payload format."
else:
bot_reply = f"Error: {response.status_code} - {response.text}"
except Exception as e:
bot_reply = f"Connection error: {e}"
await update.message.reply_text(bot_reply)
async def handle_excel(self, update: Update, context: CallbackContext):
"""Handles Excel file uploads."""
user_id = update.message.from_user.id
if user_id not in AUTHENTICATED_USERS:
await update.message.reply_text("❌ You are not authenticated. Please enter the password first.")
return
if not self.auth_token:
self.authenticate()
if not self.auth_token:
await update.message.reply_text("Authentication failed. Please try again later.")
return
try:
# Get file from Telegram
file = await context.bot.get_file(update.message.document.file_id)
file_bytes = await file.download_as_bytearray()
# Prepare the file upload
headers = {
"Authorization": f"Bearer {self.auth_token}",
"accept": "application/json"
}
# Create form data with the file
form_data = aiohttp.FormData()
form_data.add_field('file',
file_bytes,
filename=update.message.document.file_name,
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
async with aiohttp.ClientSession() as session:
async with session.post(self.excel_url, headers=headers, data=form_data) as response:
if response.status == 200:
response_json = await response.json()
await update.message.reply_text(response_json.get("message", "Excel file processed successfully!"))
elif response.status == 401:
logging.warning("Authorization expired. Re-authenticating...")
self.authenticate()
await self.handle_excel(update, context)
else:
error_text = await response.text()
await update.message.reply_text(f"Error processing Excel file: {error_text}")
except Exception as e:
logging.error(f"Error handling Excel file: {e}")
await update.message.reply_text(f"Error processing Excel file: {str(e)}")
def setup_handlers(self):
"""Set up Telegram command and message handlers."""
self.app.add_handler(CommandHandler("start", self.start_command))
self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
# Add handler for document messages
self.app.add_handler(MessageHandler(filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"), self.handle_excel))
def run(self):
"""Start the bot and listen for messages."""
logging.info("Starting Telegram bot...")
asyncio.set_event_loop(asyncio.new_event_loop())
try:
self.app.run_polling()
except Exception as e:
logging.error(f"Bot failed to start: {e}")
if __name__ == "__main__":
bot = TelegramBot(
bot_token=BOT_TOKEN,
base_url=BASE_URL,
username=API_USERNAME,
password=API_PASSWORD
)
bot.run()