telebot / app.py
pranit144's picture
Upload 10 files
2bd8626 verified
import requests
import time
import json
import traceback
import threading
from datetime import datetime, timedelta
from flask import Flask, jsonify, render_template_string, send_file, request
import google.generativeai as genai
import os
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.date import DateTrigger
import pytz
import logging
import re
from typing import Optional, Dict, List, Any
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ---------------- CONFIG ----------------
# !!! IMPORTANT: Replace these with your actual tokens or use environment variables !!!
BOT_TOKEN = "8328709566:AAH7ZmdWuzGODlTByJ34yI5yR9e8otBokBc" # <<< REPLACE THIS with your Telegram Bot Token
GEMINI_API_KEY = "AIzaSyAfF-13nsrMdAAe3SFOPSxFya4EtfLBjho" # <<< REPLACE THIS with your Google Gemini API Key
# !!! ----------------------------------------------------------------------------- !!!
TELEGRAM_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
CHAT_LOG_FILE = "chat_log.json"
SCHEDULED_MESSAGES_FILE = "scheduled_messages.json"
SUGGESTED_TASKS_FILE = "suggested_tasks.json"
GENERAL_AI_RESPONSES_FILE = "general_ai_responses.json"
FARMERS_DATA_FILE = "farmers.json" # File for farmer-specific RAG data
MAX_MESSAGES = 2000
TIMEZONE = pytz.timezone('Asia/Kolkata') # Adjust to your local timezone (e.g., 'America/New_York', 'Europe/London')
FARM_UPDATE_RESPONSE_WINDOW_HOURS = 48 # Window for a farmer to respond to an update request
# Hardcoded messages for the UI dropdown
HARDCODED_MESSAGES_FOR_UI =[
{
"value": "What's the update regarding your farm? Please share details about your crops, any issues you're facing, and current farming activities.",
"text": "Ask for Farm Update"},
{"value": "How are your crops doing today?", "text": "Crops Check-in"},
{"value": "Reminder: Check irrigation system.", "text": "Irrigation Reminder"},
{"value": "Don't forget to monitor pest activity.", "text": "Pest Monitor Reminder"},
{"value": "Daily task: Fertilize Section A.", "text": "Fertilize Section A"},
{"value": "Weather looks good for harvesting. Consider starting tomorrow.", "text": "Harvest Weather Alert"},
{"value": "Time to check soil moisture levels.", "text": "Soil Moisture Check"},
{"value": "", "text": "Custom Message (Type Below)"}
]
# Rate limiting for Telegram API calls
REQUEST_RATE_LIMIT = 30 # requests per minute
rate_limit_tracker = {"count": 0, "reset_time": time.time() + 60}
# ----------------------------------------
# Flask app
app = Flask(__name__)
# Configure Gemini with error handling
try:
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel("gemini-2.0-flash-exp")
logger.info("Gemini AI configured successfully.")
except Exception as e:
logger.error(f"Failed to configure Gemini AI. AI features will be unavailable: {e}")
model = None # Set model to None if configuration fails
# Initialize scheduler
scheduler = BackgroundScheduler(timezone=TIMEZONE)
scheduler.start()
# Thread-safe stores
_messages_lock = threading.Lock()
_messages: List[Dict[str, Any]] = []
_scheduled_messages_lock = threading.Lock()
_scheduled_messages: List[Dict[str, Any]] = []
_suggested_tasks_lock = threading.Lock()
_suggested_tasks: List[Dict[str, Any]] = [] # Each entry is now a single task with message and reason
_general_ai_responses_lock = threading.Lock()
_general_ai_responses: List[Dict[str, Any]] = []
_farmer_data_lock = threading.Lock()
_farmer_data: List[Dict[str, Any]] = []
# State for tracking if we're awaiting a farm update response from a specific chat_id
_awaiting_update_response: Dict[str, datetime] = {}
def check_rate_limit() -> bool:
"""Checks if Telegram API request rate limit has been exceeded."""
current_time = time.time()
with _messages_lock: # Use a lock for shared rate_limit_tracker access
if current_time > rate_limit_tracker["reset_time"]:
rate_limit_tracker["count"] = 0
rate_limit_tracker["reset_time"] = current_time + 60
if rate_limit_tracker["count"] >= REQUEST_RATE_LIMIT:
return False
rate_limit_tracker["count"] += 1
return True
def sanitize_text(text: str, max_length: int = 4096) -> str:
"""Sanitizes text for Telegram message limits and removes invalid characters."""
if not text:
return ""
# Remove null characters which can cause issues with some APIs/databases
sanitized = text.replace('\x00', '').strip()
# Telegram message limit for sendMessage is 4096 characters (HTML mode can be slightly less due to tags)
if len(sanitized) > max_length:
sanitized = sanitized[:max_length - 3] + "..."
return sanitized
def save_messages_to_file():
"""Saves chat messages to a JSON file."""
try:
with _messages_lock:
with open(CHAT_LOG_FILE, "w", encoding="utf-8") as f:
json.dump(_messages, f, ensure_ascii=False, indent=2)
logger.debug(f"Saved {len(_messages)} messages to {CHAT_LOG_FILE}.")
except Exception as e:
logger.error(f"Failed to save messages to {CHAT_LOG_FILE}: {e}")
def save_scheduled_messages():
"""Saves scheduled messages to a JSON file."""
try:
with _scheduled_messages_lock:
with open(SCHEDULED_MESSAGES_FILE, "w", encoding="utf-8") as f:
json.dump(_scheduled_messages, f, ensure_ascii=False, indent=2)
logger.debug(f"Saved {len(_scheduled_messages)} scheduled messages to {SCHEDULED_MESSAGES_FILE}.")
except Exception as e:
logger.error(f"Failed to save scheduled messages to {SCHEDULED_MESSAGES_FILE}: {e}")
def save_suggested_tasks():
"""Saves AI-suggested tasks to a JSON file."""
try:
with _suggested_tasks_lock:
serializable_tasks = []
for task in _suggested_tasks:
temp_task = task.copy()
# Convert datetime objects to ISO strings for JSON serialization
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_task.get(key), datetime):
temp_task[key] = temp_task[key].isoformat()
serializable_tasks.append(temp_task)
with open(SUGGESTED_TASKS_FILE, "w", encoding="utf-8") as f:
json.dump(serializable_tasks, f, ensure_ascii=False, indent=2)
logger.debug(f"Saved {len(_suggested_tasks)} suggested tasks to {SUGGESTED_TASKS_FILE}.")
except Exception as e:
logger.error(f"Failed to save suggested tasks to {SUGGESTED_TASKS_FILE}: {e}")
def save_general_ai_responses():
"""Saves general AI responses (pending admin review) to a JSON file."""
try:
with _general_ai_responses_lock:
serializable_responses = []
for resp in _general_ai_responses:
temp_resp = resp.copy()
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_resp.get(key), datetime):
temp_resp[key] = temp_resp[key].isoformat()
serializable_responses.append(temp_resp)
with open(GENERAL_AI_RESPONSES_FILE, "w", encoding="utf-8") as f:
json.dump(serializable_responses, f, ensure_ascii=False, indent=2)
logger.debug(f"Saved {len(_general_ai_responses)} general AI responses to {GENERAL_AI_RESPONSES_FILE}.")
except Exception as e:
logger.error(f"Failed to save general AI responses to {GENERAL_AI_RESPONSES_FILE}: {e}")
def load_messages_from_file():
"""Loads chat messages from a JSON file."""
if not os.path.exists(CHAT_LOG_FILE):
logger.info(f"{CHAT_LOG_FILE} not found, starting with empty messages.")
return
try:
with open(CHAT_LOG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
with _messages_lock:
_messages.clear()
_messages.extend(data[-MAX_MESSAGES:]) # Load only the latest MAX_MESSAGES
logger.info(f"Loaded {len(_messages)} messages from {CHAT_LOG_FILE}.")
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse {CHAT_LOG_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.")
except Exception as e:
logger.error(f"Failed to load messages from {CHAT_LOG_FILE}: {e}")
def load_scheduled_messages():
"""Loads scheduled messages and re-schedules pending ones."""
if not os.path.exists(SCHEDULED_MESSAGES_FILE):
logger.info(f"{SCHEDULED_MESSAGES_FILE} not found, starting with empty scheduled messages.")
return
try:
with open(SCHEDULED_MESSAGES_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
with _scheduled_messages_lock:
_scheduled_messages.clear()
_scheduled_messages.extend(data)
rescheduled_count = 0
for entry in _scheduled_messages:
try:
# Ensure scheduled_time is timezone-aware
scheduled_time = datetime.fromisoformat(entry["scheduled_time"])
if scheduled_time.tzinfo is None:
scheduled_time = TIMEZONE.localize(scheduled_time) # Localize if naive
# Only re-schedule if pending and in the future
if entry["status"] == "pending" and scheduled_time > datetime.now(TIMEZONE):
scheduler.add_job(
func=send_scheduled_message,
trigger=DateTrigger(run_date=scheduled_time),
args=[entry],
id=entry["id"],
replace_existing=True # Important to prevent duplicate jobs on restart
)
rescheduled_count += 1
# If it was a sent farm update request, track it for response window management
if entry["type"] == "farm_update_request" and entry["status"] == "sent":
sent_time_str = entry.get("sent_time",
entry["scheduled_time"]) # Fallback to scheduled if sent_time missing
sent_or_scheduled_time = datetime.fromisoformat(sent_time_str)
if sent_or_scheduled_time.tzinfo is None:
sent_or_scheduled_time = TIMEZONE.localize(sent_or_scheduled_time)
_awaiting_update_response[str(entry["chat_id"])] = sent_or_scheduled_time
except KeyError as ke:
logger.error(f"Missing key in scheduled message entry ID '{entry.get('id', 'unknown')}': {ke}")
except ValueError as ve: # fromisoformat error
logger.error(
f"Invalid datetime format in scheduled message entry ID '{entry.get('id', 'unknown')}': {ve}")
except Exception as e:
logger.error(f"Failed to reschedule entry ID '{entry.get('id', 'unknown')}': {e}")
logger.info(
f"Loaded {len(_scheduled_messages)} scheduled messages, rescheduled {rescheduled_count} pending jobs.")
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse {SCHEDULED_MESSAGES_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.")
except Exception as e:
logger.error(f"Failed to load scheduled messages from {SCHEDULED_MESSAGES_FILE}: {e}")
def load_suggested_tasks():
"""Loads AI-suggested tasks from a JSON file."""
if not os.path.exists(SUGGESTED_TASKS_FILE):
logger.info(f"{SUGGESTED_TASKS_FILE} not found, starting with empty suggested tasks.")
return
try:
with open(SUGGESTED_TASKS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
with _suggested_tasks_lock:
_suggested_tasks.clear()
for task in data:
# Convert ISO strings back to datetime objects
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if key in task and task[key] is not None:
try:
task[key] = datetime.fromisoformat(task[key])
except (ValueError, TypeError):
logger.warning(
f"Invalid datetime format for key '{key}' in task ID '{task.get('id')}': {task[key]}. Setting to None.")
task[key] = None
_suggested_tasks.append(task)
logger.info(f"Loaded {len(_suggested_tasks)} suggested tasks from {SUGGESTED_TASKS_FILE}.")
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse {SUGGESTED_TASKS_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.")
except Exception as e:
logger.error(f"Failed to load suggested tasks from {SUGGESTED_TASKS_FILE}: {e}")
def load_general_ai_responses():
"""Loads general AI responses (pending admin review) from a JSON file."""
if not os.path.exists(GENERAL_AI_RESPONSES_FILE):
logger.info(f"{GENERAL_AI_RESPONSES_FILE} not found, starting with empty general AI responses.")
return
try:
with open(GENERAL_AI_RESPONSES_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
with _general_ai_responses_lock:
_general_ai_responses.clear()
for resp in data:
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if key in resp and resp[key] is not None:
try:
resp[key] = datetime.fromisoformat(resp[key])
except (ValueError, TypeError):
logger.warning(
f"Invalid datetime format for key '{key}' in general AI response ID '{resp.get('id')}': {resp[key]}. Setting to None.")
resp[key] = None
_general_ai_responses.append(resp)
logger.info(f"Loaded {len(_general_ai_responses)} general AI responses from {GENERAL_AI_RESPONSES_FILE}.")
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse {GENERAL_AI_RESPONSES_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.")
except Exception as e:
logger.error(f"Failed to load general AI responses from {GENERAL_AI_RESPONSES_FILE}: {e}")
def load_farmer_data():
"""Loads farmer-specific data from farmers.json for RAG."""
if not os.path.exists(FARMERS_DATA_FILE):
logger.warning(
f"{FARMERS_DATA_FILE} not found. RAG functionality for tasks will be limited or unavailable. Please create it if needed.")
with _farmer_data_lock:
_farmer_data.clear()
return
try:
with open(FARMERS_DATA_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
with _farmer_data_lock:
_farmer_data.clear()
_farmer_data.extend(data)
logger.info(f"Loaded {len(_farmer_data)} farmer profiles from {FARMERS_DATA_FILE}.")
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse {FARMERS_DATA_FILE} (JSON error): {e}. Ensure it's valid JSON. RAG functionality will be limited.")
except Exception as e:
logger.error(f"Failed to load farmer data from {FARMERS_DATA_FILE}: {e}")
def get_farmer_profile(chat_id: str) -> Optional[Dict]:
"""Retrieves a farmer's profile by chat_id from loaded data."""
with _farmer_data_lock:
for farmer in _farmer_data:
if str(farmer.get("chat_id")) == str(chat_id):
return farmer
return None
def append_message(role: str, username: str, chat_id: str, text: str):
"""Appends a message to the in-memory chat log and saves it to file."""
if not text:
logger.debug(f"Attempted to append empty message for role '{role}' from {username} ({chat_id}). Skipping.")
return
text = sanitize_text(text)
entry = {
"role": role,
"username": username,
"chat_id": str(chat_id),
"text": text,
"ts": datetime.utcnow().isoformat() + "Z"
}
with _messages_lock:
_messages.append(entry)
if len(_messages) > MAX_MESSAGES:
_messages[:] = _messages[-MAX_MESSAGES:] # Keep only the latest messages
save_messages_to_file()
def send_message(chat_id: str, text: str) -> Optional[Dict]:
"""Sends a message to Telegram, respecting rate limits."""
if not check_rate_limit():
logger.warning(f"Telegram API rate limit exceeded for chat_id {chat_id}. Message skipped: '{text[:50]}...'.")
return None
text = sanitize_text(text)
if not text:
logger.warning(f"Attempted to send empty message to chat_id {chat_id}. Skipping.")
return None
url = f"{TELEGRAM_API_URL}/sendMessage"
payload = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"} # Use HTML for basic formatting
try:
r = requests.post(url, json=payload, timeout=10)
r.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
result = r.json()
if not result.get("ok"):
logger.error(
f"Telegram API reported error sending to {chat_id}: {result.get('description', 'Unknown error')}. Message: '{text[:50]}...'.")
return None
logger.debug(f"Successfully sent Telegram message to {chat_id}: '{text[:50]}...'.")
return result
except requests.exceptions.Timeout:
logger.error(f"Timeout sending Telegram message to {chat_id}. Message: '{text[:50]}...'.")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Network error sending Telegram message to {chat_id}: {e}. Message: '{text[:50]}...'.")
return None
except Exception as e:
logger.error(f"Unexpected error sending Telegram message to {chat_id}: {e}. Message: '{text[:50]}...'.")
return None
def schedule_message(chat_id: str, username: str, message_text: str, scheduled_time: datetime,
message_type: str = "reminder") -> Optional[Dict]:
"""Schedules a message to be sent at a future time."""
if not message_text or not chat_id:
logger.error("Cannot schedule message: Missing message text or chat_id.")
return None
# Ensure scheduled_time is in the future
if scheduled_time <= datetime.now(TIMEZONE):
logger.error(
f"Cannot schedule message for {username} ({chat_id}): Scheduled time ({scheduled_time}) is in the past. Message: '{message_text[:50]}...'.")
return None
# Generate a unique ID for the scheduled entry
timestamp = int(scheduled_time.timestamp())
# Use a hash of the first 50 chars of message_text for uniqueness in the ID
text_hash = hash(message_text[:50]) % 10000
unique_id = f"{chat_id}_{message_type}_{timestamp}_{text_hash}"
scheduled_entry = {
"id": unique_id,
"chat_id": str(chat_id),
"username": username,
"message": sanitize_text(message_text),
"scheduled_time": scheduled_time.isoformat(),
"type": message_type,
"status": "pending",
"created_at": datetime.now(TIMEZONE).isoformat()
}
try:
with _scheduled_messages_lock:
_scheduled_messages.append(scheduled_entry)
save_scheduled_messages()
scheduler.add_job(
func=send_scheduled_message,
trigger=DateTrigger(run_date=scheduled_time),
args=[scheduled_entry],
id=scheduled_entry["id"],
replace_existing=True # Important: Prevents duplicate jobs if app restarts and tries to reschedule same ID
)
logger.info(
f"Scheduled '{message_type}' message for {username} ({chat_id}) (ID: {unique_id}) at {scheduled_time.strftime('%Y-%m-%d %H:%M %Z')}.")
return scheduled_entry
except Exception as e:
logger.error(f"Failed to schedule message for {username} ({chat_id}) (ID: {unique_id}): {e}")
return None
def send_scheduled_message(scheduled_entry: Dict):
"""Callback function executed by APScheduler to send a scheduled message."""
try:
chat_id = scheduled_entry["chat_id"]
message = scheduled_entry["message"]
recipient_username = scheduled_entry["username"]
logger.info(
f"Attempting to send scheduled message '{scheduled_entry['type']}' to {recipient_username} ({chat_id}). Message: '{message[:50]}...'.")
result = send_message(chat_id, message)
current_time = datetime.now(TIMEZONE).isoformat()
if result:
append_message("bot", f"Bot (Scheduled for {recipient_username})", chat_id, message)
with _scheduled_messages_lock:
for msg in _scheduled_messages:
if msg["id"] == scheduled_entry["id"]:
msg["status"] = "sent"
msg["sent_time"] = current_time
break
save_scheduled_messages()
logger.info(f"Successfully sent scheduled message to {recipient_username} ({chat_id}).")
# If a farm update request was just sent, start tracking the response window
if scheduled_entry["type"] == "farm_update_request":
_awaiting_update_response[str(chat_id)] = datetime.now(TIMEZONE)
logger.info(
f"Now awaiting farm update response from {recipient_username} for {FARM_UPDATE_RESPONSE_WINDOW_HOURS} hours (chat_id: {chat_id}).")
else:
with _scheduled_messages_lock:
for msg in _scheduled_messages:
if msg["id"] == scheduled_entry["id"]:
msg["status"] = "failed"
msg["failed_time"] = current_time
break
save_scheduled_messages()
logger.error(
f"Failed to send scheduled message to {recipient_username} ({chat_id}). Marking as 'failed'. Message: '{message[:50]}...'.")
except Exception as e:
logger.error(f"Critical error in send_scheduled_message for ID '{scheduled_entry.get('id', 'unknown')}': {e}")
traceback.print_exc()
def generate_and_store_farm_tasks(user_message: str, chat_id: str, username: str) -> str:
"""
Generates 3-4 specific, short, descriptive daily tasks with a reason (<50 words)
based on farmer's update AND farmer's profile (RAG), and stores each task separately for admin review.
Does NOT send any message directly to the farmer.
"""
if not model:
logger.error(f"Gemini model not available for task generation for {username}.")
return ""
try:
farmer_profile = get_farmer_profile(chat_id)
profile_context = ""
if farmer_profile and farmer_profile.get("farm_details"):
details = farmer_profile["farm_details"]
profile_context = (
f"\n\nContext about {username}'s farm (use this for tailored advice):\n"
f"- Location: {details.get('location', 'N/A')}\n"
f"- Main Crops: {', '.join(details.get('main_crops', []))}\n"
f"- Soil Type: {details.get('soil_type', 'N/A')}\n"
f"- Typical Issues: {', '.join(details.get('typical_issues', []))}\n"
f"- Last reported weather: {details.get('last_reported_weather', 'N/A')}\n"
f"- Irrigation System: {details.get('irrigation_system', 'N/A')}\n"
)
else:
profile_context = "\n\n(No detailed farmer profile data found. Generating tasks based solely on the update.)"
prompt = (
f"You are a highly experienced and practical agricultural expert. Based on the farmer's update below "
f"and their farm context (if provided), generate exactly 3 to 4 critical daily tasks. "
f"Each task must be concise, actionable, and include a 'Reason:' why it's important. "
f"Ensure each 'Reason:' is strictly under 50 words. Prioritize immediate needs and common farming practices."
f"\n\nStrictly follow this output format for each task:\n"
f"Task: [Short, descriptive task, e.g., 'Check soil moisture in cornfield']\n"
f"Reason: [Brief explanation why (under 50 words), e.g., 'Ensure optimal water levels after dry spell.']\n"
f"---\n"
f"Task: [Another short, descriptive task]\n"
f"Reason: [Brief explanation why (under 50 words)]\n"
f"---\n"
f"(Repeat 3 or 4 times. Ensure each task block is clearly separated by '---' on its own line.)"
f"\n\nFarmer: {username}\n"
f"Farmer's Update: '{user_message}'"
f"{profile_context}"
f"\n\nNow, generate the 3-4 tasks and reasons in the specified format:"
)
logger.info(
f"Calling Gemini for RAG task generation for {username} (chat_id: {chat_id}) based on update: '{user_message[:100]}...'.")
response = model.generate_content(prompt)
ai_raw_output = response.text if response and response.text else "Could not generate tasks based on the update."
logger.debug(f"Raw AI output for tasks: {ai_raw_output[:500]}...")
# Parse the AI's output into individual tasks
parsed_tasks = []
# Regex to find blocks starting with "Task:" and containing "Reason:", separated by "---"
# Adjusted regex for better robustness to handle various line endings and trailing whitespace.
task_blocks = re.findall(r"Task:\s*(.*?)\nReason:\s*(.*?)(?:\n---\s*|$)", ai_raw_output,
re.DOTALL | re.IGNORECASE)
for task_text, reason_text in task_blocks:
parsed_tasks.append({
"task_message": task_text.strip(),
"task_reason": reason_text.strip()
})
if not parsed_tasks:
logger.warning(
f"AI did not generate structured tasks for {username} in expected format. Providing fallback task(s). Raw output: {ai_raw_output[:200]}")
# Fallback if parsing fails or AI doesn't follow format. Always ensure at least one review item.
fallback_message = "AI did not generate specific tasks in the expected format. Manual review of farmer's update is required."
if len(ai_raw_output) > 50: # If AI gave some output, add it as context to the fallback
fallback_message += f" AI's raw response: {sanitize_text(ai_raw_output, max_length=150)}."
parsed_tasks.append({
"task_message": "Manual Review Required",
"task_reason": sanitize_text(fallback_message, max_length=200)
})
generation_time = datetime.now(TIMEZONE)
# Store each individual parsed task for separate admin scheduling
with _suggested_tasks_lock:
for i, task_item in enumerate(parsed_tasks):
# Create a unique ID for each individual task item
task_id = f"suggested_task_{chat_id}_{int(generation_time.timestamp())}_{i}_{hash(task_item['task_message']) % 10000}"
task_entry = {
"id": task_id,
"chat_id": str(chat_id),
"username": username,
"original_update": user_message, # Keep original update for context in UI
"suggested_task_message": sanitize_text(task_item["task_message"]),
"suggested_task_reason": sanitize_text(task_item["task_reason"]),
"generation_time": generation_time,
"status": "pending_review"
}
_suggested_tasks.append(task_entry)
save_suggested_tasks()
logger.info(
f"Generated and stored {len(parsed_tasks)} individual tasks for {username} (chat_id: {chat_id}) for admin review.")
return "" # No immediate response to farmer
except Exception as e:
logger.error(f"Error during farm task generation/storage for {username} (chat_id: {chat_id}): {e}")
traceback.print_exc()
return "" # No immediate response to farmer on AI error
def get_ai_response_with_context(user_message: str, chat_id: str) -> str:
"""Gets a general AI response for a farmer's query, considering general farming context."""
if not model:
logger.error("Gemini model not available for general query, returning fallback message.")
return "AI service is currently unavailable. Please try again later."
try:
# Include general farming context in the prompt
prompt = (
f"You are a helpful and knowledgeable agricultural assistant. Provide a concise and direct answer "
f"to the following query from a farmer. Keep the response factual and actionable where possible. "
f"Limit the response to a few sentences (max 3-4 sentences)."
f"\n\nFarmer's Query: {user_message}"
)
logger.info(f"Generating general AI response for {chat_id} (query: '{user_message[:50]}...')...")
response = model.generate_content(prompt)
ai_reply = response.text if response and response.text else "I couldn't process your request right now."
return sanitize_text(ai_reply)
except Exception as e:
logger.error(f"AI response error for chat_id {chat_id} (query: '{user_message[:50]}...'): {e}")
traceback.print_exc()
return "I'm having trouble processing your request right now. Please try again later."
def store_general_ai_response(chat_id: str, username: str, original_query: str, ai_response: str):
"""Stores a general AI response (generated from farmer's general query) for admin review."""
if not ai_response:
logger.warning(f"Attempted to store empty general AI response for {username} ({chat_id}). Skipping.")
return
response_entry = {
"id": f"general_ai_{chat_id}_{int(datetime.now().timestamp())}_{hash(original_query) % 10000}",
"chat_id": str(chat_id),
"username": username,
"original_query": sanitize_text(original_query),
"ai_response": sanitize_text(ai_response),
"generation_time": datetime.now(TIMEZONE),
"status": "pending_review"
}
with _general_ai_responses_lock:
_general_ai_responses.append(response_entry)
save_general_ai_responses()
logger.info(f"Stored general AI response for {username} ({chat_id}) (ID: {response_entry['id']}).")
# ---------------- Polling logic ----------------
def delete_webhook(drop_pending: bool = True) -> Optional[Dict]:
"""Deletes the Telegram webhook to switch to long polling."""
try:
params = {"drop_pending_updates": "true" if drop_pending else "false"}
r = requests.post(f"{TELEGRAM_API_URL}/deleteWebhook", params=params, timeout=10)
r.raise_for_status()
result = r.json()
if result.get("ok"):
logger.info("Telegram webhook deleted successfully (ensuring long polling).")
return result
except requests.exceptions.RequestException as e:
logger.error(f"Failed to delete Telegram webhook: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error deleting Telegram webhook: {e}")
return None
def get_updates(offset: Optional[int] = None, timeout_seconds: int = 60) -> Optional[Dict]:
"""Fetches updates from Telegram using long polling."""
params = {"timeout": timeout_seconds}
if offset:
params["offset"] = offset
try:
r = requests.get(f"{TELEGRAM_API_URL}/getUpdates", params=params, timeout=timeout_seconds + 10)
r.raise_for_status()
return r.json()
except requests.exceptions.Timeout:
logger.debug("Telegram polling timed out (normal, no new messages).")
return {"ok": True, "result": []} # Return empty result on timeout to continue polling
except requests.exceptions.RequestException as e:
logger.error(f"Network error getting updates from Telegram: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error getting updates from Telegram: {e}")
return None
def polling_loop():
"""Main loop for long-polling Telegram for new messages."""
logger.info("Starting Telegram polling thread.")
delete_webhook(drop_pending=True) # Ensure webhook is off for polling
offset = None
consecutive_errors = 0
while True:
try:
updates = get_updates(offset=offset, timeout_seconds=60)
if updates is None: # Critical network error, get_updates returned None
consecutive_errors += 1
logger.error(
f"get_updates returned None due to error. Consecutive errors: {consecutive_errors}. Sleeping for {min(consecutive_errors * 5, 120)}s.")
time.sleep(min(consecutive_errors * 5, 120)) # Exponential backoff up to 2 min
continue
if not updates.get("ok"):
logger.warning(
f"Telegram API returned 'not ok' for getUpdates: {updates.get('description', 'No description')}. Consecutive errors: {consecutive_errors}.")
consecutive_errors += 1
time.sleep(min(consecutive_errors * 2, 60)) # Exponential backoff up to 1 min
continue
consecutive_errors = 0 # Reset error counter on successful API call
results = updates.get("result", [])
for update in results:
try:
process_update(update)
offset = update.get("update_id", 0) + 1
except Exception as e:
logger.error(f"Error processing individual update ID {update.get('update_id', 'unknown')}: {e}")
traceback.print_exc()
# Small delay to prevent busy-waiting if Telegram returns many empty results quickly
if not results:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Telegram polling stopped by user (KeyboardInterrupt).")
break
except Exception as e:
consecutive_errors += 1
logger.critical(f"Unhandled critical error in polling loop: {e}")
time.sleep(min(consecutive_errors * 10, 300)) # Longer exponential backoff for unhandled errors
def process_update(update: Dict):
"""Processes a single Telegram update message."""
msg = update.get("message") or update.get("edited_message")
if not msg:
logger.debug(f"Update ID {update.get('update_id')}: No message or edited_message found. Skipping.")
return
chat = msg.get("chat", {})
chat_id = str(chat.get("id"))
username = chat.get("first_name") or chat.get("username") or "User"
text = msg.get("text") or msg.get("caption") or ""
if not text:
# If no text (e.g., photo, sticker), send a polite message and append to log
acknowledgment_text = "I can only respond to text messages. Please send your queries as text."
send_message(chat_id, acknowledgment_text)
append_message("user", username, chat_id, "(Non-text message received)")
append_message("bot", "Bot", chat_id, acknowledgment_text)
logger.debug(f"Received non-text message from {username} ({chat_id}). Sent acknowledgment.")
return
append_message("user", username, chat_id, text)
logger.info(f"Processing message from {username} ({chat_id}): '{text[:50]}...'.")
bot_acknowledgment_to_farmer = "" # This is the only message sent directly to the farmer
# Check if we are awaiting a farm update response from this chat_id
most_recent_request_time = _awaiting_update_response.get(chat_id)
if (most_recent_request_time and
(datetime.now(TIMEZONE) - most_recent_request_time) <= timedelta(hours=FARM_UPDATE_RESPONSE_WINDOW_HOURS)):
logger.info(f"Recognized as farm update response from {username}. Generating tasks for admin review.")
generate_and_store_farm_tasks(text, chat_id, username) # Generates and stores tasks
# Send a generic acknowledgment to the farmer
bot_acknowledgment_to_farmer = "Thank you for your farm update! The admin will review it, generate daily tasks, and schedule them to be sent to you shortly."
# Remove from tracking after processing this update
_awaiting_update_response.pop(chat_id, None)
logger.info(f"Finished processing farm update for {username} ({chat_id}). Awaiting response tracker cleared.")
else:
# This is a general query. Generate AI response but store it for admin scheduling.
logger.info(f"Processing general query from {username}. Generating AI response for admin scheduling.")
ai_generated_response = get_ai_response_with_context(text, chat_id)
# Only store and acknowledge if AI generated a meaningful response
if ai_generated_response and ai_generated_response != "AI service is currently unavailable. Please try again later.":
store_general_ai_response(chat_id, username, text, ai_generated_response)
bot_acknowledgment_to_farmer = f"I've received your query about '{text[:50]}...'. The admin will review the AI's suggested response and schedule it to be sent to you."
else: # AI service unavailable or response generation failed
bot_acknowledgment_to_farmer = "I've received your message. The admin will get back to you shortly."
logger.warning(
f"AI failed to generate a response for general query from {username} ({chat_id}). Admin will manually review.")
if bot_acknowledgment_to_farmer:
append_message("bot", "Bot", chat_id, bot_acknowledgment_to_farmer)
send_message(chat_id, bot_acknowledgment_to_farmer)
else:
logger.debug(f"No direct acknowledgment message generated for {username} ({chat_id}) after processing.")
# ---------------- Flask routes & UI ----------------
# HTML template (with updated section titles and new general AI response section)
HTML_TEMPLATE = """
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<title>Farm Bot Admin Panel</title>
<meta name="viewport" content="width=device-width,initial-scale=1">
<style>
/* Define a CSS variable for consistent spacing */
:root {
--spacing-xs: 4px;
--spacing-s: 8px;
--spacing-m: 12px;
--spacing-l: 16px;
--spacing-xl: 24px;
}
body {
font-family: Inter, Arial, sans-serif;
background:#f5f7fb;
margin:0;
padding:0;
display:flex;
flex-direction:column;
height:100vh;
}
header {
background:#243b55;
color:#fff;
padding:var(--spacing-m) var(--spacing-l);
display:flex;
justify-content:space-between;
align-items:center;
}
.status-indicator {
padding:var(--spacing-xs) var(--spacing-s);
border-radius:var(--spacing-xs);
font-size:12px;
}
.status-indicator.online { background:#27ae60; }
.status-indicator.offline { background:#e74c3c; }
.container {
display:flex;
gap:var(--spacing-l);
padding:var(--spacing-l);
height:calc(100vh - 64px); /* Account for header height */
box-sizing:border-box;
}
.chat {
flex:1;
background:#fff;
border-radius:10px;
padding:var(--spacing-m); /* Consistent padding */
box-shadow:0 4px 20px rgba(0,0,0,0.06);
overflow:auto;
display:flex;
flex-direction:column;
}
.meta {
width:350px;
background:#fff;
border-radius:10px;
padding:var(--spacing-m); /* Consistent padding */
box-shadow:0 4px 20px rgba(0,0,0,0.06);
display:flex;
flex-direction:column;
max-height:calc(100vh - 100px); /* Adjust based on container/header */
overflow-y:auto;
}
.msg {
margin:var(--spacing-s) 0;
padding:var(--spacing-s);
border-radius:8px;
max-width:80%;
word-wrap:break-word;
}
.msg.user { background:#e6f0ff; align-self:flex-start; }
.msg.bot { background:#f1f4f8; align-self:flex-end; text-align:left; }
.meta button {
margin-top:var(--spacing-s);
padding:var(--spacing-s) var(--spacing-m);
border:none;
border-radius:6px;
cursor:pointer;
background:#243b55;
color:#fff;
transition:background 0.2s;
font-size:14px; /* Ensure consistency */
line-height: 1; /* Normalize line-height for button text */
}
.meta button:hover { background:#1a2d42; }
.meta button.danger { background:#c0392b; }
.meta button.danger:hover { background:#a93226; }
.top-row {
display:flex;
justify-content:space-between;
align-items:center;
margin-bottom:var(--spacing-s);
}
.small { font-size:12px; color:#666; }
.timestamp { font-size:11px; color:#888; margin-top:var(--spacing-xs); }
/* Section separators and headings */
.scheduled-section,
.suggested-tasks-section,
.general-ai-responses-section,
.schedule-new-message-section { /* Added a class for consistent styling */
margin-top:var(--spacing-l);
padding-top:var(--spacing-l);
border-top:1px solid #eee;
}
.scheduled-section strong,
.suggested-tasks-section strong,
.general-ai-responses-section strong,
.schedule-new-message-section strong {
display: block; /* Make strong take full width */
margin-bottom: var(--spacing-s); /* Space below the heading text */
font-size: 16px; /* Slightly larger heading for sections */
color: #243b55; /* Match header color for importance */
}
.scheduled-msg {
background:#fff9e6;
padding:var(--spacing-s);
margin:var(--spacing-xs) 0;
border-radius:6px;
border-left:3px solid #f39c12;
}
.scheduled-msg.sent { background:#e8f5e8; border-left-color:#27ae60; }
.scheduled-msg.failed { background:#ffeaea; border-left-color:#e74c3c; }
.status {
font-size:10px;
text-transform:uppercase;
font-weight:bold;
margin-left:var(--spacing-s); /* Added margin-left for spacing */
}
.status.pending { color:#f39c12; }
.status.sent { color:#27ae60; }
.status.failed { color:#e74c3c; }
.suggested-task-entry, .general-ai-response-entry {
background:#e6f7ff;
padding:var(--spacing-s);
margin:var(--spacing-xs) 0;
border-radius:6px;
border-left:3px solid #3498db;
}
.suggested-task-entry.scheduled, .general-ai-response-entry.scheduled { background:#e8f5e8; border-left-color:#27ae60; }
.suggested-task-entry.discarded, .general-ai-response-entry.discarded { background:#ffeaea; border-left-color:#e74c3c; opacity: 0.7; }
.suggested-task-entry .header, .general-ai-response-entry .header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: var(--spacing-xs);
}
.suggested-task-entry .content, .general-ai-response-entry .content {
font-size: 12px;
margin-bottom: var(--spacing-xs);
max-height:100px;
overflow-y:auto;
}
.suggested-task-entry .actions, .general-ai-response-entry .actions {
display: flex;
gap: var(--spacing-xs);
margin-top: var(--spacing-s);
}
/* Keep specific button sizes within actions, but apply line-height */
.suggested-task-entry .actions button, .general-ai-response-entry .actions button {
padding: 6px 10px;
font-size: 12px;
margin-top:0;
line-height: 1;
}
.meta label {
font-size: 14px;
color: #333;
margin-bottom: var(--spacing-xs);
display: block;
}
/* Unified styling for form inputs and selects */
.meta input[type="text"],
.meta input[type="datetime-local"],
.meta select,
.meta textarea {
width: calc(100% - (2 * var(--spacing-s))); /* Account for padding */
padding: var(--spacing-s); /* Standardized padding */
margin-bottom: var(--spacing-s); /* Consistent bottom margin */
border: 1px solid #ddd;
border-radius: 4px;
box-sizing: border-box;
font-family: Inter, Arial, sans-serif;
font-size: 14px; /* Consistent font size */
}
.meta button.schedule-btn { background:#007bff; }
.meta button.discard-btn { background:#e74c3c; }
.error-message {
background:#ffeaea;
color:#c0392b;
padding:var(--spacing-s);
border-radius:4px;
margin:var(--spacing-s) 0;
font-size:12px;
}
.success-message {
background:#e8f5e8;
color:#27ae60;
padding:var(--spacing-s);
border-radius:4px;
margin:var(--spacing-s) 0;
font-size:12px;
}
.loading { opacity:0.7; pointer-events:none; }
/* Improve AI response pre-formatted text styling */
.general-ai-response-entry .content pre {
background-color: #f9f9f9; /* Light background */
border-left: 3px solid #b3d9ff; /* Subtle left border */
padding: var(--spacing-s);
margin: var(--spacing-s) 0;
border-radius: 4px;
white-space: pre-wrap; /* Ensure wrapping */
font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, Courier, monospace; /* Monospace font */
font-size: 12px; /* Keep it relatively small */
}
</style>
</head>
<body>
<header>
<strong>Farm Bot Admin Panel</strong>
<div class="status-indicator online" id="connection-status">Online</div>
</header>
<div class="container">
<div class="chat" id="chat">
<!-- messages will be injected here -->
</div>
<div class="meta">
<div class="top-row">
<div><strong>Controls</strong><div class="small">Auto-updates every 2s</div></div>
<div><span id="count">0</span> messages</div>
</div>
<div id="status-messages"></div>
<button onclick="downloadLog()">Download Chat JSON</button>
<button onclick="clearAllData()" class="danger">Clear All Data & Reset</button>
<div class="scheduled-section">
<strong>Scheduled Messages</strong> (<span id="scheduled-count">0</span>)
<div class="small" style="margin-bottom:var(--spacing-s);">All messages scheduled for farmers (pending or sent).</div>
<div id="scheduled-messages"></div>
<button onclick="downloadScheduled()" style="background:#27ae60;">Download Scheduled JSON</button>
</div>
<div class="suggested-tasks-section">
<strong>AI Generated Daily Tasks</strong> (<span id="suggested-count">0</span> pending)
<div class="small" style="margin-bottom:var(--spacing-s);">Individual tasks generated by AI from farmer updates, awaiting admin review and scheduling.</div>
<div id="suggested-tasks-list"></div>
<button onclick="downloadSuggested()" style="background:#f39c12;">Download Suggested Tasks JSON</button>
</div>
<div class="general-ai-responses-section">
<strong>AI Generated General Responses</strong> (<span id="general-ai-count">0</span> pending)
<div class="small" style="margin-bottom:var(--spacing-s);">AI responses to general farmer queries, awaiting admin review and scheduling.</div>
<div id="general-ai-responses-list"></div>
<button onclick="downloadGeneralAI()" style="background:#8e44ad;">Download General AI Responses JSON</button>
</div>
<div class="schedule-new-message-section">
<strong>Schedule New Message</strong>
<div>
<label for="schedule-chat-id">Send to:</label>
<select id="schedule-chat-id"></select>
</div>
<div>
<label for="message-template">Message Template:</label>
<select id="message-template" onchange="updateMessageInput()">
</select>
</div>
<div id="custom-message-div" style="display:none;">
<label for="schedule-message">Custom Message:</label>
<textarea id="schedule-message" rows="3" placeholder="Enter custom message"></textarea>
</div>
<div>
<label for="schedule-time">Time:</label>
<input type="datetime-local" id="schedule-time">
</div>
<div>
<label for="message-type">Message Type:</label>
<select id="message-type">
<option value="reminder">General Reminder</option>
<option value="farm_update_request">Farm Update Request</option>
<option value="daily_tasks">Daily Tasks</option>
<option value="general_ai_response">General AI Response</option>
</select>
</div>
<button onclick="scheduleMessageFromUI()" class="schedule-btn">Schedule Message</button>
</div>
</div>
</div>
<script>
// This line is now dynamically populated by Flask's render_template_string
const HARDCODED_MESSAGES_FOR_UI = {{ HARDCODED_MESSAGES_FOR_UI | tojson }};
let lastUpdateTime = 0;
let connectionOk = true; // Tracks if the Flask backend is currently reachable
function showMessage(text, type = 'info') {
const container = document.getElementById('status-messages');
if (!container) { console.error("Status message container not found."); return; }
// Remove oldest messages if too many to prevent UI clutter
while (container.children.length > 3) {
container.removeChild(container.firstChild);
}
const div = document.createElement('div');
div.className = type === 'error' ? 'error-message' : 'success-message';
div.textContent = text;
container.appendChild(div);
setTimeout(() => div.remove(), 7000); // Messages disappear after 7 seconds
}
function setLoading(element, loading) {
if (element) { // Check if element exists before manipulating
if (loading) {
element.classList.add('loading');
element.disabled = true; // Disable button when loading to prevent double clicks
} else {
element.classList.remove('loading');
element.disabled = false; // Re-enable button
}
}
}
function updateConnectionStatus(online) {
const indicator = document.getElementById('connection-status');
if (indicator) { // Check if indicator exists
if (online) {
indicator.textContent = 'Online';
indicator.className = 'status-indicator online';
} else {
indicator.textContent = 'Offline';
indicator.className = 'status-indicator offline';
}
}
connectionOk = online; // Update global state
}
// Standardized offset for all scheduling inputs
function getCurrentDateTimeLocal(offsetMinutes = 5) {
const now = new Date();
now.setMinutes(now.getMinutes() + offsetMinutes);
// Ensure correct format YYYY-MM-DDTHH:MM for datetime-local
return now.toISOString().slice(0, 16);
}
function populateMessageTemplates() {
const select = document.getElementById("message-template");
if (!select) return; // Guard against element not found
select.innerHTML = "";
// Add the "Custom Message" option first
const customOption = document.createElement("option");
customOption.value = ""; // Empty value for custom message
customOption.innerText = "Custom Message (Type Below)";
select.appendChild(customOption);
HARDCODED_MESSAGES_FOR_UI.forEach(msg => {
const option = document.createElement("option");
option.value = msg.value;
option.innerText = msg.text;
select.appendChild(option);
});
updateMessageInput(); // Set initial state for custom message input
}
function updateMessageInput() {
const templateSelect = document.getElementById("message-template");
const customMessageDiv = document.getElementById("custom-message-div");
const scheduleMessageTextarea = document.getElementById("schedule-message");
if (!templateSelect || !customMessageDiv || !scheduleMessageTextarea) return; // Guards
if (templateSelect.value === "") { // "Custom Message (Type Below)" option
customMessageDiv.style.display = "block";
scheduleMessageTextarea.value = ""; // Clear for new custom message
scheduleMessageTextarea.focus(); // Focus on custom message input for user convenience
} else {
customMessageDiv.style.display = "none";
scheduleMessageTextarea.value = templateSelect.value;
}
}
function renderMessages(msgs) {
const chat = document.getElementById("chat");
if (!chat) { console.error("Chat window element not found."); return; }
const wasScrolledToBottom = chat.scrollHeight - chat.clientHeight <= chat.scrollTop + 1; // Check if already at bottom
chat.innerHTML = ""; // Clear existing messages
if (msgs.length === 0) {
chat.innerHTML = '<div style="text-align:center; color:#666; margin-top:20px;">No messages yet. Start a conversation with your bot!</div>';
document.getElementById("count").innerText = 0;
return;
}
msgs.forEach(m => {
const div = document.createElement("div");
div.className = "msg " + (m.role === "user" ? "user" : "bot");
const who = document.createElement("div");
who.innerHTML = "<strong>" + (m.role === "user" ? (m.username || "User") : "Bot") + "</strong>";
const txt = document.createElement("div");
txt.innerText = m.text;
const ts = document.createElement("div");
ts.className = "timestamp";
ts.innerText = new Date(m.ts).toLocaleString();
div.appendChild(who);
div.appendChild(txt);
div.appendChild(ts);
chat.appendChild(div);
});
if (wasScrolledToBottom) { // Only scroll to bottom if user was already there
chat.scrollTop = chat.scrollHeight;
}
document.getElementById("count").innerText = msgs.length;
}
function renderScheduledMessages(scheduled) {
const container = document.getElementById("scheduled-messages");
if (!container) return;
container.innerHTML = "";
if (scheduled.length === 0) {
container.innerHTML = '<div class="small" style="text-align:center; color:#666;">No scheduled messages</div>';
document.getElementById("scheduled-count").innerText = 0;
return;
}
// Sort by scheduled time, newest first
scheduled.sort((a, b) => new Date(b.scheduled_time) - new Date(a.scheduled_time));
scheduled.forEach(s => {
const div = document.createElement("div");
const statusClass = (s.status === 'pending') ? 'pending' : s.status;
div.className = "scheduled-msg " + statusClass;
const header = document.createElement("div");
header.innerHTML = `<strong>${s.username || 'Unknown User'}</strong> <span class="status ${statusClass}">${s.status.replace('_', ' ')}</span>`;
const message = document.createElement("div");
message.style.margin = "4px 0";
message.style.maxHeight = "60px";
message.style.overflowY = "auto";
message.innerText = s.message;
const time = document.createElement("div");
time.className = "timestamp";
time.innerText = `Scheduled: ${new Date(s.scheduled_time).toLocaleString()}`;
if (s.type) time.innerText += ` (Type: ${s.type.replace('_', ' ')})`;
if (s.sent_time) {
time.innerText += ` | Sent: ${new Date(s.sent_time).toLocaleString()}`;
}
div.appendChild(header);
div.appendChild(message);
div.appendChild(time);
container.appendChild(div);
});
document.getElementById("scheduled-count").innerText = scheduled.length;
}
function renderSuggestedTasks(tasks) {
const container = document.getElementById("suggested-tasks-list");
if (!container) return;
container.innerHTML = "";
let pendingCount = 0;
if (tasks.length === 0) {
container.innerHTML = '<div class="small" style="text-align:center; color:#666;">No AI generated tasks yet</div>';
document.getElementById("suggested-count").innerText = 0;
return;
}
// Filter and sort pending tasks by generation time, newest first
const pendingTasks = tasks.filter(t => t.status === "pending_review")
.sort((a, b) => new Date(b.generation_time) - new Date(a.generation_time));
pendingTasks.forEach(task => {
pendingCount++;
const div = document.createElement("div");
div.className = "suggested-task-entry";
const header = document.createElement("div");
header.className = "header";
header.innerHTML = `<strong>${task.username} (${task.chat_id})</strong> <span class="status pending">pending review</span>`;
const originalUpdateMsg = document.createElement("div");
originalUpdateMsg.className = "small";
originalUpdateMsg.innerHTML = `<strong>From Update:</strong> ${task.original_update.substring(0, 70)}${task.original_update.length > 70 ? '...' : ''}`;
const taskContent = document.createElement("div");
taskContent.className = "content";
taskContent.innerHTML = `<strong>Task:</strong> ${task.suggested_task_message}<br><strong>Reason:</strong> ${task.suggested_task_reason}`;
const generationTime = document.createElement("div");
generationTime.className = "timestamp";
generationTime.innerText = `Generated: ${new Date(task.generation_time).toLocaleString()}`;
const actions = document.createElement("div");
actions.className = "actions";
const scheduleTimeInput = document.createElement("input");
scheduleTimeInput.type = "datetime-local";
scheduleTimeInput.value = getCurrentDateTimeLocal();
scheduleTimeInput.id = `schedule-time-task-${task.id}`; // Unique ID for the input
const scheduleBtn = document.createElement("button");
scheduleBtn.className = "schedule-btn";
scheduleBtn.innerText = "Schedule Task";
scheduleBtn.id = `schedule-btn-task-${task.id}`; // Unique ID for the button
// FIX: Pass the ID of the input field to the function
scheduleBtn.onclick = () => scheduleSuggestedTasks(task.id, task.chat_id, task.username, task.suggested_task_message, scheduleTimeInput.id, scheduleBtn);
const discardBtn = document.createElement("button");
discardBtn.className = "discard-btn";
discardBtn.innerText = "Discard";
discardBtn.id = `discard-btn-task-${task.id}`; // Unique ID for the discard button
// Pass the button element to the function
discardBtn.onclick = () => discardSuggestedTask(task.id, discardBtn);
actions.appendChild(scheduleTimeInput);
actions.appendChild(scheduleBtn);
actions.appendChild(discardBtn);
div.appendChild(header);
div.appendChild(originalUpdateMsg);
div.appendChild(taskContent);
div.appendChild(generationTime);
div.appendChild(actions);
container.appendChild(div);
});
document.getElementById("suggested-count").innerText = pendingCount;
}
function renderGeneralAIResponses(responses) {
const container = document.getElementById("general-ai-responses-list");
if (!container) return;
container.innerHTML = "";
let pendingCount = 0;
if (responses.length === 0) {
container.innerHTML = '<div class="small" style="text-align:center; color:#666;">No AI generated responses yet</div>';
document.getElementById("general-ai-count").innerText = 0;
return;
}
const pendingResponses = responses.filter(r => r.status === "pending_review")
.sort((a, b) => new Date(b.generation_time) - new Date(a.generation_time));
pendingResponses.forEach(resp => {
pendingCount++;
const div = document.createElement("div");
div.className = "general-ai-response-entry";
const header = document.createElement("div");
header.className = "header";
header.innerHTML = `<strong>${resp.username} (${resp.chat_id})</strong> <span class="status pending">pending review</span>`;
const queryMsg = document.createElement("div");
queryMsg.className = "small";
queryMsg.innerHTML = `<strong>Original Query:</strong> ${resp.original_query.substring(0, 100)}${resp.original_query.length > 100 ? '...' : ''}`;
const aiResponseContent = document.createElement("div");
aiResponseContent.className = "content";
aiResponseContent.innerHTML = `<strong>AI Response:</strong><pre>${resp.ai_response}</pre>`;
const generationTime = document.createElement("div");
generationTime.className = "timestamp";
generationTime.innerText = `Generated: ${new Date(resp.generation_time).toLocaleString()}`;
const actions = document.createElement("div");
actions.className = "actions";
const scheduleTimeInput = document.createElement("input");
scheduleTimeInput.type = "datetime-local";
scheduleTimeInput.value = getCurrentDateTimeLocal();
scheduleTimeInput.id = `schedule-time-ai-${resp.id}`; // Unique ID for the input
const scheduleBtn = document.createElement("button");
scheduleBtn.className = "schedule-btn";
scheduleBtn.innerText = "Schedule Response";
scheduleBtn.id = `schedule-btn-ai-${resp.id}`; // Unique ID for the button
// FIX: Pass the ID of the input field to the function
scheduleBtn.onclick = () => scheduleGeneralAIResponse(resp.id, resp.chat_id, resp.username, resp.ai_response, scheduleTimeInput.id, scheduleBtn);
const discardBtn = document.createElement("button");
discardBtn.className = "discard-btn";
discardBtn.innerText = "Discard";
discardBtn.id = `discard-btn-ai-${resp.id}`; // Unique ID for the discard button
// Pass the button element to the function
discardBtn.onclick = () => discardGeneralAIResponse(resp.id, discardBtn);
actions.appendChild(scheduleTimeInput);
actions.appendChild(scheduleBtn);
actions.appendChild(discardBtn);
div.appendChild(header);
div.appendChild(queryMsg);
div.appendChild(aiResponseContent);
div.appendChild(generationTime);
div.appendChild(actions);
container.appendChild(div);
});
document.getElementById("general-ai-count").innerText = pendingCount;
}
function populateChatIds(messages) {
const select = document.getElementById("schedule-chat-id");
if (!select) { console.error("Schedule chat ID select element not found."); return; }
const currentValue = select.value; // Store current selection
select.innerHTML = ""; // Clear existing options
const uniqueChats = new Map(); // chat_id -> username
messages.forEach(m => {
// Only add user messages to get distinct chat IDs and their associated usernames
if (m.role === "user" && m.chat_id && !uniqueChats.has(m.chat_id)) {
uniqueChats.set(m.chat_id, m.username || `User ${m.chat_id}`);
}
});
if (uniqueChats.size === 0) {
const option = document.createElement("option");
option.value = "";
option.innerText = "No active chats yet";
select.appendChild(option);
select.disabled = true; // Disable if no chats
return;
}
select.disabled = false; // Enable if chats exist
uniqueChats.forEach((username, chatId) => {
const option = document.createElement("option");
option.value = chatId;
option.innerText = `${username} (${chatId})`;
option.dataset.username = username; // Store username as a data attribute for robustness
select.appendChild(option);
});
// Restore previous selection if it still exists, otherwise set to first option
if (currentValue && uniqueChats.has(currentValue)) {
select.value = currentValue;
} else if (select.options.length > 0) {
select.value = select.options[0].value;
}
}
async function scheduleMessageFromUI() {
const chatId = document.getElementById("schedule-chat-id").value;
const messageInput = document.getElementById("schedule-message");
const message = messageInput.value.trim(); // Trim message
const scheduledTimeStr = document.getElementById("schedule-time").value;
const messageType = document.getElementById("message-type").value;
if (!chatId || !message || !scheduledTimeStr) {
showMessage("Please select a Chat ID, provide a Message, and select a Time.", "error");
return;
}
const scheduleBtn = document.querySelector('.schedule-new-message-section .schedule-btn'); // More specific selector
setLoading(scheduleBtn, true);
let username = "UI Admin"; // Default username for messages scheduled via UI
const chatSelect = document.getElementById("schedule-chat-id");
if (chatSelect && chatSelect.selectedOptions.length > 0) {
// Read username directly from the data attribute for robustness
username = chatSelect.selectedOptions[0].dataset.username || chatSelect.selectedOptions[0].innerText.split('(')[0].trim();
}
try {
const res = await fetch("/schedule_from_ui", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
chat_id: chatId,
username: username,
message: message,
scheduled_time: scheduledTimeStr,
message_type: messageType
})
});
const data = await res.json();
if (data.status === "success") {
showMessage("Message scheduled successfully!", "success");
// Clear message and reset time *after* successful scheduling
messageInput.value = ""; // Clear custom message input
document.getElementById("schedule-time").value = getCurrentDateTimeLocal();
// Reset to "Custom Message" option and update display
const templateSelect = document.getElementById("message-template");
if (templateSelect && templateSelect.options.length > 0) {
templateSelect.value = templateSelect.options[0].value;
}
updateMessageInput(); // Update message input visibility/content based on template
await fetchScheduledMessages(); // Refresh the scheduled messages list
} else {
showMessage("Failed to schedule message: " + data.message, "error");
}
} catch (e) {
console.error("Error scheduling message from UI:", e);
showMessage("Network error while scheduling message. Check console for details.", "error");
} finally {
setLoading(scheduleBtn, false);
}
}
// FIX: Modified function signature and body to retrieve the current input value using its ID
async function scheduleSuggestedTasks(suggestedTaskId, chatId, username, tasksMessage, scheduleTimeInputId, buttonElement) {
// Retrieve the current value from the input field using its ID
const scheduledTimeStr = document.getElementById(scheduleTimeInputId).value;
if (!scheduledTimeStr) {
showMessage("Please select a time to schedule this task.", "error");
return;
}
setLoading(buttonElement, true);
try {
const res = await fetch("/schedule_suggested_from_ui", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
suggested_task_id: suggestedTaskId,
chat_id: chatId,
username: username,
message: tasksMessage,
scheduled_time: scheduledTimeStr
})
});
const data = await res.json();
if (data.status === "success") {
showMessage("Task scheduled successfully!", "success");
await Promise.all([fetchSuggestedTasks(), fetchScheduledMessages()]);
} else {
showMessage("Failed to schedule task: " + data.message, "error");
}
} catch (e) {
console.error("Error scheduling suggested task:", e);
showMessage("Network error while scheduling task. Check console for details.", "error");
} finally {
setLoading(buttonElement, false);
}
}
// FIX: Added 'buttonElement' parameter (already mostly correct from your previous code, just confirm)
async function discardSuggestedTask(suggestedTaskId, buttonElement) {
if (!confirm("Are you sure you want to discard this suggested task? It cannot be recovered.")) return;
setLoading(buttonElement, true);
try {
const res = await fetch("/discard_suggested_task", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ suggested_task_id: suggestedTaskId })
});
const data = await res.json();
if (data.status === "success") {
showMessage("Task discarded successfully!", "success");
await fetchSuggestedTasks();
} else {
showMessage("Failed to discard task: " + data.message, "error");
}
} catch (e) {
console.error("Error discarding task:", e);
showMessage("Network error while discarding task. Check console for details.", "error");
} finally {
setLoading(buttonElement, false);
}
}
// FIX: Modified function signature and body to retrieve the current input value using its ID
async function scheduleGeneralAIResponse(responseId, chatId, username, aiResponse, scheduleTimeInputId, buttonElement) {
// Retrieve the current value from the input field using its ID
const scheduledTimeStr = document.getElementById(scheduleTimeInputId).value;
if (!scheduledTimeStr) {
showMessage("Please select a time to schedule this AI response.", "error");
return;
}
setLoading(buttonElement, true);
try {
const res = await fetch("/schedule_general_ai_response", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
response_id: responseId,
chat_id: chatId,
username: username,
message: aiResponse,
scheduled_time: scheduledTimeStr
})
});
const data = await res.json();
if (data.status === "success") {
showMessage("AI response scheduled successfully!", "success");
await Promise.all([fetchGeneralAIResponses(), fetchScheduledMessages()]);
} else {
showMessage("Failed to schedule AI response: " + data.message, "error");
}
} catch (e) {
console.error("Error scheduling general AI response:", e);
showMessage("Network error while scheduling general AI response. Check console for details.", "error");
} finally {
setLoading(buttonElement, false);
}
}
// FIX: Added 'buttonElement' parameter (already mostly correct from your previous code, just confirm)
async function discardGeneralAIResponse(responseId, buttonElement) {
if (!confirm("Are you sure you want to discard this AI response? It cannot be recovered.")) return;
setLoading(buttonElement, true);
try {
const res = await fetch("/discard_general_ai_response", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ response_id: responseId })
});
const data = await res.json();
if (data.status === "success") {
showMessage("AI response discarded successfully!", "success");
await fetchGeneralAIResponses();
} else {
showMessage("Failed to discard AI response: " + data.message, "error");
}
} catch (e) {
console.error("Error discarding general AI response:", e);
showMessage("Network error while discarding general AI response. Check console for details.", "error");
} finally {
setLoading(buttonElement, false);
}
}
async function fetchMessages() {
try {
const res = await fetch("/messages");
if (!res.ok) throw new Error(`HTTP ${res.status} from /messages`);
const data = await res.json();
renderMessages(data);
populateChatIds(data);
updateConnectionStatus(true);
return true;
} catch (e) {
console.error("Failed to fetch messages:", e);
updateConnectionStatus(false);
return false;
}
}
async function fetchScheduledMessages() {
try {
const res = await fetch("/scheduled");
if (!res.ok) throw new Error(`HTTP ${res.status} from /scheduled`);
const data = await res.json();
renderScheduledMessages(data);
return true;
} catch (e) {
console.error("Failed to fetch scheduled messages:", e);
return false;
}
}
async function fetchSuggestedTasks() {
try {
const res = await fetch("/suggested_tasks");
if (!res.ok) throw new Error(`HTTP ${res.status} from /suggested_tasks`);
const data = await res.json();
renderSuggestedTasks(data);
return true;
} catch (e) {
console.error("Failed to fetch suggested tasks:", e);
return false;
}
}
async function fetchGeneralAIResponses() {
try {
const res = await fetch("/general_ai_responses");
if (!res.ok) throw new Error(`HTTP ${res.status} from /general_ai_responses`);
const data = await res.json();
renderGeneralAIResponses(data);
return true;
} catch (e) {
console.error("Failed to fetch general AI responses:", e);
return false;
}
}
function downloadLog() {
window.location = "/download";
}
function downloadScheduled() {
window.location = "/download-scheduled";
}
function downloadSuggested() {
window.location = "/download-suggested";
}
function downloadGeneralAI() {
window.location = "/download-general-ai";
}
async function clearAllData() {
if (!confirm("Clear ALL chat logs, scheduled messages, AI generated tasks, and AI generated responses? This action cannot be undone and will delete all stored JSON files. Are you sure?")) return;
try {
const res = await fetch("/clear", {method: "POST"});
if (res.ok) {
showMessage("All data cleared successfully! Refreshing...", "success");
setTimeout(() => location.reload(), 1500);
} else {
showMessage("Failed to clear data: " + (await res.json()).error, "error");
}
} catch (e) {
console.error("Error clearing data:", e);
showMessage("Network error while clearing data. Check console for details.", "error");
}
}
// Initialize on load
document.addEventListener('DOMContentLoaded', () => {
document.getElementById('schedule-time').value = getCurrentDateTimeLocal();
populateMessageTemplates();
// Initial data fetch - handle potential failures gracefully
Promise.allSettled([
fetchMessages(),
fetchScheduledMessages(),
fetchSuggestedTasks(),
fetchGeneralAIResponses()
]).then(results => {
const allOk = results.every(result => result.status === 'fulfilled' && result.value === true);
if (!allOk) {
showMessage("Some data failed to load. Check console for details.", "error");
updateConnectionStatus(false);
} else {
updateConnectionStatus(true);
}
});
});
// Auto-refresh with better error handling
let refreshInterval;
function startAutoRefresh() {
if (refreshInterval) clearInterval(refreshInterval);
refreshInterval = setInterval(async () => {
const results = await Promise.allSettled([
fetchMessages(),
fetchScheduledMessages(),
fetchSuggestedTasks(),
fetchGeneralAIResponses()
]);
const anyFailed = results.some(result => result.status === 'rejected' || result.value === false);
if (anyFailed) {
updateConnectionStatus(false);
console.warn("Auto-refresh: Some data fetches failed. Connection status updated to offline.");
} else {
updateConnectionStatus(true);
}
}, 2000); // Refresh every 2 seconds
}
startAutoRefresh();
// Handle page visibility changes to pause/resume refresh
document.addEventListener('visibilitychange', () => {
if (document.hidden) {
clearInterval(refreshInterval);
console.log("Auto-refresh paused (tab hidden).");
} else {
console.log("Auto-refresh resumed (tab visible).");
startAutoRefresh(); // Resume refresh when tab becomes visible
}
});
</script>
</body>
</html>
"""
@app.route("/")
def index():
# Make sure this line is passing the HARDCODED_MESSAGES_FOR_UI Python list
return render_template_string(HTML_TEMPLATE, HARDCODED_MESSAGES_FOR_UI=HARDCODED_MESSAGES_FOR_UI)
@app.route("/messages")
def messages_api():
with _messages_lock:
return jsonify(list(_messages))
@app.route("/scheduled")
def scheduled_messages_api():
with _scheduled_messages_lock:
return jsonify(list(_scheduled_messages))
@app.route("/suggested_tasks")
def suggested_tasks_api():
with _suggested_tasks_lock:
serializable_tasks = []
for task in _suggested_tasks:
temp_task = task.copy()
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_task.get(key), datetime):
temp_task[key] = temp_task[key].isoformat()
serializable_tasks.append(temp_task)
return jsonify(serializable_tasks)
@app.route("/general_ai_responses")
def general_ai_responses_api():
with _general_ai_responses_lock:
serializable_responses = []
for resp in _general_ai_responses:
temp_resp = resp.copy()
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_resp.get(key), datetime):
temp_resp[key] = temp_resp[key].isoformat()
serializable_responses.append(temp_resp)
return jsonify(serializable_responses)
@app.route("/download")
def download():
try:
# Prioritize existing file if it's there
if os.path.exists(CHAT_LOG_FILE):
return send_file(CHAT_LOG_FILE, as_attachment=True, download_name="chat_log.json")
# Otherwise, create a temporary file from in-memory data
with _messages_lock:
if not _messages: # Handle empty data
logger.warning("No chat log data in memory for download.")
return jsonify({"error": "No chat log data to download."}), 404
tmp_file = "chat_log_tmp.json"
with open(tmp_file, "w", encoding="utf-8") as f:
json.dump(_messages, f, ensure_ascii=False, indent=2)
return send_file(tmp_file, as_attachment=True, download_name="chat_log.json")
except Exception as e:
logger.error(f"Error downloading chat log: {e}")
return jsonify({"error": "Failed to download chat log file."}), 500
@app.route("/download-scheduled")
def download_scheduled():
try:
if os.path.exists(SCHEDULED_MESSAGES_FILE):
return send_file(SCHEDULED_MESSAGES_FILE, as_attachment=True, download_name="scheduled_messages.json")
with _scheduled_messages_lock:
if not _scheduled_messages:
logger.warning("No scheduled messages data in memory for download.")
return jsonify({"error": "No scheduled messages data to download."}), 404
tmp_file = "scheduled_messages_tmp.json"
with open(tmp_file, "w", encoding="utf-8") as f:
json.dump(_scheduled_messages, f, ensure_ascii=False, indent=2)
return send_file(tmp_file, as_attachment=True, download_name="scheduled_messages.json")
except Exception as e:
logger.error(f"Error downloading scheduled messages: {e}")
return jsonify({"error": "Failed to download scheduled messages file."}), 500
@app.route("/download-suggested")
def download_suggested():
try:
if os.path.exists(SUGGESTED_TASKS_FILE):
return send_file(SUGGESTED_TASKS_FILE, as_attachment=True, download_name="suggested_tasks.json")
with _suggested_tasks_lock:
if not _suggested_tasks:
logger.warning("No suggested tasks data in memory for download.")
return jsonify({"error": "No suggested tasks data to download."}), 404
tmp_file = "suggested_tasks_tmp.json"
serializable_tasks = []
for task in _suggested_tasks:
temp_task = task.copy()
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_task.get(key), datetime):
temp_task[key] = temp_task[key].isoformat()
serializable_tasks.append(temp_task)
with open(tmp_file, "w", encoding="utf-8") as f:
json.dump(serializable_tasks, f, ensure_ascii=False, indent=2)
return send_file(tmp_file, as_attachment=True, download_name="suggested_tasks.json")
except Exception as e:
logger.error(f"Error downloading suggested tasks: {e}")
return jsonify({"error": "Failed to download suggested tasks file."}), 500
@app.route("/download-general-ai")
def download_general_ai():
try:
if os.path.exists(GENERAL_AI_RESPONSES_FILE):
return send_file(GENERAL_AI_RESPONSES_FILE, as_attachment=True, download_name="general_ai_responses.json")
with _general_ai_responses_lock:
if not _general_ai_responses:
logger.warning("No general AI responses data in memory for download.")
return jsonify({"error": "No general AI responses data to download."}), 404
tmp_file = "general_ai_responses_tmp.json"
serializable_responses = []
for resp in _general_ai_responses:
temp_resp = resp.copy()
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_resp.get(key), datetime):
temp_resp[key] = temp_resp[key].isoformat()
serializable_responses.append(temp_resp)
with open(tmp_file, "w", encoding="utf-8") as f:
json.dump(serializable_responses, f, ensure_ascii=False, indent=2)
return send_file(tmp_file, as_attachment=True, download_name="general_ai_responses.json")
except Exception as e:
logger.error(f"Error downloading general AI responses: {e}")
return jsonify({"error": "Failed to download general AI responses file."}), 500
@app.route("/clear", methods=["POST"])
def clear():
"""Clears all in-memory data, removes all persistent files, and clears scheduler jobs."""
try:
# Clear in-memory stores
with _messages_lock:
_messages.clear()
with _scheduled_messages_lock:
_scheduled_messages.clear()
with _suggested_tasks_lock:
_suggested_tasks.clear()
with _general_ai_responses_lock:
_general_ai_responses.clear()
with _farmer_data_lock:
_farmer_data.clear() # Clear farmer data in memory
global _awaiting_update_response
_awaiting_update_response = {} # Reset the tracker
# Remove persistent files
for file_path in [CHAT_LOG_FILE, SCHEDULED_MESSAGES_FILE, SUGGESTED_TASKS_FILE, GENERAL_AI_RESPONSES_FILE]:
if os.path.exists(file_path):
try:
os.remove(file_path)
logger.info(f"Removed persistent file: {file_path}")
except Exception as e:
logger.error(f"Failed to remove file {file_path}: {e}. Please check permissions.")
# Clear all APScheduler jobs
scheduler.remove_all_jobs()
logger.info("All APScheduler jobs cleared.")
logger.info("All application data cleared successfully.")
return ("", 204) # No Content
except Exception as e:
logger.error(f"Error during 'clear all data' operation: {e}")
return jsonify({"error": "Failed to clear data due to an internal server error. Check server logs."}), 500
@app.route("/schedule_from_ui", methods=["POST"])
def schedule_from_ui():
"""Endpoint to schedule a message directly from the UI."""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400
chat_id = data.get("chat_id")
recipient_username = data.get("username", "UI Scheduled User")
message = data.get("message", "").strip()
scheduled_time_str = data.get("scheduled_time")
message_type = data.get("message_type", "reminder")
if not all([chat_id, message, scheduled_time_str]):
return jsonify({
"status": "error",
"message": "Missing required fields: 'chat_id', 'message', and 'scheduled_time' are all required."
}), 400
# Validate message length (Telegram limit)
if len(message) > 4096:
return jsonify({
"status": "error",
"message": f"Message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram."
}), 400
# Parse and validate datetime string
try:
scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M')
scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive)
except ValueError:
return jsonify({
"status": "error",
"message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM (e.g., 2023-10-27T14:30)."
}), 400
# Check if scheduled time is in the future
now = datetime.now(TIMEZONE)
if scheduled_datetime <= now:
return jsonify({
"status": "error",
"message": "Scheduled time must be in the future. Please choose a time later than now."
}), 400
# Optional: Prevent scheduling too far in the future for stability/resource management
if scheduled_datetime > now + timedelta(days=365):
return jsonify({
"status": "error",
"message": "Scheduled time cannot be more than 1 year in the future for stability."
}), 400
result = schedule_message(chat_id, recipient_username, message, scheduled_datetime, message_type)
if result:
logger.info(
f"UI: Message '{message_type}' scheduled successfully for {recipient_username} ({chat_id}) at {scheduled_datetime}.")
return jsonify({"status": "success", "message": "Message scheduled successfully."})
else:
return jsonify({
"status": "error",
"message": "Failed to schedule message due to an internal server error (check server logs)."
}), 500
except Exception as e:
logger.error(f"Error in /schedule_from_ui: {e}")
traceback.print_exc()
return jsonify({
"status": "error",
"message": f"An unexpected server error occurred: {str(e)}"
}), 500
@app.route("/schedule_suggested_from_ui", methods=["POST"])
def schedule_suggested_from_ui():
"""Endpoint for admin to schedule an individual AI-suggested task from the UI."""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400
suggested_task_id = data.get("suggested_task_id")
chat_id = data.get("chat_id")
username = data.get("username")
message = data.get("message", "").strip() # This is the individual task message content
scheduled_time_str = data.get("scheduled_time")
if not all([suggested_task_id, chat_id, username, message, scheduled_time_str]):
return jsonify({
"status": "error",
"message": "Missing required fields for scheduling suggested task."
}), 400
# Validate message length (Telegram limit)
if len(message) > 4096:
return jsonify({
"status": "error",
"message": f"Task message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram."
}), 400
# Parse and validate datetime
try:
scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M')
scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive)
except ValueError:
return jsonify({
"status": "error",
"message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM."
}), 400
# Check if time is in the future
now = datetime.now(TIMEZONE)
if scheduled_datetime <= now:
return jsonify({
"status": "error",
"message": "Scheduled time must be in the future. Please choose a time later than now."
}), 400
# Check if the suggested task exists and is still pending review
task_found_and_pending = False
with _suggested_tasks_lock:
for task in _suggested_tasks:
if task["id"] == suggested_task_id and task["status"] == "pending_review":
task_found_and_pending = True
break
if not task_found_and_pending:
return jsonify({
"status": "error",
"message": "Suggested task not found or already processed (scheduled/discarded)."
}), 404
result = schedule_message(chat_id, username, message, scheduled_datetime, "daily_tasks")
if result:
# Update the status of the suggested task to 'scheduled'
with _suggested_tasks_lock:
for task in _suggested_tasks:
if task["id"] == suggested_task_id:
task["status"] = "scheduled"
task["scheduled_by_admin_time"] = datetime.now(TIMEZONE)
break
save_suggested_tasks()
logger.info(
f"UI: Individual suggested task ID '{suggested_task_id}' scheduled for {username} ({chat_id}) at {scheduled_datetime}.")
return jsonify({"status": "success", "message": "Suggested task scheduled successfully."})
else:
return jsonify({
"status": "error",
"message": "Failed to schedule suggested task due to an internal server error (check server logs)."
}), 500
except Exception as e:
logger.error(f"Error in /schedule_suggested_from_ui: {e}")
traceback.print_exc()
return jsonify({
"status": "error",
"message": f"An unexpected server error occurred: {str(e)}"
}), 500
@app.route("/discard_suggested_task", methods=["POST"])
def discard_suggested_task():
"""Endpoint for admin to discard an individual AI-suggested task from the UI."""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400
suggested_task_id = data.get("suggested_task_id")
if not suggested_task_id:
return jsonify({"status": "error", "message": "Missing 'suggested_task_id' in request."}), 400
found_and_pending = False
with _suggested_tasks_lock:
for task in _suggested_tasks:
if task["id"] == suggested_task_id and task["status"] == "pending_review":
task["status"] = "discarded"
task["discarded_by_admin_time"] = datetime.now(TIMEZONE)
found_and_pending = True
break
if found_and_pending:
save_suggested_tasks()
logger.info(f"UI: Suggested task ID '{suggested_task_id}' discarded.")
return jsonify({"status": "success", "message": "Suggested task discarded successfully."})
else:
return jsonify({
"status": "error",
"message": "Suggested task not found or already processed (scheduled/discarded)."
}), 404
except Exception as e:
logger.error(f"Error in /discard_suggested_task: {e}")
traceback.print_exc()
return jsonify({
"status": "error",
"message": f"An unexpected server error occurred: {str(e)}"
}), 500
@app.route("/schedule_general_ai_response", methods=["POST"])
def schedule_general_ai_response():
"""Endpoint for admin to schedule a general AI response from the UI."""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400
response_id = data.get("response_id")
chat_id = data.get("chat_id")
username = data.get("username")
message = data.get("message", "").strip()
scheduled_time_str = data.get("scheduled_time")
if not all([response_id, chat_id, username, message, scheduled_time_str]):
return jsonify({"status": "error", "message": "Missing required fields for scheduling AI response."}), 400
# Validate message length (Telegram limit)
if len(message) > 4096:
return jsonify({
"status": "error",
"message": f"AI response message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram."
}), 400
# Parse and validate datetime
try:
scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M')
scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive)
except ValueError:
return jsonify({"status": "error", "message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM."}), 400
now = datetime.now(TIMEZONE)
if scheduled_datetime <= now:
return jsonify({"status": "error", "message": "Scheduled time must be in the future."}), 400
# Check if the AI response entry exists and is still pending review
response_found = False
with _general_ai_responses_lock:
for resp in _general_ai_responses:
if resp["id"] == response_id and resp["status"] == "pending_review":
response_found = True
break
if not response_found:
return jsonify({"status": "error", "message": "AI response not found or already processed."}), 404
result = schedule_message(chat_id, username, message, scheduled_datetime, "general_ai_response")
if result:
with _general_ai_responses_lock:
for resp in _general_ai_responses:
if resp["id"] == response_id:
resp["status"] = "scheduled"
resp["scheduled_by_admin_time"] = datetime.now(TIMEZONE)
break
save_general_ai_responses()
logger.info(
f"UI: General AI response ID '{response_id}' scheduled for {username} ({chat_id}) at {scheduled_datetime}.")
return jsonify({"status": "success", "message": "AI response scheduled successfully."})
else:
return jsonify({"status": "error", "message": "Failed to schedule AI response due to internal error."}), 500
except Exception as e:
logger.error(f"Error in /schedule_general_ai_response: {e}")
traceback.print_exc()
return jsonify({"status": "error", "message": f"An unexpected server error occurred: {str(e)}"}), 500
@app.route("/discard_general_ai_response", methods=["POST"])
def discard_general_ai_response():
"""Endpoint for admin to discard a general AI response from the UI."""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400
response_id = data.get("response_id")
if not response_id:
return jsonify({"status": "error", "message": "Missing 'response_id' in request."}), 400
found = False
with _general_ai_responses_lock:
for resp in _general_ai_responses:
if resp["id"] == response_id and resp["status"] == "pending_review":
resp["status"] = "discarded"
resp["discarded_by_admin_time"] = datetime.now(TIMEZONE)
found = True
break
if found:
save_general_ai_responses()
logger.info(f"UI: General AI response ID '{response_id}' discarded.")
return jsonify({"status": "success", "message": "AI response discarded successfully."})
else:
return jsonify({"status": "error", "message": "AI response not found or already processed."}), 404
except Exception as e:
logger.error(f"Error in /discard_general_ai_response: {e}")
traceback.print_exc()
return jsonify({
"status": "error",
"message": f"An unexpected server error occurred: {str(e)}"
}), 500
@app.route("/health")
def health():
"""Provides a basic health check endpoint for the application."""
return jsonify({
"status": "healthy",
"timestamp": datetime.now(TIMEZONE).isoformat(),
"scheduler_running": scheduler.running,
"gemini_available": model is not None,
"farmer_data_loaded_count": len(_farmer_data),
"messages_in_log": len(_messages),
"scheduled_messages_count": len(_scheduled_messages),
"suggested_tasks_count": len(_suggested_tasks),
"general_ai_responses_count": len(_general_ai_responses)
})
# ---------------- Startup ----------------
def start_polling_thread():
"""Starts a separate thread for Telegram long polling."""
t = threading.Thread(target=polling_loop, daemon=True, name="TelegramPolling")
t.start()
logger.info("Telegram polling thread initiated.")
return t
if __name__ == "__main__":
# Perform initial configuration validation
if not BOT_TOKEN :
logger.critical(
"❌ BOT_TOKEN is missing or is the default placeholder. Please update it in config or environment variables.")
exit(1) # Exit if essential config is missing
if not GEMINI_API_KEY or GEMINI_API_KEY == "AIzaSyAfF-13nsrMdAAe3SFOPSxFya4EtfLBjho":
logger.warning(
"⚠️ GEMINI_API_KEY is missing or is the default placeholder. AI features may not function correctly without it.")
# Do not exit, as other parts of the app might still be useful
logger.info("Starting Farm Bot with admin-controlled scheduling workflow...")
# Load all persistent data into memory
load_messages_from_file()
load_scheduled_messages()
load_suggested_tasks()
load_general_ai_responses()
load_farmer_data() # Load farmer-specific RAG data
# --- HARDCODED SCHEDULED FARM UPDATE REQUEST AT STARTUP ---
# This ensures the first "ask for updates" message is automatically scheduled for a farmer.
# IMPORTANT: Replace these values with actual farmer information (or set via environment variables)
# To get a farmer's chat ID: Have them send any message to your bot. Then check http://127.0.0.1:5000/messages
TARGET_FARMER_CHAT_ID = os.environ.get("TARGET_FARMER_CHAT_ID", "YOUR_TELEGRAM_CHAT_ID")
TARGET_FARMER_USERNAME = os.environ.get("TARGET_FARMER_USERNAME", "YOUR_TELEGRAM_USERNAME")
FARM_UPDATE_REQUEST_MESSAGE = "What's the update regarding your farm? Please share details about your crops, any issues you're facing, and current farming activities."
FARM_UPDATE_REQUEST_TYPE = "farm_update_request"
# Schedule for 5 minutes from startup (for testing) - adjust for production e.g. daily at 8 AM
scheduled_farm_update_time = datetime.now(TIMEZONE) + timedelta(minutes=5)
if TARGET_FARMER_CHAT_ID in ["YOUR_TELEGRAM_CHAT_ID", "", None] or TARGET_FARMER_USERNAME in [
"YOUR_TELEGRAM_USERNAME", "", None]:
logger.warning(
"⚠️ Initial farm update request not scheduled: TARGET_FARMER_CHAT_ID or TARGET_FARMER_USERNAME not properly configured in script or environment variables.")
logger.info(" -> To enable this feature, please update the configuration or manually schedule via the UI.")
else:
# Check if a similar farm update request has already been recently scheduled/sent
update_request_already_handled = False
with _scheduled_messages_lock:
for msg in _scheduled_messages:
if (str(msg["chat_id"]) == TARGET_FARMER_CHAT_ID and
msg["type"] == FARM_UPDATE_REQUEST_TYPE and
msg["status"] in ["pending", "sent"]):
# Consider handled if scheduled or sent within the last 24 hours
msg_time_str = msg.get("sent_time", msg["scheduled_time"])
try:
msg_dt = datetime.fromisoformat(msg_time_str.replace('Z', '+00:00')) # Handle Z timezone
if msg_dt.tzinfo is None:
msg_dt = TIMEZONE.localize(msg_dt)
else:
msg_dt = msg_dt.astimezone(TIMEZONE)
if msg_dt > (datetime.now(TIMEZONE) - timedelta(hours=24)):
update_request_already_handled = True
logger.info(
f"ℹ️ Farm update request for {TARGET_FARMER_USERNAME} ({TARGET_FARMER_CHAT_ID}) already exists or was recently sent. Not re-scheduling.")
break
except Exception as e:
logger.warning(
f"Error parsing datetime for existing scheduled message ID '{msg.get('id', 'unknown')}': {e}. This might affect duplicate detection for hardcoded message.")
if not update_request_already_handled:
if scheduled_farm_update_time > datetime.now(TIMEZONE):
result = schedule_message(
TARGET_FARMER_CHAT_ID,
TARGET_FARMER_USERNAME,
FARM_UPDATE_REQUEST_MESSAGE,
scheduled_farm_update_time,
FARM_UPDATE_REQUEST_TYPE
)
if result:
logger.info(
f"✅ Initial farm update request scheduled for {TARGET_FARMER_USERNAME} ({TARGET_FARMER_CHAT_ID}) at {scheduled_farm_update_time.strftime('%Y-%m-%d %I:%M %p %Z')}.")
else:
logger.error("❌ Failed to schedule initial farm update request due to an internal error.")
else:
logger.warning(
f"⚠️ Initial farm update request not scheduled: Scheduled time ({scheduled_farm_update_time.strftime('%Y-%m-%d %I:%M %p %Z')}) is in the past. Please adjust `scheduled_farm_update_time` for future execution.")
# --- END HARDCODED SCHEDULED FARM UPDATE REQUEST ---
# Start the Telegram polling thread (daemon so it exits with main app)
start_polling_thread()
logger.info("\n-----------------------------------------------------")
logger.info("Farm Bot Application Ready!")
logger.info("📱 Instruct your farmers to start a chat with your Telegram bot.")
logger.info("🌐 Access the Admin Panel to manage tasks and responses: http://127.0.0.1:5000")
logger.info("📊 Check system health: http://127.0.0.1:5000/health")
logger.info("-----------------------------------------------------\n")
try:
# Run the Flask app
app.run(host="0.0.0.0", port=5000, debug=False, threaded=True)
except KeyboardInterrupt:
logger.info("Application received KeyboardInterrupt. Shutting down gracefully...")
except Exception as e:
logger.critical(f"Unhandled exception during Flask application runtime: {e}")
traceback.print_exc()
finally:
# Ensure scheduler is shut down when the app stops
if scheduler.running:
scheduler.shutdown(wait=False)
logger.info("APScheduler shut down.")
logger.info("Application process finished.")