Stray_Dogs / debug_wrapper.py
mustafa2ak's picture
Update debug_wrapper.py
a0358cb verified
raw
history blame
12.5 kB
"""
debug_wrapper.py - Debug Monitoring Module
Injects debugging capabilities without modifying core modules
"""
import os
import time
import json
import numpy as np
import cv2
from collections import deque, Counter
from pathlib import Path
from typing import Any, Dict, List, Optional
import traceback
# Disable Gradio analytics (avoids pandas OptionError bug)
try:
import gradio as gr
gr.analytics_enabled = False
except Exception:
pass
# Create debug directory
DEBUG_DIR = Path("debug_samples")
DEBUG_DIR.mkdir(exist_ok=True)
class DebugMonitor:
"""Centralized debug monitoring and logging"""
def __init__(self, keep_last_logs: int = 200):
self.keep_last_logs = keep_last_logs
self.reset()
def reset(self):
"""Reset all debug state"""
self.start_time = time.time()
self.frame_count = 0
self.counters = Counter()
self.similarities = deque(maxlen=2000)
self.recent_logs = deque(maxlen=self.keep_last_logs)
self.bad_samples = []
self.match_records = []
self.live_stats = {} # Live statistics for display
def log(self, msg: str, level: str = "INFO"):
"""Add log entry with timestamp"""
ts = time.strftime("%H:%M:%S")
entry = f"[{ts}] [{level}] {msg}"
print(entry, flush=True)
self.recent_logs.append(entry)
def save_bad_sample(self, img_bgr: np.ndarray, similarity: float, dog_id: int, track_id: int) -> Optional[str]:
"""Save problematic detection sample"""
try:
idx = len(self.bad_samples) + 1
fname = DEBUG_DIR / f"bad_sample_{idx:04d}_sim{similarity:.2f}_dog{dog_id}.jpg"
cv2.imwrite(str(fname), img_bgr)
self.bad_samples.append({
'file': str(fname),
'similarity': similarity,
'dog_id': dog_id,
'track_id': track_id,
'timestamp': time.time()
})
self.log(f"Saved bad sample #{idx}: sim={similarity:.3f}, dog_id={dog_id}, track={track_id}", "WARNING")
return str(fname)
except Exception as e:
self.log(f"Error saving bad sample: {e}", "ERROR")
return None
def update_live_stats(self):
"""Update live statistics for display"""
total_matches = self.counters['reid_matches']
total_attempts = self.counters['reid_attempts'] or 1
self.live_stats = {
'frame': self.frame_count,
'dogs_detected': len(set(r['dog_id'] for r in self.match_records if r['dog_id'] > 0)),
'active_tracks': self.counters['active_tracks'],
'match_rate': total_matches / total_attempts,
'avg_similarity': np.mean(self.similarities) if self.similarities else 0.0,
'fps': self.frame_count / (time.time() - self.start_time + 1)
}
def get_debug_output(self, lines: int = 10) -> str:
"""Get recent log lines for UI display"""
return "\n".join(list(self.recent_logs)[-lines:])
def get_debug_summary(self) -> str:
"""Get comprehensive debug summary"""
c = self.counters
total_matches = c['reid_matches']
total_attempts = c['reid_attempts'] or 1
match_rate = total_matches / total_attempts
avg_sim = np.mean(self.similarities) if self.similarities else 0.0
elapsed = time.time() - self.start_time
# Get unique dogs
unique_dogs = len(set(r['dog_id'] for r in self.match_records if r['dog_id'] > 0))
summary = f"""📊 Debug Summary
━━━━━━━━━━━━━━━━━━━━━━
Frames: {self.frame_count} | Time: {elapsed:.1f}s | FPS: {self.frame_count/(elapsed+1):.1f}
Detections: {c['detections']} | Tracks: {c['tracks_created']} | Confirmed: {c['tracks_confirmed']}
Unique Dogs: {unique_dogs} | Active Tracks: {c['active_tracks']}
ReID Attempts: {total_attempts} | Matches: {total_matches} ({match_rate:.1%})
Avg Similarity: {avg_sim:.3f} | Feature Fails: {c['feature_fail']}
Bad Samples Saved: {len(self.bad_samples)}"""
return summary
def save_debug_report(self, filepath: str = "debug_report.json"):
"""Save comprehensive debug report"""
try:
report = {
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"),
'frames_processed': self.frame_count,
'elapsed_time': time.time() - self.start_time,
'counters': dict(self.counters),
'avg_similarity': float(np.mean(self.similarities)) if self.similarities else 0.0,
'similarity_distribution': {
'min': float(min(self.similarities)) if self.similarities else 0,
'max': float(max(self.similarities)) if self.similarities else 0,
'percentiles': {
'25%': float(np.percentile(list(self.similarities), 25)) if self.similarities else 0,
'50%': float(np.percentile(list(self.similarities), 50)) if self.similarities else 0,
'75%': float(np.percentile(list(self.similarities), 75)) if self.similarities else 0,
'90%': float(np.percentile(list(self.similarities), 90)) if self.similarities else 0
}
},
'bad_samples': self.bad_samples[:20],
'match_records_last': self.match_records[-50:]
}
with open(filepath, "w") as f:
json.dump(report, f, indent=2, default=str)
self.log(f"Saved debug report to {filepath}", "INFO")
except Exception as e:
self.log(f"Failed to save debug report: {e}", "ERROR")
def add_debugging(app: Any) -> Any:
"""
Inject debug monitoring into DogMonitoringApp
Args:
app: DogMonitoringApp instance
Returns:
Modified app with debug capabilities
"""
# Create debug monitor
app.debug_monitor = DebugMonitor()
# ============== Wrap detector.detect ==============
if hasattr(app, "detector") and hasattr(app.detector, "detect"):
original_detect = app.detector.detect
def wrapped_detect(frame, *args, **kwargs):
try:
detections = original_detect(frame, *args, **kwargs)
n = len(detections) if detections else 0
app.debug_monitor.counters['detections'] += n
if n > 0:
app.debug_monitor.log(f"Detected {n} dogs in frame {app.debug_monitor.frame_count}", "DEBUG")
return detections
except Exception as e:
app.debug_monitor.log(f"Detection error: {e}", "ERROR")
return []
app.detector.detect = wrapped_detect
app.debug_monitor.log("Injected debug wrapper for detector.detect()")
# ============== Wrap tracker.update ==============
if hasattr(app, "tracker") and hasattr(app.tracker, "update"):
original_tracker_update = app.tracker.update
def wrapped_tracker_update(detections, *args, **kwargs):
try:
before_ids = set([t.track_id for t in getattr(app.tracker, "tracks", [])])
tracks = original_tracker_update(detections, *args, **kwargs)
after_ids = set([t.track_id for t in getattr(app.tracker, "tracks", [])])
new_ids = after_ids - before_ids
app.debug_monitor.counters['tracks_created'] += len(new_ids)
app.debug_monitor.counters['tracks_confirmed'] += len(tracks)
app.debug_monitor.counters['active_tracks'] = len(tracks)
if new_ids:
app.debug_monitor.log(f"Created {len(new_ids)} new tracks", "DEBUG")
return tracks
except Exception as e:
app.debug_monitor.log(f"Tracker error: {e}", "ERROR")
return []
app.tracker.update = wrapped_tracker_update
app.debug_monitor.log("Injected debug wrapper for tracker.update()")
# ============== Wrap reid.match_or_register ==============
if hasattr(app, "reid") and hasattr(app.reid, "match_or_register"):
original_match = app.reid.match_or_register
def wrapped_match(track, *args, **kwargs):
app.debug_monitor.counters['reid_attempts'] += 1
try:
dog_id, confidence = original_match(track, *args, **kwargs)
dog_id = int(dog_id)
sim = float(confidence)
# Record similarity
app.debug_monitor.similarities.append(sim)
app.debug_monitor.match_records.append({
'sim': sim,
'dog_id': dog_id,
'track_id': getattr(track, 'track_id', None),
'frame': app.debug_monitor.frame_count
})
# Check for problematic matches
threshold = getattr(app.reid, 'similarity_threshold', getattr(app, 'reid_threshold', 0.7))
# Save bad samples (very low similarity)
if sim < (threshold - 0.2) and track.detections:
latest_crop = None
for det in reversed(track.detections):
if det.image_crop is not None:
latest_crop = det.image_crop
break
if latest_crop is not None:
app.debug_monitor.save_bad_sample(
latest_crop, sim, dog_id,
getattr(track, 'track_id', 0)
)
# Count match/miss
if sim >= threshold:
app.debug_monitor.counters['reid_matches'] += 1
else:
app.debug_monitor.counters['reid_misses'] += 1
return dog_id, sim
except Exception as e:
app.debug_monitor.log(f"ReID error: {traceback.format_exc()}", "ERROR")
app.debug_monitor.counters['feature_fail'] += 1
return 0, 0.0
app.reid.match_or_register = wrapped_match
app.debug_monitor.log("Injected debug wrapper for reid.match_or_register()")
# ============== Wrap main process_video ==============
if hasattr(app, "process_video"):
original_process_video = app.process_video
def wrapped_process_video(video_path: str, progress=None):
# Reset debug state
app.debug_monitor.reset()
app.debug_monitor.log(f"Starting video processing: {video_path}")
# Wrap progress callback to count frames
if progress:
original_progress = progress
def wrapped_progress(val, desc=None):
app.debug_monitor.frame_count = int(val * 100) # Approximate frame count
app.debug_monitor.update_live_stats()
return original_progress(val, desc)
progress = wrapped_progress
try:
result = original_process_video(video_path, progress)
# Save debug report
app.debug_monitor.save_debug_report()
app.debug_monitor.log("Video processing completed successfully")
return result
except Exception as e:
app.debug_monitor.log(f"Process video error: {traceback.format_exc()}", "ERROR")
raise
app.process_video = wrapped_process_video
app.debug_monitor.log("Injected debug wrapper for process_video()")
# ============== Add convenience methods ==============
app.get_debug_output = lambda lines=10: app.debug_monitor.get_debug_output(lines)
app.get_debug_summary = lambda: app.debug_monitor.get_debug_summary()
app.get_live_stats = lambda: app.debug_monitor.live_stats
app.debug_monitor.log("✓ Debug monitoring system initialized")
return app