import requests import time import json import traceback import threading from datetime import datetime, timedelta from flask import Flask, jsonify, render_template_string, send_file, request import google.generativeai as genai import os from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.date import DateTrigger import pytz import logging import re # For parsing AI output from typing import Optional, Dict, List, Any # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # ---------------- CONFIG ---------------- # !!! IMPORTANT: Replace these with your actual tokens or use environment variables !!! BOT_TOKEN = "8328709566:AAH7ZmdWuzGODlTByJ34yI5yR9e8otBokBc" # <<< REPLACE THIS with your Telegram Bot Token GEMINI_API_KEY = "AIzaSyAfF-13nsrMdAAe3SFOPSxFya4EtfLBjho" # <<< REPLACE THIS with your Google Gemini API Key # !!! ----------------------------------------------------------------------------- !!! TELEGRAM_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}" CHAT_LOG_FILE = "chat_log.json" SCHEDULED_MESSAGES_FILE = "scheduled_messages.json" SUGGESTED_TASKS_FILE = "suggested_tasks.json" GENERAL_AI_RESPONSES_FILE = "general_ai_responses.json" FARMERS_DATA_FILE = "data\farmers.json" # File for farmer-specific RAG data MAX_MESSAGES = 2000 TIMEZONE = pytz.timezone('Asia/Kolkata') # Adjust to your local timezone (e.g., 'America/New_York', 'Europe/London') FARM_UPDATE_RESPONSE_WINDOW_HOURS = 48 # Window for a farmer to respond to an update request # Hardcoded messages for the UI dropdown HARDCODED_MESSAGES_FOR_UI = [ { "value": "What's the update regarding your farm? Please share details about your crops, any issues you're facing, and current farming activities.", "text": "🌱 Request Farm Update"}, {"value": "How are your crops doing today?", "text": "đŸŒŋ Crops Check-in"}, {"value": "Reminder: Check irrigation system.", "text": "💧 Irrigation Reminder"}, {"value": "Don't forget to monitor pest activity.", "text": "🐛 Pest Monitor Reminder"}, {"value": "Daily task: Fertilize Section A.", "text": "🚜 Fertilize Section A"}, {"value": "Weather looks good for harvesting. Consider starting tomorrow.", "text": "đŸŒ¤ī¸ Harvest Weather Alert"}, {"value": "Time to check soil moisture levels.", "text": "đŸŒĄī¸ Soil Moisture Check"}, {"value": "", "text": "âœī¸ Custom Message (Type Below)"} # Option for custom input ] # Rate limiting for Telegram API calls REQUEST_RATE_LIMIT = 30 # requests per minute rate_limit_tracker = {"count": 0, "reset_time": time.time() + 60} # ---------------------------------------- # Flask app app = Flask(__name__) # Configure Gemini with error handling try: genai.configure(api_key=GEMINI_API_KEY) model = genai.GenerativeModel("gemini-2.0-flash-exp") logger.info("Gemini AI configured successfully.") except Exception as e: logger.error(f"Failed to configure Gemini AI. AI features will be unavailable: {e}") model = None # Set model to None if configuration fails # Initialize scheduler scheduler = BackgroundScheduler(timezone=TIMEZONE) scheduler.start() # Thread-safe stores _messages_lock = threading.Lock() _messages: List[Dict[str, Any]] = [] _scheduled_messages_lock = threading.Lock() _scheduled_messages: List[Dict[str, Any]] = [] _suggested_tasks_lock = threading.Lock() _suggested_tasks: List[Dict[str, Any]] = [] # Each entry is now a single task with message and reason _general_ai_responses_lock = threading.Lock() _general_ai_responses: List[Dict[str, Any]] = [] _farmer_data_lock = threading.Lock() _farmer_data: List[Dict[str, Any]] = [] # State for tracking if we're awaiting a farm update response from a specific chat_id _awaiting_update_response: Dict[str, datetime] = {} def check_rate_limit() -> bool: """Checks if Telegram API request rate limit has been exceeded.""" current_time = time.time() with threading.Lock(): # Use a lock for shared rate_limit_tracker access if current_time > rate_limit_tracker["reset_time"]: rate_limit_tracker["count"] = 0 rate_limit_tracker["reset_time"] = current_time + 60 if rate_limit_tracker["count"] >= REQUEST_RATE_LIMIT: return False rate_limit_tracker["count"] += 1 return True def sanitize_text(text: str, max_length: int = 4096) -> str: """Sanitizes text for Telegram message limits and removes invalid characters.""" if not text: return "" # Remove null characters which can cause issues with some APIs/databases sanitized = text.replace('\x00', '').strip() # Telegram message limit for sendMessage is 4096 characters (HTML mode can be slightly less due to tags) if len(sanitized) > max_length: sanitized = sanitized[:max_length - 3] + "..." return sanitized def save_messages_to_file(): """Saves chat messages to a JSON file.""" try: with _messages_lock: with open(CHAT_LOG_FILE, "w", encoding="utf-8") as f: json.dump(_messages, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_messages)} messages to {CHAT_LOG_FILE}.") except Exception as e: logger.error(f"Failed to save messages to {CHAT_LOG_FILE}: {e}") def save_scheduled_messages(): """Saves scheduled messages to a JSON file.""" try: with _scheduled_messages_lock: with open(SCHEDULED_MESSAGES_FILE, "w", encoding="utf-8") as f: json.dump(_scheduled_messages, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_scheduled_messages)} scheduled messages to {SCHEDULED_MESSAGES_FILE}.") except Exception as e: logger.error(f"Failed to save scheduled messages to {SCHEDULED_MESSAGES_FILE}: {e}") def save_suggested_tasks(): """Saves AI-suggested tasks to a JSON file.""" try: with _suggested_tasks_lock: serializable_tasks = [] for task in _suggested_tasks: temp_task = task.copy() # Convert datetime objects to ISO strings for JSON serialization for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_task.get(key), datetime): temp_task[key] = temp_task[key].isoformat() serializable_tasks.append(temp_task) with open(SUGGESTED_TASKS_FILE, "w", encoding="utf-8") as f: json.dump(serializable_tasks, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_suggested_tasks)} suggested tasks to {SUGGESTED_TASKS_FILE}.") except Exception as e: logger.error(f"Failed to save suggested tasks to {SUGGESTED_TASKS_FILE}: {e}") def save_general_ai_responses(): """Saves general AI responses (pending admin review) to a JSON file.""" try: with _general_ai_responses_lock: serializable_responses = [] for resp in _general_ai_responses: temp_resp = resp.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_resp.get(key), datetime): temp_resp[key] = temp_resp[key].isoformat() serializable_responses.append(temp_resp) with open(GENERAL_AI_RESPONSES_FILE, "w", encoding="utf-8") as f: json.dump(serializable_responses, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_general_ai_responses)} general AI responses to {GENERAL_AI_RESPONSES_FILE}.") except Exception as e: logger.error(f"Failed to save general AI responses to {GENERAL_AI_RESPONSES_FILE}: {e}") def load_messages_from_file(): """Loads chat messages from a JSON file.""" if not os.path.exists(CHAT_LOG_FILE): logger.info(f"{CHAT_LOG_FILE} not found, starting with empty messages.") return try: with open(CHAT_LOG_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _messages_lock: _messages.clear() _messages.extend(data[-MAX_MESSAGES:]) # Load only the latest MAX_MESSAGES logger.info(f"Loaded {len(_messages)} messages from {CHAT_LOG_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {CHAT_LOG_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load messages from {CHAT_LOG_FILE}: {e}") def load_scheduled_messages(): """Loads scheduled messages and re-schedules pending ones.""" if not os.path.exists(SCHEDULED_MESSAGES_FILE): logger.info(f"{SCHEDULED_MESSAGES_FILE} not found, starting with empty scheduled messages.") return try: with open(SCHEDULED_MESSAGES_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _scheduled_messages_lock: _scheduled_messages.clear() _scheduled_messages.extend(data) rescheduled_count = 0 for entry in _scheduled_messages: try: # Ensure scheduled_time is timezone-aware scheduled_time = datetime.fromisoformat(entry["scheduled_time"]) if scheduled_time.tzinfo is None: scheduled_time = TIMEZONE.localize(scheduled_time) # Localize if naive # Only re-schedule if pending and in the future if entry["status"] == "pending" and scheduled_time > datetime.now(TIMEZONE): scheduler.add_job( func=send_scheduled_message, trigger=DateTrigger(run_date=scheduled_time), args=[entry], id=entry["id"], replace_existing=True # Important to prevent duplicate jobs on restart ) rescheduled_count += 1 # If it was a sent farm update request, track it for response window management if entry["type"] == "farm_update_request" and entry["status"] == "sent": sent_time_str = entry.get("sent_time", entry["scheduled_time"]) # Fallback to scheduled if sent_time missing sent_or_scheduled_time = datetime.fromisoformat(sent_time_str) if sent_or_scheduled_time.tzinfo is None: sent_or_scheduled_time = TIMEZONE.localize(sent_or_scheduled_time) _awaiting_update_response[str(entry["chat_id"])] = sent_or_scheduled_time except KeyError as ke: logger.error(f"Missing key in scheduled message entry ID '{entry.get('id', 'unknown')}': {ke}") except ValueError as ve: # fromisoformat error logger.error( f"Invalid datetime format in scheduled message entry ID '{entry.get('id', 'unknown')}': {ve}") except Exception as e: logger.error(f"Failed to reschedule entry ID '{entry.get('id', 'unknown')}': {e}") logger.info( f"Loaded {len(_scheduled_messages)} scheduled messages, rescheduled {rescheduled_count} pending jobs.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {SCHEDULED_MESSAGES_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load scheduled messages from {SCHEDULED_MESSAGES_FILE}: {e}") def load_suggested_tasks(): """Loads AI-suggested tasks from a JSON file.""" if not os.path.exists(SUGGESTED_TASKS_FILE): logger.info(f"{SUGGESTED_TASKS_FILE} not found, starting with empty suggested tasks.") return try: with open(SUGGESTED_TASKS_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _suggested_tasks_lock: _suggested_tasks.clear() for task in data: # Convert ISO strings back to datetime objects for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if key in task and task[key] is not None: try: task[key] = datetime.fromisoformat(task[key]) except (ValueError, TypeError): logger.warning( f"Invalid datetime format for key '{key}' in task ID '{task.get('id')}': {task[key]}. Setting to None.") task[key] = None _suggested_tasks.append(task) logger.info(f"Loaded {len(_suggested_tasks)} suggested tasks from {SUGGESTED_TASKS_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {SUGGESTED_TASKS_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load suggested tasks from {SUGGESTED_TASKS_FILE}: {e}") def load_general_ai_responses(): """Loads general AI responses (pending admin review) from a JSON file.""" if not os.path.exists(GENERAL_AI_RESPONSES_FILE): logger.info(f"{GENERAL_AI_RESPONSES_FILE} not found, starting with empty general AI responses.") return try: with open(GENERAL_AI_RESPONSES_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _general_ai_responses_lock: _general_ai_responses.clear() for resp in data: for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if key in resp and resp[key] is not None: try: resp[key] = datetime.fromisoformat(resp[key]) except (ValueError, TypeError): logger.warning( f"Invalid datetime format for key '{key}' in general AI response ID '{resp.get('id')}': {resp[key]}. Setting to None.") resp[key] = None _general_ai_responses.append(resp) logger.info(f"Loaded {len(_general_ai_responses)} general AI responses from {GENERAL_AI_RESPONSES_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {GENERAL_AI_RESPONSES_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load general AI responses from {GENERAL_AI_RESPONSES_FILE}: {e}") def load_farmer_data(): """Loads farmer-specific data from farmers.json for RAG.""" if not os.path.exists(FARMERS_DATA_FILE): logger.warning( f"{FARMERS_DATA_FILE} not found. RAG functionality for tasks will be limited or unavailable. Please create it if needed.") with _farmer_data_lock: _farmer_data.clear() return try: with open(FARMERS_DATA_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _farmer_data_lock: _farmer_data.clear() _farmer_data.extend(data) logger.info(f"Loaded {len(_farmer_data)} farmer profiles from {FARMERS_DATA_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {FARMERS_DATA_FILE} (JSON error): {e}. Ensure it's valid JSON. RAG functionality will be limited.") except Exception as e: logger.error(f"Failed to load farmer data from {FARMERS_DATA_FILE}: {e}") def get_farmer_profile(chat_id: str) -> Optional[Dict]: """Retrieves a farmer's profile by chat_id from loaded data.""" with _farmer_data_lock: for farmer in _farmer_data: if str(farmer.get("chat_id")) == str(chat_id): return farmer return None def append_message(role: str, username: str, chat_id: str, text: str): """Appends a message to the in-memory chat log and saves it to file.""" if not text: logger.debug(f"Attempted to append empty message for role '{role}' from {username} ({chat_id}). Skipping.") return text = sanitize_text(text) entry = { "role": role, "username": username, "chat_id": str(chat_id), "text": text, "ts": datetime.utcnow().isoformat() + "Z" } with _messages_lock: _messages.append(entry) if len(_messages) > MAX_MESSAGES: _messages[:] = _messages[-MAX_MESSAGES:] # Keep only the latest messages save_messages_to_file() def send_message(chat_id: str, text: str) -> Optional[Dict]: """Sends a message to Telegram, respecting rate limits.""" if not check_rate_limit(): logger.warning(f"Telegram API rate limit exceeded for chat_id {chat_id}. Message skipped: '{text[:50]}...'.") return None text = sanitize_text(text) if not text: logger.warning(f"Attempted to send empty message to chat_id {chat_id}. Skipping.") return None url = f"{TELEGRAM_API_URL}/sendMessage" payload = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"} # Use HTML for basic formatting try: r = requests.post(url, json=payload, timeout=10) r.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) result = r.json() if not result.get("ok"): logger.error( f"Telegram API reported error sending to {chat_id}: {result.get('description', 'Unknown error')}. Message: '{text[:50]}...'.") return None logger.debug(f"Successfully sent Telegram message to {chat_id}: '{text[:50]}...'.") return result except requests.exceptions.Timeout: logger.error(f"Timeout sending Telegram message to {chat_id}. Message: '{text[:50]}...'.") return None except requests.exceptions.RequestException as e: logger.error(f"Network error sending Telegram message to {chat_id}: {e}. Message: '{text[:50]}...'.") return None except Exception as e: logger.error(f"Unexpected error sending Telegram message to {chat_id}: {e}. Message: '{text[:50]}...'.") return None def schedule_message(chat_id: str, username: str, message_text: str, scheduled_time: datetime, message_type: str = "reminder") -> Optional[Dict]: """Schedules a message to be sent at a future time.""" if not message_text or not chat_id: logger.error("Cannot schedule message: Missing message text or chat_id.") return None # Ensure scheduled_time is in the future if scheduled_time <= datetime.now(TIMEZONE): logger.error( f"Cannot schedule message for {username} ({chat_id}): Scheduled time ({scheduled_time}) is in the past. Message: '{message_text[:50]}...'.") return None # Generate a unique ID for the scheduled entry timestamp = int(scheduled_time.timestamp()) # Use a hash of the first 50 chars of message_text for uniqueness in the ID text_hash = hash(message_text[:50]) % 10000 unique_id = f"{chat_id}_{message_type}_{timestamp}_{text_hash}" scheduled_entry = { "id": unique_id, "chat_id": str(chat_id), "username": username, "message": sanitize_text(message_text), "scheduled_time": scheduled_time.isoformat(), "type": message_type, "status": "pending", "created_at": datetime.now(TIMEZONE).isoformat() } try: with _scheduled_messages_lock: _scheduled_messages.append(scheduled_entry) save_scheduled_messages() scheduler.add_job( func=send_scheduled_message, trigger=DateTrigger(run_date=scheduled_time), args=[scheduled_entry], id=scheduled_entry["id"], replace_existing=True # Important: Prevents duplicate jobs if app restarts and tries to reschedule same ID ) logger.info( f"Scheduled '{message_type}' message for {username} ({chat_id}) (ID: {unique_id}) at {scheduled_time.strftime('%Y-%m-%d %H:%M %Z')}.") return scheduled_entry except Exception as e: logger.error(f"Failed to schedule message for {username} ({chat_id}) (ID: {unique_id}): {e}") return None def send_scheduled_message(scheduled_entry: Dict): """Callback function executed by APScheduler to send a scheduled message.""" try: chat_id = scheduled_entry["chat_id"] message = scheduled_entry["message"] recipient_username = scheduled_entry["username"] logger.info( f"Attempting to send scheduled message '{scheduled_entry['type']}' to {recipient_username} ({chat_id}). Message: '{message[:50]}...'.") result = send_message(chat_id, message) current_time = datetime.now(TIMEZONE).isoformat() if result: append_message("bot", f"Bot (Scheduled for {recipient_username})", chat_id, message) with _scheduled_messages_lock: for msg in _scheduled_messages: if msg["id"] == scheduled_entry["id"]: msg["status"] = "sent" msg["sent_time"] = current_time break save_scheduled_messages() logger.info(f"Successfully sent scheduled message to {recipient_username} ({chat_id}).") # If a farm update request was just sent, start tracking the response window if scheduled_entry["type"] == "farm_update_request": _awaiting_update_response[str(chat_id)] = datetime.now(TIMEZONE) logger.info( f"Now awaiting farm update response from {recipient_username} for {FARM_UPDATE_RESPONSE_WINDOW_HOURS} hours (chat_id: {chat_id}).") else: with _scheduled_messages_lock: for msg in _scheduled_messages: if msg["id"] == scheduled_entry["id"]: msg["status"] = "failed" msg["failed_time"] = current_time break save_scheduled_messages() logger.error( f"Failed to send scheduled message to {recipient_username} ({chat_id}). Marking as 'failed'. Message: '{message[:50]}...'.") except Exception as e: logger.error(f"Critical error in send_scheduled_message for ID '{scheduled_entry.get('id', 'unknown')}': {e}") traceback.print_exc() def generate_and_store_farm_tasks(user_message: str, chat_id: str, username: str) -> str: """ Generates 3-4 specific, short, descriptive daily tasks with a reason (<50 words) based on farmer's update AND farmer's profile (RAG), and stores each task separately for admin review. Does NOT send any message directly to the farmer. """ if not model: logger.error(f"Gemini model not available for task generation for {username}.") return "" try: farmer_profile = get_farmer_profile(chat_id) profile_context = "" if farmer_profile and farmer_profile.get("farm_details"): details = farmer_profile["farm_details"] profile_context = ( f"\n\nContext about {username}'s farm (use this for tailored advice):\n" f"- Location: {details.get('location', 'N/A')}\n" f"- Main Crops: {', '.join(details.get('main_crops', []))}\n" f"- Soil Type: {details.get('soil_type', 'N/A')}\n" f"- Typical Issues: {', '.join(details.get('typical_issues', []))}\n" f"- Last reported weather: {details.get('last_reported_weather', 'N/A')}\n" f"- Irrigation System: {details.get('irrigation_system', 'N/A')}\n" ) else: profile_context = "\n\n(No detailed farmer profile data found. Generating tasks based solely on the update.)" prompt = ( f"You are a highly experienced and practical agricultural expert. Based on the farmer's update below " f"and their farm context (if provided), generate exactly 3 to 4 critical daily tasks. " f"Each task must be concise, actionable, and include a 'Reason:' why it's important. " f"Ensure each 'Reason:' is strictly under 50 words. Prioritize immediate needs and common farming practices." f"\n\nStrictly follow this output format for each task:\n" f"Task: [Short, descriptive task, e.g., 'Check soil moisture in cornfield']\n" f"Reason: [Brief explanation why (under 50 words), e.g., 'Ensure optimal water levels after dry spell.']\n" f"---\n" f"Task: [Another short, descriptive task]\n" f"Reason: [Brief explanation why (under 50 words)]\n" f"---\n" f"(Repeat 3 or 4 times. Ensure each task block is clearly separated by '---' on its own line.)" f"\n\nFarmer: {username}\n" f"Farmer's Update: '{user_message}'" f"{profile_context}" f"\n\nNow, generate the 3-4 tasks and reasons in the specified format:" ) logger.info( f"Calling Gemini for RAG task generation for {username} (chat_id: {chat_id}) based on update: '{user_message[:100]}...'.") response = model.generate_content(prompt) ai_raw_output = response.text if response and response.text else "Could not generate tasks based on the update." logger.debug(f"Raw AI output for tasks: {ai_raw_output[:500]}...") # Parse the AI's output into individual tasks parsed_tasks = [] # Regex to find blocks starting with "Task:" and containing "Reason:", separated by "---" # Adjusted regex for better robustness to handle various line endings and trailing whitespace. task_blocks = re.findall(r"Task:\s*(.*?)\nReason:\s*(.*?)(?:\n---\s*|$)", ai_raw_output, re.DOTALL | re.IGNORECASE) for task_text, reason_text in task_blocks: parsed_tasks.append({ "task_message": task_text.strip(), "task_reason": reason_text.strip() }) if not parsed_tasks: logger.warning( f"AI did not generate structured tasks for {username} in expected format. Providing fallback task(s). Raw output: '{ai_raw_output[:200]}'.") # Fallback if parsing fails or AI doesn't follow format. Always ensure at least one review item. fallback_message = "AI did not generate specific tasks in the expected format. Manual review of farmer's update is required." if len(ai_raw_output) > 50: # If AI gave some output, add it as context to the fallback fallback_message += f" AI's raw response: {sanitize_text(ai_raw_output, max_length=150)}." parsed_tasks.append({ "task_message": "Manual Review Required", "task_reason": sanitize_text(fallback_message, max_length=200) }) # Add a separate entry for the full raw response if it's long and tasks couldn't be parsed if len(ai_raw_output) > 200 and len( parsed_tasks) == 1: # Only add if first fallback was due to parsing failure parsed_tasks.append({ "task_message": "Full Raw AI Response (for admin review)", "task_reason": sanitize_text(ai_raw_output, max_length=400) }) generation_time = datetime.now(TIMEZONE) # Store each individual parsed task for separate admin scheduling with _suggested_tasks_lock: for i, task_item in enumerate(parsed_tasks): # Create a unique ID for each individual task item task_id = f"suggested_task_{chat_id}_{int(generation_time.timestamp())}_{i}_{hash(task_item['task_message']) % 10000}" task_entry = { "id": task_id, "chat_id": str(chat_id), "username": username, "original_update": user_message, # Keep original update for context in UI "suggested_task_message": sanitize_text(task_item["task_message"]), "suggested_task_reason": sanitize_text(task_item["task_reason"]), "generation_time": generation_time, "status": "pending_review" } _suggested_tasks.append(task_entry) save_suggested_tasks() logger.info( f"Generated and stored {len(parsed_tasks)} individual tasks for {username} (chat_id: {chat_id}) for admin review.") return "" # No immediate response to farmer except Exception as e: logger.error(f"Error during farm task generation/storage for {username} (chat_id: {chat_id}): {e}") traceback.print_exc() return "" # No immediate response to farmer on AI error def get_ai_response_with_context(user_message: str, chat_id: str) -> str: """Gets a general AI response for a farmer's query, considering general farming context.""" if not model: logger.error("Gemini model not available for general query, returning fallback message.") return "AI service is currently unavailable. Please try again later." try: # Include general farming context in the prompt prompt = ( f"You are a helpful and knowledgeable agricultural assistant. Provide a concise and direct answer " f"to the following query from a farmer. Keep the response factual and actionable where possible. " f"Limit the response to a few sentences (max 3-4 sentences)." f"\n\nFarmer's Query: {user_message}" ) logger.info(f"Generating general AI response for {chat_id} (query: '{user_message[:50]}...')...") response = model.generate_content(prompt) ai_reply = response.text if response and response.text else "I couldn't process your request right now." return sanitize_text(ai_reply) except Exception as e: logger.error(f"AI response error for chat_id {chat_id} (query: '{user_message[:50]}...'): {e}") traceback.print_exc() return "I'm having trouble processing your request right now. Please try again later." def store_general_ai_response(chat_id: str, username: str, original_query: str, ai_response: str): """Stores a general AI response (generated from farmer's general query) for admin review.""" if not ai_response: logger.warning(f"Attempted to store empty general AI response for {username} ({chat_id}). Skipping.") return response_entry = { "id": f"general_ai_{chat_id}_{int(datetime.now().timestamp())}_{hash(original_query) % 10000}", "chat_id": str(chat_id), "username": username, "original_query": sanitize_text(original_query), "ai_response": sanitize_text(ai_response), "generation_time": datetime.now(TIMEZONE), "status": "pending_review" } with _general_ai_responses_lock: _general_ai_responses.append(response_entry) save_general_ai_responses() logger.info(f"Stored general AI response for {username} ({chat_id}) (ID: {response_entry['id']}).") # ---------------- Polling logic ---------------- def delete_webhook(drop_pending: bool = True) -> Optional[Dict]: """Deletes the Telegram webhook to switch to long polling.""" try: params = {"drop_pending_updates": "true" if drop_pending else "false"} r = requests.post(f"{TELEGRAM_API_URL}/deleteWebhook", params=params, timeout=10) r.raise_for_status() result = r.json() if result.get("ok"): logger.info("Telegram webhook deleted successfully (ensuring long polling).") return result except requests.exceptions.RequestException as e: logger.error(f"Failed to delete Telegram webhook: {e}") return None except Exception as e: logger.error(f"Unexpected error deleting Telegram webhook: {e}") return None def get_updates(offset: Optional[int] = None, timeout_seconds: int = 60) -> Optional[Dict]: """Fetches updates from Telegram using long polling.""" params = {"timeout": timeout_seconds} if offset: params["offset"] = offset try: r = requests.get(f"{TELEGRAM_API_URL}/getUpdates", params=params, timeout=timeout_seconds + 10) r.raise_for_status() return r.json() except requests.exceptions.Timeout: logger.debug("Telegram polling timed out (normal, no new messages).") return {"ok": True, "result": []} # Return empty result on timeout to continue polling except requests.exceptions.RequestException as e: logger.error(f"Network error getting updates from Telegram: {e}") return None except Exception as e: logger.error(f"Unexpected error getting updates from Telegram: {e}") return None def polling_loop(): """Main loop for long-polling Telegram for new messages.""" logger.info("Starting Telegram polling thread.") delete_webhook(drop_pending=True) # Ensure webhook is off for polling offset = None consecutive_errors = 0 while True: try: updates = get_updates(offset=offset, timeout_seconds=60) if updates is None: # Critical network error, get_updates returned None consecutive_errors += 1 logger.error( f"get_updates returned None due to error. Consecutive errors: {consecutive_errors}. Sleeping for {min(consecutive_errors * 5, 120)}s.") time.sleep(min(consecutive_errors * 5, 120)) # Exponential backoff up to 2 min continue if not updates.get("ok"): logger.warning( f"Telegram API returned 'not ok' for getUpdates: {updates.get('description', 'No description')}. Consecutive errors: {consecutive_errors}.") consecutive_errors += 1 time.sleep(min(consecutive_errors * 2, 60)) # Exponential backoff up to 1 min continue consecutive_errors = 0 # Reset error counter on successful API call results = updates.get("result", []) for update in results: try: process_update(update) offset = update.get("update_id", 0) + 1 except Exception as e: logger.error(f"Error processing individual update ID {update.get('update_id', 'unknown')}: {e}") traceback.print_exc() # Small delay to prevent busy-waiting if Telegram returns many empty results quickly if not results: time.sleep(1) except KeyboardInterrupt: logger.info("Telegram polling stopped by user (KeyboardInterrupt).") break except Exception as e: consecutive_errors += 1 logger.critical(f"Unhandled critical error in polling loop: {e}") time.sleep(min(consecutive_errors * 10, 300)) # Longer exponential backoff for unhandled errors def process_update(update: Dict): """Processes a single Telegram update message.""" msg = update.get("message") or update.get("edited_message") if not msg: logger.debug(f"Update ID {update.get('update_id')}: No message or edited_message found. Skipping.") return chat = msg.get("chat", {}) chat_id = str(chat.get("id")) username = chat.get("first_name") or chat.get("username") or "User" text = msg.get("text") or msg.get("caption") or "" if not text: # If no text (e.g., photo, sticker), send a polite message and append to log acknowledgment_text = "I can only respond to text messages. Please send your queries as text." send_message(chat_id, acknowledgment_text) append_message("user", username, chat_id, "(Non-text message received)") append_message("bot", "Bot", chat_id, acknowledgment_text) logger.debug(f"Received non-text message from {username} ({chat_id}). Sent acknowledgment.") return append_message("user", username, chat_id, text) logger.info(f"Processing message from {username} ({chat_id}): '{text[:50]}...'.") bot_acknowledgment_to_farmer = "" # This is the only message sent directly to the farmer # Check if we are awaiting a farm update response from this chat_id most_recent_request_time = _awaiting_update_response.get(chat_id) if (most_recent_request_time and (datetime.now(TIMEZONE) - most_recent_request_time) <= timedelta(hours=FARM_UPDATE_RESPONSE_WINDOW_HOURS)): logger.info(f"Recognized as farm update response from {username}. Generating tasks for admin review.") generate_and_store_farm_tasks(text, chat_id, username) # Generates and stores tasks # Send a generic acknowledgment to the farmer bot_acknowledgment_to_farmer = "Thank you for your farm update! The admin will review it, generate daily tasks, and schedule them to be sent to you shortly." # Remove from tracking after processing this update _awaiting_update_response.pop(chat_id, None) logger.info(f"Finished processing farm update for {username} ({chat_id}). Awaiting response tracker cleared.") else: # This is a general query. Generate AI response but store it for admin scheduling. logger.info(f"Processing general query from {username}. Generating AI response for admin scheduling.") ai_generated_response = get_ai_response_with_context(text, chat_id) # Only store and acknowledge if AI generated a meaningful response if ai_generated_response and ai_generated_response != "AI service is currently unavailable. Please try again later.": store_general_ai_response(chat_id, username, text, ai_generated_response) bot_acknowledgment_to_farmer = f"I've received your query about '{text[:50]}...'. The admin will review the AI's suggested response and schedule it to be sent to you." else: # AI service unavailable or response generation failed bot_acknowledgment_to_farmer = "I've received your message. The admin will get back to you shortly." logger.warning( f"AI failed to generate a response for general query from {username} ({chat_id}). Admin will manually review.") if bot_acknowledgment_to_farmer: append_message("bot", "Bot", chat_id, bot_acknowledgment_to_farmer) send_message(chat_id, bot_acknowledgment_to_farmer) else: logger.debug(f"No direct acknowledgment message generated for {username} ({chat_id}) after processing.") # ---------------- Flask routes & UI ---------------- # New Dashboard HTML template with 50/50 split and tabs HTML_TEMPLATE = """ SmartFarm Dashboard

🌱
SmartFarm Dashboard

Connecting...
📊 0 Messages
⏰
đŸ’Ŧ Farm Communications
0 Messages
đŸ’Ŧ

No messages yet

Start a conversation with your farm bot!

🚜 Farm Management
Real-time Admin
📊 Analytics
📋 Tasks
⏰ Schedule
âš™ī¸ Settings
0
Active Farmers
0
Pending Tasks
0
Scheduled Messages
0
AI Responses
🌾 Farm Bot Status
Operational

Monitor your farm bot operations. Track farmer communications, manage AI-generated tasks, and review general AI responses.

🤖 AI Generated Daily Tasks
0 Pending
📋

No AI generated tasks yet

💡 AI General Responses
0 Pending
🤖

No AI responses pending review

📅 Scheduled Messages
0 Scheduled
📅

No messages currently scheduled

➕ Schedule New Message
âš™ī¸ Dashboard Settings

Settings are saved automatically.

📊 Export Data

Download your farm data for backup or analysis:

""" @app.route("/") def index(): return render_template_string(HTML_TEMPLATE) @app.route("/messages") def messages_api(): with _messages_lock: return jsonify(list(_messages)) @app.route("/scheduled") def scheduled_messages_api(): with _scheduled_messages_lock: return jsonify(list(_scheduled_messages)) @app.route("/suggested_tasks") def suggested_tasks_api(): with _suggested_tasks_lock: serializable_tasks = [] for task in _suggested_tasks: temp_task = task.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_task.get(key), datetime): temp_task[key] = temp_task[key].isoformat() serializable_tasks.append(temp_task) return jsonify(serializable_tasks) @app.route("/general_ai_responses") def general_ai_responses_api(): with _general_ai_responses_lock: serializable_responses = [] for resp in _general_ai_responses: temp_resp = resp.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_resp.get(key), datetime): temp_resp[key] = temp_resp[key].isoformat() serializable_responses.append(temp_resp) return jsonify(serializable_responses) @app.route("/download") def download(): try: # Prioritize existing file if it's there if os.path.exists(CHAT_LOG_FILE): return send_file(CHAT_LOG_FILE, as_attachment=True, download_name="chat_log.json") # Otherwise, create a temporary file from in-memory data with _messages_lock: if not _messages: # Handle empty data logger.warning("No chat log data in memory for download.") return jsonify({"error": "No chat log data to download."}), 404 tmp_file = "chat_log_tmp.json" with open(tmp_file, "w", encoding="utf-8") as f: json.dump(_messages, f, ensure_ascii=False, indent=2) return send_file(tmp_file, as_attachment=True, download_name="chat_log.json") except Exception as e: logger.error(f"Error downloading chat log: {e}") return jsonify({"error": "Failed to download chat log file."}), 500 @app.route("/download-scheduled") def download_scheduled(): try: if os.path.exists(SCHEDULED_MESSAGES_FILE): return send_file(SCHEDULED_MESSAGES_FILE, as_attachment=True, download_name="scheduled_messages.json") with _scheduled_messages_lock: if not _scheduled_messages: logger.warning("No scheduled messages data in memory for download.") return jsonify({"error": "No scheduled messages data to download."}), 404 tmp_file = "scheduled_messages_tmp.json" with open(tmp_file, "w", encoding="utf-8") as f: json.dump(_scheduled_messages, f, ensure_ascii=False, indent=2) return send_file(tmp_file, as_attachment=True, download_name="scheduled_messages.json") except Exception as e: logger.error(f"Error downloading scheduled messages: {e}") return jsonify({"error": "Failed to download scheduled messages file."}), 500 @app.route("/download-suggested") def download_suggested(): try: if os.path.exists(SUGGESTED_TASKS_FILE): return send_file(SUGGESTED_TASKS_FILE, as_attachment=True, download_name="suggested_tasks.json") with _suggested_tasks_lock: if not _suggested_tasks: logger.warning("No suggested tasks data in memory for download.") return jsonify({"error": "No suggested tasks data to download."}), 404 tmp_file = "suggested_tasks_tmp.json" serializable_tasks = [] for task in _suggested_tasks: temp_task = task.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_task.get(key), datetime): temp_task[key] = temp_task[key].isoformat() serializable_tasks.append(temp_task) with open(tmp_file, "w", encoding="utf-8") as f: json.dump(serializable_tasks, f, ensure_ascii=False, indent=2) return send_file(tmp_file, as_attachment=True, download_name="suggested_tasks.json") except Exception as e: logger.error(f"Error downloading suggested tasks: {e}") return jsonify({"error": "Failed to download suggested tasks file."}), 500 @app.route("/download-general-ai") def download_general_ai(): try: if os.path.exists(GENERAL_AI_RESPONSES_FILE): return send_file(GENERAL_AI_RESPONSES_FILE, as_attachment=True, download_name="general_ai_responses.json") with _general_ai_responses_lock: if not _general_ai_responses: logger.warning("No general AI responses data in memory for download.") return jsonify({"error": "No general AI responses data to download."}), 404 tmp_file = "general_ai_responses_tmp.json" serializable_responses = [] for resp in _general_ai_responses: temp_resp = resp.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_resp.get(key), datetime): temp_resp[key] = temp_resp[key].isoformat() serializable_responses.append(temp_resp) with open(tmp_file, "w", encoding="utf-8") as f: json.dump(serializable_responses, f, ensure_ascii=False, indent=2) return send_file(tmp_file, as_attachment=True, download_name="general_ai_responses.json") except Exception as e: logger.error(f"Error downloading general AI responses: {e}") return jsonify({"error": "Failed to download general AI responses file."}), 500 @app.route("/clear", methods=["POST"]) def clear(): """Clears all in-memory data, removes all persistent files, and clears scheduler jobs.""" try: # Clear in-memory stores with _messages_lock: _messages.clear() with _scheduled_messages_lock: _scheduled_messages.clear() with _suggested_tasks_lock: _suggested_tasks.clear() with _general_ai_responses_lock: _general_ai_responses.clear() with _farmer_data_lock: _farmer_data.clear() # Clear farmer data in memory global _awaiting_update_response _awaiting_update_response = {} # Reset the tracker # Remove persistent files for file_path in [CHAT_LOG_FILE, SCHEDULED_MESSAGES_FILE, SUGGESTED_TASKS_FILE, GENERAL_AI_RESPONSES_FILE]: if os.path.exists(file_path): try: os.remove(file_path) logger.info(f"Removed persistent file: {file_path}") except Exception as e: logger.error(f"Failed to remove file {file_path}: {e}. Please check file permissions.") # Clear all APScheduler jobs scheduler.remove_all_jobs() logger.info("All APScheduler jobs cleared.") logger.info("All application data cleared successfully.") return ("", 204) # No Content except Exception as e: logger.error(f"Error during 'clear all data' operation: {e}") return jsonify({"error": "Failed to clear data due to an internal server error. Check server logs."}), 500 @app.route("/schedule_from_ui", methods=["POST"]) def schedule_from_ui(): """Endpoint to schedule a message directly from the UI.""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400 chat_id = data.get("chat_id") recipient_username = data.get("username", "UI Scheduled User") message = data.get("message", "").strip() scheduled_time_str = data.get("scheduled_time") message_type = data.get("message_type", "reminder") if not all([chat_id, message, scheduled_time_str]): return jsonify({ "status": "error", "message": "Missing required fields: 'chat_id', 'message', and 'scheduled_time' are all required." }), 400 # Validate message length (Telegram limit) if len(message) > 4096: return jsonify({ "status": "error", "message": f"Message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram." }), 400 # Parse and validate datetime string try: scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M') scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive) except ValueError: return jsonify({ "status": "error", "message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM (e.g., 2023-10-27T14:30)." }), 400 # Check if scheduled time is in the future now = datetime.now(TIMEZONE) if scheduled_datetime <= now: return jsonify({ "status": "error", "message": "Scheduled time must be in the future. Please choose a time later than now." }), 400 # Optional: Prevent scheduling too far in the future for stability/resource management if scheduled_datetime > now + timedelta(days=365): return jsonify({ "status": "error", "message": "Scheduled time cannot be more than 1 year in the future for stability." }), 400 result = schedule_message(chat_id, recipient_username, message, scheduled_datetime, message_type) if result: logger.info( f"UI: Message '{message_type}' scheduled successfully for {recipient_username} ({chat_id}) at {scheduled_datetime}.") return jsonify({"status": "success", "message": "Message scheduled successfully."}) else: return jsonify({ "status": "error", "message": "Failed to schedule message due to an internal server error (check server logs)." }), 500 except Exception as e: logger.error(f"Error in /schedule_from_ui: {e}") traceback.print_exc() return jsonify({ "status": "error", "message": f"An unexpected server error occurred: {str(e)}" }), 500 @app.route("/schedule_suggested_from_ui", methods=["POST"]) def schedule_suggested_from_ui(): """Endpoint for admin to schedule an individual AI-suggested task from the UI.""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400 suggested_task_id = data.get("suggested_task_id") chat_id = data.get("chat_id") username = data.get("username") message = data.get("message", "").strip() # This is the individual task message content scheduled_time_str = data.get("scheduled_time") if not all([suggested_task_id, chat_id, username, message, scheduled_time_str]): return jsonify({ "status": "error", "message": "Missing required fields for scheduling suggested task." }), 400 # Validate message length (Telegram limit) if len(message) > 4096: return jsonify({ "status": "error", "message": f"Task message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram." }), 400 # Parse and validate datetime try: scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M') scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive) except ValueError: return jsonify({ "status": "error", "message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM." }), 400 # Check if time is in the future now = datetime.now(TIMEZONE) if scheduled_datetime <= now: return jsonify({ "status": "error", "message": "Scheduled time must be in the future. Please choose a time later than now." }), 400 # Check if the suggested task exists and is still pending review task_found_and_pending = False with _suggested_tasks_lock: for task in _suggested_tasks: if task["id"] == suggested_task_id and task["status"] == "pending_review": task_found_and_pending = True break if not task_found_and_pending: return jsonify({ "status": "error", "message": "Suggested task not found or already processed (scheduled/discarded)." }), 404 result = schedule_message(chat_id, username, message, scheduled_datetime, "daily_tasks") if result: # Update the status of the suggested task to 'scheduled' with _suggested_tasks_lock: for task in _suggested_tasks: if task["id"] == suggested_task_id: task["status"] = "scheduled" task["scheduled_by_admin_time"] = datetime.now(TIMEZONE) break save_suggested_tasks() logger.info( f"UI: Individual suggested task ID '{suggested_task_id}' scheduled for {username} ({chat_id}) at {scheduled_datetime}.") return jsonify({"status": "success", "message": "Suggested task scheduled successfully."}) else: return jsonify({ "status": "error", "message": "Failed to schedule suggested task due to an internal server error (check server logs)." }), 500 except Exception as e: logger.error(f"Error in /schedule_suggested_from_ui: {e}") traceback.print_exc() return jsonify({ "status": "error", "message": f"An unexpected server error occurred: {str(e)}" }), 500 @app.route("/discard_suggested_task", methods=["POST"]) def discard_suggested_task(): """Endpoint for admin to discard an individual AI-suggested task from the UI.""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400 suggested_task_id = data.get("suggested_task_id") if not suggested_task_id: return jsonify({"status": "error", "message": "Missing 'suggested_task_id' in request."}), 400 found_and_pending = False with _suggested_tasks_lock: for task in _suggested_tasks: if task["id"] == suggested_task_id and task["status"] == "pending_review": task["status"] = "discarded" task["discarded_by_admin_time"] = datetime.now(TIMEZONE) found_and_pending = True break if found_and_pending: save_suggested_tasks() logger.info(f"UI: Suggested task ID '{suggested_task_id}' discarded.") return jsonify({"status": "success", "message": "Suggested task discarded successfully."}) else: return jsonify({ "status": "error", "message": "Suggested task not found or already processed (scheduled/discarded)." }), 404 except Exception as e: logger.error(f"Error in /discard_suggested_task: {e}") traceback.print_exc() return jsonify({ "status": "error", "message": f"An unexpected server error occurred: {str(e)}" }), 500 @app.route("/schedule_general_ai_response", methods=["POST"]) def schedule_general_ai_response(): """Endpoint for admin to schedule a general AI response from the UI.""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400 response_id = data.get("response_id") chat_id = data.get("chat_id") username = data.get("username") message = data.get("message", "").strip() scheduled_time_str = data.get("scheduled_time") if not all([response_id, chat_id, username, message, scheduled_time_str]): return jsonify({"status": "error", "message": "Missing required fields for scheduling AI response."}), 400 # Validate message length (Telegram limit) if len(message) > 4096: return jsonify({ "status": "error", "message": f"AI response message too long ({len(message)} chars). Maximum 4096 characters allowed for Telegram." }), 400 # Parse and validate datetime try: scheduled_datetime_naive = datetime.strptime(scheduled_time_str, '%Y-%m-%dT%H:%M') scheduled_datetime = TIMEZONE.localize(scheduled_datetime_naive) except ValueError: return jsonify({"status": "error", "message": "Invalid date/time format. Expected YYYY-MM-DDTHH:MM."}), 400 now = datetime.now(TIMEZONE) if scheduled_datetime <= now: return jsonify({"status": "error", "message": "Scheduled time must be in the future."}), 400 # Check if the AI response entry exists and is still pending review response_found = False with _general_ai_responses_lock: for resp in _general_ai_responses: if resp["id"] == response_id and resp["status"] == "pending_review": response_found = True break if not response_found: return jsonify({"status": "error", "message": "AI response not found or already processed."}), 404 result = schedule_message(chat_id, username, message, scheduled_datetime, "general_ai_response") if result: with _general_ai_responses_lock: for resp in _general_ai_responses: if resp["id"] == response_id: resp["status"] = "scheduled" resp["scheduled_by_admin_time"] = datetime.now(TIMEZONE) break save_general_ai_responses() logger.info( f"UI: General AI response ID '{response_id}' scheduled for {username} ({chat_id}) at {scheduled_datetime}.") return jsonify({"status": "success", "message": "AI response scheduled successfully."}) else: return jsonify({"status": "error", "message": "Failed to schedule AI response due to internal error."}), 500 except Exception as e: logger.error(f"Error in /schedule_general_ai_response: {e}") traceback.print_exc() return jsonify({ "status": "error", "message": f"An unexpected server error occurred: {str(e)}" }), 500 @app.route("/discard_general_ai_response", methods=["POST"]) def discard_general_ai_response(): """Endpoint for admin to discard a general AI response from the UI.""" try: data = request.get_json() if not data: return jsonify({"status": "error", "message": "No JSON data provided in request body."}), 400 response_id = data.get("response_id") if not response_id: return jsonify({"status": "error", "message": "Missing 'response_id' in request."}), 400 found = False with _general_ai_responses_lock: for resp in _general_ai_responses: if resp["id"] == response_id and resp["status"] == "pending_review": resp["status"] = "discarded" resp["discarded_by_admin_time"] = datetime.now(TIMEZONE) found = True break if found: save_general_ai_responses() logger.info(f"UI: General AI response ID '{response_id}' discarded.") return jsonify({"status": "success", "message": "AI response discarded successfully."}) else: return jsonify({"status": "error", "message": "AI response not found or already processed."}), 404 except Exception as e: logger.error(f"Error in /discard_general_ai_response: {e}") traceback.print_exc() return jsonify({ "status": "error", "message": f"An unexpected server error occurred: {str(e)}" }), 500 @app.route("/health") def health(): """Provides a basic health check endpoint for the application.""" return jsonify({ "status": "healthy", "timestamp": datetime.now(TIMEZONE).isoformat(), "scheduler_running": scheduler.running, "gemini_available": model is not None, "farmer_data_loaded_count": len(_farmer_data), "messages_in_log": len(_messages), "scheduled_messages_count": len(_scheduled_messages), "suggested_tasks_count": len(_suggested_tasks), "general_ai_responses_count": len(_general_ai_responses) }) # ---------------- Startup ---------------- def start_polling_thread(): """Starts a separate thread for Telegram long polling.""" t = threading.Thread(target=polling_loop, daemon=True, name="TelegramPolling") t.start() logger.info("Telegram polling thread initiated.") return t if __name__ == "__main__": # Perform initial configuration validation if not BOT_TOKEN : logger.critical( "❌ BOT_TOKEN is missing or is the default placeholder. Please update it in config or environment variables.") exit(1) # Exit if essential config is missing if not GEMINI_API_KEY or GEMINI_API_KEY == "AIzaSyAfF-13nsrMdAAe3SFOPSxFya4EtfLBjho": logger.warning( "âš ī¸ GEMINI_API_KEY is missing or is the default placeholder. AI features may not function correctly without it.") # Do not exit, as other parts of the app might still be useful logger.info("Starting Farm Bot with admin-controlled scheduling workflow...") # Load all persistent data into memory load_messages_from_file() load_scheduled_messages() load_suggested_tasks() load_general_ai_responses() load_farmer_data() # Load farmer-specific RAG data # --- HARDCODED SCHEDULED FARM UPDATE REQUEST AT STARTUP --- # This ensures the first "ask for updates" message is automatically scheduled for a farmer. # IMPORTANT: Replace these values with actual farmer information (or set via environment variables) # To get a farmer's chat ID: Have them send any message to your bot. Then check http://127.0.0.1:5000/messages TARGET_FARMER_CHAT_ID = os.environ.get("TARGET_FARMER_CHAT_ID", "YOUR_TELEGRAM_CHAT_ID") TARGET_FARMER_USERNAME = os.environ.get("TARGET_FARMER_USERNAME", "YOUR_TELEGRAM_USERNAME") FARM_UPDATE_REQUEST_MESSAGE = "What's the update regarding your farm? Please share details about your crops, any issues you're facing, and current farming activities." FARM_UPDATE_REQUEST_TYPE = "farm_update_request" # Schedule for 5 minutes from startup (for testing) - adjust for production e.g. daily at 8 AM scheduled_farm_update_time = datetime.now(TIMEZONE) + timedelta(minutes=5) if TARGET_FARMER_CHAT_ID in ["YOUR_TELEGRAM_CHAT_ID", "", None] or TARGET_FARMER_USERNAME in [ "YOUR_TELEGRAM_USERNAME", "", None]: logger.warning( "âš ī¸ Initial farm update request not scheduled: TARGET_FARMER_CHAT_ID or TARGET_FARMER_USERNAME not properly configured in script or environment variables.") logger.info(" -> To enable this feature, please update the configuration or manually schedule via the UI.") else: # Check if a similar farm update request has already been recently scheduled/sent update_request_already_handled = False with _scheduled_messages_lock: for msg in _scheduled_messages: if (str(msg["chat_id"]) == TARGET_FARMER_CHAT_ID and msg["type"] == FARM_UPDATE_REQUEST_TYPE and msg["status"] in ["pending", "sent"]): # Consider handled if scheduled or sent within the last 24 hours msg_time_str = msg.get("sent_time", msg["scheduled_time"]) try: msg_dt = datetime.fromisoformat(msg_time_str.replace('Z', '+00:00')) # Handle Z timezone if msg_dt.tzinfo is None: msg_dt = TIMEZONE.localize(msg_dt) else: msg_dt = msg_dt.astimezone(TIMEZONE) if msg_dt > (datetime.now(TIMEZONE) - timedelta(hours=24)): update_request_already_handled = True logger.info( f"â„šī¸ Farm update request for {TARGET_FARMER_USERNAME} ({TARGET_FARMER_CHAT_ID}) already exists or was recently sent. Not re-scheduling.") break except Exception as e: logger.warning( f"Error parsing datetime for existing scheduled message ID '{msg.get('id', 'unknown')}': {e}. This might affect duplicate detection for hardcoded message.") if not update_request_already_handled: if scheduled_farm_update_time > datetime.now(TIMEZONE): result = schedule_message( TARGET_FARMER_CHAT_ID, TARGET_FARMER_USERNAME, FARM_UPDATE_REQUEST_MESSAGE, scheduled_farm_update_time, FARM_UPDATE_REQUEST_TYPE ) if result: logger.info( f"✅ Initial farm update request scheduled for {TARGET_FARMER_USERNAME} ({TARGET_FARMER_CHAT_ID}) at {scheduled_farm_update_time.strftime('%Y-%m-%d %I:%M %p %Z')}.") else: logger.error("❌ Failed to schedule initial farm update request due to an internal error.") else: logger.warning( f"âš ī¸ Initial farm update request not scheduled: Scheduled time ({scheduled_farm_update_time.strftime('%Y-%m-%d %I:%M %p %Z')}) is in the past. Please adjust `scheduled_farm_update_time` for future execution.") # --- END HARDCODED SCHEDULED FARM UPDATE REQUEST --- # Start the Telegram polling thread (daemon so it exits with main app) start_polling_thread() logger.info("\n-----------------------------------------------------") logger.info("Farm Bot Application Ready!") logger.info("📱 Instruct your farmers to start a chat with your Telegram bot.") logger.info("🌐 Access the Admin Panel to manage tasks and responses: http://127.0.0.1:5000") logger.info("📊 Check system health: http://127.0.0.1:5000/health") logger.info("-----------------------------------------------------\n") try: # Run the Flask app app.run(host="0.0.0.0", port=5000, debug=False, threaded=True) except KeyboardInterrupt: logger.info("Application received KeyboardInterrupt. Shutting down gracefully...") except Exception as e: logger.critical(f"Unhandled exception during Flask application runtime: {e}") traceback.print_exc() finally: # Ensure scheduler is shut down when the app stops if scheduler.running: scheduler.shutdown(wait=False) logger.info("APScheduler shut down.") logger.info("Application process finished.")