Stray_Dogs / health_module.py
mustafa2ak's picture
Update health_module.py
92a6999 verified
raw
history blame
17 kB
"""
health_module.py - Dog Health Assessment Module
Uses pose detection and heuristics for health scoring
"""
import cv2
import numpy as np
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from collections import deque
@dataclass
class HealthScore:
"""Health assessment result"""
score: float # 0-10
score_text: str # "8.5/10"
color: Tuple[int, int, int] # BGR color for visualization
status: str # Turkish status: Sağlıklı, İyi, Dikkat, Kritik
alerts: List[str] # Health alerts
confidence: float # 0-1 confidence in assessment
class DogHealthAssessment:
"""
Health assessment using pose keypoints and visual analysis
Optimized for real-time processing on T4 GPU
"""
def __init__(self):
# Store history for temporal analysis
self.pose_history = {} # dog_id -> deque of keypoints
self.movement_history = {} # dog_id -> deque of positions
self.health_history = {} # dog_id -> deque of scores
# Thresholds
self.thresholds = {
'head_low_ratio': 0.7, # Head below 70% of body = concern
'leg_asymmetry_ratio': 0.1, # >10% difference = limping
'body_condition_thin': 0.35, # Width/height ratio
'body_condition_obese': 0.65,
'movement_inactive': 50, # Pixels moved
'movement_hyperactive': 500,
'red_area_threshold': 0.05 # >5% red = possible wound
}
# Pose keypoint indices for dogs (17 keypoints)
self.keypoints_map = {
'nose': 0,
'left_eye': 1,
'right_eye': 2,
'left_ear': 3,
'right_ear': 4,
'left_shoulder': 5,
'right_shoulder': 6,
'left_elbow': 7,
'right_elbow': 8,
'left_wrist': 9,
'right_wrist': 10,
'left_hip': 11,
'right_hip': 12,
'left_knee': 13,
'right_knee': 14,
'left_ankle': 15,
'right_ankle': 16
}
def assess_from_pose(self, keypoints: np.ndarray, bbox: List[float]) -> Dict:
"""
Analyze health from pose keypoints
Returns: dict with scores for different aspects
"""
scores = {
'posture': 10.0,
'gait_symmetry': 10.0,
'head_position': 10.0
}
if keypoints is None or len(keypoints) < 17:
return scores # Return default if no pose data
body_height = bbox[3] - bbox[1]
body_width = bbox[2] - bbox[0]
# 1. Head Position Analysis
nose_kp = keypoints[self.keypoints_map['nose']]
if nose_kp[0] > 0 and nose_kp[1] > 0: # Valid keypoint
head_relative_y = (nose_kp[1] - bbox[1]) / body_height
if head_relative_y > self.thresholds['head_low_ratio']:
# Head is too low - sign of illness or exhaustion
scores['head_position'] -= 4.0
elif head_relative_y > 0.5:
# Head slightly low
scores['head_position'] -= 2.0
# 2. Leg Symmetry (detect limping)
# Compare front legs
left_shoulder = keypoints[self.keypoints_map['left_shoulder']]
right_shoulder = keypoints[self.keypoints_map['right_shoulder']]
left_wrist = keypoints[self.keypoints_map['left_wrist']]
right_wrist = keypoints[self.keypoints_map['right_wrist']]
if all(kp[1] > 0 for kp in [left_shoulder, right_shoulder, left_wrist, right_wrist]):
left_leg_length = abs(left_wrist[1] - left_shoulder[1])
right_leg_length = abs(right_wrist[1] - right_shoulder[1])
if left_leg_length > 0 and right_leg_length > 0:
asymmetry = abs(left_leg_length - right_leg_length) / max(left_leg_length, right_leg_length)
if asymmetry > self.thresholds['leg_asymmetry_ratio']:
scores['gait_symmetry'] -= 3.0 * (asymmetry / self.thresholds['leg_asymmetry_ratio'])
# 3. Back legs symmetry
left_hip = keypoints[self.keypoints_map['left_hip']]
right_hip = keypoints[self.keypoints_map['right_hip']]
left_ankle = keypoints[self.keypoints_map['left_ankle']]
right_ankle = keypoints[self.keypoints_map['right_ankle']]
if all(kp[1] > 0 for kp in [left_hip, right_hip, left_ankle, right_ankle]):
left_back_length = abs(left_ankle[1] - left_hip[1])
right_back_length = abs(right_ankle[1] - right_hip[1])
if left_back_length > 0 and right_back_length > 0:
back_asymmetry = abs(left_back_length - right_back_length) / max(left_back_length, right_back_length)
if back_asymmetry > self.thresholds['leg_asymmetry_ratio']:
scores['gait_symmetry'] -= 3.0 * (back_asymmetry / self.thresholds['leg_asymmetry_ratio'])
# 4. Posture Analysis (spine alignment)
if nose_kp[0] > 0 and left_hip[0] > 0 and right_hip[0] > 0:
hip_center_x = (left_hip[0] + right_hip[0]) / 2
spine_alignment = abs(nose_kp[0] - hip_center_x) / body_width
if spine_alignment > 0.3: # Spine not straight
scores['posture'] -= 2.0
# Ensure scores don't go below 0
for key in scores:
scores[key] = max(0, scores[key])
return scores
def assess_body_condition(self, bbox: List[float], dog_crop: np.ndarray) -> Dict:
"""
Assess body condition from appearance
Returns: dict with body condition scores
"""
scores = {
'weight': 10.0,
'coat_quality': 10.0,
'visible_issues': 10.0
}
# 1. Body Condition Score (weight assessment)
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
if height > 0:
aspect_ratio = width / height
if aspect_ratio < self.thresholds['body_condition_thin']:
# Too thin
scores['weight'] = 3.0
elif aspect_ratio < 0.45:
# Slightly thin
scores['weight'] = 6.0
elif aspect_ratio > self.thresholds['body_condition_obese']:
# Obese
scores['weight'] = 4.0
elif aspect_ratio > 0.55:
# Overweight
scores['weight'] = 7.0
# else: ideal weight, keep at 10
# 2. Coat Quality Assessment
gray = cv2.cvtColor(dog_crop, cv2.COLOR_BGR2GRAY)
# Texture analysis using standard deviation
texture_score = np.std(gray)
if texture_score < 15:
# Very poor coat quality
scores['coat_quality'] = 3.0
elif texture_score < 25:
# Poor coat quality
scores['coat_quality'] = 6.0
elif texture_score > 50:
# Good texture
scores['coat_quality'] = 10.0
# Edge detection for matted fur
edges = cv2.Canny(gray, 50, 150)
edge_density = np.sum(edges > 0) / edges.size
if edge_density < 0.02:
# Too smooth, possible hair loss
scores['coat_quality'] = min(scores['coat_quality'], 5.0)
# 3. Visible Issues (wounds, skin problems)
hsv = cv2.cvtColor(dog_crop, cv2.COLOR_BGR2HSV)
# Check for red areas (possible wounds)
lower_red1 = np.array([0, 50, 50])
upper_red1 = np.array([10, 255, 255])
lower_red2 = np.array([170, 50, 50])
upper_red2 = np.array([180, 255, 255])
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
red_mask = mask1 | mask2
red_ratio = np.sum(red_mask > 0) / red_mask.size
if red_ratio > self.thresholds['red_area_threshold']:
# Significant red areas detected
scores['visible_issues'] = 4.0
elif red_ratio > 0.02:
# Some red areas
scores['visible_issues'] = 7.0
return scores
# database_health_update.py
"""Add health assessment fields to existing database"""
def add_health_fields_to_database():
"""Add health-related fields to the database"""
import sqlite3
from pathlib import Path
db_path = "dog_monitoring.db"
# Only proceed if database exists
if not Path(db_path).exists():
return
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Add health fields to dogs table
try:
cursor.execute("ALTER TABLE dogs ADD COLUMN last_health_score REAL DEFAULT 5.0")
except:
pass # Column already exists
try:
cursor.execute("ALTER TABLE dogs ADD COLUMN health_status TEXT DEFAULT 'Unknown'")
except:
pass # Column already exists
# Create health assessments table if not exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS health_assessments (
assessment_id INTEGER PRIMARY KEY AUTOINCREMENT,
dog_id INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
health_score REAL,
status TEXT,
posture_score REAL,
gait_score REAL,
body_condition_score REAL,
activity_score REAL,
alerts TEXT,
recommendations TEXT,
confidence REAL,
video_source TEXT,
frame_number INTEGER,
FOREIGN KEY (dog_id) REFERENCES dogs(dog_id)
)
""")
conn.commit()
conn.close()
def assess_movement(self, dog_id: int, current_pos: Tuple[float, float]) -> float:
"""
Assess movement patterns and activity level
Returns: activity score (0-10)
"""
if dog_id not in self.movement_history:
self.movement_history[dog_id] = deque(maxlen=30)
self.movement_history[dog_id].append(current_pos)
if len(self.movement_history[dog_id]) < 2:
return 7.0 # Default neutral score
# Calculate total movement
positions = list(self.movement_history[dog_id])
total_movement = 0
for i in range(1, len(positions)):
dx = positions[i][0] - positions[i-1][0]
dy = positions[i][1] - positions[i-1][1]
total_movement += np.sqrt(dx**2 + dy**2)
# Normalize movement score
if total_movement < self.thresholds['movement_inactive']:
# Very inactive - could be sick or injured
return 4.0
elif total_movement > self.thresholds['movement_hyperactive']:
# Very active - healthy
return 10.0
else:
# Normal activity
return 7.0 + (total_movement / self.thresholds['movement_hyperactive']) * 3.0
def calculate_overall_health(self, dog_id: int, keypoints: Optional[np.ndarray],
dog_crop: np.ndarray, bbox: List[float],
current_pos: Optional[Tuple[float, float]] = None) -> HealthScore:
"""
Calculate comprehensive health score
Combines pose, appearance, and movement analysis
"""
# Get individual assessments
pose_scores = self.assess_from_pose(keypoints, bbox) if keypoints is not None else {
'posture': 7.0, 'gait_symmetry': 7.0, 'head_position': 7.0
}
body_scores = self.assess_body_condition(bbox, dog_crop)
movement_score = self.assess_movement(dog_id, current_pos) if current_pos else 7.0
# Calculate weighted average
weights = {
'pose': 0.35,
'body': 0.35,
'movement': 0.30
}
# Average pose scores
avg_pose = np.mean(list(pose_scores.values()))
# Average body condition scores
avg_body = np.mean(list(body_scores.values()))
# Final score calculation
final_score = (
avg_pose * weights['pose'] +
avg_body * weights['body'] +
movement_score * weights['movement']
)
# Round to 1 decimal
final_score = round(final_score, 1)
# Determine status and color
if final_score >= 8.0:
status = "Sağlıklı"
color = (0, 255, 0) # Green
elif final_score >= 6.0:
status = "İyi"
color = (0, 255, 255) # Yellow
elif final_score >= 4.0:
status = "Dikkat"
color = (0, 165, 255) # Orange
else:
status = "Kritik"
color = (0, 0, 255) # Red
# Generate alerts based on specific issues
alerts = []
if pose_scores['head_position'] < 6.0:
alerts.append("Baş pozisyonu düşük")
if pose_scores['gait_symmetry'] < 6.0:
alerts.append("Yürüyüş bozukluğu")
if body_scores['weight'] < 4.0:
alerts.append("Çok zayıf")
elif body_scores['weight'] < 7.0:
alerts.append("Kilo problemi")
if body_scores['coat_quality'] < 6.0:
alerts.append("Tüy kalitesi düşük")
if body_scores['visible_issues'] < 6.0:
alerts.append("Görünür sağlık sorunu")
if movement_score < 5.0:
alerts.append("Hareketsiz")
# Calculate confidence based on available data
confidence = 0.5 # Base confidence
if keypoints is not None:
confidence += 0.25
if dog_id in self.movement_history and len(self.movement_history[dog_id]) > 10:
confidence += 0.15
if dog_crop.size > 10000: # Good quality image
confidence += 0.10
# Store in history
if dog_id not in self.health_history:
self.health_history[dog_id] = deque(maxlen=50)
self.health_history[dog_id].append(final_score)
return HealthScore(
score=final_score,
score_text=f"{final_score}/10",
color=color,
status=status,
alerts=alerts,
confidence=min(1.0, confidence)
)
def get_health_trend(self, dog_id: int) -> str:
"""
Get health trend over time
Returns: trend description
"""
if dog_id not in self.health_history or len(self.health_history[dog_id]) < 5:
return "Yetersiz veri"
scores = list(self.health_history[dog_id])
recent_avg = np.mean(scores[-5:])
older_avg = np.mean(scores[-10:-5]) if len(scores) >= 10 else np.mean(scores[:5])
if recent_avg > older_avg + 1:
return "İyileşiyor ↑"
elif recent_avg < older_avg - 1:
return "Kötüleşiyor ↓"
else:
return "Stabil →"
def get_recommendations(self, health_score: HealthScore) -> List[str]:
"""
Get care recommendations based on health assessment
"""
recommendations = []
if health_score.score < 4.0:
recommendations.append("🚨 Acil veteriner kontrolü")
recommendations.append("🍖 Yüksek kaliteli beslenme")
recommendations.append("💊 Tıbbi tedavi gerekebilir")
elif health_score.score < 6.0:
recommendations.append("🏥 Veteriner muayenesi önerilir")
recommendations.append("🥫 Düzenli beslenme programı")
recommendations.append("🔍 Yakın takip")
elif health_score.score < 8.0:
recommendations.append("📋 Rutin kontrol")
recommendations.append("🥘 Dengeli beslenme")
else:
recommendations.append("✅ Mevcut bakım devam etsin")
recommendations.append("📅 Periyodik kontroller")
# Add specific recommendations based on alerts
if "Çok zayıf" in health_score.alerts:
recommendations.append("🍖 Protein takviyesi")
if "Yürüyüş bozukluğu" in health_score.alerts:
recommendations.append("🦴 Eklem kontrolü")
if "Tüy kalitesi düşük" in health_score.alerts:
recommendations.append("🧴 Parazit kontrolü")
return recommendations