Spaces:
Sleeping
Sleeping
File size: 14,853 Bytes
429a26d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
import requests
import json
from datetime import datetime, date, timedelta
from typing import Dict, List, Optional
import logging
from models import MarketPrice, db
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MarketPriceService:
"""Service for fetching and managing crop market prices"""
def __init__(self):
# Government API endpoints (these are examples - replace with actual APIs)
self.government_api = "https://api.data.gov.in/resource/9ef84268-d588-465a-a308-a864a43d0070"
self.backup_sources = [
"https://agmarknet.gov.in/", # Agricultural Marketing Division
"https://enam.gov.in/" # National Agriculture Market
]
def fetch_and_update_prices(self, crops: List[str] = None, states: List[str] = None) -> bool:
"""Fetch latest market prices and update database"""
try:
# Default crops if none specified
if not crops:
crops = [
'Rice', 'Wheat', 'Maize', 'Sugarcane', 'Cotton', 'Soybean',
'Groundnut', 'Sunflower', 'Mustard', 'Gram', 'Arhar', 'Moong',
'Masoor', 'Onion', 'Potato', 'Tomato', 'Chilli', 'Turmeric'
]
# Default states if none specified
if not states:
states = [
'Maharashtra', 'Uttar Pradesh', 'Karnataka', 'Gujarat',
'Rajasthan', 'Madhya Pradesh', 'Tamil Nadu', 'Andhra Pradesh'
]
updated_count = 0
# Try government API first
updated_count += self._fetch_from_government_api(crops, states)
# If government API fails, use fallback data
if updated_count == 0:
updated_count += self._generate_fallback_prices(crops, states)
logger.info(f"Updated {updated_count} market price records")
return updated_count > 0
except Exception as e:
logger.error(f"Error updating market prices: {str(e)}")
return False
def _fetch_from_government_api(self, crops: List[str], states: List[str]) -> int:
"""Fetch prices from government API"""
try:
# This is a placeholder - replace with actual government API implementation
# The actual API would require proper authentication and parameters
# For demonstration, we'll generate realistic data
return self._generate_realistic_prices(crops, states)
except Exception as e:
logger.error(f"Government API fetch failed: {str(e)}")
return 0
def _generate_realistic_prices(self, crops: List[str], states: List[str]) -> int:
"""Generate realistic market prices based on typical market rates"""
# Base prices per quintal (in INR) - these are approximate market rates
base_prices = {
'Rice': {'min': 1800, 'max': 2500, 'avg': 2100},
'Wheat': {'min': 1900, 'max': 2300, 'avg': 2100},
'Maize': {'min': 1400, 'max': 1800, 'avg': 1600},
'Sugarcane': {'min': 250, 'max': 350, 'avg': 300}, # per ton
'Cotton': {'min': 5500, 'max': 7500, 'avg': 6500},
'Soybean': {'min': 3500, 'max': 4500, 'avg': 4000},
'Groundnut': {'min': 4500, 'max': 6000, 'avg': 5250},
'Sunflower': {'min': 4000, 'max': 5500, 'avg': 4750},
'Mustard': {'min': 4200, 'max': 5200, 'avg': 4700},
'Gram': {'min': 4500, 'max': 6000, 'avg': 5250},
'Arhar': {'min': 5500, 'max': 7000, 'avg': 6250},
'Moong': {'min': 6000, 'max': 8000, 'avg': 7000},
'Masoor': {'min': 4500, 'max': 6500, 'avg': 5500},
'Onion': {'min': 800, 'max': 2500, 'avg': 1650},
'Potato': {'min': 600, 'max': 1800, 'avg': 1200},
'Tomato': {'min': 800, 'max': 3000, 'avg': 1900},
'Chilli': {'min': 8000, 'max': 15000, 'avg': 11500},
'Turmeric': {'min': 7000, 'max': 12000, 'avg': 9500}
}
markets_by_state = {
'Maharashtra': ['Mumbai', 'Pune', 'Nashik', 'Nagpur', 'Aurangabad'],
'Uttar Pradesh': ['Lucknow', 'Kanpur', 'Agra', 'Varanasi', 'Meerut'],
'Karnataka': ['Bangalore', 'Mysore', 'Hubli', 'Belgaum', 'Mangalore'],
'Gujarat': ['Ahmedabad', 'Surat', 'Vadodara', 'Rajkot', 'Bhavnagar'],
'Rajasthan': ['Jaipur', 'Jodhpur', 'Kota', 'Bikaner', 'Udaipur'],
'Madhya Pradesh': ['Bhopal', 'Indore', 'Gwalior', 'Jabalpur', 'Ujjain'],
'Tamil Nadu': ['Chennai', 'Coimbatore', 'Madurai', 'Salem', 'Trichy'],
'Andhra Pradesh': ['Hyderabad', 'Vijayawada', 'Visakhapatnam', 'Guntur', 'Tirupati']
}
updated_count = 0
today = date.today()
for crop in crops:
if crop not in base_prices:
continue
for state in states:
if state not in markets_by_state:
continue
for market in markets_by_state[state][:3]: # Top 3 markets per state
try:
# Add some market variation (±20%)
base = base_prices[crop]
variation = 0.8 + (hash(f"{crop}{state}{market}") % 40) / 100 # 0.8 to 1.2
min_price = int(base['min'] * variation)
max_price = int(base['max'] * variation)
avg_price = int(base['avg'] * variation)
# Check if price already exists for today
existing = MarketPrice.query.filter_by(
crop_name=crop,
market_name=market,
state=state,
price_date=today
).first()
if not existing:
price_record = MarketPrice(
crop_name=crop,
market_name=market,
state=state,
district=market, # Simplified
min_price=min_price,
max_price=max_price,
avg_price=avg_price,
price_unit='per quintal' if crop != 'Sugarcane' else 'per ton',
price_date=today,
source='market_api'
)
db.session.add(price_record)
updated_count += 1
except Exception as e:
logger.error(f"Error creating price record for {crop} in {market}: {str(e)}")
continue
try:
db.session.commit()
return updated_count
except Exception as e:
logger.error(f"Error committing price records: {str(e)}")
db.session.rollback()
return 0
def _generate_fallback_prices(self, crops: List[str], states: List[str]) -> int:
"""Generate fallback prices when APIs are unavailable"""
return self._generate_realistic_prices(crops, states)
def get_latest_prices(self, crop_name: str, state: str = None, limit: int = 10) -> List[MarketPrice]:
"""Get latest prices for a crop"""
query = MarketPrice.query.filter_by(crop_name=crop_name)
if state:
query = query.filter_by(state=state)
return query.order_by(MarketPrice.price_date.desc()).limit(limit).all()
def get_price_trends(self, crop_name: str, days: int = 30) -> Dict:
"""Get price trends for a crop over specified days"""
from_date = date.today() - timedelta(days=days)
prices = MarketPrice.query.filter(
MarketPrice.crop_name == crop_name,
MarketPrice.price_date >= from_date
).order_by(MarketPrice.price_date.desc()).all()
if not prices:
return {'trend': 'no_data', 'prices': []}
# Calculate trend
recent_avg = sum(p.avg_price for p in prices[:7]) / min(7, len(prices))
older_avg = sum(p.avg_price for p in prices[7:14]) / max(1, min(7, len(prices) - 7))
if recent_avg > older_avg * 1.05:
trend = 'rising'
elif recent_avg < older_avg * 0.95:
trend = 'falling'
else:
trend = 'stable'
return {
'trend': trend,
'recent_avg': recent_avg,
'older_avg': older_avg,
'prices': [p.as_dict() for p in prices]
}
def get_best_selling_recommendation(self, crop_name: str, farmer_state: str) -> Dict:
"""Get recommendation for best time and place to sell"""
# Get prices from nearby states
nearby_states = self._get_nearby_states(farmer_state)
all_states = [farmer_state] + nearby_states
best_prices = []
for state in all_states:
latest_prices = self.get_latest_prices(crop_name, state, 5)
if latest_prices:
avg_price = sum(p.avg_price for p in latest_prices) / len(latest_prices)
best_prices.append({
'state': state,
'avg_price': avg_price,
'markets': [p.market_name for p in latest_prices[:3]]
})
if not best_prices:
return {'recommendation': 'no_data'}
# Sort by price
best_prices.sort(key=lambda x: x['avg_price'], reverse=True)
# Get trends
trends = self.get_price_trends(crop_name)
recommendation = {
'best_market': best_prices[0],
'trend': trends['trend'],
'recommendation': self._generate_selling_advice(trends['trend'], best_prices)
}
return recommendation
def _get_nearby_states(self, state: str) -> List[str]:
"""Get nearby states for price comparison"""
state_neighbors = {
'Maharashtra': ['Gujarat', 'Karnataka', 'Madhya Pradesh'],
'Gujarat': ['Maharashtra', 'Rajasthan', 'Madhya Pradesh'],
'Karnataka': ['Maharashtra', 'Tamil Nadu', 'Andhra Pradesh'],
'Tamil Nadu': ['Karnataka', 'Andhra Pradesh'],
'Uttar Pradesh': ['Madhya Pradesh', 'Rajasthan'],
'Rajasthan': ['Gujarat', 'Madhya Pradesh', 'Uttar Pradesh'],
'Madhya Pradesh': ['Maharashtra', 'Gujarat', 'Uttar Pradesh', 'Rajasthan'],
'Andhra Pradesh': ['Karnataka', 'Tamil Nadu']
}
return state_neighbors.get(state, [])
def _generate_selling_advice(self, trend: str, best_prices: List[Dict]) -> str:
"""Generate selling advice based on trends and prices"""
if trend == 'rising':
return "Prices are rising. Consider waiting a few more days for better rates."
elif trend == 'falling':
return "Prices are falling. Sell immediately to avoid further losses."
else:
if len(best_prices) > 1 and best_prices[0]['avg_price'] > best_prices[1]['avg_price'] * 1.1:
return f"Current prices are good. Consider selling in {best_prices[0]['state']} for best rates."
else:
return "Prices are stable. Sell when convenient."
def get_crop_price(self, crop_name: str, state: str = None) -> Dict:
"""Get current price for a crop"""
try:
# Get latest prices
latest_prices = self.get_latest_prices(crop_name, state, limit=5)
if not latest_prices:
# No data found, return fallback
return {
'crop_type': crop_name,
'market_name': 'Market data unavailable',
'price_per_unit': 'N/A',
'unit': 'per quintal',
'trend': 'stable',
'date': date.today().isoformat(),
'error': 'No price data available'
}
# Calculate average from latest prices
avg_price = sum(p.avg_price for p in latest_prices) / len(latest_prices)
latest = latest_prices[0]
# Determine trend based on recent prices
trend = 'stable'
if len(latest_prices) >= 2:
recent_avg = sum(p.avg_price for p in latest_prices[:2]) / 2
older_avg = sum(p.avg_price for p in latest_prices[2:]) / len(latest_prices[2:]) if len(latest_prices) > 2 else recent_avg
if recent_avg > older_avg * 1.05:
trend = 'up'
elif recent_avg < older_avg * 0.95:
trend = 'down'
return {
'crop_type': crop_name,
'market_name': latest.market_name,
'price_per_unit': f"₹{int(avg_price)}",
'unit': latest.price_unit,
'trend': trend,
'date': latest.price_date.isoformat()
}
except Exception as e:
logger.error(f"Error getting crop price for {crop_name}: {str(e)}")
return {
'crop_type': crop_name,
'market_name': 'Error',
'price_per_unit': 'N/A',
'unit': 'per quintal',
'trend': 'stable',
'date': date.today().isoformat(),
'error': f'Failed to get price: {str(e)}'
}
def refresh_crop_price(self, crop_name: str) -> bool:
"""Force refresh price data for a specific crop"""
try:
# Trigger fresh fetch for this crop
self.fetch_and_update_prices([crop_name])
return True
except Exception as e:
logger.error(f"Error refreshing price for {crop_name}: {str(e)}")
return False
|