import requests
import time
import json
import traceback
import threading
from datetime import datetime, timedelta
from flask import Flask, jsonify, render_template_string, send_file, request
import google.generativeai as genai
import os
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.date import DateTrigger
import pytz
import logging
import re # For parsing AI output
from typing import Optional, Dict, List, Any
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ---------------- CONFIG ----------------
# !!! IMPORTANT: Replace these with your actual tokens or use environment variables !!!
BOT_TOKEN = "8328709566:AAH7ZmdWuzGODlTByJ34yI5yR9e8otBokBc" # <<< REPLACE THIS with your Telegram Bot Token
GEMINI_API_KEY = "AIzaSyAfF-13nsrMdAAe3SFOPSxFya4EtfLBjho" # <<< REPLACE THIS with your Google Gemini API Key
# !!! ----------------------------------------------------------------------------- !!!
TELEGRAM_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
CHAT_LOG_FILE = "chat_log.json"
SCHEDULED_MESSAGES_FILE = "scheduled_messages.json"
SUGGESTED_TASKS_FILE = "suggested_tasks.json"
GENERAL_AI_RESPONSES_FILE = "general_ai_responses.json"
FARMERS_DATA_FILE = "data\farmers.json" # File for farmer-specific RAG data
MAX_MESSAGES = 2000
TIMEZONE = pytz.timezone('Asia/Kolkata') # Adjust to your local timezone (e.g., 'America/New_York', 'Europe/London')
FARM_UPDATE_RESPONSE_WINDOW_HOURS = 48 # Window for a farmer to respond to an update request
# Hardcoded messages for the UI dropdown
HARDCODED_MESSAGES_FOR_UI = [
{
"value": "What's the update regarding your farm? Please share details about your crops, any issues you're facing, and current farming activities.",
"text": "đą Request Farm Update"},
{"value": "How are your crops doing today?", "text": "đŋ Crops Check-in"},
{"value": "Reminder: Check irrigation system.", "text": "đ§ Irrigation Reminder"},
{"value": "Don't forget to monitor pest activity.", "text": "đ Pest Monitor Reminder"},
{"value": "Daily task: Fertilize Section A.", "text": "đ Fertilize Section A"},
{"value": "Weather looks good for harvesting. Consider starting tomorrow.", "text": "đ¤ī¸ Harvest Weather Alert"},
{"value": "Time to check soil moisture levels.", "text": "đĄī¸ Soil Moisture Check"},
{"value": "", "text": "âī¸ Custom Message (Type Below)"} # Option for custom input
]
# Rate limiting for Telegram API calls
REQUEST_RATE_LIMIT = 30 # requests per minute
rate_limit_tracker = {"count": 0, "reset_time": time.time() + 60}
# ----------------------------------------
# Flask app
app = Flask(__name__)
# Configure Gemini with error handling
try:
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel("gemini-2.0-flash-exp")
logger.info("Gemini AI configured successfully.")
except Exception as e:
logger.error(f"Failed to configure Gemini AI. AI features will be unavailable: {e}")
model = None # Set model to None if configuration fails
# Initialize scheduler
scheduler = BackgroundScheduler(timezone=TIMEZONE)
scheduler.start()
# Thread-safe stores
_messages_lock = threading.Lock()
_messages: List[Dict[str, Any]] = []
_scheduled_messages_lock = threading.Lock()
_scheduled_messages: List[Dict[str, Any]] = []
_suggested_tasks_lock = threading.Lock()
_suggested_tasks: List[Dict[str, Any]] = [] # Each entry is now a single task with message and reason
_general_ai_responses_lock = threading.Lock()
_general_ai_responses: List[Dict[str, Any]] = []
_farmer_data_lock = threading.Lock()
_farmer_data: List[Dict[str, Any]] = []
# State for tracking if we're awaiting a farm update response from a specific chat_id
_awaiting_update_response: Dict[str, datetime] = {}
def check_rate_limit() -> bool:
"""Checks if Telegram API request rate limit has been exceeded."""
current_time = time.time()
with threading.Lock(): # Use a lock for shared rate_limit_tracker access
if current_time > rate_limit_tracker["reset_time"]:
rate_limit_tracker["count"] = 0
rate_limit_tracker["reset_time"] = current_time + 60
if rate_limit_tracker["count"] >= REQUEST_RATE_LIMIT:
return False
rate_limit_tracker["count"] += 1
return True
def sanitize_text(text: str, max_length: int = 4096) -> str:
"""Sanitizes text for Telegram message limits and removes invalid characters."""
if not text:
return ""
# Remove null characters which can cause issues with some APIs/databases
sanitized = text.replace('\x00', '').strip()
# Telegram message limit for sendMessage is 4096 characters (HTML mode can be slightly less due to tags)
if len(sanitized) > max_length:
sanitized = sanitized[:max_length - 3] + "..."
return sanitized
def save_messages_to_file():
"""Saves chat messages to a JSON file."""
try:
with _messages_lock:
with open(CHAT_LOG_FILE, "w", encoding="utf-8") as f:
json.dump(_messages, f, ensure_ascii=False, indent=2)
logger.debug(f"Saved {len(_messages)} messages to {CHAT_LOG_FILE}.")
except Exception as e:
logger.error(f"Failed to save messages to {CHAT_LOG_FILE}: {e}")
def save_scheduled_messages():
"""Saves scheduled messages to a JSON file."""
try:
with _scheduled_messages_lock:
with open(SCHEDULED_MESSAGES_FILE, "w", encoding="utf-8") as f:
json.dump(_scheduled_messages, f, ensure_ascii=False, indent=2)
logger.debug(f"Saved {len(_scheduled_messages)} scheduled messages to {SCHEDULED_MESSAGES_FILE}.")
except Exception as e:
logger.error(f"Failed to save scheduled messages to {SCHEDULED_MESSAGES_FILE}: {e}")
def save_suggested_tasks():
"""Saves AI-suggested tasks to a JSON file."""
try:
with _suggested_tasks_lock:
serializable_tasks = []
for task in _suggested_tasks:
temp_task = task.copy()
# Convert datetime objects to ISO strings for JSON serialization
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_task.get(key), datetime):
temp_task[key] = temp_task[key].isoformat()
serializable_tasks.append(temp_task)
with open(SUGGESTED_TASKS_FILE, "w", encoding="utf-8") as f:
json.dump(serializable_tasks, f, ensure_ascii=False, indent=2)
logger.debug(f"Saved {len(_suggested_tasks)} suggested tasks to {SUGGESTED_TASKS_FILE}.")
except Exception as e:
logger.error(f"Failed to save suggested tasks to {SUGGESTED_TASKS_FILE}: {e}")
def save_general_ai_responses():
"""Saves general AI responses (pending admin review) to a JSON file."""
try:
with _general_ai_responses_lock:
serializable_responses = []
for resp in _general_ai_responses:
temp_resp = resp.copy()
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_resp.get(key), datetime):
temp_resp[key] = temp_resp[key].isoformat()
serializable_responses.append(temp_resp)
with open(GENERAL_AI_RESPONSES_FILE, "w", encoding="utf-8") as f:
json.dump(serializable_responses, f, ensure_ascii=False, indent=2)
logger.debug(f"Saved {len(_general_ai_responses)} general AI responses to {GENERAL_AI_RESPONSES_FILE}.")
except Exception as e:
logger.error(f"Failed to save general AI responses to {GENERAL_AI_RESPONSES_FILE}: {e}")
def load_messages_from_file():
"""Loads chat messages from a JSON file."""
if not os.path.exists(CHAT_LOG_FILE):
logger.info(f"{CHAT_LOG_FILE} not found, starting with empty messages.")
return
try:
with open(CHAT_LOG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
with _messages_lock:
_messages.clear()
_messages.extend(data[-MAX_MESSAGES:]) # Load only the latest MAX_MESSAGES
logger.info(f"Loaded {len(_messages)} messages from {CHAT_LOG_FILE}.")
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse {CHAT_LOG_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.")
except Exception as e:
logger.error(f"Failed to load messages from {CHAT_LOG_FILE}: {e}")
def load_scheduled_messages():
"""Loads scheduled messages and re-schedules pending ones."""
if not os.path.exists(SCHEDULED_MESSAGES_FILE):
logger.info(f"{SCHEDULED_MESSAGES_FILE} not found, starting with empty scheduled messages.")
return
try:
with open(SCHEDULED_MESSAGES_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
with _scheduled_messages_lock:
_scheduled_messages.clear()
_scheduled_messages.extend(data)
rescheduled_count = 0
for entry in _scheduled_messages:
try:
# Ensure scheduled_time is timezone-aware
scheduled_time = datetime.fromisoformat(entry["scheduled_time"])
if scheduled_time.tzinfo is None:
scheduled_time = TIMEZONE.localize(scheduled_time) # Localize if naive
# Only re-schedule if pending and in the future
if entry["status"] == "pending" and scheduled_time > datetime.now(TIMEZONE):
scheduler.add_job(
func=send_scheduled_message,
trigger=DateTrigger(run_date=scheduled_time),
args=[entry],
id=entry["id"],
replace_existing=True # Important to prevent duplicate jobs on restart
)
rescheduled_count += 1
# If it was a sent farm update request, track it for response window management
if entry["type"] == "farm_update_request" and entry["status"] == "sent":
sent_time_str = entry.get("sent_time",
entry["scheduled_time"]) # Fallback to scheduled if sent_time missing
sent_or_scheduled_time = datetime.fromisoformat(sent_time_str)
if sent_or_scheduled_time.tzinfo is None:
sent_or_scheduled_time = TIMEZONE.localize(sent_or_scheduled_time)
_awaiting_update_response[str(entry["chat_id"])] = sent_or_scheduled_time
except KeyError as ke:
logger.error(f"Missing key in scheduled message entry ID '{entry.get('id', 'unknown')}': {ke}")
except ValueError as ve: # fromisoformat error
logger.error(
f"Invalid datetime format in scheduled message entry ID '{entry.get('id', 'unknown')}': {ve}")
except Exception as e:
logger.error(f"Failed to reschedule entry ID '{entry.get('id', 'unknown')}': {e}")
logger.info(
f"Loaded {len(_scheduled_messages)} scheduled messages, rescheduled {rescheduled_count} pending jobs.")
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse {SCHEDULED_MESSAGES_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.")
except Exception as e:
logger.error(f"Failed to load scheduled messages from {SCHEDULED_MESSAGES_FILE}: {e}")
def load_suggested_tasks():
"""Loads AI-suggested tasks from a JSON file."""
if not os.path.exists(SUGGESTED_TASKS_FILE):
logger.info(f"{SUGGESTED_TASKS_FILE} not found, starting with empty suggested tasks.")
return
try:
with open(SUGGESTED_TASKS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
with _suggested_tasks_lock:
_suggested_tasks.clear()
for task in data:
# Convert ISO strings back to datetime objects
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if key in task and task[key] is not None:
try:
task[key] = datetime.fromisoformat(task[key])
except (ValueError, TypeError):
logger.warning(
f"Invalid datetime format for key '{key}' in task ID '{task.get('id')}': {task[key]}. Setting to None.")
task[key] = None
_suggested_tasks.append(task)
logger.info(f"Loaded {len(_suggested_tasks)} suggested tasks from {SUGGESTED_TASKS_FILE}.")
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse {SUGGESTED_TASKS_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.")
except Exception as e:
logger.error(f"Failed to load suggested tasks from {SUGGESTED_TASKS_FILE}: {e}")
def load_general_ai_responses():
"""Loads general AI responses (pending admin review) from a JSON file."""
if not os.path.exists(GENERAL_AI_RESPONSES_FILE):
logger.info(f"{GENERAL_AI_RESPONSES_FILE} not found, starting with empty general AI responses.")
return
try:
with open(GENERAL_AI_RESPONSES_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
with _general_ai_responses_lock:
_general_ai_responses.clear()
for resp in data:
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if key in resp and resp[key] is not None:
try:
resp[key] = datetime.fromisoformat(resp[key])
except (ValueError, TypeError):
logger.warning(
f"Invalid datetime format for key '{key}' in general AI response ID '{resp.get('id')}': {resp[key]}. Setting to None.")
resp[key] = None
_general_ai_responses.append(resp)
logger.info(f"Loaded {len(_general_ai_responses)} general AI responses from {GENERAL_AI_RESPONSES_FILE}.")
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse {GENERAL_AI_RESPONSES_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.")
except Exception as e:
logger.error(f"Failed to load general AI responses from {GENERAL_AI_RESPONSES_FILE}: {e}")
def load_farmer_data():
"""Loads farmer-specific data from farmers.json for RAG."""
if not os.path.exists(FARMERS_DATA_FILE):
logger.warning(
f"{FARMERS_DATA_FILE} not found. RAG functionality for tasks will be limited or unavailable. Please create it if needed.")
with _farmer_data_lock:
_farmer_data.clear()
return
try:
with open(FARMERS_DATA_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
with _farmer_data_lock:
_farmer_data.clear()
_farmer_data.extend(data)
logger.info(f"Loaded {len(_farmer_data)} farmer profiles from {FARMERS_DATA_FILE}.")
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse {FARMERS_DATA_FILE} (JSON error): {e}. Ensure it's valid JSON. RAG functionality will be limited.")
except Exception as e:
logger.error(f"Failed to load farmer data from {FARMERS_DATA_FILE}: {e}")
def get_farmer_profile(chat_id: str) -> Optional[Dict]:
"""Retrieves a farmer's profile by chat_id from loaded data."""
with _farmer_data_lock:
for farmer in _farmer_data:
if str(farmer.get("chat_id")) == str(chat_id):
return farmer
return None
def append_message(role: str, username: str, chat_id: str, text: str):
"""Appends a message to the in-memory chat log and saves it to file."""
if not text:
logger.debug(f"Attempted to append empty message for role '{role}' from {username} ({chat_id}). Skipping.")
return
text = sanitize_text(text)
entry = {
"role": role,
"username": username,
"chat_id": str(chat_id),
"text": text,
"ts": datetime.utcnow().isoformat() + "Z"
}
with _messages_lock:
_messages.append(entry)
if len(_messages) > MAX_MESSAGES:
_messages[:] = _messages[-MAX_MESSAGES:] # Keep only the latest messages
save_messages_to_file()
def send_message(chat_id: str, text: str) -> Optional[Dict]:
"""Sends a message to Telegram, respecting rate limits."""
if not check_rate_limit():
logger.warning(f"Telegram API rate limit exceeded for chat_id {chat_id}. Message skipped: '{text[:50]}...'.")
return None
text = sanitize_text(text)
if not text:
logger.warning(f"Attempted to send empty message to chat_id {chat_id}. Skipping.")
return None
url = f"{TELEGRAM_API_URL}/sendMessage"
payload = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"} # Use HTML for basic formatting
try:
r = requests.post(url, json=payload, timeout=10)
r.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
result = r.json()
if not result.get("ok"):
logger.error(
f"Telegram API reported error sending to {chat_id}: {result.get('description', 'Unknown error')}. Message: '{text[:50]}...'.")
return None
logger.debug(f"Successfully sent Telegram message to {chat_id}: '{text[:50]}...'.")
return result
except requests.exceptions.Timeout:
logger.error(f"Timeout sending Telegram message to {chat_id}. Message: '{text[:50]}...'.")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Network error sending Telegram message to {chat_id}: {e}. Message: '{text[:50]}...'.")
return None
except Exception as e:
logger.error(f"Unexpected error sending Telegram message to {chat_id}: {e}. Message: '{text[:50]}...'.")
return None
def schedule_message(chat_id: str, username: str, message_text: str, scheduled_time: datetime,
message_type: str = "reminder") -> Optional[Dict]:
"""Schedules a message to be sent at a future time."""
if not message_text or not chat_id:
logger.error("Cannot schedule message: Missing message text or chat_id.")
return None
# Ensure scheduled_time is in the future
if scheduled_time <= datetime.now(TIMEZONE):
logger.error(
f"Cannot schedule message for {username} ({chat_id}): Scheduled time ({scheduled_time}) is in the past. Message: '{message_text[:50]}...'.")
return None
# Generate a unique ID for the scheduled entry
timestamp = int(scheduled_time.timestamp())
# Use a hash of the first 50 chars of message_text for uniqueness in the ID
text_hash = hash(message_text[:50]) % 10000
unique_id = f"{chat_id}_{message_type}_{timestamp}_{text_hash}"
scheduled_entry = {
"id": unique_id,
"chat_id": str(chat_id),
"username": username,
"message": sanitize_text(message_text),
"scheduled_time": scheduled_time.isoformat(),
"type": message_type,
"status": "pending",
"created_at": datetime.now(TIMEZONE).isoformat()
}
try:
with _scheduled_messages_lock:
_scheduled_messages.append(scheduled_entry)
save_scheduled_messages()
scheduler.add_job(
func=send_scheduled_message,
trigger=DateTrigger(run_date=scheduled_time),
args=[scheduled_entry],
id=scheduled_entry["id"],
replace_existing=True # Important: Prevents duplicate jobs if app restarts and tries to reschedule same ID
)
logger.info(
f"Scheduled '{message_type}' message for {username} ({chat_id}) (ID: {unique_id}) at {scheduled_time.strftime('%Y-%m-%d %H:%M %Z')}.")
return scheduled_entry
except Exception as e:
logger.error(f"Failed to schedule message for {username} ({chat_id}) (ID: {unique_id}): {e}")
return None
def send_scheduled_message(scheduled_entry: Dict):
"""Callback function executed by APScheduler to send a scheduled message."""
try:
chat_id = scheduled_entry["chat_id"]
message = scheduled_entry["message"]
recipient_username = scheduled_entry["username"]
logger.info(
f"Attempting to send scheduled message '{scheduled_entry['type']}' to {recipient_username} ({chat_id}). Message: '{message[:50]}...'.")
result = send_message(chat_id, message)
current_time = datetime.now(TIMEZONE).isoformat()
if result:
append_message("bot", f"Bot (Scheduled for {recipient_username})", chat_id, message)
with _scheduled_messages_lock:
for msg in _scheduled_messages:
if msg["id"] == scheduled_entry["id"]:
msg["status"] = "sent"
msg["sent_time"] = current_time
break
save_scheduled_messages()
logger.info(f"Successfully sent scheduled message to {recipient_username} ({chat_id}).")
# If a farm update request was just sent, start tracking the response window
if scheduled_entry["type"] == "farm_update_request":
_awaiting_update_response[str(chat_id)] = datetime.now(TIMEZONE)
logger.info(
f"Now awaiting farm update response from {recipient_username} for {FARM_UPDATE_RESPONSE_WINDOW_HOURS} hours (chat_id: {chat_id}).")
else:
with _scheduled_messages_lock:
for msg in _scheduled_messages:
if msg["id"] == scheduled_entry["id"]:
msg["status"] = "failed"
msg["failed_time"] = current_time
break
save_scheduled_messages()
logger.error(
f"Failed to send scheduled message to {recipient_username} ({chat_id}). Marking as 'failed'. Message: '{message[:50]}...'.")
except Exception as e:
logger.error(f"Critical error in send_scheduled_message for ID '{scheduled_entry.get('id', 'unknown')}': {e}")
traceback.print_exc()
def generate_and_store_farm_tasks(user_message: str, chat_id: str, username: str) -> str:
"""
Generates 3-4 specific, short, descriptive daily tasks with a reason (<50 words)
based on farmer's update AND farmer's profile (RAG), and stores each task separately for admin review.
Does NOT send any message directly to the farmer.
"""
if not model:
logger.error(f"Gemini model not available for task generation for {username}.")
return ""
try:
farmer_profile = get_farmer_profile(chat_id)
profile_context = ""
if farmer_profile and farmer_profile.get("farm_details"):
details = farmer_profile["farm_details"]
profile_context = (
f"\n\nContext about {username}'s farm (use this for tailored advice):\n"
f"- Location: {details.get('location', 'N/A')}\n"
f"- Main Crops: {', '.join(details.get('main_crops', []))}\n"
f"- Soil Type: {details.get('soil_type', 'N/A')}\n"
f"- Typical Issues: {', '.join(details.get('typical_issues', []))}\n"
f"- Last reported weather: {details.get('last_reported_weather', 'N/A')}\n"
f"- Irrigation System: {details.get('irrigation_system', 'N/A')}\n"
)
else:
profile_context = "\n\n(No detailed farmer profile data found. Generating tasks based solely on the update.)"
prompt = (
f"You are a highly experienced and practical agricultural expert. Based on the farmer's update below "
f"and their farm context (if provided), generate exactly 3 to 4 critical daily tasks. "
f"Each task must be concise, actionable, and include a 'Reason:' why it's important. "
f"Ensure each 'Reason:' is strictly under 50 words. Prioritize immediate needs and common farming practices."
f"\n\nStrictly follow this output format for each task:\n"
f"Task: [Short, descriptive task, e.g., 'Check soil moisture in cornfield']\n"
f"Reason: [Brief explanation why (under 50 words), e.g., 'Ensure optimal water levels after dry spell.']\n"
f"---\n"
f"Task: [Another short, descriptive task]\n"
f"Reason: [Brief explanation why (under 50 words)]\n"
f"---\n"
f"(Repeat 3 or 4 times. Ensure each task block is clearly separated by '---' on its own line.)"
f"\n\nFarmer: {username}\n"
f"Farmer's Update: '{user_message}'"
f"{profile_context}"
f"\n\nNow, generate the 3-4 tasks and reasons in the specified format:"
)
logger.info(
f"Calling Gemini for RAG task generation for {username} (chat_id: {chat_id}) based on update: '{user_message[:100]}...'.")
response = model.generate_content(prompt)
ai_raw_output = response.text if response and response.text else "Could not generate tasks based on the update."
logger.debug(f"Raw AI output for tasks: {ai_raw_output[:500]}...")
# Parse the AI's output into individual tasks
parsed_tasks = []
# Regex to find blocks starting with "Task:" and containing "Reason:", separated by "---"
# Adjusted regex for better robustness to handle various line endings and trailing whitespace.
task_blocks = re.findall(r"Task:\s*(.*?)\nReason:\s*(.*?)(?:\n---\s*|$)", ai_raw_output,
re.DOTALL | re.IGNORECASE)
for task_text, reason_text in task_blocks:
parsed_tasks.append({
"task_message": task_text.strip(),
"task_reason": reason_text.strip()
})
if not parsed_tasks:
logger.warning(
f"AI did not generate structured tasks for {username} in expected format. Providing fallback task(s). Raw output: '{ai_raw_output[:200]}'.")
# Fallback if parsing fails or AI doesn't follow format. Always ensure at least one review item.
fallback_message = "AI did not generate specific tasks in the expected format. Manual review of farmer's update is required."
if len(ai_raw_output) > 50: # If AI gave some output, add it as context to the fallback
fallback_message += f" AI's raw response: {sanitize_text(ai_raw_output, max_length=150)}."
parsed_tasks.append({
"task_message": "Manual Review Required",
"task_reason": sanitize_text(fallback_message, max_length=200)
})
# Add a separate entry for the full raw response if it's long and tasks couldn't be parsed
if len(ai_raw_output) > 200 and len(
parsed_tasks) == 1: # Only add if first fallback was due to parsing failure
parsed_tasks.append({
"task_message": "Full Raw AI Response (for admin review)",
"task_reason": sanitize_text(ai_raw_output, max_length=400)
})
generation_time = datetime.now(TIMEZONE)
# Store each individual parsed task for separate admin scheduling
with _suggested_tasks_lock:
for i, task_item in enumerate(parsed_tasks):
# Create a unique ID for each individual task item
task_id = f"suggested_task_{chat_id}_{int(generation_time.timestamp())}_{i}_{hash(task_item['task_message']) % 10000}"
task_entry = {
"id": task_id,
"chat_id": str(chat_id),
"username": username,
"original_update": user_message, # Keep original update for context in UI
"suggested_task_message": sanitize_text(task_item["task_message"]),
"suggested_task_reason": sanitize_text(task_item["task_reason"]),
"generation_time": generation_time,
"status": "pending_review"
}
_suggested_tasks.append(task_entry)
save_suggested_tasks()
logger.info(
f"Generated and stored {len(parsed_tasks)} individual tasks for {username} (chat_id: {chat_id}) for admin review.")
return "" # No immediate response to farmer
except Exception as e:
logger.error(f"Error during farm task generation/storage for {username} (chat_id: {chat_id}): {e}")
traceback.print_exc()
return "" # No immediate response to farmer on AI error
def get_ai_response_with_context(user_message: str, chat_id: str) -> str:
"""Gets a general AI response for a farmer's query, considering general farming context."""
if not model:
logger.error("Gemini model not available for general query, returning fallback message.")
return "AI service is currently unavailable. Please try again later."
try:
# Include general farming context in the prompt
prompt = (
f"You are a helpful and knowledgeable agricultural assistant. Provide a concise and direct answer "
f"to the following query from a farmer. Keep the response factual and actionable where possible. "
f"Limit the response to a few sentences (max 3-4 sentences)."
f"\n\nFarmer's Query: {user_message}"
)
logger.info(f"Generating general AI response for {chat_id} (query: '{user_message[:50]}...')...")
response = model.generate_content(prompt)
ai_reply = response.text if response and response.text else "I couldn't process your request right now."
return sanitize_text(ai_reply)
except Exception as e:
logger.error(f"AI response error for chat_id {chat_id} (query: '{user_message[:50]}...'): {e}")
traceback.print_exc()
return "I'm having trouble processing your request right now. Please try again later."
def store_general_ai_response(chat_id: str, username: str, original_query: str, ai_response: str):
"""Stores a general AI response (generated from farmer's general query) for admin review."""
if not ai_response:
logger.warning(f"Attempted to store empty general AI response for {username} ({chat_id}). Skipping.")
return
response_entry = {
"id": f"general_ai_{chat_id}_{int(datetime.now().timestamp())}_{hash(original_query) % 10000}",
"chat_id": str(chat_id),
"username": username,
"original_query": sanitize_text(original_query),
"ai_response": sanitize_text(ai_response),
"generation_time": datetime.now(TIMEZONE),
"status": "pending_review"
}
with _general_ai_responses_lock:
_general_ai_responses.append(response_entry)
save_general_ai_responses()
logger.info(f"Stored general AI response for {username} ({chat_id}) (ID: {response_entry['id']}).")
# ---------------- Polling logic ----------------
def delete_webhook(drop_pending: bool = True) -> Optional[Dict]:
"""Deletes the Telegram webhook to switch to long polling."""
try:
params = {"drop_pending_updates": "true" if drop_pending else "false"}
r = requests.post(f"{TELEGRAM_API_URL}/deleteWebhook", params=params, timeout=10)
r.raise_for_status()
result = r.json()
if result.get("ok"):
logger.info("Telegram webhook deleted successfully (ensuring long polling).")
return result
except requests.exceptions.RequestException as e:
logger.error(f"Failed to delete Telegram webhook: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error deleting Telegram webhook: {e}")
return None
def get_updates(offset: Optional[int] = None, timeout_seconds: int = 60) -> Optional[Dict]:
"""Fetches updates from Telegram using long polling."""
params = {"timeout": timeout_seconds}
if offset:
params["offset"] = offset
try:
r = requests.get(f"{TELEGRAM_API_URL}/getUpdates", params=params, timeout=timeout_seconds + 10)
r.raise_for_status()
return r.json()
except requests.exceptions.Timeout:
logger.debug("Telegram polling timed out (normal, no new messages).")
return {"ok": True, "result": []} # Return empty result on timeout to continue polling
except requests.exceptions.RequestException as e:
logger.error(f"Network error getting updates from Telegram: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error getting updates from Telegram: {e}")
return None
def polling_loop():
"""Main loop for long-polling Telegram for new messages."""
logger.info("Starting Telegram polling thread.")
delete_webhook(drop_pending=True) # Ensure webhook is off for polling
offset = None
consecutive_errors = 0
while True:
try:
updates = get_updates(offset=offset, timeout_seconds=60)
if updates is None: # Critical network error, get_updates returned None
consecutive_errors += 1
logger.error(
f"get_updates returned None due to error. Consecutive errors: {consecutive_errors}. Sleeping for {min(consecutive_errors * 5, 120)}s.")
time.sleep(min(consecutive_errors * 5, 120)) # Exponential backoff up to 2 min
continue
if not updates.get("ok"):
logger.warning(
f"Telegram API returned 'not ok' for getUpdates: {updates.get('description', 'No description')}. Consecutive errors: {consecutive_errors}.")
consecutive_errors += 1
time.sleep(min(consecutive_errors * 2, 60)) # Exponential backoff up to 1 min
continue
consecutive_errors = 0 # Reset error counter on successful API call
results = updates.get("result", [])
for update in results:
try:
process_update(update)
offset = update.get("update_id", 0) + 1
except Exception as e:
logger.error(f"Error processing individual update ID {update.get('update_id', 'unknown')}: {e}")
traceback.print_exc()
# Small delay to prevent busy-waiting if Telegram returns many empty results quickly
if not results:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Telegram polling stopped by user (KeyboardInterrupt).")
break
except Exception as e:
consecutive_errors += 1
logger.critical(f"Unhandled critical error in polling loop: {e}")
time.sleep(min(consecutive_errors * 10, 300)) # Longer exponential backoff for unhandled errors
def process_update(update: Dict):
"""Processes a single Telegram update message."""
msg = update.get("message") or update.get("edited_message")
if not msg:
logger.debug(f"Update ID {update.get('update_id')}: No message or edited_message found. Skipping.")
return
chat = msg.get("chat", {})
chat_id = str(chat.get("id"))
username = chat.get("first_name") or chat.get("username") or "User"
text = msg.get("text") or msg.get("caption") or ""
if not text:
# If no text (e.g., photo, sticker), send a polite message and append to log
acknowledgment_text = "I can only respond to text messages. Please send your queries as text."
send_message(chat_id, acknowledgment_text)
append_message("user", username, chat_id, "(Non-text message received)")
append_message("bot", "Bot", chat_id, acknowledgment_text)
logger.debug(f"Received non-text message from {username} ({chat_id}). Sent acknowledgment.")
return
append_message("user", username, chat_id, text)
logger.info(f"Processing message from {username} ({chat_id}): '{text[:50]}...'.")
bot_acknowledgment_to_farmer = "" # This is the only message sent directly to the farmer
# Check if we are awaiting a farm update response from this chat_id
most_recent_request_time = _awaiting_update_response.get(chat_id)
if (most_recent_request_time and
(datetime.now(TIMEZONE) - most_recent_request_time) <= timedelta(hours=FARM_UPDATE_RESPONSE_WINDOW_HOURS)):
logger.info(f"Recognized as farm update response from {username}. Generating tasks for admin review.")
generate_and_store_farm_tasks(text, chat_id, username) # Generates and stores tasks
# Send a generic acknowledgment to the farmer
bot_acknowledgment_to_farmer = "Thank you for your farm update! The admin will review it, generate daily tasks, and schedule them to be sent to you shortly."
# Remove from tracking after processing this update
_awaiting_update_response.pop(chat_id, None)
logger.info(f"Finished processing farm update for {username} ({chat_id}). Awaiting response tracker cleared.")
else:
# This is a general query. Generate AI response but store it for admin scheduling.
logger.info(f"Processing general query from {username}. Generating AI response for admin scheduling.")
ai_generated_response = get_ai_response_with_context(text, chat_id)
# Only store and acknowledge if AI generated a meaningful response
if ai_generated_response and ai_generated_response != "AI service is currently unavailable. Please try again later.":
store_general_ai_response(chat_id, username, text, ai_generated_response)
bot_acknowledgment_to_farmer = f"I've received your query about '{text[:50]}...'. The admin will review the AI's suggested response and schedule it to be sent to you."
else: # AI service unavailable or response generation failed
bot_acknowledgment_to_farmer = "I've received your message. The admin will get back to you shortly."
logger.warning(
f"AI failed to generate a response for general query from {username} ({chat_id}). Admin will manually review.")
if bot_acknowledgment_to_farmer:
append_message("bot", "Bot", chat_id, bot_acknowledgment_to_farmer)
send_message(chat_id, bot_acknowledgment_to_farmer)
else:
logger.debug(f"No direct acknowledgment message generated for {username} ({chat_id}) after processing.")
# ---------------- Flask routes & UI ----------------
# New Dashboard HTML template with 50/50 split and tabs
HTML_TEMPLATE = """
SmartFarm Dashboard
đą
SmartFarm Dashboard
Connecting...
đ0 Messages
â°
đŦ Farm Communications
0 Messages
đŦ
No messages yet
Start a conversation with your farm bot!
đ Farm Management
Real-time Admin
đ Analytics
đ Tasks
â° Schedule
âī¸ Settings
0
Active Farmers
0
Pending Tasks
0
Scheduled Messages
0
AI Responses
đž Farm Bot Status
Operational
Monitor your farm bot operations. Track farmer communications, manage AI-generated tasks, and review general AI responses.
đ¤ AI Generated Daily Tasks
0 Pending
đ
No AI generated tasks yet
đĄ AI General Responses
0 Pending
đ¤
No AI responses pending review
đ Scheduled Messages
0 Scheduled
đ
No messages currently scheduled
â Schedule New Message
âī¸ Dashboard Settings
Settings are saved automatically.
đ Export Data
Download your farm data for backup or analysis:
"""
@app.route("/")
def index():
return render_template_string(HTML_TEMPLATE)
@app.route("/messages")
def messages_api():
with _messages_lock:
return jsonify(list(_messages))
@app.route("/scheduled")
def scheduled_messages_api():
with _scheduled_messages_lock:
return jsonify(list(_scheduled_messages))
@app.route("/suggested_tasks")
def suggested_tasks_api():
with _suggested_tasks_lock:
serializable_tasks = []
for task in _suggested_tasks:
temp_task = task.copy()
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_task.get(key), datetime):
temp_task[key] = temp_task[key].isoformat()
serializable_tasks.append(temp_task)
return jsonify(serializable_tasks)
@app.route("/general_ai_responses")
def general_ai_responses_api():
with _general_ai_responses_lock:
serializable_responses = []
for resp in _general_ai_responses:
temp_resp = resp.copy()
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_resp.get(key), datetime):
temp_resp[key] = temp_resp[key].isoformat()
serializable_responses.append(temp_resp)
return jsonify(serializable_responses)
@app.route("/download")
def download():
try:
# Prioritize existing file if it's there
if os.path.exists(CHAT_LOG_FILE):
return send_file(CHAT_LOG_FILE, as_attachment=True, download_name="chat_log.json")
# Otherwise, create a temporary file from in-memory data
with _messages_lock:
if not _messages: # Handle empty data
logger.warning("No chat log data in memory for download.")
return jsonify({"error": "No chat log data to download."}), 404
tmp_file = "chat_log_tmp.json"
with open(tmp_file, "w", encoding="utf-8") as f:
json.dump(_messages, f, ensure_ascii=False, indent=2)
return send_file(tmp_file, as_attachment=True, download_name="chat_log.json")
except Exception as e:
logger.error(f"Error downloading chat log: {e}")
return jsonify({"error": "Failed to download chat log file."}), 500
@app.route("/download-scheduled")
def download_scheduled():
try:
if os.path.exists(SCHEDULED_MESSAGES_FILE):
return send_file(SCHEDULED_MESSAGES_FILE, as_attachment=True, download_name="scheduled_messages.json")
with _scheduled_messages_lock:
if not _scheduled_messages:
logger.warning("No scheduled messages data in memory for download.")
return jsonify({"error": "No scheduled messages data to download."}), 404
tmp_file = "scheduled_messages_tmp.json"
with open(tmp_file, "w", encoding="utf-8") as f:
json.dump(_scheduled_messages, f, ensure_ascii=False, indent=2)
return send_file(tmp_file, as_attachment=True, download_name="scheduled_messages.json")
except Exception as e:
logger.error(f"Error downloading scheduled messages: {e}")
return jsonify({"error": "Failed to download scheduled messages file."}), 500
@app.route("/download-suggested")
def download_suggested():
try:
if os.path.exists(SUGGESTED_TASKS_FILE):
return send_file(SUGGESTED_TASKS_FILE, as_attachment=True, download_name="suggested_tasks.json")
with _suggested_tasks_lock:
if not _suggested_tasks:
logger.warning("No suggested tasks data in memory for download.")
return jsonify({"error": "No suggested tasks data to download."}), 404
tmp_file = "suggested_tasks_tmp.json"
serializable_tasks = []
for task in _suggested_tasks:
temp_task = task.copy()
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_task.get(key), datetime):
temp_task[key] = temp_task[key].isoformat()
serializable_tasks.append(temp_task)
with open(tmp_file, "w", encoding="utf-8") as f:
json.dump(serializable_tasks, f, ensure_ascii=False, indent=2)
return send_file(tmp_file, as_attachment=True, download_name="suggested_tasks.json")
except Exception as e:
logger.error(f"Error downloading suggested tasks: {e}")
return jsonify({"error": "Failed to download suggested tasks file."}), 500
@app.route("/download-general-ai")
def download_general_ai():
try:
if os.path.exists(GENERAL_AI_RESPONSES_FILE):
return send_file(GENERAL_AI_RESPONSES_FILE, as_attachment=True, download_name="general_ai_responses.json")
with _general_ai_responses_lock:
if not _general_ai_responses:
logger.warning("No general AI responses data in memory for download.")
return jsonify({"error": "No general AI responses data to download."}), 404
tmp_file = "general_ai_responses_tmp.json"
serializable_responses = []
for resp in _general_ai_responses:
temp_resp = resp.copy()
for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]:
if isinstance(temp_resp.get(key), datetime):
temp_resp[key] = temp_resp[key].isoformat()
serializable_responses.append(temp_resp)
with open(tmp_file, "w", encoding="utf-8") as f:
json.dump(serializable_responses, f, ensure_ascii=False, indent=2)
return send_file(tmp_file, as_attachment=True, download_name="general_ai_responses.json")
except Exception as e:
logger.error(f"Error downloading general AI responses: {e}")
return jsonify({"error": "Failed to download general AI responses file."}), 500
@app.route("/clear", methods=["POST"])
def clear():
"""Clears all in-memory data, removes all persistent files, and clears scheduler jobs."""
try:
# Clear in-memory stores
with _messages_lock:
_messages.clear()
with _scheduled_messages_lock:
_scheduled_messages.clear()
with _suggested_tasks_lock:
_suggested_tasks.clear()
with _general_ai_responses_lock:
_general_ai_responses.clear()
with _farmer_data_lock:
_farmer_data.clear() # Clear farmer data in memory
global _awaiting_update_response
_awaiting_update_response = {} # Reset the tracker
# Remove persistent files
for file_path in [CHAT_LOG_FILE, SCHEDULED_MESSAGES_FILE, SUGGESTED_TASKS_FILE, GENERAL_AI_RESPONSES_FILE]:
if os.path.exists(file_path):
try:
os.remove(file_path)
logger.info(f"Removed persistent file: {file_path}")
except Exception as e:
logger.error(f"Failed to remove file {file_path}: {e}. Please check file permissions.")
# Clear all APScheduler jobs
scheduler.remove_all_jobs()
logger.info("All APScheduler jobs cleared.")
logger.info("All application data cleared successfully.")
return ("", 204) # No Content
except Exception as e:
logger.error(f"Error during 'clear all data' operation: {e}")
return jsonify({"error": "Failed to clear data due to an internal server error. Check server logs."}), 500
@app.route("/schedule_from_ui", methods=["POST"])
def schedule_from_ui():
"""Endpoint to schedule a message directly from the UI."""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400
chat_id = data.get("chat_id")
recipient_username = data.get("username", "UI Scheduled User")
message = data.get("message", "").strip()
scheduled_time_str = data.get("scheduled_time")
message_type = data.get("message_type", "reminder")
if not all([chat_id, message, scheduled_time_str]):
return jsonify({
"status": "error",
"message": "Missing required fields: 'chat_id', 'message', and 'scheduled_time' are all required."
}), 400
# Validate message length (Telegram limit)
if len(message) > 4096:
return jsonify({
"status": "error",
"message": f"Message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram."
}), 400
# Parse and validate datetime string
try:
scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M')
scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive)
except ValueError:
return jsonify({
"status": "error",
"message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM (e.g., 2023-10-27T14:30)."
}), 400
# Check if scheduled time is in the future
now = datetime.now(TIMEZONE)
if scheduled_datetime <= now:
return jsonify({
"status": "error",
"message": "Scheduled time must be in the future. Please choose a time later than now."
}), 400
# Optional: Prevent scheduling too far in the future for stability/resource management
if scheduled_datetime > now + timedelta(days=365):
return jsonify({
"status": "error",
"message": "Scheduled time cannot be more than 1 year in the future for stability."
}), 400
result = schedule_message(chat_id, recipient_username, message, scheduled_datetime, message_type)
if result:
logger.info(
f"UI: Message '{message_type}' scheduled successfully for {recipient_username} ({chat_id}) at {scheduled_datetime}.")
return jsonify({"status": "success", "message": "Message scheduled successfully."})
else:
return jsonify({
"status": "error",
"message": "Failed to schedule message due to an internal server error (check server logs)."
}), 500
except Exception as e:
logger.error(f"Error in /schedule_from_ui: {e}")
traceback.print_exc()
return jsonify({
"status": "error",
"message": f"An unexpected server error occurred: {str(e)}"
}), 500
@app.route("/schedule_suggested_from_ui", methods=["POST"])
def schedule_suggested_from_ui():
"""Endpoint for admin to schedule an individual AI-suggested task from the UI."""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400
suggested_task_id = data.get("suggested_task_id")
chat_id = data.get("chat_id")
username = data.get("username")
message = data.get("message", "").strip() # This is the individual task message content
scheduled_time_str = data.get("scheduled_time")
if not all([suggested_task_id, chat_id, username, message, scheduled_time_str]):
return jsonify({
"status": "error",
"message": "Missing required fields for scheduling suggested task."
}), 400
# Validate message length (Telegram limit)
if len(message) > 4096:
return jsonify({
"status": "error",
"message": f"Task message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram."
}), 400
# Parse and validate datetime
try:
scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M')
scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive)
except ValueError:
return jsonify({
"status": "error",
"message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM."
}), 400
# Check if time is in the future
now = datetime.now(TIMEZONE)
if scheduled_datetime <= now:
return jsonify({
"status": "error",
"message": "Scheduled time must be in the future. Please choose a time later than now."
}), 400
# Check if the suggested task exists and is still pending review
task_found_and_pending = False
with _suggested_tasks_lock:
for task in _suggested_tasks:
if task["id"] == suggested_task_id and task["status"] == "pending_review":
task_found_and_pending = True
break
if not task_found_and_pending:
return jsonify({
"status": "error",
"message": "Suggested task not found or already processed (scheduled/discarded)."
}), 404
result = schedule_message(chat_id, username, message, scheduled_datetime, "daily_tasks")
if result:
# Update the status of the suggested task to 'scheduled'
with _suggested_tasks_lock:
for task in _suggested_tasks:
if task["id"] == suggested_task_id:
task["status"] = "scheduled"
task["scheduled_by_admin_time"] = datetime.now(TIMEZONE)
break
save_suggested_tasks()
logger.info(
f"UI: Individual suggested task ID '{suggested_task_id}' scheduled for {username} ({chat_id}) at {scheduled_datetime}.")
return jsonify({"status": "success", "message": "Suggested task scheduled successfully."})
else:
return jsonify({
"status": "error",
"message": "Failed to schedule suggested task due to an internal server error (check server logs)."
}), 500
except Exception as e:
logger.error(f"Error in /schedule_suggested_from_ui: {e}")
traceback.print_exc()
return jsonify({
"status": "error",
"message": f"An unexpected server error occurred: {str(e)}"
}), 500
@app.route("/discard_suggested_task", methods=["POST"])
def discard_suggested_task():
"""Endpoint for admin to discard an individual AI-suggested task from the UI."""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400
suggested_task_id = data.get("suggested_task_id")
if not suggested_task_id:
return jsonify({"status": "error", "message": "Missing 'suggested_task_id' in request."}), 400
found_and_pending = False
with _suggested_tasks_lock:
for task in _suggested_tasks:
if task["id"] == suggested_task_id and task["status"] == "pending_review":
task["status"] = "discarded"
task["discarded_by_admin_time"] = datetime.now(TIMEZONE)
found_and_pending = True
break
if found_and_pending:
save_suggested_tasks()
logger.info(f"UI: Suggested task ID '{suggested_task_id}' discarded.")
return jsonify({"status": "success", "message": "Suggested task discarded successfully."})
else:
return jsonify({
"status": "error",
"message": "Suggested task not found or already processed (scheduled/discarded)."
}), 404
except Exception as e:
logger.error(f"Error in /discard_suggested_task: {e}")
traceback.print_exc()
return jsonify({
"status": "error",
"message": f"An unexpected server error occurred: {str(e)}"
}), 500
@app.route("/schedule_general_ai_response", methods=["POST"])
def schedule_general_ai_response():
"""Endpoint for admin to schedule a general AI response from the UI."""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400
response_id = data.get("response_id")
chat_id = data.get("chat_id")
username = data.get("username")
message = data.get("message", "").strip()
scheduled_time_str = data.get("scheduled_time")
if not all([response_id, chat_id, username, message, scheduled_time_str]):
return jsonify({"status": "error", "message": "Missing required fields for scheduling AI response."}), 400
# Validate message length (Telegram limit)
if len(message) > 4096:
return jsonify({
"status": "error",
"message": f"AI response message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram."
}), 400
# Parse and validate datetime
try:
scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M')
scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive)
except ValueError:
return jsonify({"status": "error", "message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM."}), 400
now = datetime.now(TIMEZONE)
if scheduled_datetime <= now:
return jsonify({"status": "error", "message": "Scheduled time must be in the future."}), 400
# Check if the AI response entry exists and is still pending review
response_found = False
with _general_ai_responses_lock:
for resp in _general_ai_responses:
if resp["id"] == response_id and resp["status"] == "pending_review":
response_found = True
break
if not response_found:
return jsonify({"status": "error", "message": "AI response not found or already processed."}), 404
result = schedule_message(chat_id, username, message, scheduled_datetime, "general_ai_response")
if result:
with _general_ai_responses_lock:
for resp in _general_ai_responses:
if resp["id"] == response_id:
resp["status"] = "scheduled"
resp["scheduled_by_admin_time"] = datetime.now(TIMEZONE)
break
save_general_ai_responses()
logger.info(
f"UI: General AI response ID '{response_id}' scheduled for {username} ({chat_id}) at {scheduled_datetime}.")
return jsonify({"status": "success", "message": "AI response scheduled successfully."})
else:
return jsonify({"status": "error", "message": "Failed to schedule AI response due to internal error."}), 500
except Exception as e:
logger.error(f"Error in /schedule_general_ai_response: {e}")
traceback.print_exc()
return jsonify({
"status": "error",
"message": f"An unexpected server error occurred: {str(e)}"
}), 500
@app.route("/discard_general_ai_response", methods=["POST"])
def discard_general_ai_response():
"""Endpoint for admin to discard a general AI response from the UI."""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400
response_id = data.get("response_id")
if not response_id:
return jsonify({"status": "error", "message": "Missing 'response_id' in request."}), 400
found = False
with _general_ai_responses_lock:
for resp in _general_ai_responses:
if resp["id"] == response_id and resp["status"] == "pending_review":
resp["status"] = "discarded"
resp["discarded_by_admin_time"] = datetime.now(TIMEZONE)
found = True
break
if found:
save_general_ai_responses()
logger.info(f"UI: General AI response ID '{response_id}' discarded.")
return jsonify({"status": "success", "message": "AI response discarded successfully."})
else:
return jsonify({"status": "error", "message": "AI response not found or already processed."}), 404
except Exception as e:
logger.error(f"Error in /discard_general_ai_response: {e}")
traceback.print_exc()
return jsonify({
"status": "error",
"message": f"An unexpected server error occurred: {str(e)}"
}), 500
@app.route("/health")
def health():
"""Provides a basic health check endpoint for the application."""
return jsonify({
"status": "healthy",
"timestamp": datetime.now(TIMEZONE).isoformat(),
"scheduler_running": scheduler.running,
"gemini_available": model is not None,
"farmer_data_loaded_count": len(_farmer_data),
"messages_in_log": len(_messages),
"scheduled_messages_count": len(_scheduled_messages),
"suggested_tasks_count": len(_suggested_tasks),
"general_ai_responses_count": len(_general_ai_responses)
})
# ---------------- Startup ----------------
def start_polling_thread():
"""Starts a separate thread for Telegram long polling."""
t = threading.Thread(target=polling_loop, daemon=True, name="TelegramPolling")
t.start()
logger.info("Telegram polling thread initiated.")
return t
if __name__ == "__main__":
# Perform initial configuration validation
if not BOT_TOKEN :
logger.critical(
"â BOT_TOKEN is missing or is the default placeholder. Please update it in config or environment variables.")
exit(1) # Exit if essential config is missing
if not GEMINI_API_KEY or GEMINI_API_KEY == "AIzaSyAfF-13nsrMdAAe3SFOPSxFya4EtfLBjho":
logger.warning(
"â ī¸ GEMINI_API_KEY is missing or is the default placeholder. AI features may not function correctly without it.")
# Do not exit, as other parts of the app might still be useful
logger.info("Starting Farm Bot with admin-controlled scheduling workflow...")
# Load all persistent data into memory
load_messages_from_file()
load_scheduled_messages()
load_suggested_tasks()
load_general_ai_responses()
load_farmer_data() # Load farmer-specific RAG data
# --- HARDCODED SCHEDULED FARM UPDATE REQUEST AT STARTUP ---
# This ensures the first "ask for updates" message is automatically scheduled for a farmer.
# IMPORTANT: Replace these values with actual farmer information (or set via environment variables)
# To get a farmer's chat ID: Have them send any message to your bot. Then check http://127.0.0.1:5000/messages
TARGET_FARMER_CHAT_ID = os.environ.get("TARGET_FARMER_CHAT_ID", "YOUR_TELEGRAM_CHAT_ID")
TARGET_FARMER_USERNAME = os.environ.get("TARGET_FARMER_USERNAME", "YOUR_TELEGRAM_USERNAME")
FARM_UPDATE_REQUEST_MESSAGE = "What's the update regarding your farm? Please share details about your crops, any issues you're facing, and current farming activities."
FARM_UPDATE_REQUEST_TYPE = "farm_update_request"
# Schedule for 5 minutes from startup (for testing) - adjust for production e.g. daily at 8 AM
scheduled_farm_update_time = datetime.now(TIMEZONE) + timedelta(minutes=5)
if TARGET_FARMER_CHAT_ID in ["YOUR_TELEGRAM_CHAT_ID", "", None] or TARGET_FARMER_USERNAME in [
"YOUR_TELEGRAM_USERNAME", "", None]:
logger.warning(
"â ī¸ Initial farm update request not scheduled: TARGET_FARMER_CHAT_ID or TARGET_FARMER_USERNAME not properly configured in script or environment variables.")
logger.info(" -> To enable this feature, please update the configuration or manually schedule via the UI.")
else:
# Check if a similar farm update request has already been recently scheduled/sent
update_request_already_handled = False
with _scheduled_messages_lock:
for msg in _scheduled_messages:
if (str(msg["chat_id"]) == TARGET_FARMER_CHAT_ID and
msg["type"] == FARM_UPDATE_REQUEST_TYPE and
msg["status"] in ["pending", "sent"]):
# Consider handled if scheduled or sent within the last 24 hours
msg_time_str = msg.get("sent_time", msg["scheduled_time"])
try:
msg_dt = datetime.fromisoformat(msg_time_str.replace('Z', '+00:00')) # Handle Z timezone
if msg_dt.tzinfo is None:
msg_dt = TIMEZONE.localize(msg_dt)
else:
msg_dt = msg_dt.astimezone(TIMEZONE)
if msg_dt > (datetime.now(TIMEZONE) - timedelta(hours=24)):
update_request_already_handled = True
logger.info(
f"âšī¸ Farm update request for {TARGET_FARMER_USERNAME} ({TARGET_FARMER_CHAT_ID}) already exists or was recently sent. Not re-scheduling.")
break
except Exception as e:
logger.warning(
f"Error parsing datetime for existing scheduled message ID '{msg.get('id', 'unknown')}': {e}. This might affect duplicate detection for hardcoded message.")
if not update_request_already_handled:
if scheduled_farm_update_time > datetime.now(TIMEZONE):
result = schedule_message(
TARGET_FARMER_CHAT_ID,
TARGET_FARMER_USERNAME,
FARM_UPDATE_REQUEST_MESSAGE,
scheduled_farm_update_time,
FARM_UPDATE_REQUEST_TYPE
)
if result:
logger.info(
f"â Initial farm update request scheduled for {TARGET_FARMER_USERNAME} ({TARGET_FARMER_CHAT_ID}) at {scheduled_farm_update_time.strftime('%Y-%m-%d %I:%M %p %Z')}.")
else:
logger.error("â Failed to schedule initial farm update request due to an internal error.")
else:
logger.warning(
f"â ī¸ Initial farm update request not scheduled: Scheduled time ({scheduled_farm_update_time.strftime('%Y-%m-%d %I:%M %p %Z')}) is in the past. Please adjust `scheduled_farm_update_time` for future execution.")
# --- END HARDCODED SCHEDULED FARM UPDATE REQUEST ---
# Start the Telegram polling thread (daemon so it exits with main app)
start_polling_thread()
logger.info("\n-----------------------------------------------------")
logger.info("Farm Bot Application Ready!")
logger.info("đą Instruct your farmers to start a chat with your Telegram bot.")
logger.info("đ Access the Admin Panel to manage tasks and responses: http://127.0.0.1:5000")
logger.info("đ Check system health: http://127.0.0.1:5000/health")
logger.info("-----------------------------------------------------\n")
try:
# Run the Flask app
app.run(host="0.0.0.0", port=5000, debug=False, threaded=True)
except KeyboardInterrupt:
logger.info("Application received KeyboardInterrupt. Shutting down gracefully...")
except Exception as e:
logger.critical(f"Unhandled exception during Flask application runtime: {e}")
traceback.print_exc()
finally:
# Ensure scheduler is shut down when the app stops
if scheduler.running:
scheduler.shutdown(wait=False)
logger.info("APScheduler shut down.")
logger.info("Application process finished.")