Spaces:
Sleeping
Sleeping
""" | |
Daily Task Management Service | |
This service generates daily farming tasks based on: | |
- Seasonal farming calendar | |
- Weather conditions | |
- Crop growth stages | |
- Soil conditions | |
- Previous task completion history | |
""" | |
import logging | |
from datetime import datetime, date, timedelta | |
from typing import List, Dict, Optional | |
import json | |
logger = logging.getLogger(__name__) | |
class DailyTaskService: | |
"""Service for generating and managing daily farming tasks""" | |
def __init__(self, gemini_service=None, weather_service=None): | |
self.gemini_service = gemini_service | |
self.weather_service = weather_service | |
# Predefined task templates based on seasons and farming activities | |
self.task_templates = self._initialize_task_templates() | |
def _initialize_task_templates(self) -> Dict: | |
"""Initialize predefined task templates for different farming activities""" | |
return { | |
# ==================== CROP FARMING TASKS ==================== | |
'irrigation': { | |
'morning': { | |
'title': 'Morning Irrigation Check', | |
'description': 'Check soil moisture levels and irrigate crops if needed. Focus on areas that dried out overnight.', | |
'duration': 30, | |
'priority': 'high', | |
'weather_dependent': True, | |
'farm_types': ['crop', 'mixed'] | |
}, | |
'evening': { | |
'title': 'Evening Irrigation', | |
'description': 'Water crops in the evening to minimize water loss due to evaporation.', | |
'duration': 45, | |
'priority': 'medium', | |
'weather_dependent': True, | |
'farm_types': ['crop', 'mixed'] | |
} | |
}, | |
'fertilizing': { | |
'organic': { | |
'title': 'Apply Organic Fertilizer', | |
'description': 'Apply compost or organic manure to crops. Focus on base application around plant roots.', | |
'duration': 60, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['crop', 'mixed'] | |
}, | |
'chemical': { | |
'title': 'Chemical Fertilizer Application', | |
'description': 'Apply chemical fertilizers as per soil test recommendations. Ensure proper timing and quantities.', | |
'duration': 45, | |
'priority': 'high', | |
'weather_dependent': True, | |
'farm_types': ['crop', 'mixed'] | |
} | |
}, | |
'pest_control': { | |
'inspection': { | |
'title': 'Pest and Disease Inspection', | |
'description': 'Inspect crops for signs of pests, diseases, or nutrient deficiencies. Take photos if issues found.', | |
'duration': 30, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['crop', 'mixed'] | |
}, | |
'spraying': { | |
'title': 'Pest Control Spraying', | |
'description': 'Apply pesticides or bio-pesticides as needed. Avoid spraying during windy or rainy conditions.', | |
'duration': 60, | |
'priority': 'medium', | |
'weather_dependent': True, | |
'farm_types': ['crop', 'mixed'] | |
} | |
}, | |
'weeding': { | |
'manual': { | |
'title': 'Manual Weeding', | |
'description': 'Remove weeds manually from crop rows. Focus on areas around young plants.', | |
'duration': 90, | |
'priority': 'medium', | |
'weather_dependent': False, | |
'farm_types': ['crop', 'mixed'] | |
}, | |
'mechanical': { | |
'title': 'Mechanical Weeding', | |
'description': 'Use hoe or cultivator to remove weeds between crop rows.', | |
'duration': 60, | |
'priority': 'medium', | |
'weather_dependent': False, | |
'farm_types': ['crop', 'mixed'] | |
} | |
}, | |
# ==================== DAIRY FARMING TASKS ==================== | |
'milking': { | |
'morning': { | |
'title': 'Morning Milking', | |
'description': 'Milk cows/buffaloes in the morning. Ensure clean milking equipment and proper hygiene.', | |
'duration': 90, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['dairy', 'mixed'] | |
}, | |
'evening': { | |
'title': 'Evening Milking', | |
'description': 'Evening milking session. Clean udders and equipment before milking.', | |
'duration': 90, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['dairy', 'mixed'] | |
} | |
}, | |
'dairy_feeding': { | |
'concentrate': { | |
'title': 'Provide Concentrate Feed', | |
'description': 'Give concentrate feed to dairy animals. Adjust quantity based on milk production.', | |
'duration': 45, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['dairy', 'livestock', 'mixed'] | |
}, | |
'fodder': { | |
'title': 'Fodder Distribution', | |
'description': 'Distribute fresh green fodder or dry fodder to dairy animals.', | |
'duration': 60, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['dairy', 'livestock', 'mixed'] | |
} | |
}, | |
'dairy_health': { | |
'health_check': { | |
'title': 'Daily Health Monitoring', | |
'description': 'Check animals for signs of illness, lameness, or distress. Monitor body temperature if needed.', | |
'duration': 30, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['dairy', 'livestock', 'poultry', 'mixed'] | |
}, | |
'udder_care': { | |
'title': 'Udder Health Check', | |
'description': 'Inspect udders for mastitis signs. Apply udder care products if necessary.', | |
'duration': 20, | |
'priority': 'medium', | |
'weather_dependent': False, | |
'farm_types': ['dairy', 'mixed'] | |
} | |
}, | |
# ==================== POULTRY FARMING TASKS ==================== | |
'poultry_feeding': { | |
'morning_feed': { | |
'title': 'Morning Poultry Feeding', | |
'description': 'Provide morning feed to chickens/birds. Check feed quality and quantity.', | |
'duration': 30, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['poultry', 'mixed'] | |
}, | |
'evening_feed': { | |
'title': 'Evening Poultry Feeding', | |
'description': 'Give evening feed to poultry. Ensure clean water supply.', | |
'duration': 30, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['poultry', 'mixed'] | |
} | |
}, | |
'egg_collection': { | |
'daily': { | |
'title': 'Egg Collection', | |
'description': 'Collect eggs from nesting boxes. Handle carefully and store properly.', | |
'duration': 20, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['poultry', 'mixed'] | |
} | |
}, | |
'poultry_health': { | |
'flock_monitoring': { | |
'title': 'Flock Health Monitoring', | |
'description': 'Observe bird behavior, appetite, and signs of disease. Remove sick birds if necessary.', | |
'duration': 25, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['poultry', 'mixed'] | |
}, | |
'coop_cleaning': { | |
'title': 'Coop Cleaning', | |
'description': 'Clean poultry house, remove droppings, and ensure proper ventilation.', | |
'duration': 45, | |
'priority': 'medium', | |
'weather_dependent': False, | |
'farm_types': ['poultry', 'mixed'] | |
} | |
}, | |
# ==================== LIVESTOCK FARMING TASKS ==================== | |
'livestock_feeding': { | |
'grazing': { | |
'title': 'Livestock Grazing Management', | |
'description': 'Move livestock to fresh grazing areas. Monitor grass quality and quantity.', | |
'duration': 60, | |
'priority': 'medium', | |
'weather_dependent': True, | |
'farm_types': ['livestock', 'mixed'] | |
}, | |
'supplemental': { | |
'title': 'Supplemental Feeding', | |
'description': 'Provide additional feed supplements to livestock. Check mineral salt availability.', | |
'duration': 45, | |
'priority': 'medium', | |
'weather_dependent': False, | |
'farm_types': ['livestock', 'mixed'] | |
} | |
}, | |
'livestock_care': { | |
'water_supply': { | |
'title': 'Water Supply Check', | |
'description': 'Ensure clean drinking water is available for all animals. Clean water troughs.', | |
'duration': 20, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['dairy', 'livestock', 'poultry', 'mixed'] | |
}, | |
'shelter_maintenance': { | |
'title': 'Shelter Maintenance', | |
'description': 'Check and maintain animal shelters. Repair any damage and ensure proper ventilation.', | |
'duration': 60, | |
'priority': 'low', | |
'weather_dependent': False, | |
'farm_types': ['dairy', 'livestock', 'poultry', 'mixed'] | |
} | |
}, | |
# ==================== FISHERY TASKS ==================== | |
'fishery': { | |
'water_quality': { | |
'title': 'Water Quality Monitoring', | |
'description': 'Check pond water quality, pH levels, and oxygen content. Monitor fish behavior.', | |
'duration': 30, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['fishery', 'mixed'] | |
}, | |
'fish_feeding': { | |
'title': 'Fish Feeding', | |
'description': 'Provide appropriate feed to fish. Adjust quantity based on fish size and water temperature.', | |
'duration': 20, | |
'priority': 'high', | |
'weather_dependent': False, | |
'farm_types': ['fishery', 'mixed'] | |
}, | |
'pond_maintenance': { | |
'title': 'Pond Maintenance', | |
'description': 'Clean pond surroundings, check water level, and maintain aeration systems.', | |
'duration': 45, | |
'priority': 'medium', | |
'weather_dependent': False, | |
'farm_types': ['fishery', 'mixed'] | |
} | |
}, | |
# ==================== COMMON MAINTENANCE TASKS ==================== | |
'maintenance': { | |
'equipment': { | |
'title': 'Equipment Maintenance', | |
'description': 'Check and maintain farming equipment. Clean, oil, and repair as needed.', | |
'duration': 60, | |
'priority': 'low', | |
'weather_dependent': False, | |
'farm_types': ['crop', 'dairy', 'livestock', 'poultry', 'fishery', 'mixed'] | |
}, | |
'infrastructure': { | |
'title': 'Farm Infrastructure Check', | |
'description': 'Inspect farm infrastructure including fencing, storage, and water systems.', | |
'duration': 45, | |
'priority': 'medium', | |
'weather_dependent': False, | |
'farm_types': ['crop', 'dairy', 'livestock', 'poultry', 'fishery', 'mixed'] | |
} | |
} | |
} | |
def generate_daily_tasks(self, farmer, target_date: date = None) -> List[Dict]: | |
""" | |
Generate daily tasks for a farmer based on various factors | |
Args: | |
farmer: Farmer object | |
target_date: Date to generate tasks for (default: today) | |
Returns: | |
List of task dictionaries | |
""" | |
if not target_date: | |
target_date = date.today() | |
tasks = [] | |
try: | |
# Get farmer's farms | |
farms = getattr(farmer, 'farms', []) | |
for farm in farms: | |
# Generate tasks for each farm | |
farm_tasks = self._generate_farm_tasks(farmer, farm, target_date) | |
tasks.extend(farm_tasks) | |
# Generate general farmer tasks (not farm-specific) | |
general_tasks = self._generate_general_tasks(farmer, target_date) | |
tasks.extend(general_tasks) | |
# Sort tasks by priority and estimated duration | |
tasks = self._prioritize_tasks(tasks) | |
logger.info(f"Generated {len(tasks)} daily tasks for farmer {farmer.name} for {target_date}") | |
return tasks | |
except Exception as e: | |
logger.error(f"Error generating daily tasks for farmer {farmer.id}: {str(e)}") | |
return self._get_fallback_tasks(farmer, target_date) | |
def _generate_farm_tasks(self, farmer, farm, target_date: date) -> List[Dict]: | |
"""Generate tasks specific to a farm based on its type""" | |
tasks = [] | |
try: | |
# Get farm type (default to crop if not specified) | |
farm_type = getattr(farm, 'farm_type', 'crop') | |
# Get current season and month | |
month = target_date.month | |
season = self._get_season(month) | |
# Get weather conditions if available | |
weather_info = self._get_weather_info(farm, target_date) | |
# Generate tasks based on farm type | |
if farm_type == 'crop': | |
tasks.extend(self._get_crop_farming_tasks(farm, target_date, season, weather_info)) | |
elif farm_type == 'dairy': | |
tasks.extend(self._get_dairy_farming_tasks(farm, target_date, weather_info)) | |
elif farm_type == 'poultry': | |
tasks.extend(self._get_poultry_farming_tasks(farm, target_date, weather_info)) | |
elif farm_type == 'livestock': | |
tasks.extend(self._get_livestock_farming_tasks(farm, target_date, weather_info)) | |
elif farm_type == 'fishery': | |
tasks.extend(self._get_fishery_tasks(farm, target_date, weather_info)) | |
elif farm_type == 'mixed': | |
# For mixed farming, generate tasks from all applicable types | |
tasks.extend(self._get_mixed_farming_tasks(farm, target_date, season, weather_info)) | |
# Add routine maintenance tasks for all farm types | |
tasks.extend(self._get_routine_maintenance_tasks(farm, target_date)) | |
except Exception as e: | |
logger.error(f"Error generating farm tasks for farm {farm.id}: {str(e)}") | |
return tasks | |
def _get_crop_farming_tasks(self, farm, target_date: date, season: str, weather_info: Dict) -> List[Dict]: | |
"""Generate tasks specific to crop farming""" | |
tasks = [] | |
month = target_date.month | |
# Get crops grown on this farm | |
crops = getattr(farm, 'get_crop_types', lambda: ['rice', 'wheat'])() | |
# Season-specific crop tasks | |
if season == 'kharif': # June-October | |
tasks.extend(self._get_kharif_tasks(farm, target_date, weather_info)) | |
elif season == 'rabi': # November-April | |
tasks.extend(self._get_rabi_tasks(farm, target_date, weather_info)) | |
else: # Summer - May | |
tasks.extend(self._get_summer_tasks(farm, target_date, weather_info)) | |
# Daily crop management tasks | |
if not weather_info.get('rain_expected', False): | |
tasks.append(self._create_task_from_template('irrigation', 'morning', farm)) | |
# Weekly pest inspection | |
if target_date.weekday() == 0: # Monday | |
tasks.append(self._create_task_from_template('pest_control', 'inspection', farm)) | |
return tasks | |
def _get_dairy_farming_tasks(self, farm, target_date: date, weather_info: Dict) -> List[Dict]: | |
"""Generate tasks specific to dairy farming""" | |
tasks = [] | |
# Essential daily tasks for dairy farms | |
tasks.extend([ | |
self._create_task_from_template('milking', 'morning', farm), | |
self._create_task_from_template('milking', 'evening', farm), | |
self._create_task_from_template('dairy_feeding', 'concentrate', farm), | |
self._create_task_from_template('dairy_feeding', 'fodder', farm), | |
self._create_task_from_template('dairy_health', 'health_check', farm), | |
self._create_task_from_template('livestock_care', 'water_supply', farm) | |
]) | |
# Weekly udder care | |
if target_date.weekday() in [1, 4]: # Tuesday and Friday | |
tasks.append(self._create_task_from_template('dairy_health', 'udder_care', farm)) | |
return tasks | |
def _get_poultry_farming_tasks(self, farm, target_date: date, weather_info: Dict) -> List[Dict]: | |
"""Generate tasks specific to poultry farming""" | |
tasks = [] | |
# Daily poultry tasks | |
tasks.extend([ | |
self._create_task_from_template('poultry_feeding', 'morning_feed', farm), | |
self._create_task_from_template('poultry_feeding', 'evening_feed', farm), | |
self._create_task_from_template('egg_collection', 'daily', farm), | |
self._create_task_from_template('poultry_health', 'flock_monitoring', farm), | |
self._create_task_from_template('livestock_care', 'water_supply', farm) | |
]) | |
# Weekly coop cleaning | |
if target_date.weekday() == 6: # Sunday | |
tasks.append(self._create_task_from_template('poultry_health', 'coop_cleaning', farm)) | |
return tasks | |
def _get_livestock_farming_tasks(self, farm, target_date: date, weather_info: Dict) -> List[Dict]: | |
"""Generate tasks specific to general livestock farming""" | |
tasks = [] | |
# Daily livestock care | |
tasks.extend([ | |
self._create_task_from_template('livestock_feeding', 'supplemental', farm), | |
self._create_task_from_template('dairy_health', 'health_check', farm), | |
self._create_task_from_template('livestock_care', 'water_supply', farm) | |
]) | |
# Weather-dependent grazing | |
if not weather_info.get('rain_expected', False) and weather_info.get('temperature', 25) < 35: | |
tasks.append(self._create_task_from_template('livestock_feeding', 'grazing', farm)) | |
return tasks | |
def _get_fishery_tasks(self, farm, target_date: date, weather_info: Dict) -> List[Dict]: | |
"""Generate tasks specific to fish farming""" | |
tasks = [] | |
# Daily fish farming tasks | |
tasks.extend([ | |
self._create_task_from_template('fishery', 'water_quality', farm), | |
self._create_task_from_template('fishery', 'fish_feeding', farm) | |
]) | |
# Regular pond maintenance | |
if target_date.weekday() in [2, 5]: # Wednesday and Saturday | |
tasks.append(self._create_task_from_template('fishery', 'pond_maintenance', farm)) | |
return tasks | |
def _get_mixed_farming_tasks(self, farm, target_date: date, season: str, weather_info: Dict) -> List[Dict]: | |
"""Generate tasks for mixed farming operations""" | |
tasks = [] | |
# Get farm's livestock and crop information | |
livestock_types = getattr(farm, 'get_livestock_types', lambda: [])() | |
crop_types = getattr(farm, 'get_crop_types', lambda: [])() | |
# Add crop tasks if crops are grown | |
if crop_types: | |
tasks.extend(self._get_crop_farming_tasks(farm, target_date, season, weather_info)) | |
# Add livestock tasks based on what animals are present | |
has_dairy = any('cow' in str(animal).lower() or 'buffalo' in str(animal).lower() | |
for animal in livestock_types) | |
has_poultry = any('chicken' in str(animal).lower() or 'hen' in str(animal).lower() | |
for animal in livestock_types) | |
if has_dairy: | |
# Add essential dairy tasks | |
tasks.extend([ | |
self._create_task_from_template('milking', 'morning', farm), | |
self._create_task_from_template('milking', 'evening', farm), | |
self._create_task_from_template('dairy_feeding', 'concentrate', farm) | |
]) | |
if has_poultry: | |
# Add essential poultry tasks | |
tasks.extend([ | |
self._create_task_from_template('poultry_feeding', 'morning_feed', farm), | |
self._create_task_from_template('egg_collection', 'daily', farm) | |
]) | |
if livestock_types: | |
# Add general livestock care | |
tasks.extend([ | |
self._create_task_from_template('dairy_health', 'health_check', farm), | |
self._create_task_from_template('livestock_care', 'water_supply', farm) | |
]) | |
return tasks | |
def _get_routine_maintenance_tasks(self, farm, target_date: date) -> List[Dict]: | |
"""Generate routine maintenance tasks applicable to all farm types""" | |
tasks = [] | |
day_of_week = target_date.weekday() | |
# Weekly equipment maintenance | |
if day_of_week == 0: # Monday | |
tasks.append(self._create_task_from_template('maintenance', 'equipment', farm)) | |
# Monthly infrastructure check | |
if target_date.day <= 7 and day_of_week == 0: # First Monday of month | |
tasks.append(self._create_task_from_template('maintenance', 'infrastructure', farm)) | |
return tasks | |
def _create_task_from_template(self, category: str, task_type: str, farm) -> Dict: | |
"""Create a task from predefined templates""" | |
template = self.task_templates.get(category, {}).get(task_type, {}) | |
return { | |
'task_type': category, | |
'task_title': template.get('title', 'Farm Task'), | |
'task_description': template.get('description', 'Complete this farming task.'), | |
'priority': template.get('priority', 'medium'), | |
'estimated_duration': template.get('duration', 30), | |
'weather_dependent': template.get('weather_dependent', False), | |
'farm_id': farm.id, | |
'crop_specific': None | |
} | |
def _generate_general_tasks(self, farmer, target_date: date) -> List[Dict]: | |
"""Generate general tasks not specific to any farm""" | |
tasks = [] | |
# Market price checking | |
if target_date.weekday() in [0, 3]: # Monday and Thursday | |
tasks.append({ | |
'task_type': 'market_research', | |
'task_title': 'Check Market Prices', | |
'task_description': 'Check current market prices for your crops and plan selling strategy accordingly.', | |
'priority': 'medium', | |
'estimated_duration': 15, | |
'weather_dependent': False, | |
'farm_id': None, | |
'crop_specific': None | |
}) | |
# Weather planning | |
tasks.append({ | |
'task_type': 'planning', | |
'task_title': 'Review Weather Forecast', | |
'task_description': 'Check 7-day weather forecast and plan farming activities accordingly.', | |
'priority': 'high', | |
'estimated_duration': 10, | |
'weather_dependent': False, | |
'farm_id': None, | |
'crop_specific': None | |
}) | |
return tasks | |
def _get_season(self, month: int) -> str: | |
"""Determine agricultural season based on month""" | |
if month in [6, 7, 8, 9, 10]: # June to October | |
return 'kharif' | |
elif month in [11, 12, 1, 2, 3, 4]: # November to April | |
return 'rabi' | |
else: # May | |
return 'summer' | |
def _get_kharif_tasks(self, farm, target_date: date, weather_info: Dict) -> List[Dict]: | |
"""Generate tasks for Kharif season""" | |
tasks = [] | |
month = target_date.month | |
# Get crops grown on this farm | |
crops = farm.get_crop_types() if hasattr(farm, 'get_crop_types') else ['rice', 'cotton'] | |
if month == 6: # June - Sowing season | |
tasks.append({ | |
'task_type': 'sowing', | |
'task_title': 'Kharif Crop Sowing', | |
'task_description': f'Sow {", ".join(crops)} seeds according to recommended spacing and depth.', | |
'priority': 'high', | |
'estimated_duration': 120, | |
'weather_dependent': True, | |
'farm_id': farm.id, | |
'crop_specific': crops[0] if crops else None | |
}) | |
elif month in [7, 8]: # July-August - Growth period | |
tasks.extend([ | |
{ | |
'task_type': 'weeding', | |
'task_title': 'Weed Control', | |
'task_description': 'Remove weeds from crop fields to prevent competition for nutrients.', | |
'priority': 'high', | |
'estimated_duration': 90, | |
'weather_dependent': False, | |
'farm_id': farm.id, | |
'crop_specific': None | |
}, | |
{ | |
'task_type': 'fertilizing', | |
'task_title': 'Top Dressing Fertilizer', | |
'task_description': 'Apply nitrogen fertilizer for healthy crop growth.', | |
'priority': 'high', | |
'estimated_duration': 60, | |
'weather_dependent': True, | |
'farm_id': farm.id, | |
'crop_specific': None | |
} | |
]) | |
elif month in [9, 10]: # September-October - Maturity and harvesting | |
tasks.append({ | |
'task_type': 'harvesting', | |
'task_title': 'Kharif Crop Harvesting', | |
'task_description': f'Harvest mature {", ".join(crops)} crops at optimal moisture content.', | |
'priority': 'high', | |
'estimated_duration': 180, | |
'weather_dependent': True, | |
'farm_id': farm.id, | |
'crop_specific': crops[0] if crops else None | |
}) | |
return tasks | |
def _get_rabi_tasks(self, farm, target_date: date, weather_info: Dict) -> List[Dict]: | |
"""Generate tasks for Rabi season""" | |
tasks = [] | |
month = target_date.month | |
crops = farm.get_crop_types() if hasattr(farm, 'get_crop_types') else ['wheat', 'mustard'] | |
if month in [11, 12]: # November-December - Sowing | |
tasks.append({ | |
'task_type': 'sowing', | |
'task_title': 'Rabi Crop Sowing', | |
'task_description': f'Sow {", ".join(crops)} with proper seed treatment and spacing.', | |
'priority': 'high', | |
'estimated_duration': 120, | |
'weather_dependent': False, | |
'farm_id': farm.id, | |
'crop_specific': crops[0] if crops else None | |
}) | |
elif month in [1, 2]: # January-February - Growth | |
tasks.extend([ | |
{ | |
'task_type': 'irrigation', | |
'task_title': 'Winter Irrigation', | |
'task_description': 'Provide adequate irrigation to rabi crops during dry winter period.', | |
'priority': 'high', | |
'estimated_duration': 45, | |
'weather_dependent': False, | |
'farm_id': farm.id, | |
'crop_specific': None | |
}, | |
{ | |
'task_type': 'pest_control', | |
'task_title': 'Pest Monitoring', | |
'task_description': 'Monitor for aphids and other winter pests. Apply control measures if needed.', | |
'priority': 'medium', | |
'estimated_duration': 30, | |
'weather_dependent': False, | |
'farm_id': farm.id, | |
'crop_specific': None | |
} | |
]) | |
elif month in [3, 4]: # March-April - Maturity and harvesting | |
tasks.append({ | |
'task_type': 'harvesting', | |
'task_title': 'Rabi Crop Harvesting', | |
'task_description': f'Harvest {", ".join(crops)} when crops reach physiological maturity.', | |
'priority': 'high', | |
'estimated_duration': 180, | |
'weather_dependent': True, | |
'farm_id': farm.id, | |
'crop_specific': crops[0] if crops else None | |
}) | |
return tasks | |
def _get_summer_tasks(self, farm, target_date: date, weather_info: Dict) -> List[Dict]: | |
"""Generate tasks for Summer season""" | |
tasks = [] | |
# Summer preparation tasks | |
tasks.extend([ | |
{ | |
'task_type': 'soil_care', | |
'task_title': 'Summer Plowing', | |
'task_description': 'Deep plowing to break soil crust and improve water infiltration.', | |
'priority': 'medium', | |
'estimated_duration': 180, | |
'weather_dependent': False, | |
'farm_id': farm.id, | |
'crop_specific': None | |
}, | |
{ | |
'task_type': 'maintenance', | |
'task_title': 'Equipment Preparation', | |
'task_description': 'Service and repair farming equipment for upcoming Kharif season.', | |
'priority': 'medium', | |
'estimated_duration': 120, | |
'weather_dependent': False, | |
'farm_id': farm.id, | |
'crop_specific': None | |
} | |
]) | |
return tasks | |
def _get_routine_tasks(self, farm, target_date: date, weather_info: Dict) -> List[Dict]: | |
"""Generate routine daily tasks""" | |
tasks = [] | |
day_of_week = target_date.weekday() | |
# Daily irrigation check | |
if not weather_info.get('rain_expected', False): | |
tasks.append({ | |
'task_type': 'irrigation', | |
'task_title': 'Daily Irrigation Check', | |
'task_description': 'Check soil moisture and irrigate crops if needed.', | |
'priority': 'high', | |
'estimated_duration': 30, | |
'weather_dependent': True, | |
'farm_id': farm.id, | |
'crop_specific': None | |
}) | |
# Weekly pest inspection | |
if day_of_week == 0: # Monday | |
tasks.append({ | |
'task_type': 'pest_control', | |
'task_title': 'Weekly Pest Inspection', | |
'task_description': 'Thoroughly inspect all crops for pests, diseases, and nutrient deficiencies.', | |
'priority': 'high', | |
'estimated_duration': 45, | |
'weather_dependent': False, | |
'farm_id': farm.id, | |
'crop_specific': None | |
}) | |
return tasks | |
def _get_weather_info(self, farm, target_date: date) -> Dict: | |
"""Get weather information for task planning""" | |
try: | |
if self.weather_service and hasattr(farm, 'latitude') and hasattr(farm, 'longitude'): | |
if farm.latitude and farm.longitude: | |
weather = self.weather_service.get_weather_data(farm.latitude, farm.longitude) | |
return { | |
'temperature': weather.get('temperature', 25), | |
'humidity': weather.get('humidity', 60), | |
'rain_expected': weather.get('rainfall', 0) > 0, | |
'wind_speed': weather.get('wind_speed', 5) | |
} | |
except Exception as e: | |
logger.error(f"Error getting weather info: {str(e)}") | |
return {'temperature': 25, 'humidity': 60, 'rain_expected': False, 'wind_speed': 5} | |
def _prioritize_tasks(self, tasks: List[Dict]) -> List[Dict]: | |
"""Sort tasks by priority and other factors""" | |
priority_order = {'high': 3, 'medium': 2, 'low': 1} | |
return sorted(tasks, key=lambda x: ( | |
priority_order.get(x.get('priority', 'medium'), 2), | |
-x.get('estimated_duration', 60) # Shorter tasks first within same priority | |
), reverse=True) | |
def _get_fallback_tasks(self, farmer, target_date: date) -> List[Dict]: | |
"""Provide basic fallback tasks when AI generation fails""" | |
return [ | |
{ | |
'task_type': 'inspection', | |
'task_title': 'Daily Farm Inspection', | |
'task_description': 'Walk through your farm and inspect crops for any issues.', | |
'priority': 'high', | |
'estimated_duration': 30, | |
'weather_dependent': False, | |
'farm_id': None, | |
'crop_specific': None | |
}, | |
{ | |
'task_type': 'planning', | |
'task_title': 'Review Daily Plan', | |
'task_description': 'Review your farming activities and plan for tomorrow.', | |
'priority': 'medium', | |
'estimated_duration': 15, | |
'weather_dependent': False, | |
'farm_id': None, | |
'crop_specific': None | |
} | |
] | |
def format_task_for_telegram(self, task: Dict, farmer_name: str) -> str: | |
"""Format a task for Telegram notification""" | |
priority_emoji = {'high': 'π΄', 'medium': 'π‘', 'low': 'π’'} | |
type_emoji = { | |
'irrigation': 'π§', | |
'fertilizing': 'π±', | |
'pest_control': 'π‘οΈ', | |
'weeding': 'πΎ', | |
'sowing': 'π±', | |
'harvesting': 'π½', | |
'soil_care': 'π', | |
'maintenance': 'π§', | |
'inspection': 'π', | |
'planning': 'π', | |
'market_research': 'π°' | |
} | |
emoji = type_emoji.get(task.get('task_type', ''), 'π') | |
priority = priority_emoji.get(task.get('priority', 'medium'), 'π‘') | |
message = f"{emoji} <b>{task.get('task_title', 'Farm Task')}</b>\n" | |
message += f"{priority} Priority: {task.get('priority', 'medium').title()}\n" | |
message += f"β±οΈ Duration: {task.get('estimated_duration', 30)} minutes\n\n" | |
message += f"π <b>Description:</b>\n{task.get('task_description', 'Complete this farming task.')}\n\n" | |
if task.get('crop_specific'): | |
message += f"πΎ <b>Crop:</b> {task.get('crop_specific')}\n" | |
if task.get('weather_dependent'): | |
message += f"π€οΈ <i>Weather dependent task</i>\n" | |
return message | |