Spaces:
Running
Running
""" | |
System monitoring service for Video Model Studio. | |
Tracks system resources like CPU, memory, and other metrics. | |
""" | |
import os | |
import time | |
import logging | |
import platform | |
import threading | |
import pandas as pd | |
from datetime import datetime, timedelta | |
from collections import deque | |
from typing import Dict, List, Optional, Tuple, Any | |
import psutil | |
from vms.ui.monitoring.services.gpu import GPUMonitoringService | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
class MonitoringService: | |
"""Service for monitoring system resources and performance""" | |
def __init__(self, history_minutes: int = 10, sample_interval: int = 5): | |
"""Initialize the monitoring service | |
Args: | |
history_minutes: How many minutes of history to keep | |
sample_interval: How many seconds between samples | |
""" | |
self.history_minutes = history_minutes | |
self.sample_interval = sample_interval | |
self.max_samples = (history_minutes * 60) // sample_interval | |
# Initialize data structures for metrics | |
self.timestamps = deque(maxlen=self.max_samples) | |
self.cpu_percent = deque(maxlen=self.max_samples) | |
self.memory_percent = deque(maxlen=self.max_samples) | |
self.memory_used = deque(maxlen=self.max_samples) | |
self.memory_available = deque(maxlen=self.max_samples) | |
# CPU temperature history (might not be available on all systems) | |
self.cpu_temp = deque(maxlen=self.max_samples) | |
# Per-core CPU history | |
self.cpu_cores_percent = {} | |
# Initialize GPU monitoring service | |
self.gpu = GPUMonitoringService(history_minutes=history_minutes, sample_interval=sample_interval) | |
# Track if the monitoring thread is running | |
self.is_running = False | |
self.thread = None | |
# Initialize with current values | |
self.collect_metrics() | |
def collect_metrics(self) -> Dict[str, Any]: | |
"""Collect current system metrics | |
Returns: | |
Dictionary of current metrics | |
""" | |
metrics = { | |
'timestamp': datetime.now(), | |
'cpu_percent': psutil.cpu_percent(interval=0.1), | |
'memory_percent': psutil.virtual_memory().percent, | |
'memory_used': psutil.virtual_memory().used / (1024**3), # GB | |
'memory_available': psutil.virtual_memory().available / (1024**3), # GB | |
'cpu_temp': None, | |
'per_cpu_percent': psutil.cpu_percent(interval=0.1, percpu=True) | |
} | |
# Try to get CPU temperature (platform specific) | |
try: | |
if platform.system() == 'Linux': | |
# Try to get temperature from psutil | |
temps = psutil.sensors_temperatures() | |
for name, entries in temps.items(): | |
if name.startswith(('coretemp', 'k10temp', 'cpu_thermal')): | |
metrics['cpu_temp'] = entries[0].current | |
break | |
elif platform.system() == 'Darwin': # macOS | |
# On macOS, we could use SMC reader but it requires additional dependencies | |
# Leaving as None for now | |
pass | |
elif platform.system() == 'Windows': | |
# Windows might require WMI, leaving as None for simplicity | |
pass | |
except (AttributeError, KeyError, IndexError, NotImplementedError): | |
# Sensors not available | |
pass | |
return metrics | |
def update_history(self, metrics: Dict[str, Any]) -> None: | |
"""Update metric history with new values | |
Args: | |
metrics: New metrics to add to history | |
""" | |
self.timestamps.append(metrics['timestamp']) | |
self.cpu_percent.append(metrics['cpu_percent']) | |
self.memory_percent.append(metrics['memory_percent']) | |
self.memory_used.append(metrics['memory_used']) | |
self.memory_available.append(metrics['memory_available']) | |
if metrics['cpu_temp'] is not None: | |
self.cpu_temp.append(metrics['cpu_temp']) | |
# Update per-core CPU metrics | |
for i, percent in enumerate(metrics['per_cpu_percent']): | |
if i not in self.cpu_cores_percent: | |
self.cpu_cores_percent[i] = deque(maxlen=self.max_samples) | |
self.cpu_cores_percent[i].append(percent) | |
def start_monitoring(self) -> None: | |
"""Start background thread for collecting metrics""" | |
if self.is_running: | |
logger.warning("Monitoring thread already running") | |
return | |
self.is_running = True | |
# Start GPU monitoring if available | |
self.gpu.start_monitoring() | |
def _monitor_loop(): | |
while self.is_running: | |
try: | |
metrics = self.collect_metrics() | |
self.update_history(metrics) | |
time.sleep(self.sample_interval) | |
except Exception as e: | |
logger.error(f"Error in monitoring thread: {str(e)}", exc_info=True) | |
time.sleep(self.sample_interval) | |
self.thread = threading.Thread(target=_monitor_loop, daemon=True) | |
self.thread.start() | |
logger.info("System monitoring thread started") | |
def stop_monitoring(self) -> None: | |
"""Stop the monitoring thread""" | |
if not self.is_running: | |
return | |
self.is_running = False | |
# Stop GPU monitoring | |
self.gpu.stop_monitoring() | |
if self.thread: | |
self.thread.join(timeout=1.0) | |
logger.info("System monitoring thread stopped") | |
def get_current_metrics(self) -> Dict[str, Any]: | |
"""Get current system metrics | |
Returns: | |
Dictionary with current system metrics | |
""" | |
return self.collect_metrics() | |
def get_system_info(self) -> Dict[str, Any]: | |
"""Get general system information | |
Returns: | |
Dictionary with system details | |
""" | |
cpu_info = { | |
'cores_physical': psutil.cpu_count(logical=False), | |
'cores_logical': psutil.cpu_count(logical=True), | |
'current_frequency': None, | |
'architecture': platform.machine(), | |
} | |
# Try to get CPU frequency | |
try: | |
cpu_freq = psutil.cpu_freq() | |
if cpu_freq: | |
cpu_info['current_frequency'] = cpu_freq.current | |
except Exception: | |
pass | |
memory_info = { | |
'total': psutil.virtual_memory().total / (1024**3), # GB | |
'available': psutil.virtual_memory().available / (1024**3), # GB | |
'used': psutil.virtual_memory().used / (1024**3), # GB | |
'percent': psutil.virtual_memory().percent | |
} | |
disk_info = {} | |
for part in psutil.disk_partitions(all=False): | |
if os.name == 'nt' and ('cdrom' in part.opts or part.fstype == ''): | |
# Skip CD-ROM drives on Windows | |
continue | |
try: | |
usage = psutil.disk_usage(part.mountpoint) | |
disk_info[part.mountpoint] = { | |
'total': usage.total / (1024**3), # GB | |
'used': usage.used / (1024**3), # GB | |
'free': usage.free / (1024**3), # GB | |
'percent': usage.percent | |
} | |
except PermissionError: | |
continue | |
sys_info = { | |
'system': platform.system(), | |
'version': platform.version(), | |
'platform': platform.platform(), | |
'processor': platform.processor(), | |
'hostname': platform.node(), | |
'python_version': platform.python_version(), | |
'uptime': time.time() - psutil.boot_time() | |
} | |
return { | |
'cpu': cpu_info, | |
'memory': memory_info, | |
'disk': disk_info, | |
'system': sys_info, | |
} | |
def get_cpu_data(self) -> pd.DataFrame: | |
"""Get CPU usage data as a DataFrame | |
Returns: | |
DataFrame with CPU usage data | |
""" | |
if not self.timestamps: | |
return pd.DataFrame({ | |
'time': list(), | |
'CPU Usage (%)': list() | |
}) | |
data = { | |
'time': list(self.timestamps), | |
'CPU Usage (%)': list(self.cpu_percent) | |
} | |
# Add temperature if available | |
if self.cpu_temp and len(self.cpu_temp) > 0: | |
# Ensure temperature data aligns with timestamps | |
# If fewer temperature readings than timestamps, pad with None | |
temp_data = list(self.cpu_temp) | |
if len(temp_data) < len(self.timestamps): | |
padding = [None] * (len(self.timestamps) - len(temp_data)) | |
temp_data = padding + temp_data | |
data['CPU Temperature (°C)'] = temp_data | |
return pd.DataFrame(data) | |
def get_memory_data(self) -> pd.DataFrame: | |
"""Get memory usage data as a DataFrame | |
Returns: | |
DataFrame with memory usage data | |
""" | |
if not self.timestamps: | |
return pd.DataFrame({ | |
'time': list(), | |
'Memory Usage (%)': list(), | |
'Memory Used (GB)': list(), | |
'Memory Available (GB)': list() | |
}) | |
return pd.DataFrame({ | |
'time': list(self.timestamps), | |
'Memory Usage (%)': list(self.memory_percent), | |
'Memory Used (GB)': list(self.memory_used), | |
'Memory Available (GB)': list(self.memory_available) | |
}) | |
def get_per_core_data(self) -> Dict[int, pd.DataFrame]: | |
"""Get per-core CPU usage data as DataFrames | |
Returns: | |
Dictionary of DataFrames with per-core CPU usage data | |
""" | |
if not self.timestamps or not self.cpu_cores_percent: | |
return {} | |
core_data = {} | |
for core_id, percentages in self.cpu_cores_percent.items(): | |
# Ensure we don't have more data points than timestamps | |
data_length = min(len(percentages), len(self.timestamps)) | |
core_data[core_id] = pd.DataFrame({ | |
'time': list(self.timestamps)[-data_length:], | |
f'Core {core_id} Usage (%)': list(percentages)[-data_length:] | |
}) | |
return core_data | |
# Replace matplotlib methods with DataFrame methods | |
# This method is kept for backward compatibility but returns a DataFrame | |
def generate_cpu_plot(self) -> pd.DataFrame: | |
"""Get CPU usage data for plotting | |
Returns: | |
DataFrame with CPU usage data | |
""" | |
return self.get_cpu_data() | |
# This method is kept for backward compatibility but returns a DataFrame | |
def generate_memory_plot(self) -> pd.DataFrame: | |
"""Get memory usage data for plotting | |
Returns: | |
DataFrame with memory usage data | |
""" | |
return self.get_memory_data() | |
# This method is kept for backward compatibility but returns a DataFrame of all cores | |
def generate_per_core_plot(self) -> pd.DataFrame: | |
"""Get per-core CPU usage data for plotting | |
Returns: | |
Combined DataFrame with all cores' usage data | |
""" | |
core_data = self.get_per_core_data() | |
if not core_data: | |
return pd.DataFrame() | |
# Combine all core data into a single DataFrame using the first core's timestamps | |
first_core_id = list(core_data.keys())[0] | |
combined_df = core_data[first_core_id][['time']].copy() | |
for core_id, df in core_data.items(): | |
combined_df[f'Core {core_id} Usage (%)'] = df[f'Core {core_id} Usage (%)'] | |
return combined_df |