import requests import time import json import traceback import threading from datetime import datetime, timedelta from flask import Flask, jsonify, render_template_string, send_file, request import google.generativeai as genai import os from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.date import DateTrigger import pytz import logging import re from typing import Optional, Dict, List, Any # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # ---------------- CONFIG ---------------- # !!! IMPORTANT: Replace these with your actual tokens or use environment variables !!! BOT_TOKEN = "8328709566:AAH7ZmdWuzGODlTByJ34yI5yR9e8otBokBc" # <<< REPLACE THIS with your Telegram Bot Token GEMINI_API_KEY = "AIzaSyAfF-13nsrMdAAe3SFOPSxFya4EtfLBjho" # <<< REPLACE THIS with your Google Gemini API Key # !!! ----------------------------------------------------------------------------- !!! TELEGRAM_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}" CHAT_LOG_FILE = "chat_log.json" SCHEDULED_MESSAGES_FILE = "scheduled_messages.json" SUGGESTED_TASKS_FILE = "suggested_tasks.json" GENERAL_AI_RESPONSES_FILE = "general_ai_responses.json" FARMERS_DATA_FILE = "farmers.json" # File for farmer-specific RAG data MAX_MESSAGES = 2000 TIMEZONE = pytz.timezone('Asia/Kolkata') # Adjust to your local timezone (e.g., 'America/New_York', 'Europe/London') FARM_UPDATE_RESPONSE_WINDOW_HOURS = 48 # Window for a farmer to respond to an update request # Hardcoded messages for the UI dropdown HARDCODED_MESSAGES_FOR_UI =[ { "value": "What's the update regarding your farm? Please share details about your crops, any issues you're facing, and current farming activities.", "text": "Ask for Farm Update"}, {"value": "How are your crops doing today?", "text": "Crops Check-in"}, {"value": "Reminder: Check irrigation system.", "text": "Irrigation Reminder"}, {"value": "Don't forget to monitor pest activity.", "text": "Pest Monitor Reminder"}, {"value": "Daily task: Fertilize Section A.", "text": "Fertilize Section A"}, {"value": "Weather looks good for harvesting. Consider starting tomorrow.", "text": "Harvest Weather Alert"}, {"value": "Time to check soil moisture levels.", "text": "Soil Moisture Check"}, {"value": "", "text": "Custom Message (Type Below)"} ] # Rate limiting for Telegram API calls REQUEST_RATE_LIMIT = 30 # requests per minute rate_limit_tracker = {"count": 0, "reset_time": time.time() + 60} # ---------------------------------------- # Flask app app = Flask(__name__) # Configure Gemini with error handling try: genai.configure(api_key=GEMINI_API_KEY) model = genai.GenerativeModel("gemini-2.0-flash-exp") logger.info("Gemini AI configured successfully.") except Exception as e: logger.error(f"Failed to configure Gemini AI. AI features will be unavailable: {e}") model = None # Set model to None if configuration fails # Initialize scheduler scheduler = BackgroundScheduler(timezone=TIMEZONE) scheduler.start() # Thread-safe stores _messages_lock = threading.Lock() _messages: List[Dict[str, Any]] = [] _scheduled_messages_lock = threading.Lock() _scheduled_messages: List[Dict[str, Any]] = [] _suggested_tasks_lock = threading.Lock() _suggested_tasks: List[Dict[str, Any]] = [] # Each entry is now a single task with message and reason _general_ai_responses_lock = threading.Lock() _general_ai_responses: List[Dict[str, Any]] = [] _farmer_data_lock = threading.Lock() _farmer_data: List[Dict[str, Any]] = [] # State for tracking if we're awaiting a farm update response from a specific chat_id _awaiting_update_response: Dict[str, datetime] = {} def check_rate_limit() -> bool: """Checks if Telegram API request rate limit has been exceeded.""" current_time = time.time() with _messages_lock: # Use a lock for shared rate_limit_tracker access if current_time > rate_limit_tracker["reset_time"]: rate_limit_tracker["count"] = 0 rate_limit_tracker["reset_time"] = current_time + 60 if rate_limit_tracker["count"] >= REQUEST_RATE_LIMIT: return False rate_limit_tracker["count"] += 1 return True def sanitize_text(text: str, max_length: int = 4096) -> str: """Sanitizes text for Telegram message limits and removes invalid characters.""" if not text: return "" # Remove null characters which can cause issues with some APIs/databases sanitized = text.replace('\x00', '').strip() # Telegram message limit for sendMessage is 4096 characters (HTML mode can be slightly less due to tags) if len(sanitized) > max_length: sanitized = sanitized[:max_length - 3] + "..." return sanitized def save_messages_to_file(): """Saves chat messages to a JSON file.""" try: with _messages_lock: with open(CHAT_LOG_FILE, "w", encoding="utf-8") as f: json.dump(_messages, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_messages)} messages to {CHAT_LOG_FILE}.") except Exception as e: logger.error(f"Failed to save messages to {CHAT_LOG_FILE}: {e}") def save_scheduled_messages(): """Saves scheduled messages to a JSON file.""" try: with _scheduled_messages_lock: with open(SCHEDULED_MESSAGES_FILE, "w", encoding="utf-8") as f: json.dump(_scheduled_messages, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_scheduled_messages)} scheduled messages to {SCHEDULED_MESSAGES_FILE}.") except Exception as e: logger.error(f"Failed to save scheduled messages to {SCHEDULED_MESSAGES_FILE}: {e}") def save_suggested_tasks(): """Saves AI-suggested tasks to a JSON file.""" try: with _suggested_tasks_lock: serializable_tasks = [] for task in _suggested_tasks: temp_task = task.copy() # Convert datetime objects to ISO strings for JSON serialization for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_task.get(key), datetime): temp_task[key] = temp_task[key].isoformat() serializable_tasks.append(temp_task) with open(SUGGESTED_TASKS_FILE, "w", encoding="utf-8") as f: json.dump(serializable_tasks, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_suggested_tasks)} suggested tasks to {SUGGESTED_TASKS_FILE}.") except Exception as e: logger.error(f"Failed to save suggested tasks to {SUGGESTED_TASKS_FILE}: {e}") def save_general_ai_responses(): """Saves general AI responses (pending admin review) to a JSON file.""" try: with _general_ai_responses_lock: serializable_responses = [] for resp in _general_ai_responses: temp_resp = resp.copy() for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if isinstance(temp_resp.get(key), datetime): temp_resp[key] = temp_resp[key].isoformat() serializable_responses.append(temp_resp) with open(GENERAL_AI_RESPONSES_FILE, "w", encoding="utf-8") as f: json.dump(serializable_responses, f, ensure_ascii=False, indent=2) logger.debug(f"Saved {len(_general_ai_responses)} general AI responses to {GENERAL_AI_RESPONSES_FILE}.") except Exception as e: logger.error(f"Failed to save general AI responses to {GENERAL_AI_RESPONSES_FILE}: {e}") def load_messages_from_file(): """Loads chat messages from a JSON file.""" if not os.path.exists(CHAT_LOG_FILE): logger.info(f"{CHAT_LOG_FILE} not found, starting with empty messages.") return try: with open(CHAT_LOG_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _messages_lock: _messages.clear() _messages.extend(data[-MAX_MESSAGES:]) # Load only the latest MAX_MESSAGES logger.info(f"Loaded {len(_messages)} messages from {CHAT_LOG_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {CHAT_LOG_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load messages from {CHAT_LOG_FILE}: {e}") def load_scheduled_messages(): """Loads scheduled messages and re-schedules pending ones.""" if not os.path.exists(SCHEDULED_MESSAGES_FILE): logger.info(f"{SCHEDULED_MESSAGES_FILE} not found, starting with empty scheduled messages.") return try: with open(SCHEDULED_MESSAGES_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _scheduled_messages_lock: _scheduled_messages.clear() _scheduled_messages.extend(data) rescheduled_count = 0 for entry in _scheduled_messages: try: # Ensure scheduled_time is timezone-aware scheduled_time = datetime.fromisoformat(entry["scheduled_time"]) if scheduled_time.tzinfo is None: scheduled_time = TIMEZONE.localize(scheduled_time) # Localize if naive # Only re-schedule if pending and in the future if entry["status"] == "pending" and scheduled_time > datetime.now(TIMEZONE): scheduler.add_job( func=send_scheduled_message, trigger=DateTrigger(run_date=scheduled_time), args=[entry], id=entry["id"], replace_existing=True # Important to prevent duplicate jobs on restart ) rescheduled_count += 1 # If it was a sent farm update request, track it for response window management if entry["type"] == "farm_update_request" and entry["status"] == "sent": sent_time_str = entry.get("sent_time", entry["scheduled_time"]) # Fallback to scheduled if sent_time missing sent_or_scheduled_time = datetime.fromisoformat(sent_time_str) if sent_or_scheduled_time.tzinfo is None: sent_or_scheduled_time = TIMEZONE.localize(sent_or_scheduled_time) _awaiting_update_response[str(entry["chat_id"])] = sent_or_scheduled_time except KeyError as ke: logger.error(f"Missing key in scheduled message entry ID '{entry.get('id', 'unknown')}': {ke}") except ValueError as ve: # fromisoformat error logger.error( f"Invalid datetime format in scheduled message entry ID '{entry.get('id', 'unknown')}': {ve}") except Exception as e: logger.error(f"Failed to reschedule entry ID '{entry.get('id', 'unknown')}': {e}") logger.info( f"Loaded {len(_scheduled_messages)} scheduled messages, rescheduled {rescheduled_count} pending jobs.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {SCHEDULED_MESSAGES_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load scheduled messages from {SCHEDULED_MESSAGES_FILE}: {e}") def load_suggested_tasks(): """Loads AI-suggested tasks from a JSON file.""" if not os.path.exists(SUGGESTED_TASKS_FILE): logger.info(f"{SUGGESTED_TASKS_FILE} not found, starting with empty suggested tasks.") return try: with open(SUGGESTED_TASKS_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _suggested_tasks_lock: _suggested_tasks.clear() for task in data: # Convert ISO strings back to datetime objects for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if key in task and task[key] is not None: try: task[key] = datetime.fromisoformat(task[key]) except (ValueError, TypeError): logger.warning( f"Invalid datetime format for key '{key}' in task ID '{task.get('id')}': {task[key]}. Setting to None.") task[key] = None _suggested_tasks.append(task) logger.info(f"Loaded {len(_suggested_tasks)} suggested tasks from {SUGGESTED_TASKS_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {SUGGESTED_TASKS_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load suggested tasks from {SUGGESTED_TASKS_FILE}: {e}") def load_general_ai_responses(): """Loads general AI responses (pending admin review) from a JSON file.""" if not os.path.exists(GENERAL_AI_RESPONSES_FILE): logger.info(f"{GENERAL_AI_RESPONSES_FILE} not found, starting with empty general AI responses.") return try: with open(GENERAL_AI_RESPONSES_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _general_ai_responses_lock: _general_ai_responses.clear() for resp in data: for key in ["generation_time", "scheduled_by_admin_time", "discarded_by_admin_time"]: if key in resp and resp[key] is not None: try: resp[key] = datetime.fromisoformat(resp[key]) except (ValueError, TypeError): logger.warning( f"Invalid datetime format for key '{key}' in general AI response ID '{resp.get('id')}': {resp[key]}. Setting to None.") resp[key] = None _general_ai_responses.append(resp) logger.info(f"Loaded {len(_general_ai_responses)} general AI responses from {GENERAL_AI_RESPONSES_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {GENERAL_AI_RESPONSES_FILE} (JSON error): {e}. Consider backing up the file and starting fresh.") except Exception as e: logger.error(f"Failed to load general AI responses from {GENERAL_AI_RESPONSES_FILE}: {e}") def load_farmer_data(): """Loads farmer-specific data from farmers.json for RAG.""" if not os.path.exists(FARMERS_DATA_FILE): logger.warning( f"{FARMERS_DATA_FILE} not found. RAG functionality for tasks will be limited or unavailable. Please create it if needed.") with _farmer_data_lock: _farmer_data.clear() return try: with open(FARMERS_DATA_FILE, "r", encoding="utf-8") as f: data = json.load(f) with _farmer_data_lock: _farmer_data.clear() _farmer_data.extend(data) logger.info(f"Loaded {len(_farmer_data)} farmer profiles from {FARMERS_DATA_FILE}.") except json.JSONDecodeError as e: logger.error( f"Failed to parse {FARMERS_DATA_FILE} (JSON error): {e}. Ensure it's valid JSON. RAG functionality will be limited.") except Exception as e: logger.error(f"Failed to load farmer data from {FARMERS_DATA_FILE}: {e}") def get_farmer_profile(chat_id: str) -> Optional[Dict]: """Retrieves a farmer's profile by chat_id from loaded data.""" with _farmer_data_lock: for farmer in _farmer_data: if str(farmer.get("chat_id")) == str(chat_id): return farmer return None def append_message(role: str, username: str, chat_id: str, text: str): """Appends a message to the in-memory chat log and saves it to file.""" if not text: logger.debug(f"Attempted to append empty message for role '{role}' from {username} ({chat_id}). Skipping.") return text = sanitize_text(text) entry = { "role": role, "username": username, "chat_id": str(chat_id), "text": text, "ts": datetime.utcnow().isoformat() + "Z" } with _messages_lock: _messages.append(entry) if len(_messages) > MAX_MESSAGES: _messages[:] = _messages[-MAX_MESSAGES:] # Keep only the latest messages save_messages_to_file() def send_message(chat_id: str, text: str) -> Optional[Dict]: """Sends a message to Telegram, respecting rate limits.""" if not check_rate_limit(): logger.warning(f"Telegram API rate limit exceeded for chat_id {chat_id}. Message skipped: '{text[:50]}...'.") return None text = sanitize_text(text) if not text: logger.warning(f"Attempted to send empty message to chat_id {chat_id}. Skipping.") return None url = f"{TELEGRAM_API_URL}/sendMessage" payload = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"} # Use HTML for basic formatting try: r = requests.post(url, json=payload, timeout=10) r.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) result = r.json() if not result.get("ok"): logger.error( f"Telegram API reported error sending to {chat_id}: {result.get('description', 'Unknown error')}. Message: '{text[:50]}...'.") return None logger.debug(f"Successfully sent Telegram message to {chat_id}: '{text[:50]}...'.") return result except requests.exceptions.Timeout: logger.error(f"Timeout sending Telegram message to {chat_id}. Message: '{text[:50]}...'.") return None except requests.exceptions.RequestException as e: logger.error(f"Network error sending Telegram message to {chat_id}: {e}. Message: '{text[:50]}...'.") return None except Exception as e: logger.error(f"Unexpected error sending Telegram message to {chat_id}: {e}. Message: '{text[:50]}...'.") return None def schedule_message(chat_id: str, username: str, message_text: str, scheduled_time: datetime, message_type: str = "reminder") -> Optional[Dict]: """Schedules a message to be sent at a future time.""" if not message_text or not chat_id: logger.error("Cannot schedule message: Missing message text or chat_id.") return None # Ensure scheduled_time is in the future if scheduled_time <= datetime.now(TIMEZONE): logger.error( f"Cannot schedule message for {username} ({chat_id}): Scheduled time ({scheduled_time}) is in the past. Message: '{message_text[:50]}...'.") return None # Generate a unique ID for the scheduled entry timestamp = int(scheduled_time.timestamp()) # Use a hash of the first 50 chars of message_text for uniqueness in the ID text_hash = hash(message_text[:50]) % 10000 unique_id = f"{chat_id}_{message_type}_{timestamp}_{text_hash}" scheduled_entry = { "id": unique_id, "chat_id": str(chat_id), "username": username, "message": sanitize_text(message_text), "scheduled_time": scheduled_time.isoformat(), "type": message_type, "status": "pending", "created_at": datetime.now(TIMEZONE).isoformat() } try: with _scheduled_messages_lock: _scheduled_messages.append(scheduled_entry) save_scheduled_messages() scheduler.add_job( func=send_scheduled_message, trigger=DateTrigger(run_date=scheduled_time), args=[scheduled_entry], id=scheduled_entry["id"], replace_existing=True # Important: Prevents duplicate jobs if app restarts and tries to reschedule same ID ) logger.info( f"Scheduled '{message_type}' message for {username} ({chat_id}) (ID: {unique_id}) at {scheduled_time.strftime('%Y-%m-%d %H:%M %Z')}.") return scheduled_entry except Exception as e: logger.error(f"Failed to schedule message for {username} ({chat_id}) (ID: {unique_id}): {e}") return None def send_scheduled_message(scheduled_entry: Dict): """Callback function executed by APScheduler to send a scheduled message.""" try: chat_id = scheduled_entry["chat_id"] message = scheduled_entry["message"] recipient_username = scheduled_entry["username"] logger.info( f"Attempting to send scheduled message '{scheduled_entry['type']}' to {recipient_username} ({chat_id}). Message: '{message[:50]}...'.") result = send_message(chat_id, message) current_time = datetime.now(TIMEZONE).isoformat() if result: append_message("bot", f"Bot (Scheduled for {recipient_username})", chat_id, message) with _scheduled_messages_lock: for msg in _scheduled_messages: if msg["id"] == scheduled_entry["id"]: msg["status"] = "sent" msg["sent_time"] = current_time break save_scheduled_messages() logger.info(f"Successfully sent scheduled message to {recipient_username} ({chat_id}).") # If a farm update request was just sent, start tracking the response window if scheduled_entry["type"] == "farm_update_request": _awaiting_update_response[str(chat_id)] = datetime.now(TIMEZONE) logger.info( f"Now awaiting farm update response from {recipient_username} for {FARM_UPDATE_RESPONSE_WINDOW_HOURS} hours (chat_id: {chat_id}).") else: with _scheduled_messages_lock: for msg in _scheduled_messages: if msg["id"] == scheduled_entry["id"]: msg["status"] = "failed" msg["failed_time"] = current_time break save_scheduled_messages() logger.error( f"Failed to send scheduled message to {recipient_username} ({chat_id}). Marking as 'failed'. Message: '{message[:50]}...'.") except Exception as e: logger.error(f"Critical error in send_scheduled_message for ID '{scheduled_entry.get('id', 'unknown')}': {e}") traceback.print_exc() def generate_and_store_farm_tasks(user_message: str, chat_id: str, username: str) -> str: """ Generates 3-4 specific, short, descriptive daily tasks with a reason (<50 words) based on farmer's update AND farmer's profile (RAG), and stores each task separately for admin review. Does NOT send any message directly to the farmer. """ if not model: logger.error(f"Gemini model not available for task generation for {username}.") return "" try: farmer_profile = get_farmer_profile(chat_id) profile_context = "" if farmer_profile and farmer_profile.get("farm_details"): details = farmer_profile["farm_details"] profile_context = ( f"\n\nContext about {username}'s farm (use this for tailored advice):\n" f"- Location: {details.get('location', 'N/A')}\n" f"- Main Crops: {', '.join(details.get('main_crops', []))}\n" f"- Soil Type: {details.get('soil_type', 'N/A')}\n" f"- Typical Issues: {', '.join(details.get('typical_issues', []))}\n" f"- Last reported weather: {details.get('last_reported_weather', 'N/A')}\n" f"- Irrigation System: {details.get('irrigation_system', 'N/A')}\n" ) else: profile_context = "\n\n(No detailed farmer profile data found. Generating tasks based solely on the update.)" prompt = ( f"You are a highly experienced and practical agricultural expert. Based on the farmer's update below " f"and their farm context (if provided), generate exactly 3 to 4 critical daily tasks. " f"Each task must be concise, actionable, and include a 'Reason:' why it's important. " f"Ensure each 'Reason:' is strictly under 50 words. Prioritize immediate needs and common farming practices." f"\n\nStrictly follow this output format for each task:\n" f"Task: [Short, descriptive task, e.g., 'Check soil moisture in cornfield']\n" f"Reason: [Brief explanation why (under 50 words), e.g., 'Ensure optimal water levels after dry spell.']\n" f"---\n" f"Task: [Another short, descriptive task]\n" f"Reason: [Brief explanation why (under 50 words)]\n" f"---\n" f"(Repeat 3 or 4 times. Ensure each task block is clearly separated by '---' on its own line.)" f"\n\nFarmer: {username}\n" f"Farmer's Update: '{user_message}'" f"{profile_context}" f"\n\nNow, generate the 3-4 tasks and reasons in the specified format:" ) logger.info( f"Calling Gemini for RAG task generation for {username} (chat_id: {chat_id}) based on update: '{user_message[:100]}...'.") response = model.generate_content(prompt) ai_raw_output = response.text if response and response.text else "Could not generate tasks based on the update." logger.debug(f"Raw AI output for tasks: {ai_raw_output[:500]}...") # Parse the AI's output into individual tasks parsed_tasks = [] # Regex to find blocks starting with "Task:" and containing "Reason:", separated by "---" # Adjusted regex for better robustness to handle various line endings and trailing whitespace. task_blocks = re.findall(r"Task:\s*(.*?)\nReason:\s*(.*?)(?:\n---\s*|$)", ai_raw_output, re.DOTALL | re.IGNORECASE) for task_text, reason_text in task_blocks: parsed_tasks.append({ "task_message": task_text.strip(), "task_reason": reason_text.strip() }) if not parsed_tasks: logger.warning( f"AI did not generate structured tasks for {username} in expected format. Providing fallback task(s). Raw output: {ai_raw_output[:200]}") # Fallback if parsing fails or AI doesn't follow format. Always ensure at least one review item. fallback_message = "AI did not generate specific tasks in the expected format. Manual review of farmer's update is required." if len(ai_raw_output) > 50: # If AI gave some output, add it as context to the fallback fallback_message += f" AI's raw response: {sanitize_text(ai_raw_output, max_length=150)}." parsed_tasks.append({ "task_message": "Manual Review Required", "task_reason": sanitize_text(fallback_message, max_length=200) }) generation_time = datetime.now(TIMEZONE) # Store each individual parsed task for separate admin scheduling with _suggested_tasks_lock: for i, task_item in enumerate(parsed_tasks): # Create a unique ID for each individual task item task_id = f"suggested_task_{chat_id}_{int(generation_time.timestamp())}_{i}_{hash(task_item['task_message']) % 10000}" task_entry = { "id": task_id, "chat_id": str(chat_id), "username": username, "original_update": user_message, # Keep original update for context in UI "suggested_task_message": sanitize_text(task_item["task_message"]), "suggested_task_reason": sanitize_text(task_item["task_reason"]), "generation_time": generation_time, "status": "pending_review" } _suggested_tasks.append(task_entry) save_suggested_tasks() logger.info( f"Generated and stored {len(parsed_tasks)} individual tasks for {username} (chat_id: {chat_id}) for admin review.") return "" # No immediate response to farmer except Exception as e: logger.error(f"Error during farm task generation/storage for {username} (chat_id: {chat_id}): {e}") traceback.print_exc() return "" # No immediate response to farmer on AI error def get_ai_response_with_context(user_message: str, chat_id: str) -> str: """Gets a general AI response for a farmer's query, considering general farming context.""" if not model: logger.error("Gemini model not available for general query, returning fallback message.") return "AI service is currently unavailable. Please try again later." try: # Include general farming context in the prompt prompt = ( f"You are a helpful and knowledgeable agricultural assistant. Provide a concise and direct answer " f"to the following query from a farmer. Keep the response factual and actionable where possible. " f"Limit the response to a few sentences (max 3-4 sentences)." f"\n\nFarmer's Query: {user_message}" ) logger.info(f"Generating general AI response for {chat_id} (query: '{user_message[:50]}...')...") response = model.generate_content(prompt) ai_reply = response.text if response and response.text else "I couldn't process your request right now." return sanitize_text(ai_reply) except Exception as e: logger.error(f"AI response error for chat_id {chat_id} (query: '{user_message[:50]}...'): {e}") traceback.print_exc() return "I'm having trouble processing your request right now. Please try again later." def store_general_ai_response(chat_id: str, username: str, original_query: str, ai_response: str): """Stores a general AI response (generated from farmer's general query) for admin review.""" if not ai_response: logger.warning(f"Attempted to store empty general AI response for {username} ({chat_id}). Skipping.") return response_entry = { "id": f"general_ai_{chat_id}_{int(datetime.now().timestamp())}_{hash(original_query) % 10000}", "chat_id": str(chat_id), "username": username, "original_query": sanitize_text(original_query), "ai_response": sanitize_text(ai_response), "generation_time": datetime.now(TIMEZONE), "status": "pending_review" } with _general_ai_responses_lock: _general_ai_responses.append(response_entry) save_general_ai_responses() logger.info(f"Stored general AI response for {username} ({chat_id}) (ID: {response_entry['id']}).") # ---------------- Polling logic ---------------- def delete_webhook(drop_pending: bool = True) -> Optional[Dict]: """Deletes the Telegram webhook to switch to long polling.""" try: params = {"drop_pending_updates": "true" if drop_pending else "false"} r = requests.post(f"{TELEGRAM_API_URL}/deleteWebhook", params=params, timeout=10) r.raise_for_status() result = r.json() if result.get("ok"): logger.info("Telegram webhook deleted successfully (ensuring long polling).") return result except requests.exceptions.RequestException as e: logger.error(f"Failed to delete Telegram webhook: {e}") return None except Exception as e: logger.error(f"Unexpected error deleting Telegram webhook: {e}") return None def get_updates(offset: Optional[int] = None, timeout_seconds: int = 60) -> Optional[Dict]: """Fetches updates from Telegram using long polling.""" params = {"timeout": timeout_seconds} if offset: params["offset"] = offset try: r = requests.get(f"{TELEGRAM_API_URL}/getUpdates", params=params, timeout=timeout_seconds + 10) r.raise_for_status() return r.json() except requests.exceptions.Timeout: logger.debug("Telegram polling timed out (normal, no new messages).") return {"ok": True, "result": []} # Return empty result on timeout to continue polling except requests.exceptions.RequestException as e: logger.error(f"Network error getting updates from Telegram: {e}") return None except Exception as e: logger.error(f"Unexpected error getting updates from Telegram: {e}") return None def polling_loop(): """Main loop for long-polling Telegram for new messages.""" logger.info("Starting Telegram polling thread.") delete_webhook(drop_pending=True) # Ensure webhook is off for polling offset = None consecutive_errors = 0 while True: try: updates = get_updates(offset=offset, timeout_seconds=60) if updates is None: # Critical network error, get_updates returned None consecutive_errors += 1 logger.error( f"get_updates returned None due to error. Consecutive errors: {consecutive_errors}. Sleeping for {min(consecutive_errors * 5, 120)}s.") time.sleep(min(consecutive_errors * 5, 120)) # Exponential backoff up to 2 min continue if not updates.get("ok"): logger.warning( f"Telegram API returned 'not ok' for getUpdates: {updates.get('description', 'No description')}. Consecutive errors: {consecutive_errors}.") consecutive_errors += 1 time.sleep(min(consecutive_errors * 2, 60)) # Exponential backoff up to 1 min continue consecutive_errors = 0 # Reset error counter on successful API call results = updates.get("result", []) for update in results: try: process_update(update) offset = update.get("update_id", 0) + 1 except Exception as e: logger.error(f"Error processing individual update ID {update.get('update_id', 'unknown')}: {e}") traceback.print_exc() # Small delay to prevent busy-waiting if Telegram returns many empty results quickly if not results: time.sleep(1) except KeyboardInterrupt: logger.info("Telegram polling stopped by user (KeyboardInterrupt).") break except Exception as e: consecutive_errors += 1 logger.critical(f"Unhandled critical error in polling loop: {e}") time.sleep(min(consecutive_errors * 10, 300)) # Longer exponential backoff for unhandled errors def process_update(update: Dict): """Processes a single Telegram update message.""" msg = update.get("message") or update.get("edited_message") if not msg: logger.debug(f"Update ID {update.get('update_id')}: No message or edited_message found. Skipping.") return chat = msg.get("chat", {}) chat_id = str(chat.get("id")) username = chat.get("first_name") or chat.get("username") or "User" text = msg.get("text") or msg.get("caption") or "" if not text: # If no text (e.g., photo, sticker), send a polite message and append to log acknowledgment_text = "I can only respond to text messages. Please send your queries as text." send_message(chat_id, acknowledgment_text) append_message("user", username, chat_id, "(Non-text message received)") append_message("bot", "Bot", chat_id, acknowledgment_text) logger.debug(f"Received non-text message from {username} ({chat_id}). Sent acknowledgment.") return append_message("user", username, chat_id, text) logger.info(f"Processing message from {username} ({chat_id}): '{text[:50]}...'.") bot_acknowledgment_to_farmer = "" # This is the only message sent directly to the farmer # Check if we are awaiting a farm update response from this chat_id most_recent_request_time = _awaiting_update_response.get(chat_id) if (most_recent_request_time and (datetime.now(TIMEZONE) - most_recent_request_time) <= timedelta(hours=FARM_UPDATE_RESPONSE_WINDOW_HOURS)): logger.info(f"Recognized as farm update response from {username}. Generating tasks for admin review.") generate_and_store_farm_tasks(text, chat_id, username) # Generates and stores tasks # Send a generic acknowledgment to the farmer bot_acknowledgment_to_farmer = "Thank you for your farm update! The admin will review it, generate daily tasks, and schedule them to be sent to you shortly." # Remove from tracking after processing this update _awaiting_update_response.pop(chat_id, None) logger.info(f"Finished processing farm update for {username} ({chat_id}). Awaiting response tracker cleared.") else: # This is a general query. Generate AI response but store it for admin scheduling. logger.info(f"Processing general query from {username}. Generating AI response for admin scheduling.") ai_generated_response = get_ai_response_with_context(text, chat_id) # Only store and acknowledge if AI generated a meaningful response if ai_generated_response and ai_generated_response != "AI service is currently unavailable. Please try again later.": store_general_ai_response(chat_id, username, text, ai_generated_response) bot_acknowledgment_to_farmer = f"I've received your query about '{text[:50]}...'. The admin will review the AI's suggested response and schedule it to be sent to you." else: # AI service unavailable or response generation failed bot_acknowledgment_to_farmer = "I've received your message. The admin will get back to you shortly." logger.warning( f"AI failed to generate a response for general query from {username} ({chat_id}). Admin will manually review.") if bot_acknowledgment_to_farmer: append_message("bot", "Bot", chat_id, bot_acknowledgment_to_farmer) send_message(chat_id, bot_acknowledgment_to_farmer) else: logger.debug(f"No direct acknowledgment message generated for {username} ({chat_id}) after processing.") # ---------------- Flask routes & UI ---------------- # HTML template (with updated section titles and new general AI response section) HTML_TEMPLATE = """