Spaces:
Paused
Paused
import os | |
import logging | |
import requests | |
from telegram import Update | |
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext | |
import aiohttp | |
import io | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
import time | |
import pandas as pd | |
from dotenv import load_dotenv | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
# Load environment variables | |
load_dotenv() | |
# Load environment variables from .env file | |
BOT_SECRET_PASSWORD = os.getenv("BOT_SECRET_PASSWORD") | |
BOT_TOKEN = os.getenv("BOT_TOKEN") | |
BASE_URL = os.getenv("BASE_URL") | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
FILE_RFP_EXCEL_COUNT = int(os.getenv("FILE_RFP_EXCEL_COUNT", "100")) # Default to 100 if not set | |
# API_USERNAME = os.getenv("API_USERNAME") | |
# API_PASSWORD = os.getenv("API_PASSWORD") | |
# Set the secret password for authentication | |
SECRET_PASSWORD = BOT_SECRET_PASSWORD | |
# Dictionary to store authenticated users | |
AUTHENTICATED_USERS = set() | |
AWAITING_PASSWORD = set() | |
logging.info("Bot starting... v0.1.0") # Add version number | |
# Add debug logging | |
logging.info(f"BOT_TOKEN loaded: {'yes' if BOT_TOKEN else 'no'}") | |
logging.info(f"BASE_URL loaded: {'yes' if BASE_URL else 'no'}") | |
logging.info(f"HF_TOKEN loaded: {'yes' if HF_TOKEN else 'no'}") | |
logging.info(f"BOT_SECRET_PASSWORD loaded: {'yes' if BOT_SECRET_PASSWORD else 'no'}") | |
class TelegramBot: | |
"""A Telegram bot with password-based authentication.""" | |
def __init__(self, bot_token, base_url, application=None): | |
"""Initialize the bot with Telegram API token, API credentials, and authentication.""" | |
self.bot_token = bot_token | |
self.base_url = base_url | |
# self.username = username | |
# self.password = password | |
# self.auth_token = None | |
# API Endpoints | |
self.login_url = f"{self.base_url}/api/v1/auth/login" | |
self.ai_url = f"{self.base_url}/api/v1/questions/text" | |
self.excel_url = f"{self.base_url}/api/v1/questions/excel" | |
# Executors | |
self.text_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="text_worker") | |
self.excel_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="excel_worker") | |
self.excel_semaphore = asyncio.Semaphore(10) | |
# Track active processes | |
self.active_requests = {} | |
self.active_excel_files = {} | |
# Start Telegram Bot | |
if application: | |
self.app = application | |
else: | |
self.app = Application.builder().token(self.bot_token).build() | |
self.setup_handlers() | |
# Authenticate with API | |
logging.info("Authenticating with API...") | |
# self.authenticate() | |
# Create a ThreadPoolExecutor for handling concurrent requests | |
self.executor = ThreadPoolExecutor(max_workers=10) | |
# Increase Excel workers for more concurrent processing | |
self.excel_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="excel_worker") | |
self.excel_semaphore = asyncio.Semaphore(10) # Allow 10 concurrent Excel processes | |
self.active_excel_files = {} | |
# def authenticate(self): | |
# """Authenticate with the API and retrieve an access token.""" | |
# payload = {"username": self.username, "password": self.password} | |
# headers = {"Content-Type": "application/json", "accept": "application/json"} | |
# | |
# try: | |
# response = requests.post(self.login_url, headers=headers, json=payload) | |
# | |
# if response.status_code == 200: | |
# self.auth_token = response.json().get("access_token") | |
# logging.info("Successfully authenticated with API") | |
# else: | |
# logging.error(f"Authentication failed: {response.status_code} - {response.text}") | |
# | |
# except Exception as e: | |
# logging.error(f"Authentication Error: {e}") | |
async def start_command(self, update: Update, context: CallbackContext): | |
"""Handles the /start command and asks for a password if the user is not authenticated.""" | |
user_id = update.message.from_user.id | |
if user_id in AUTHENTICATED_USERS: | |
await update.message.reply_text( | |
"β You are already authenticated!\n\n" | |
"You can :\n" | |
"1. Send me any question as text\n" | |
"2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n" | |
"Note: Excel files must contain no more than 200 questions.\n\n" | |
"type '/status' to check the status of the request" | |
) | |
else: | |
AWAITING_PASSWORD.add(user_id) | |
await update.message.reply_text("π Please enter the secret password to access the bot.") | |
async def handle_message(self, update: Update, context: CallbackContext): | |
"""Handles all incoming messages concurrently""" | |
user_id = update.message.from_user.id | |
user_message = update.message.text.strip() | |
message_id = update.message.message_id | |
# If user is waiting to enter a password, validate it | |
if user_id in AWAITING_PASSWORD: | |
await self.check_password(update, context) | |
return | |
# If user is authenticated, process AI request | |
if user_id in AUTHENTICATED_USERS: | |
# Create task for processing | |
asyncio.create_task(self.chat_with_ai(update, context)) | |
else: | |
await update.message.reply_text("β You are not authenticated. Please type /start to authenticate and then enter the password.") | |
async def check_password(self, update: Update, context: CallbackContext): | |
"""Checks if the password is correct and authenticates the user.""" | |
user_id = update.message.from_user.id | |
user_message = update.message.text.strip() | |
if user_message == SECRET_PASSWORD: | |
AUTHENTICATED_USERS.add(user_id) | |
AWAITING_PASSWORD.discard(user_id) | |
logging.info(f"User {user_id} authenticated successfully.") | |
await update.message.reply_text( | |
"β Authentication successful!\n\n" | |
"You can:\n" | |
"1. Send me any question as text\n" | |
"2. Send me an Excel file with questions (must have a 'question' column in 'rfp' sheet)\n\n" | |
"Note: Excel files must contain no more than 200 questions." | |
) | |
else: | |
await update.message.reply_text("β Wrong password. Try again.") | |
async def chat_with_ai(self, update: Update, context: CallbackContext): | |
"""Process text messages asynchronously""" | |
message_id = update.message.message_id | |
user_message = update.message.text | |
try: | |
# Send immediate acknowledgment | |
processing_msg = await update.message.reply_text( | |
f"π€ Processing your request...\n" | |
f"Request ID: #{message_id}" | |
) | |
# Track this request | |
self.active_requests[message_id] = { | |
'type': 'text', | |
'status': 'processing', | |
'start_time': time.time() | |
} | |
# Process in thread pool | |
response = await asyncio.get_event_loop().run_in_executor( | |
self.text_executor, | |
self._make_api_request, | |
user_message | |
) | |
# Update with response | |
await processing_msg.edit_text( | |
f"β Response for #{message_id}:\n{response}" | |
) | |
except Exception as e: | |
logging.error(f"Error processing text request: {e}") | |
await processing_msg.edit_text( | |
f"β Error processing request #{message_id}: {str(e)}" | |
) | |
finally: | |
if message_id in self.active_requests: | |
self.active_requests[message_id]['status'] = 'completed' | |
self.active_requests[message_id]['end_time'] = time.time() | |
def _make_api_request(self, user_message): | |
"""Make API request""" | |
try: | |
headers = { | |
"Authorization": f"Bearer {HF_TOKEN}", | |
"accept": "application/json" | |
} | |
json_payload = {"question": user_message} | |
form_payload = {"question": user_message} | |
response = requests.post( | |
self.ai_url, | |
headers={**headers, "Content-Type": "application/json"}, | |
json=json_payload | |
) | |
if response.status_code == 422: | |
response = requests.post( | |
self.ai_url, | |
headers={**headers, "Content-Type": "application/x-www-form-urlencoded"}, | |
data=form_payload | |
) | |
if response.status_code == 200: | |
return response.json().get("answer", "I didn't understand that.") | |
else: | |
return f"Error: {response.status_code}" | |
except Exception as e: | |
return f"Connection error: {e}" | |
async def handle_excel(self, update: Update, context: CallbackContext): | |
"""Handle Excel files concurrently""" | |
logging.info("=== Starting handle_excel function ===") | |
# Add authentication check | |
user_id = update.message.from_user.id | |
if user_id not in AUTHENTICATED_USERS: | |
logging.info(f"Unauthorized access attempt from user {user_id}") | |
await update.message.reply_text( | |
"β You are not authenticated.\n" | |
"Please type /start to authenticate and enter the password first." | |
) | |
return | |
logging.info(f"Authenticated user {user_id} uploaded file: {update.message.document.file_name}") | |
logging.info(f"Received file: {update.message.document.file_name}") | |
try: | |
document = update.message.document | |
message_id = update.message.message_id | |
logging.info(f"Processing document with ID: {message_id}") | |
# First download the file | |
logging.info("Downloading file...") | |
file = await context.bot.get_file(document.file_id) | |
file_bytes = await file.download_as_bytearray() | |
logging.info("File downloaded successfully") | |
# Excel'i oku ve soru sayΔ±sΔ±nΔ± hesapla | |
try: | |
logging.info(f"Starting to read file: {document.file_name}") | |
# Check file extension | |
if document.file_name.lower().endswith('.csv'): | |
df = pd.read_csv(io.BytesIO(file_bytes)) | |
logging.info("Reading as CSV file") | |
else: | |
df = pd.read_excel(io.BytesIO(file_bytes), sheet_name='rfp') | |
logging.info("Reading as Excel file") | |
logging.info(f"File read successfully. Columns found: {df.columns.tolist()}") | |
# Add debug logging | |
logging.info(f"Excel columns found: {df.columns.tolist()}") | |
# Check for empty dataframe first | |
if df.empty: | |
await update.message.reply_text( | |
"β Error: Excel file is empty." | |
) | |
return | |
# Check for question column and make it case-insensitive | |
columns_lower = [col.lower() for col in df.columns] | |
if 'question' not in columns_lower: | |
logging.info(f"'question' column not found in columns: {df.columns.tolist()}") | |
await update.message.reply_text( | |
"β Error: Excel file does not have 'question' column.\n" | |
"Please make sure your Excel file has a column named 'question'.\n" | |
f"Found columns: {', '.join(df.columns.tolist())}" | |
) | |
return | |
except Exception as e: | |
logging.error(f"Error reading Excel: {str(e)}") | |
await update.message.reply_text( | |
f"β Error reading Excel file: {str(e)}\n" | |
f"Please make sure the file has 'rfp' sheet with 'question' column." | |
) | |
return | |
#print("debug 1") | |
#print("FILE_RFP_EXCEL_COUNT: ",FILE_RFP_EXCEL_COUNT) | |
#print("type: ",type(FILE_RFP_EXCEL_COUNT)) | |
# Add debug logging before the check | |
question_count = df['question'].count() | |
logging.info(f"Number of questions found: {question_count}") | |
logging.info(f"Question limit (FILE_RFP_EXCEL_COUNT): {FILE_RFP_EXCEL_COUNT}") | |
if question_count > FILE_RFP_EXCEL_COUNT: | |
logging.info(f"Exceeded question limit: {question_count} > {FILE_RFP_EXCEL_COUNT}") | |
await update.message.reply_text( | |
"β Error: Too many questions in Excel file!\n" | |
f"Your file has {question_count} questions.\n" | |
f"Maximum allowed is {FILE_RFP_EXCEL_COUNT} questions.\n" | |
"Please reduce the number of questions and try again." | |
) | |
logging.info("Sent error message to user about exceeding question limit") | |
return | |
# If we get here, the question count is okay | |
logging.info("Question count is within limits, proceeding with processing") | |
num_questions = len(df['question']) | |
# Tahmini sΓΌreyi hesapla | |
estimated_seconds = num_questions * 30 # Her soru 30 saniye | |
estimated_minutes = estimated_seconds / 60 # Dakikaya Γ§evir | |
# Hemen tahmini sΓΌreyi gΓΆster | |
await update.message.reply_text( | |
f"π Excel file received!\n" | |
f"File: {document.file_name}\n" | |
f"Number of questions: {num_questions}\n" | |
f"Estimated processing time: {estimated_minutes:.1f} minutes\n\n" | |
f"Starting processing..." | |
) | |
# Εimdi asΔ±l iΕleme baΕla | |
asyncio.create_task(self._process_excel_file(update, context, message_id, file_bytes)) | |
except Exception as e: | |
await update.message.reply_text( | |
f"β Error reading Excel file: {str(e)}\n" | |
) | |
async def _process_excel_file(self, update, context, message_id, file_bytes): | |
"""Process Excel file with progress updates""" | |
try: | |
document = update.message.document | |
# Send processing message | |
processing_msg = await update.message.reply_text( | |
f"βοΈ Processing Excel file...\n" | |
f"File: {document.file_name}\n" | |
f"Request ID: #{message_id}" | |
) | |
# Update status to processing | |
self.active_excel_files[message_id] = { | |
'filename': document.file_name, | |
'status': 'processing', | |
'start_time': time.time() | |
} | |
# Process in thread pool | |
result = await asyncio.get_event_loop().run_in_executor( | |
self.excel_executor, | |
self._process_excel_sync, | |
file_bytes, | |
document.file_name | |
) | |
if result is None: | |
await processing_msg.edit_text( | |
f"β Failed to process file\n" | |
f"File: {document.file_name}\n" | |
f"Please check the file format and try again." | |
) | |
return | |
# Send processed file | |
await context.bot.send_document( | |
chat_id=update.effective_chat.id, | |
document=io.BytesIO(result), | |
filename=f'processed_{document.file_name}', | |
caption=f"β Excel processing completed!\nRequest ID: #{message_id}" | |
) | |
await processing_msg.delete() | |
except Exception as e: | |
logging.error(f"Error processing file: {e}") | |
await processing_msg.edit_text( | |
f"β Error processing file\n" | |
f"File: {document.file_name}\n" | |
f"Error: {str(e)}" | |
) | |
finally: | |
if message_id in self.active_excel_files: | |
self.active_excel_files[message_id]['status'] = 'completed' | |
self.active_excel_files[message_id]['end_time'] = time.time() | |
def _process_excel_sync(self, file_bytes, filename): | |
"""Synchronous function to process Excel file""" | |
try: | |
headers = { | |
"Authorization": f"Bearer {HF_TOKEN}", | |
"accept": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
} | |
files = { | |
'file': ( | |
filename, | |
file_bytes, | |
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' | |
) | |
} | |
response = requests.post( | |
self.excel_url, | |
headers=headers, | |
files=files | |
) | |
if response.status_code == 200: | |
return response.content | |
else: | |
logging.error(f"Excel API Error: {response.status_code} - {response.text}") | |
return None | |
except Exception as e: | |
logging.error(f"Excel processing error: {e}") | |
return None | |
async def _update_progress(self, message, message_id, filename): | |
"""Update progress message periodically""" | |
try: | |
while message_id in self.active_excel_files: | |
elapsed_time = time.time() - self.active_excel_files[message_id]['start_time'] | |
hours = int(elapsed_time // 3600) | |
minutes = int((elapsed_time % 3600) // 60) | |
await message.edit_text( | |
f"βοΈ Processing Excel file...\n" | |
f"File: {filename}\n" | |
f"Request ID: #{message_id}\n" | |
f"Time elapsed: {hours}h {minutes}m\n" | |
f"Status: {self.active_excel_files[message_id]['status']}" | |
) | |
# Update every 5 minutes | |
await asyncio.sleep(300) | |
except Exception as e: | |
logging.error(f"Error updating progress: {e}") | |
async def status_command(self, update: Update, context: CallbackContext): | |
"""Show status of all active processes""" | |
status_message = "Current Status:\n\n" | |
# Text requests status | |
if self.active_requests: | |
status_message += "π Text Requests:\n" | |
for msg_id, info in self.active_requests.items(): | |
current_time = time.time() | |
processing_time = current_time - info['start_time'] | |
status_message += ( | |
f"Request #{msg_id}:\n" | |
f"ββ Status: {info['status']}\n" | |
f"ββ Time: {processing_time:.1f}s\n\n" | |
) | |
# Excel files status | |
if self.active_excel_files: | |
status_message += "π Excel Files:\n" | |
for msg_id, info in self.active_excel_files.items(): | |
current_time = time.time() | |
processing_time = current_time - info['start_time'] | |
status_message += ( | |
f"File #{msg_id}:\n" | |
f"ββ Name: {info['filename']}\n" | |
f"ββ Status: {info['status']}\n" | |
f"ββ Time: {processing_time:.1f}s\n\n" | |
) | |
if not self.active_requests and not self.active_excel_files: | |
status_message += "No active processes" | |
await update.message.reply_text(status_message) | |
def setup_handlers(self): | |
"""Set up Telegram command and message handlers.""" | |
logging.info("Setting up message handlers...") | |
self.app.add_handler(CommandHandler("start", self.start_command)) | |
self.app.add_handler(CommandHandler("status", self.status_command)) | |
self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) | |
# Update Excel handler to include CSV files | |
self.app.add_handler(MessageHandler( | |
filters.Document.FileExtension("xlsx") | | |
filters.Document.FileExtension("xls") | | |
filters.Document.FileExtension("csv"), # Add CSV support | |
self.handle_excel | |
)) | |
async def run(self): | |
"""Start the bot and listen for messages.""" | |
logging.info("Starting Telegram bot...") | |
await self.app.initialize() | |
await self.app.start() | |
await self.app.updater.start_polling() | |
async def bot_stop(self): | |
"""Stop the bot.""" | |
logging.info("Stopping Telegram bot...") | |
if self.app.updater: | |
await self.app.updater.stop() | |
await self.app.stop() | |
await self.app.shutdown() | |
def init_bot(): | |
load_dotenv() | |
logging.info("Initializing bot...") | |
try: | |
if os.getenv('SPACE_ID'): # Check if running on Hugging Face | |
logging.info("Running on Hugging Face, using custom settings...") | |
application = Application.builder().token(BOT_TOKEN).base_url( | |
"https://api.telegram.org/bot" | |
).get_updates_connection_pool_size(100).connection_pool_size(100).connect_timeout(30).read_timeout(30).write_timeout(30).pool_timeout(30).build() | |
return TelegramBot(bot_token=BOT_TOKEN, base_url=BASE_URL, application=application) | |
else: | |
logging.info("Running locally, using default settings...") | |
return TelegramBot(bot_token=BOT_TOKEN, base_url=BASE_URL) | |
except Exception as e: | |
logging.error(f"Error initializing bot: {str(e)}") | |
raise |