|
import os |
|
import logging |
|
import asyncio |
|
import requests |
|
from telegram import Update |
|
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext |
|
import aiohttp |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
|
|
|
BOT_TOKEN = os.getenv("BOT_TOKEN") |
|
BASE_URL = os.getenv("BASE_URL") |
|
API_USERNAME = os.getenv("API_USERNAME") |
|
API_PASSWORD = os.getenv("API_PASSWORD") |
|
|
|
|
|
SECRET_PASSWORD = "secure123" |
|
|
|
|
|
AUTHENTICATED_USERS = set() |
|
AWAITING_PASSWORD = set() |
|
|
|
class TelegramBot: |
|
"""A Telegram bot with password-based authentication.""" |
|
|
|
def __init__(self, bot_token, base_url, username, password): |
|
"""Initialize the bot with Telegram API token, API credentials, and authentication.""" |
|
self.bot_token = bot_token |
|
self.base_url = base_url |
|
self.username = username |
|
self.password = password |
|
self.auth_token = None |
|
|
|
|
|
self.login_url = f"{self.base_url}/api/v1/auth/login" |
|
self.ai_url = f"{self.base_url}/api/v1/questions/text" |
|
self.excel_url = f"{self.base_url}/api/v1/questions/excel" |
|
|
|
|
|
self.app = Application.builder().token(self.bot_token).build() |
|
self.setup_handlers() |
|
|
|
|
|
logging.info("Authenticating with API...") |
|
self.authenticate() |
|
|
|
def authenticate(self): |
|
"""Authenticate with the API and retrieve an access token.""" |
|
payload = {"username": self.username, "password": self.password} |
|
headers = {"Content-Type": "application/json", "accept": "application/json"} |
|
|
|
try: |
|
response = requests.post(self.login_url, headers=headers, json=payload) |
|
|
|
if response.status_code == 200: |
|
self.auth_token = response.json().get("access_token") |
|
logging.info("Successfully authenticated with API") |
|
else: |
|
logging.error(f"Authentication failed: {response.status_code} - {response.text}") |
|
|
|
except Exception as e: |
|
logging.error(f"Authentication Error: {e}") |
|
|
|
async def start_command(self, update: Update, context: CallbackContext): |
|
"""Handles the /start command and asks for a password if the user is not authenticated.""" |
|
user_id = update.message.from_user.id |
|
|
|
if user_id in AUTHENTICATED_USERS: |
|
await update.message.reply_text("β
You are already authenticated! You can start chatting.") |
|
else: |
|
AWAITING_PASSWORD.add(user_id) |
|
await update.message.reply_text("π Please enter the secret password to access the bot.") |
|
|
|
async def handle_message(self, update: Update, context: CallbackContext): |
|
"""Handles all incoming messages.""" |
|
user_id = update.message.from_user.id |
|
user_message = update.message.text.strip() |
|
|
|
|
|
if user_id in AWAITING_PASSWORD: |
|
await self.check_password(update, context) |
|
return |
|
|
|
|
|
if user_id in AUTHENTICATED_USERS: |
|
await self.chat_with_ai(update, context) |
|
else: |
|
await update.message.reply_text("β You are not authenticated. Please enter the password first.") |
|
|
|
async def check_password(self, update: Update, context: CallbackContext): |
|
"""Checks if the password is correct and authenticates the user.""" |
|
user_id = update.message.from_user.id |
|
user_message = update.message.text.strip() |
|
|
|
if user_id in AUTHENTICATED_USERS: |
|
await update.message.reply_text("β
You are already authenticated!") |
|
return |
|
|
|
if user_message == SECRET_PASSWORD: |
|
AUTHENTICATED_USERS.add(user_id) |
|
AWAITING_PASSWORD.discard(user_id) |
|
logging.info(f"User {user_id} authenticated successfully.") |
|
await update.message.reply_text("β
Authentication successful! You can now use the bot.") |
|
else: |
|
await update.message.reply_text("β Wrong password. Try again.") |
|
|
|
async def chat_with_ai(self, update: Update, context: CallbackContext): |
|
"""Handles messages and sends them to the AI API.""" |
|
user_id = update.message.from_user.id |
|
|
|
if user_id not in AUTHENTICATED_USERS: |
|
await update.message.reply_text("β You are not authenticated. Please enter the password first.") |
|
return |
|
|
|
if not self.auth_token: |
|
self.authenticate() |
|
|
|
if not self.auth_token: |
|
await update.message.reply_text("Authentication failed. Please try again later.") |
|
return |
|
|
|
user_message = update.message.text |
|
|
|
headers = { |
|
"Authorization": f"Bearer {self.auth_token}", |
|
"accept": "application/json" |
|
} |
|
|
|
json_payload = {"question": user_message} |
|
form_payload = {"question": user_message} |
|
|
|
try: |
|
logging.info(f"Sending payload as JSON: {json_payload}") |
|
response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/json"}, json=json_payload) |
|
|
|
if response.status_code == 422: |
|
logging.warning("JSON format rejected. Retrying with form-data...") |
|
response = requests.post(self.ai_url, headers={**headers, "Content-Type": "application/x-www-form-urlencoded"}, data=form_payload) |
|
|
|
logging.info(f"Response status: {response.status_code}") |
|
logging.info(f"Response content: {response.text}") |
|
|
|
if response.status_code == 200: |
|
bot_reply = response.json().get("answer", "I didn't understand that.") |
|
elif response.status_code == 401: |
|
logging.warning("Authorization expired. Re-authenticating...") |
|
self.authenticate() |
|
await self.chat_with_ai(update, context) |
|
return |
|
elif response.status_code == 422: |
|
bot_reply = "Error: The API rejected the request. Check payload format." |
|
else: |
|
bot_reply = f"Error: {response.status_code} - {response.text}" |
|
|
|
except Exception as e: |
|
bot_reply = f"Connection error: {e}" |
|
|
|
await update.message.reply_text(bot_reply) |
|
|
|
async def handle_excel(self, update: Update, context: CallbackContext): |
|
"""Handles Excel file uploads.""" |
|
user_id = update.message.from_user.id |
|
|
|
if user_id not in AUTHENTICATED_USERS: |
|
await update.message.reply_text("β You are not authenticated. Please enter the password first.") |
|
return |
|
|
|
if not self.auth_token: |
|
self.authenticate() |
|
|
|
if not self.auth_token: |
|
await update.message.reply_text("Authentication failed. Please try again later.") |
|
return |
|
|
|
try: |
|
|
|
file = await context.bot.get_file(update.message.document.file_id) |
|
file_bytes = await file.download_as_bytearray() |
|
|
|
|
|
headers = { |
|
"Authorization": f"Bearer {self.auth_token}", |
|
"accept": "application/json" |
|
} |
|
|
|
|
|
form_data = aiohttp.FormData() |
|
form_data.add_field('file', |
|
file_bytes, |
|
filename=update.message.document.file_name, |
|
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.post(self.excel_url, headers=headers, data=form_data) as response: |
|
if response.status == 200: |
|
response_json = await response.json() |
|
await update.message.reply_text(response_json.get("message", "Excel file processed successfully!")) |
|
elif response.status == 401: |
|
logging.warning("Authorization expired. Re-authenticating...") |
|
self.authenticate() |
|
await self.handle_excel(update, context) |
|
else: |
|
error_text = await response.text() |
|
await update.message.reply_text(f"Error processing Excel file: {error_text}") |
|
|
|
except Exception as e: |
|
logging.error(f"Error handling Excel file: {e}") |
|
await update.message.reply_text(f"Error processing Excel file: {str(e)}") |
|
|
|
def setup_handlers(self): |
|
"""Set up Telegram command and message handlers.""" |
|
self.app.add_handler(CommandHandler("start", self.start_command)) |
|
self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) |
|
|
|
self.app.add_handler(MessageHandler(filters.Document.FileExtension("xlsx") | filters.Document.FileExtension("xls"), self.handle_excel)) |
|
|
|
def run(self): |
|
"""Start the bot and listen for messages.""" |
|
logging.info("Starting Telegram bot...") |
|
asyncio.set_event_loop(asyncio.new_event_loop()) |
|
try: |
|
self.app.run_polling() |
|
except Exception as e: |
|
logging.error(f"Bot failed to start: {e}") |
|
|
|
if __name__ == "__main__": |
|
bot = TelegramBot( |
|
bot_token=BOT_TOKEN, |
|
base_url=BASE_URL, |
|
username=API_USERNAME, |
|
password=API_PASSWORD |
|
) |
|
bot.run() |
|
|