import requests import time import json import traceback import threading from datetime import datetime, timedelta from flask import Flask, jsonify, render_template_string, send_file, request import google.generativeai as genai import os from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.date import DateTrigger import pytz import logging import re from typing import Optional, Dict, List, Any # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # ---------------- CONFIG ---------------- # !!! IMPORTANT: Replace these with your actual tokens or use environment variables !!! BOT_TOKEN = "8328709566:AAH7ZmdWuzGODlTByJ34yI5yR9e8otBokBc" # <<< REPLACE THIS with your Telegram Bot Token GEMINI_API_KEY = "AIzaSyAfF-13nsrMdAAe3SFOPSxFya4EtfLBjho" # <<< REPLACE THIS with your Google Gemini API Key # !!! ----------------------------------------------------------------------------- !!! TELEGRAM_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}" CHAT_LOG_FILE = "chat_log.json" SCHEDULED_MESSAGES_FILE = "scheduled_messages.json" SUGGESTED_TASKS_FILE = "suggested_tasks.json" GENERAL_AI_RESPONSES_FILE = "general_ai_responses.json" FARMERS_DATA_FILE = "farmers.json" # File for farmer-specific RAG data MAX_MESSAGES = 2000 TIMEZONE = pytz.timezone('Asia/Kolkata') # Adjust to your local timezone (e.g., 'America/New_York', 'Europe/London') FARM_UPDATE_RESPONSE_WINDOW_HOURS = 48 # Window for a farmer to respond to an update request # Hardcoded messages for the UI dropdown HARDCODED_MESSAGES_FOR_UI =[ { "value": "What's the update regarding your farm? Please share details about your crops, any issues you're facing, and current farming activities.", "text": "Ask for Farm Update"}, {"value": "How are your crops doing today?", "text": "Crops Check-in"}, {"value": "Reminder: Check irrigation system.", "text": "Irrigation Reminder"}, {"value": "Don't forget to monitor pest activity.", "text": "Pest Monitor Reminder"}, {"value": "Daily task: Fertilize Section A.", "text": "Fertilize Section A"}, {"value": "Weather looks good for harvesting. Consider starting tomorrow.", "text": "Harvest Weather Alert"}, {"value": "Time to check soil moisture levels.", "text": "Soil Moisture Check"}, {"value": "", "text": "Custom Message (Type Below)"} ] # Rate limiting for Telegram API calls REQUEST_RATE_LIMIT = 30 # requests per minute rate_limit_tracker = {"count": 0, "reset_time": time.time() + 60} # ---------------------------------------- # Flask app app = Flask(__name__) # Configure Gemini with error handling try: genai.configure(api_key=GEMINI_API_KEY) model = genai.GenerativeModel("gemini-2.0-flash-exp") logger.info("Gemini AI configured successfully.") except Exception as e: logger.error(f"Failed to configure Gemini AI. AI features will be unavailable: {e}") model = None # Set model to None if configuration fails # Initialize scheduler scheduler = BackgroundScheduler(timezone=TIMEZONE) scheduler.start() # Thread-safe stores _messages_lock = threading.Lock() _messages: List[Dict[str, Any]] = [] _scheduled_messages_lock = threading.Lock() _scheduled_messages: List[Dict[str, Any]] = [] _suggested_tasks_lock = threading.Lock() _suggested_tasks: List[Dict[str, Any]] = [] # Each entry is now a single task with message and reason _general_ai_responses_lock = threading.Lock() _general_ai_responses: List[Dict[str, Any]] = [] _farmer_data_lock = threading.Lock() _farmer_data: List[Dict[str, Any]] = [] # State for tracking if we're awaiting a farm update response from a specific chat_id _awaiting_update_response: Dict[str, datetime] = {} def check_rate_limit() -> bool: """Checks if Telegram API request rate limit has been exceeded.""" current_time = time.time() with _messages_lock: # Use a lock for shared rate_limit_tracker access if current_time > rate_limit_tracker["reset_time"]: rate_limit_tracker["count"] = 0 rate_limit_tracker["reset_time"] = current_time + 60 if rate_limit_tracker["count"] >= REQUEST_RATE_LIMIT: return False rate_limit_tracker["count"] += 1 return True def sanitize_text(text: str, max_length: int = 4096) -> str: """Sanitizes text for Telegram message limits and removes invalid characters.""" if not text: return "" # Remove null characters which can cause issues with some APIs/databases sanitized = text.replace('\x00', '').strip() # Telegram message limit for sendMessage is 4096 characters (HTML mode can be slightly less due to tags) if len(sanitized) > max_length: sanitized = sanitized[:max_length - 3] + "..." return sanitized def save_messages_to_file(): """Saves chat messages to a JSON file.""" try: with _messages_lock: with open(CHAT_LOG_FILE, "w", encoding="utf-8") as f: json.dump(_messages, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_messages)} messages to {CHAT_LOG_FILE}.") except Exception as e: logger.error(f"Failed to save messages to {CHAT_LOG_FILE}: {e}") def save_scheduled_messages(): """Saves scheduled messages to a JSON file.""" try: with _scheduled_messages_lock: with open(SCHEDULED_MESSAGES_FILE, "w", encoding="utf-8") as f: json.dump(_scheduled_messages, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_scheduled_messages)} scheduled messages to {SCHEDULED_MESSAGES_FILE}.") except Exception as e: logger.error(f"Failed to save scheduled messages to {SCHEDULED_MESSAGES_FILE}: {e}") def save_suggested_tasks(): """Saves AI-suggested tasks to a JSON file.""" try: with _suggested_tasks_lock: serializable_tasks = [] for task in _suggested_tasks: temp_task = task.copy() # Convert datetime objects to ISO strings for JSON serialization for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_task.get(key), datetime): temp_task[key] = temp_task[key].isoformat() serializable_tasks.append(temp_task) with open(SUGGESTED_TASKS_FILE, "w", encoding="utf-8") as f: json.dump(serializable_tasks, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_suggested_tasks)} suggested tasks to {SUGGESTED_TASKS_FILE}.") except Exception as e: logger.error(f"Failed to save suggested tasks to {SUGGESTED_TASKS_FILE}: {e}") def save_general_ai_responses(): """Saves general AI responses (pending admin review) to a JSON file.""" try: with _general_ai_responses_lock: serializable_responses = [] for resp in _general_ai_responses: temp_resp = resp.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_resp.get(key), datetime): temp_resp[key] = temp_resp[key].isoformat() serializable_responses.append(temp_resp) with open(GENERAL_AI_RESPONSES_FILE, "w", encoding="utf-8") as f: json.dump(serializable_responses, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_general_ai_responses)} general AI responses to {GENERAL_AI_RESPONSES_FILE}.") except Exception as e: logger.error(f"Failed to save general AI responses to {GENERAL_AI_RESPONSES_FILE}: {e}") def load_messages_from_file(): """Loads chat messages from a JSON file.""" if not os.path.exists(CHAT_LOG_FILE): logger.info(f"{CHAT_LOG_FILE} not found, starting with empty messages.") return try: with open(CHAT_LOG_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _messages_lock: _messages.clear() _messages.extend(data[-MAX_MESSAGES:]) # Load only the latest MAX_MESSAGES logger.info(f"Loaded {len(_messages)} messages from {CHAT_LOG_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {CHAT_LOG_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load messages from {CHAT_LOG_FILE}: {e}") def load_scheduled_messages(): """Loads scheduled messages and re-schedules pending ones.""" if not os.path.exists(SCHEDULED_MESSAGES_FILE): logger.info(f"{SCHEDULED_MESSAGES_FILE} not found, starting with empty scheduled messages.") return try: with open(SCHEDULED_MESSAGES_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _scheduled_messages_lock: _scheduled_messages.clear() _scheduled_messages.extend(data) rescheduled_count = 0 for entry in _scheduled_messages: try: # Ensure scheduled_time is timezone-aware scheduled_time = datetime.fromisoformat(entry["scheduled_time"]) if scheduled_time.tzinfo is None: scheduled_time = TIMEZONE.localize(scheduled_time) # Localize if naive # Only re-schedule if pending and in the future if entry["status"] == "pending" and scheduled_time > datetime.now(TIMEZONE): scheduler.add_job( func=send_scheduled_message, trigger=DateTrigger(run_date=scheduled_time), args=[entry], id=entry["id"], replace_existing=True # Important to prevent duplicate jobs on restart ) rescheduled_count += 1 # If it was a sent farm update request, track it for response window management if entry["type"] == "farm_update_request" and entry["status"] == "sent": sent_time_str = entry.get("sent_time", entry["scheduled_time"]) # Fallback to scheduled if sent_time missing sent_or_scheduled_time = datetime.fromisoformat(sent_time_str) if sent_or_scheduled_time.tzinfo is None: sent_or_scheduled_time = TIMEZONE.localize(sent_or_scheduled_time) _awaiting_update_response[str(entry["chat_id"])] = sent_or_scheduled_time except KeyError as ke: logger.error(f"Missing key in scheduled message entry ID '{entry.get('id', 'unknown')}': {ke}") except ValueError as ve: # fromisoformat error logger.error( f"Invalid datetime format in scheduled message entry ID '{entry.get('id', 'unknown')}': {ve}") except Exception as e: logger.error(f"Failed to reschedule entry ID '{entry.get('id', 'unknown')}': {e}") logger.info( f"Loaded {len(_scheduled_messages)} scheduled messages, rescheduled {rescheduled_count} pending jobs.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {SCHEDULED_MESSAGES_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load scheduled messages from {SCHEDULED_MESSAGES_FILE}: {e}") def load_suggested_tasks(): """Loads AI-suggested tasks from a JSON file.""" if not os.path.exists(SUGGESTED_TASKS_FILE): logger.info(f"{SUGGESTED_TASKS_FILE} not found, starting with empty suggested tasks.") return try: with open(SUGGESTED_TASKS_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _suggested_tasks_lock: _suggested_tasks.clear() for task in data: # Convert ISO strings back to datetime objects for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if key in task and task[key] is not None: try: task[key] = datetime.fromisoformat(task[key]) except (ValueError, TypeError): logger.warning( f"Invalid datetime format for key '{key}' in task ID '{task.get('id')}': {task[key]}. Setting to None.") task[key] = None _suggested_tasks.append(task) logger.info(f"Loaded {len(_suggested_tasks)} suggested tasks from {SUGGESTED_TASKS_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {SUGGESTED_TASKS_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load suggested tasks from {SUGGESTED_TASKS_FILE}: {e}") def load_general_ai_responses(): """Loads general AI responses (pending admin review) from a JSON file.""" if not os.path.exists(GENERAL_AI_RESPONSES_FILE): logger.info(f"{GENERAL_AI_RESPONSES_FILE} not found, starting with empty general AI responses.") return try: with open(GENERAL_AI_RESPONSES_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _general_ai_responses_lock: _general_ai_responses.clear() for resp in data: for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if key in resp and resp[key] is not None: try: resp[key] = datetime.fromisoformat(resp[key]) except (ValueError, TypeError): logger.warning( f"Invalid datetime format for key '{key}' in general AI response ID '{resp.get('id')}': {resp[key]}. Setting to None.") resp[key] = None _general_ai_responses.append(resp) logger.info(f"Loaded {len(_general_ai_responses)} general AI responses from {GENERAL_AI_RESPONSES_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {GENERAL_AI_RESPONSES_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load general AI responses from {GENERAL_AI_RESPONSES_FILE}: {e}") def load_farmer_data(): """Loads farmer-specific data from farmers.json for RAG.""" if not os.path.exists(FARMERS_DATA_FILE): logger.warning( f"{FARMERS_DATA_FILE} not found. RAG functionality for tasks will be limited or unavailable. Please create it if needed.") with _farmer_data_lock: _farmer_data.clear() return try: with open(FARMERS_DATA_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _farmer_data_lock: _farmer_data.clear() _farmer_data.extend(data) logger.info(f"Loaded {len(_farmer_data)} farmer profiles from {FARMERS_DATA_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {FARMERS_DATA_FILE} (JSON error): {e}. Ensure it's valid JSON. RAG functionality will be limited.") except Exception as e: logger.error(f"Failed to load farmer data from {FARMERS_DATA_FILE}: {e}") def get_farmer_profile(chat_id: str) -> Optional[Dict]: """Retrieves a farmer's profile by chat_id from loaded data.""" with _farmer_data_lock: for farmer in _farmer_data: if str(farmer.get("chat_id")) == str(chat_id): return farmer return None def append_message(role: str, username: str, chat_id: str, text: str): """Appends a message to the in-memory chat log and saves it to file.""" if not text: logger.debug(f"Attempted to append empty message for role '{role}' from {username} ({chat_id}). Skipping.") return text = sanitize_text(text) entry = { "role": role, "username": username, "chat_id": str(chat_id), "text": text, "ts": datetime.utcnow().isoformat() + "Z" } with _messages_lock: _messages.append(entry) if len(_messages) > MAX_MESSAGES: _messages[:] = _messages[-MAX_MESSAGES:] # Keep only the latest messages save_messages_to_file() def send_message(chat_id: str, text: str) -> Optional[Dict]: """Sends a message to Telegram, respecting rate limits.""" if not check_rate_limit(): logger.warning(f"Telegram API rate limit exceeded for chat_id {chat_id}. Message skipped: '{text[:50]}...'.") return None text = sanitize_text(text) if not text: logger.warning(f"Attempted to send empty message to chat_id {chat_id}. Skipping.") return None url = f"{TELEGRAM_API_URL}/sendMessage" payload = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"} # Use HTML for basic formatting try: r = requests.post(url, json=payload, timeout=10) r.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) result = r.json() if not result.get("ok"): logger.error( f"Telegram API reported error sending to {chat_id}: {result.get('description', 'Unknown error')}. Message: '{text[:50]}...'.") return None logger.debug(f"Successfully sent Telegram message to {chat_id}: '{text[:50]}...'.") return result except requests.exceptions.Timeout: logger.error(f"Timeout sending Telegram message to {chat_id}. Message: '{text[:50]}...'.") return None except requests.exceptions.RequestException as e: logger.error(f"Network error sending Telegram message to {chat_id}: {e}. Message: '{text[:50]}...'.") return None except Exception as e: logger.error(f"Unexpected error sending Telegram message to {chat_id}: {e}. Message: '{text[:50]}...'.") return None def schedule_message(chat_id: str, username: str, message_text: str, scheduled_time: datetime, message_type: str = "reminder") -> Optional[Dict]: """Schedules a message to be sent at a future time.""" if not message_text or not chat_id: logger.error("Cannot schedule message: Missing message text or chat_id.") return None # Ensure scheduled_time is in the future if scheduled_time <= datetime.now(TIMEZONE): logger.error( f"Cannot schedule message for {username} ({chat_id}): Scheduled time ({scheduled_time}) is in the past. Message: '{message_text[:50]}...'.") return None # Generate a unique ID for the scheduled entry timestamp = int(scheduled_time.timestamp()) # Use a hash of the first 50 chars of message_text for uniqueness in the ID text_hash = hash(message_text[:50]) % 10000 unique_id = f"{chat_id}_{message_type}_{timestamp}_{text_hash}" scheduled_entry = { "id": unique_id, "chat_id": str(chat_id), "username": username, "message": sanitize_text(message_text), "scheduled_time": scheduled_time.isoformat(), "type": message_type, "status": "pending", "created_at": datetime.now(TIMEZONE).isoformat() } try: with _scheduled_messages_lock: _scheduled_messages.append(scheduled_entry) save_scheduled_messages() scheduler.add_job( func=send_scheduled_message, trigger=DateTrigger(run_date=scheduled_time), args=[scheduled_entry], id=scheduled_entry["id"], replace_existing=True # Important: Prevents duplicate jobs if app restarts and tries to reschedule same ID ) logger.info( f"Scheduled '{message_type}' message for {username} ({chat_id}) (ID: {unique_id}) at {scheduled_time.strftime('%Y-%m-%d %H:%M %Z')}.") return scheduled_entry except Exception as e: logger.error(f"Failed to schedule message for {username} ({chat_id}) (ID: {unique_id}): {e}") return None def send_scheduled_message(scheduled_entry: Dict): """Callback function executed by APScheduler to send a scheduled message.""" try: chat_id = scheduled_entry["chat_id"] message = scheduled_entry["message"] recipient_username = scheduled_entry["username"] logger.info( f"Attempting to send scheduled message '{scheduled_entry['type']}' to {recipient_username} ({chat_id}). Message: '{message[:50]}...'.") result = send_message(chat_id, message) current_time = datetime.now(TIMEZONE).isoformat() if result: append_message("bot", f"Bot (Scheduled for {recipient_username})", chat_id, message) with _scheduled_messages_lock: for msg in _scheduled_messages: if msg["id"] == scheduled_entry["id"]: msg["status"] = "sent" msg["sent_time"] = current_time break save_scheduled_messages() logger.info(f"Successfully sent scheduled message to {recipient_username} ({chat_id}).") # If a farm update request was just sent, start tracking the response window if scheduled_entry["type"] == "farm_update_request": _awaiting_update_response[str(chat_id)] = datetime.now(TIMEZONE) logger.info( f"Now awaiting farm update response from {recipient_username} for {FARM_UPDATE_RESPONSE_WINDOW_HOURS} hours (chat_id: {chat_id}).") else: with _scheduled_messages_lock: for msg in _scheduled_messages: if msg["id"] == scheduled_entry["id"]: msg["status"] = "failed" msg["failed_time"] = current_time break save_scheduled_messages() logger.error( f"Failed to send scheduled message to {recipient_username} ({chat_id}). Marking as 'failed'. Message: '{message[:50]}...'.") except Exception as e: logger.error(f"Critical error in send_scheduled_message for ID '{scheduled_entry.get('id', 'unknown')}': {e}") traceback.print_exc() def generate_and_store_farm_tasks(user_message: str, chat_id: str, username: str) -> str: """ Generates 3-4 specific, short, descriptive daily tasks with a reason (<50 words) based on farmer's update AND farmer's profile (RAG), and stores each task separately for admin review. Does NOT send any message directly to the farmer. """ if not model: logger.error(f"Gemini model not available for task generation for {username}.") return "" try: farmer_profile = get_farmer_profile(chat_id) profile_context = "" if farmer_profile and farmer_profile.get("farm_details"): details = farmer_profile["farm_details"] profile_context = ( f"\n\nContext about {username}'s farm (use this for tailored advice):\n" f"- Location: {details.get('location', 'N/A')}\n" f"- Main Crops: {', '.join(details.get('main_crops', []))}\n" f"- Soil Type: {details.get('soil_type', 'N/A')}\n" f"- Typical Issues: {', '.join(details.get('typical_issues', []))}\n" f"- Last reported weather: {details.get('last_reported_weather', 'N/A')}\n" f"- Irrigation System: {details.get('irrigation_system', 'N/A')}\n" ) else: profile_context = "\n\n(No detailed farmer profile data found. Generating tasks based solely on the update.)" prompt = ( f"You are a highly experienced and practical agricultural expert. Based on the farmer's update below " f"and their farm context (if provided), generate exactly 3 to 4 critical daily tasks. " f"Each task must be concise, actionable, and include a 'Reason:' why it's important. " f"Ensure each 'Reason:' is strictly under 50 words. Prioritize immediate needs and common farming practices." f"\n\nStrictly follow this output format for each task:\n" f"Task: [Short, descriptive task, e.g., 'Check soil moisture in cornfield']\n" f"Reason: [Brief explanation why (under 50 words), e.g., 'Ensure optimal water levels after dry spell.']\n" f"---\n" f"Task: [Another short, descriptive task]\n" f"Reason: [Brief explanation why (under 50 words)]\n" f"---\n" f"(Repeat 3 or 4 times. Ensure each task block is clearly separated by '---' on its own line.)" f"\n\nFarmer: {username}\n" f"Farmer's Update: '{user_message}'" f"{profile_context}" f"\n\nNow, generate the 3-4 tasks and reasons in the specified format:" ) logger.info( f"Calling Gemini for RAG task generation for {username} (chat_id: {chat_id}) based on update: '{user_message[:100]}...'.") response = model.generate_content(prompt) ai_raw_output = response.text if response and response.text else "Could not generate tasks based on the update." logger.debug(f"Raw AI output for tasks: {ai_raw_output[:500]}...") # Parse the AI's output into individual tasks parsed_tasks = [] # Regex to find blocks starting with "Task:" and containing "Reason:", separated by "---" # Adjusted regex for better robustness to handle various line endings and trailing whitespace. task_blocks = re.findall(r"Task:\s*(.*?)\nReason:\s*(.*?)(?:\n---\s*|$)", ai_raw_output, re.DOTALL | re.IGNORECASE) for task_text, reason_text in task_blocks: parsed_tasks.append({ "task_message": task_text.strip(), "task_reason": reason_text.strip() }) if not parsed_tasks: logger.warning( f"AI did not generate structured tasks for {username} in expected format. Providing fallback task(s). Raw output: {ai_raw_output[:200]}") # Fallback if parsing fails or AI doesn't follow format. Always ensure at least one review item. fallback_message = "AI did not generate specific tasks in the expected format. Manual review of farmer's update is required." if len(ai_raw_output) > 50: # If AI gave some output, add it as context to the fallback fallback_message += f" AI's raw response: {sanitize_text(ai_raw_output, max_length=150)}." parsed_tasks.append({ "task_message": "Manual Review Required", "task_reason": sanitize_text(fallback_message, max_length=200) }) generation_time = datetime.now(TIMEZONE) # Store each individual parsed task for separate admin scheduling with _suggested_tasks_lock: for i, task_item in enumerate(parsed_tasks): # Create a unique ID for each individual task item task_id = f"suggested_task_{chat_id}_{int(generation_time.timestamp())}_{i}_{hash(task_item['task_message']) % 10000}" task_entry = { "id": task_id, "chat_id": str(chat_id), "username": username, "original_update": user_message, # Keep original update for context in UI "suggested_task_message": sanitize_text(task_item["task_message"]), "suggested_task_reason": sanitize_text(task_item["task_reason"]), "generation_time": generation_time, "status": "pending_review" } _suggested_tasks.append(task_entry) save_suggested_tasks() logger.info( f"Generated and stored {len(parsed_tasks)} individual tasks for {username} (chat_id: {chat_id}) for admin review.") return "" # No immediate response to farmer except Exception as e: logger.error(f"Error during farm task generation/storage for {username} (chat_id: {chat_id}): {e}") traceback.print_exc() return "" # No immediate response to farmer on AI error def get_ai_response_with_context(user_message: str, chat_id: str) -> str: """Gets a general AI response for a farmer's query, considering general farming context.""" if not model: logger.error("Gemini model not available for general query, returning fallback message.") return "AI service is currently unavailable. Please try again later." try: # Include general farming context in the prompt prompt = ( f"You are a helpful and knowledgeable agricultural assistant. Provide a concise and direct answer " f"to the following query from a farmer. Keep the response factual and actionable where possible. " f"Limit the response to a few sentences (max 3-4 sentences)." f"\n\nFarmer's Query: {user_message}" ) logger.info(f"Generating general AI response for {chat_id} (query: '{user_message[:50]}...')...") response = model.generate_content(prompt) ai_reply = response.text if response and response.text else "I couldn't process your request right now." return sanitize_text(ai_reply) except Exception as e: logger.error(f"AI response error for chat_id {chat_id} (query: '{user_message[:50]}...'): {e}") traceback.print_exc() return "I'm having trouble processing your request right now. Please try again later." def store_general_ai_response(chat_id: str, username: str, original_query: str, ai_response: str): """Stores a general AI response (generated from farmer's general query) for admin review.""" if not ai_response: logger.warning(f"Attempted to store empty general AI response for {username} ({chat_id}). Skipping.") return response_entry = { "id": f"general_ai_{chat_id}_{int(datetime.now().timestamp())}_{hash(original_query) % 10000}", "chat_id": str(chat_id), "username": username, "original_query": sanitize_text(original_query), "ai_response": sanitize_text(ai_response), "generation_time": datetime.now(TIMEZONE), "status": "pending_review" } with _general_ai_responses_lock: _general_ai_responses.append(response_entry) save_general_ai_responses() logger.info(f"Stored general AI response for {username} ({chat_id}) (ID: {response_entry['id']}).") # ---------------- Polling logic ---------------- def delete_webhook(drop_pending: bool = True) -> Optional[Dict]: """Deletes the Telegram webhook to switch to long polling.""" try: params = {"drop_pending_updates": "true" if drop_pending else "false"} r = requests.post(f"{TELEGRAM_API_URL}/deleteWebhook", params=params, timeout=10) r.raise_for_status() result = r.json() if result.get("ok"): logger.info("Telegram webhook deleted successfully (ensuring long polling).") return result except requests.exceptions.RequestException as e: logger.error(f"Failed to delete Telegram webhook: {e}") return None except Exception as e: logger.error(f"Unexpected error deleting Telegram webhook: {e}") return None def get_updates(offset: Optional[int] = None, timeout_seconds: int = 60) -> Optional[Dict]: """Fetches updates from Telegram using long polling.""" params = {"timeout": timeout_seconds} if offset: params["offset"] = offset try: r = requests.get(f"{TELEGRAM_API_URL}/getUpdates", params=params, timeout=timeout_seconds + 10) r.raise_for_status() return r.json() except requests.exceptions.Timeout: logger.debug("Telegram polling timed out (normal, no new messages).") return {"ok": True, "result": []} # Return empty result on timeout to continue polling except requests.exceptions.RequestException as e: logger.error(f"Network error getting updates from Telegram: {e}") return None except Exception as e: logger.error(f"Unexpected error getting updates from Telegram: {e}") return None def polling_loop(): """Main loop for long-polling Telegram for new messages.""" logger.info("Starting Telegram polling thread.") delete_webhook(drop_pending=True) # Ensure webhook is off for polling offset = None consecutive_errors = 0 while True: try: updates = get_updates(offset=offset, timeout_seconds=60) if updates is None: # Critical network error, get_updates returned None consecutive_errors += 1 logger.error( f"get_updates returned None due to error. Consecutive errors: {consecutive_errors}. Sleeping for {min(consecutive_errors * 5, 120)}s.") time.sleep(min(consecutive_errors * 5, 120)) # Exponential backoff up to 2 min continue if not updates.get("ok"): logger.warning( f"Telegram API returned 'not ok' for getUpdates: {updates.get('description', 'No description')}. Consecutive errors: {consecutive_errors}.") consecutive_errors += 1 time.sleep(min(consecutive_errors * 2, 60)) # Exponential backoff up to 1 min continue consecutive_errors = 0 # Reset error counter on successful API call results = updates.get("result", []) for update in results: try: process_update(update) offset = update.get("update_id", 0) + 1 except Exception as e: logger.error(f"Error processing individual update ID {update.get('update_id', 'unknown')}: {e}") traceback.print_exc() # Small delay to prevent busy-waiting if Telegram returns many empty results quickly if not results: time.sleep(1) except KeyboardInterrupt: logger.info("Telegram polling stopped by user (KeyboardInterrupt).") break except Exception as e: consecutive_errors += 1 logger.critical(f"Unhandled critical error in polling loop: {e}") time.sleep(min(consecutive_errors * 10, 300)) # Longer exponential backoff for unhandled errors def process_update(update: Dict): """Processes a single Telegram update message.""" msg = update.get("message") or update.get("edited_message") if not msg: logger.debug(f"Update ID {update.get('update_id')}: No message or edited_message found. Skipping.") return chat = msg.get("chat", {}) chat_id = str(chat.get("id")) username = chat.get("first_name") or chat.get("username") or "User" text = msg.get("text") or msg.get("caption") or "" if not text: # If no text (e.g., photo, sticker), send a polite message and append to log acknowledgment_text = "I can only respond to text messages. Please send your queries as text." send_message(chat_id, acknowledgment_text) append_message("user", username, chat_id, "(Non-text message received)") append_message("bot", "Bot", chat_id, acknowledgment_text) logger.debug(f"Received non-text message from {username} ({chat_id}). Sent acknowledgment.") return append_message("user", username, chat_id, text) logger.info(f"Processing message from {username} ({chat_id}): '{text[:50]}...'.") bot_acknowledgment_to_farmer = "" # This is the only message sent directly to the farmer # Check if we are awaiting a farm update response from this chat_id most_recent_request_time = _awaiting_update_response.get(chat_id) if (most_recent_request_time and (datetime.now(TIMEZONE) - most_recent_request_time) <= timedelta(hours=FARM_UPDATE_RESPONSE_WINDOW_HOURS)): logger.info(f"Recognized as farm update response from {username}. Generating tasks for admin review.") generate_and_store_farm_tasks(text, chat_id, username) # Generates and stores tasks # Send a generic acknowledgment to the farmer bot_acknowledgment_to_farmer = "Thank you for your farm update! The admin will review it, generate daily tasks, and schedule them to be sent to you shortly." # Remove from tracking after processing this update _awaiting_update_response.pop(chat_id, None) logger.info(f"Finished processing farm update for {username} ({chat_id}). Awaiting response tracker cleared.") else: # This is a general query. Generate AI response but store it for admin scheduling. logger.info(f"Processing general query from {username}. Generating AI response for admin scheduling.") ai_generated_response = get_ai_response_with_context(text, chat_id) # Only store and acknowledge if AI generated a meaningful response if ai_generated_response and ai_generated_response != "AI service is currently unavailable. Please try again later.": store_general_ai_response(chat_id, username, text, ai_generated_response) bot_acknowledgment_to_farmer = f"I've received your query about '{text[:50]}...'. The admin will review the AI's suggested response and schedule it to be sent to you." else: # AI service unavailable or response generation failed bot_acknowledgment_to_farmer = "I've received your message. The admin will get back to you shortly." logger.warning( f"AI failed to generate a response for general query from {username} ({chat_id}). Admin will manually review.") if bot_acknowledgment_to_farmer: append_message("bot", "Bot", chat_id, bot_acknowledgment_to_farmer) send_message(chat_id, bot_acknowledgment_to_farmer) else: logger.debug(f"No direct acknowledgment message generated for {username} ({chat_id}) after processing.") # ---------------- Flask routes & UI ---------------- # HTML template (with updated section titles and new general AI response section) HTML_TEMPLATE = """ Farm Bot Admin Panel
Farm Bot Admin Panel
Online
Controls
Auto-updates every 2s
0 messages
Scheduled Messages (0)
All messages scheduled for farmers (pending or sent).
AI Generated Daily Tasks (0 pending)
Individual tasks generated by AI from farmer updates, awaiting admin review and scheduling.
AI Generated General Responses (0 pending)
AI responses to general farmer queries, awaiting admin review and scheduling.
Schedule New Message
""" @app.route("/") def index(): # Make sure this line is passing the HARDCODED_MESSAGES_FOR_UI Python list return render_template_string(HTML_TEMPLATE, HARDCODED_MESSAGES_FOR_UI=HARDCODED_MESSAGES_FOR_UI) @app.route("/messages") def messages_api(): with _messages_lock: return jsonify(list(_messages)) @app.route("/scheduled") def scheduled_messages_api(): with _scheduled_messages_lock: return jsonify(list(_scheduled_messages)) @app.route("/suggested_tasks") def suggested_tasks_api(): with _suggested_tasks_lock: serializable_tasks = [] for task in _suggested_tasks: temp_task = task.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_task.get(key), datetime): temp_task[key] = temp_task[key].isoformat() serializable_tasks.append(temp_task) return jsonify(serializable_tasks) @app.route("/general_ai_responses") def general_ai_responses_api(): with _general_ai_responses_lock: serializable_responses = [] for resp in _general_ai_responses: temp_resp = resp.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_resp.get(key), datetime): temp_resp[key] = temp_resp[key].isoformat() serializable_responses.append(temp_resp) return jsonify(serializable_responses) @app.route("/download") def download(): try: # Prioritize existing file if it's there if os.path.exists(CHAT_LOG_FILE): return send_file(CHAT_LOG_FILE, as_attachment=True, download_name="chat_log.json") # Otherwise, create a temporary file from in-memory data with _messages_lock: if not _messages: # Handle empty data logger.warning("No chat log data in memory for download.") return jsonify({"error": "No chat log data to download."}), 404 tmp_file = "chat_log_tmp.json" with open(tmp_file, "w", encoding="utf-8") as f: json.dump(_messages, f, ensure_ascii=False, indent=2) return send_file(tmp_file, as_attachment=True, download_name="chat_log.json") except Exception as e: logger.error(f"Error downloading chat log: {e}") return jsonify({"error": "Failed to download chat log file."}), 500 @app.route("/download-scheduled") def download_scheduled(): try: if os.path.exists(SCHEDULED_MESSAGES_FILE): return send_file(SCHEDULED_MESSAGES_FILE, as_attachment=True, download_name="scheduled_messages.json") with _scheduled_messages_lock: if not _scheduled_messages: logger.warning("No scheduled messages data in memory for download.") return jsonify({"error": "No scheduled messages data to download."}), 404 tmp_file = "scheduled_messages_tmp.json" with open(tmp_file, "w", encoding="utf-8") as f: json.dump(_scheduled_messages, f, ensure_ascii=False, indent=2) return send_file(tmp_file, as_attachment=True, download_name="scheduled_messages.json") except Exception as e: logger.error(f"Error downloading scheduled messages: {e}") return jsonify({"error": "Failed to download scheduled messages file."}), 500 @app.route("/download-suggested") def download_suggested(): try: if os.path.exists(SUGGESTED_TASKS_FILE): return send_file(SUGGESTED_TASKS_FILE, as_attachment=True, download_name="suggested_tasks.json") with _suggested_tasks_lock: if not _suggested_tasks: logger.warning("No suggested tasks data in memory for download.") return jsonify({"error": "No suggested tasks data to download."}), 404 tmp_file = "suggested_tasks_tmp.json" serializable_tasks = [] for task in _suggested_tasks: temp_task = task.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_task.get(key), datetime): temp_task[key] = temp_task[key].isoformat() serializable_tasks.append(temp_task) with open(tmp_file, "w", encoding="utf-8") as f: json.dump(serializable_tasks, f, ensure_ascii=False, indent=2) return send_file(tmp_file, as_attachment=True, download_name="suggested_tasks.json") except Exception as e: logger.error(f"Error downloading suggested tasks: {e}") return jsonify({"error": "Failed to download suggested tasks file."}), 500 @app.route("/download-general-ai") def download_general_ai(): try: if os.path.exists(GENERAL_AI_RESPONSES_FILE): return send_file(GENERAL_AI_RESPONSES_FILE, as_attachment=True, download_name="general_ai_responses.json") with _general_ai_responses_lock: if not _general_ai_responses: logger.warning("No general AI responses data in memory for download.") return jsonify({"error": "No general AI responses data to download."}), 404 tmp_file = "general_ai_responses_tmp.json" serializable_responses = [] for resp in _general_ai_responses: temp_resp = resp.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_resp.get(key), datetime): temp_resp[key] = temp_resp[key].isoformat() serializable_responses.append(temp_resp) with open(tmp_file, "w", encoding="utf-8") as f: json.dump(serializable_responses, f, ensure_ascii=False, indent=2) return send_file(tmp_file, as_attachment=True, download_name="general_ai_responses.json") except Exception as e: logger.error(f"Error downloading general AI responses: {e}") return jsonify({"error": "Failed to download general AI responses file."}), 500 @app.route("/clear", methods=["POST"]) def clear(): """Clears all in-memory data, removes all persistent files, and clears scheduler jobs.""" try: # Clear in-memory stores with _messages_lock: _messages.clear() with _scheduled_messages_lock: _scheduled_messages.clear() with _suggested_tasks_lock: _suggested_tasks.clear() with _general_ai_responses_lock: _general_ai_responses.clear() with _farmer_data_lock: _farmer_data.clear() # Clear farmer data in memory global _awaiting_update_response _awaiting_update_response = {} # Reset the tracker # Remove persistent files for file_path in [CHAT_LOG_FILE, SCHEDULED_MESSAGES_FILE, SUGGESTED_TASKS_FILE, GENERAL_AI_RESPONSES_FILE]: if os.path.exists(file_path): try: os.remove(file_path) logger.info(f"Removed persistent file: {file_path}") except Exception as e: logger.error(f"Failed to remove file {file_path}: {e}. Please check permissions.") # Clear all APScheduler jobs scheduler.remove_all_jobs() logger.info("All APScheduler jobs cleared.") logger.info("All application data cleared successfully.") return ("", 204) # No Content except Exception as e: logger.error(f"Error during 'clear all data' operation: {e}") return jsonify({"error": "Failed to clear data due to an internal server error. Check server logs."}), 500 @app.route("/schedule_from_ui", methods=["POST"]) def schedule_from_ui(): """Endpoint to schedule a message directly from the UI.""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400 chat_id = data.get("chat_id") recipient_username = data.get("username", "UI Scheduled User") message = data.get("message", "").strip() scheduled_time_str = data.get("scheduled_time") message_type = data.get("message_type", "reminder") if not all([chat_id, message, scheduled_time_str]): return jsonify({ "status": "error", "message": "Missing required fields: 'chat_id', 'message', and 'scheduled_time' are all required." }), 400 # Validate message length (Telegram limit) if len(message) > 4096: return jsonify({ "status": "error", "message": f"Message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram." }), 400 # Parse and validate datetime string try: scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M') scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive) except ValueError: return jsonify({ "status": "error", "message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM (e.g., 2023-10-27T14:30)." }), 400 # Check if scheduled time is in the future now = datetime.now(TIMEZONE) if scheduled_datetime <= now: return jsonify({ "status": "error", "message": "Scheduled time must be in the future. Please choose a time later than now." }), 400 # Optional: Prevent scheduling too far in the future for stability/resource management if scheduled_datetime > now + timedelta(days=365): return jsonify({ "status": "error", "message": "Scheduled time cannot be more than 1 year in the future for stability." }), 400 result = schedule_message(chat_id, recipient_username, message, scheduled_datetime, message_type) if result: logger.info( f"UI: Message '{message_type}' scheduled successfully for {recipient_username} ({chat_id}) at {scheduled_datetime}.") return jsonify({"status": "success", "message": "Message scheduled successfully."}) else: return jsonify({ "status": "error", "message": "Failed to schedule message due to an internal server error (check server logs)." }), 500 except Exception as e: logger.error(f"Error in /schedule_from_ui: {e}") traceback.print_exc() return jsonify({ "status": "error", "message": f"An unexpected server error occurred: {str(e)}" }), 500 @app.route("/schedule_suggested_from_ui", methods=["POST"]) def schedule_suggested_from_ui(): """Endpoint for admin to schedule an individual AI-suggested task from the UI.""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400 suggested_task_id = data.get("suggested_task_id") chat_id = data.get("chat_id") username = data.get("username") message = data.get("message", "").strip() # This is the individual task message content scheduled_time_str = data.get("scheduled_time") if not all([suggested_task_id, chat_id, username, message, scheduled_time_str]): return jsonify({ "status": "error", "message": "Missing required fields for scheduling suggested task." }), 400 # Validate message length (Telegram limit) if len(message) > 4096: return jsonify({ "status": "error", "message": f"Task message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram." }), 400 # Parse and validate datetime try: scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M') scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive) except ValueError: return jsonify({ "status": "error", "message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM." }), 400 # Check if time is in the future now = datetime.now(TIMEZONE) if scheduled_datetime <= now: return jsonify({ "status": "error", "message": "Scheduled time must be in the future. Please choose a time later than now." }), 400 # Check if the suggested task exists and is still pending review task_found_and_pending = False with _suggested_tasks_lock: for task in _suggested_tasks: if task["id"] == suggested_task_id and task["status"] == "pending_review": task_found_and_pending = True break if not task_found_and_pending: return jsonify({ "status": "error", "message": "Suggested task not found or already processed (scheduled/discarded)." }), 404 result = schedule_message(chat_id, username, message, scheduled_datetime, "daily_tasks") if result: # Update the status of the suggested task to 'scheduled' with _suggested_tasks_lock: for task in _suggested_tasks: if task["id"] == suggested_task_id: task["status"] = "scheduled" task["scheduled_by_admin_time"] = datetime.now(TIMEZONE) break save_suggested_tasks() logger.info( f"UI: Individual suggested task ID '{suggested_task_id}' scheduled for {username} ({chat_id}) at {scheduled_datetime}.") return jsonify({"status": "success", "message": "Suggested task scheduled successfully."}) else: return jsonify({ "status": "error", "message": "Failed to schedule suggested task due to an internal server error (check server logs)." }), 500 except Exception as e: logger.error(f"Error in /schedule_suggested_from_ui: {e}") traceback.print_exc() return jsonify({ "status": "error", "message": f"An unexpected server error occurred: {str(e)}" }), 500 @app.route("/discard_suggested_task", methods=["POST"]) def discard_suggested_task(): """Endpoint for admin to discard an individual AI-suggested task from the UI.""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400 suggested_task_id = data.get("suggested_task_id") if not suggested_task_id: return jsonify({"status": "error", "message": "Missing 'suggested_task_id' in request."}), 400 found_and_pending = False with _suggested_tasks_lock: for task in _suggested_tasks: if task["id"] == suggested_task_id and task["status"] == "pending_review": task["status"] = "discarded" task["discarded_by_admin_time"] = datetime.now(TIMEZONE) found_and_pending = True break if found_and_pending: save_suggested_tasks() logger.info(f"UI: Suggested task ID '{suggested_task_id}' discarded.") return jsonify({"status": "success", "message": "Suggested task discarded successfully."}) else: return jsonify({ "status": "error", "message": "Suggested task not found or already processed (scheduled/discarded)." }), 404 except Exception as e: logger.error(f"Error in /discard_suggested_task: {e}") traceback.print_exc() return jsonify({ "status": "error", "message": f"An unexpected server error occurred: {str(e)}" }), 500 @app.route("/schedule_general_ai_response", methods=["POST"]) def schedule_general_ai_response(): """Endpoint for admin to schedule a general AI response from the UI.""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400 response_id = data.get("response_id") chat_id = data.get("chat_id") username = data.get("username") message = data.get("message", "").strip() scheduled_time_str = data.get("scheduled_time") if not all([response_id, chat_id, username, message, scheduled_time_str]): return jsonify({"status": "error", "message": "Missing required fields for scheduling AI response."}), 400 # Validate message length (Telegram limit) if len(message) > 4096: return jsonify({ "status": "error", "message": f"AI response message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram." }), 400 # Parse and validate datetime try: scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M') scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive) except ValueError: return jsonify({"status": "error", "message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM."}), 400 now = datetime.now(TIMEZONE) if scheduled_datetime <= now: return jsonify({"status": "error", "message": "Scheduled time must be in the future."}), 400 # Check if the AI response entry exists and is still pending review response_found = False with _general_ai_responses_lock: for resp in _general_ai_responses: if resp["id"] == response_id and resp["status"] == "pending_review": response_found = True break if not response_found: return jsonify({"status": "error", "message": "AI response not found or already processed."}), 404 result = schedule_message(chat_id, username, message, scheduled_datetime, "general_ai_response") if result: with _general_ai_responses_lock: for resp in _general_ai_responses: if resp["id"] == response_id: resp["status"] = "scheduled" resp["scheduled_by_admin_time"] = datetime.now(TIMEZONE) break save_general_ai_responses() logger.info( f"UI: General AI response ID '{response_id}' scheduled for {username} ({chat_id}) at {scheduled_datetime}.") return jsonify({"status": "success", "message": "AI response scheduled successfully."}) else: return jsonify({"status": "error", "message": "Failed to schedule AI response due to internal error."}), 500 except Exception as e: logger.error(f"Error in /schedule_general_ai_response: {e}") traceback.print_exc() return jsonify({"status": "error", "message": f"An unexpected server error occurred: {str(e)}"}), 500 @app.route("/discard_general_ai_response", methods=["POST"]) def discard_general_ai_response(): """Endpoint for admin to discard a general AI response from the UI.""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400 response_id = data.get("response_id") if not response_id: return jsonify({"status": "error", "message": "Missing 'response_id' in request."}), 400 found = False with _general_ai_responses_lock: for resp in _general_ai_responses: if resp["id"] == response_id and resp["status"] == "pending_review": resp["status"] = "discarded" resp["discarded_by_admin_time"] = datetime.now(TIMEZONE) found = True break if found: save_general_ai_responses() logger.info(f"UI: General AI response ID '{response_id}' discarded.") return jsonify({"status": "success", "message": "AI response discarded successfully."}) else: return jsonify({"status": "error", "message": "AI response not found or already processed."}), 404 except Exception as e: logger.error(f"Error in /discard_general_ai_response: {e}") traceback.print_exc() return jsonify({ "status": "error", "message": f"An unexpected server error occurred: {str(e)}" }), 500 @app.route("/health") def health(): """Provides a basic health check endpoint for the application.""" return jsonify({ "status": "healthy", "timestamp": datetime.now(TIMEZONE).isoformat(), "scheduler_running": scheduler.running, "gemini_available": model is not None, "farmer_data_loaded_count": len(_farmer_data), "messages_in_log": len(_messages), "scheduled_messages_count": len(_scheduled_messages), "suggested_tasks_count": len(_suggested_tasks), "general_ai_responses_count": len(_general_ai_responses) }) # ---------------- Startup ---------------- def start_polling_thread(): """Starts a separate thread for Telegram long polling.""" t = threading.Thread(target=polling_loop, daemon=True, name="TelegramPolling") t.start() logger.info("Telegram polling thread initiated.") return t if __name__ == "__main__": # Perform initial configuration validation if not BOT_TOKEN : logger.critical( "❌ BOT_TOKEN is missing or is the default placeholder. Please update it in config or environment variables.") exit(1) # Exit if essential config is missing if not GEMINI_API_KEY or GEMINI_API_KEY == "AIzaSyAfF-13nsrMdAAe3SFOPSxFya4EtfLBjho": logger.warning( "⚠️ GEMINI_API_KEY is missing or is the default placeholder. AI features may not function correctly without it.") # Do not exit, as other parts of the app might still be useful logger.info("Starting Farm Bot with admin-controlled scheduling workflow...") # Load all persistent data into memory load_messages_from_file() load_scheduled_messages() load_suggested_tasks() load_general_ai_responses() load_farmer_data() # Load farmer-specific RAG data # --- HARDCODED SCHEDULED FARM UPDATE REQUEST AT STARTUP --- # This ensures the first "ask for updates" message is automatically scheduled for a farmer. # IMPORTANT: Replace these values with actual farmer information (or set via environment variables) # To get a farmer's chat ID: Have them send any message to your bot. Then check http://127.0.0.1:5000/messages TARGET_FARMER_CHAT_ID = os.environ.get("TARGET_FARMER_CHAT_ID", "YOUR_TELEGRAM_CHAT_ID") TARGET_FARMER_USERNAME = os.environ.get("TARGET_FARMER_USERNAME", "YOUR_TELEGRAM_USERNAME") FARM_UPDATE_REQUEST_MESSAGE = "What's the update regarding your farm? Please share details about your crops, any issues you're facing, and current farming activities." FARM_UPDATE_REQUEST_TYPE = "farm_update_request" # Schedule for 5 minutes from startup (for testing) - adjust for production e.g. daily at 8 AM scheduled_farm_update_time = datetime.now(TIMEZONE) + timedelta(minutes=5) if TARGET_FARMER_CHAT_ID in ["YOUR_TELEGRAM_CHAT_ID", "", None] or TARGET_FARMER_USERNAME in [ "YOUR_TELEGRAM_USERNAME", "", None]: logger.warning( "⚠️ Initial farm update request not scheduled: TARGET_FARMER_CHAT_ID or TARGET_FARMER_USERNAME not properly configured in script or environment variables.") logger.info(" -> To enable this feature, please update the configuration or manually schedule via the UI.") else: # Check if a similar farm update request has already been recently scheduled/sent update_request_already_handled = False with _scheduled_messages_lock: for msg in _scheduled_messages: if (str(msg["chat_id"]) == TARGET_FARMER_CHAT_ID and msg["type"] == FARM_UPDATE_REQUEST_TYPE and msg["status"] in ["pending", "sent"]): # Consider handled if scheduled or sent within the last 24 hours msg_time_str = msg.get("sent_time", msg["scheduled_time"]) try: msg_dt = datetime.fromisoformat(msg_time_str.replace('Z', '+00:00')) # Handle Z timezone if msg_dt.tzinfo is None: msg_dt = TIMEZONE.localize(msg_dt) else: msg_dt = msg_dt.astimezone(TIMEZONE) if msg_dt > (datetime.now(TIMEZONE) - timedelta(hours=24)): update_request_already_handled = True logger.info( f"ℹ️ Farm update request for {TARGET_FARMER_USERNAME} ({TARGET_FARMER_CHAT_ID}) already exists or was recently sent. Not re-scheduling.") break except Exception as e: logger.warning( f"Error parsing datetime for existing scheduled message ID '{msg.get('id', 'unknown')}': {e}. This might affect duplicate detection for hardcoded message.") if not update_request_already_handled: if scheduled_farm_update_time > datetime.now(TIMEZONE): result = schedule_message( TARGET_FARMER_CHAT_ID, TARGET_FARMER_USERNAME, FARM_UPDATE_REQUEST_MESSAGE, scheduled_farm_update_time, FARM_UPDATE_REQUEST_TYPE ) if result: logger.info( f"✅ Initial farm update request scheduled for {TARGET_FARMER_USERNAME} ({TARGET_FARMER_CHAT_ID}) at {scheduled_farm_update_time.strftime('%Y-%m-%d %I:%M %p %Z')}.") else: logger.error("❌ Failed to schedule initial farm update request due to an internal error.") else: logger.warning( f"⚠️ Initial farm update request not scheduled: Scheduled time ({scheduled_farm_update_time.strftime('%Y-%m-%d %I:%M %p %Z')}) is in the past. Please adjust `scheduled_farm_update_time` for future execution.") # --- END HARDCODED SCHEDULED FARM UPDATE REQUEST --- # Start the Telegram polling thread (daemon so it exits with main app) start_polling_thread() logger.info("\n-----------------------------------------------------") logger.info("Farm Bot Application Ready!") logger.info("📱 Instruct your farmers to start a chat with your Telegram bot.") logger.info("🌐 Access the Admin Panel to manage tasks and responses: http://127.0.0.1:5000") logger.info("📊 Check system health: http://127.0.0.1:5000/health") logger.info("-----------------------------------------------------\n") try: # Run the Flask app app.run(host="0.0.0.0", port=5000, debug=False, threaded=True) except KeyboardInterrupt: logger.info("Application received KeyboardInterrupt. Shutting down gracefully...") except Exception as e: logger.critical(f"Unhandled exception during Flask application runtime: {e}") traceback.print_exc() finally: # Ensure scheduler is shut down when the app stops if scheduler.running: scheduler.shutdown(wait=False) logger.info("APScheduler shut down.") logger.info("Application process finished.")