Spaces:
Running
Running
File size: 5,821 Bytes
ca5b08e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import asyncio
import time
from collections import defaultdict, deque
from typing import Any, Deque, Dict, List, Set
class MetricsKeeper:
def __init__(self, window=60 * 5):
"""
Initializes the MetricsKeeper.
Args:
window (int): Time window in seconds for recent metrics. Defaults to 5 minutes.
"""
self.window = window # Time window in seconds
self.start_time = time.time() # Timestamp when MetricsKeeper was created
self.total_metrics = defaultdict(int) # Cumulative metrics since start
self.window_metrics: Deque[Any] = deque() # Deque to store (timestamp, metrics_dict)
self.window_sum = defaultdict(int) # Sum of metrics within the window
def add_metrics(self, **kwargs):
"""
Adds metrics to the keeper.
Args:
**kwargs: Arbitrary keyword arguments representing metric names and their values.
"""
current_time = time.time()
# Update cumulative metrics
for key, value in kwargs.items():
self.total_metrics[key] += value
# Append current metrics with timestamp to the deque
self.window_metrics.append((current_time, kwargs))
# Update window sums
for key, value in kwargs.items():
self.window_sum[key] += value
# Remove metrics that are outside the time window
while self.window_metrics and self.window_metrics[0][0] < current_time - self.window:
old_time, old_metrics = self.window_metrics.popleft()
for key, value in old_metrics.items():
self.window_sum[key] -= value
if self.window_sum[key] <= 0:
del self.window_sum[key] # Clean up to prevent negative counts
def __str__(self):
"""
Returns a formatted string of metrics showing tokens/sec since start and within the window.
Returns:
str: Formatted metrics string as a table.
"""
current_time = time.time()
elapsed_time = current_time - self.start_time
window_time = min(self.window, elapsed_time) if elapsed_time > 0 else 1 # Prevent division by zero
# Header
header = f"{'Metric Name':<30} {'Lifetime (tokens/sec)':>25} {'Recently (tokens/sec)':>25}"
separator = "-" * len(header)
lines = [header, separator]
# Sort metrics alphabetically for consistency
for key in sorted(self.total_metrics.keys()):
total = self.total_metrics[key]
window = self.window_sum.get(key, 0)
total_rate = total / elapsed_time if elapsed_time > 0 else 0
window_rate = window / window_time if window_time > 0 else 0
line = f"{key:<20} {total_rate:>25.2f} {window_rate:>25.2f}"
lines.append(line)
return "\n".join(lines)
class WorkerTracker:
def __init__(self):
"""
Initializes the WorkerTracker with a default dictionary.
Each worker ID maps to another dictionary that holds counts for each state.
"""
# Mapping from worker_id to a dictionary of state counts
self.worker_status: Dict[int, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
self.lock = asyncio.Lock()
async def clear_work(self, worker_id: int):
async with self.lock:
self.worker_status[worker_id].clear()
async def track_work(self, worker_id: int, work_item_id: str, state: str):
"""
Update the state count for a specific worker.
Args:
worker_id (int): The ID of the worker.
work_item_id (str): The unique identifier of the work item (unused in this implementation).
state (str): The state to increment for the work item.
"""
async with self.lock:
self.worker_status[worker_id][state] += 1
async def get_status_table(self) -> str:
"""
Generate a formatted table of the current status of all workers.
Returns:
str: A string representation of the workers' statuses.
"""
async with self.lock:
# Determine all unique states across all workers
all_states: Set[str] = set()
for states in self.worker_status.values():
all_states.update(states.keys())
sorted_states: List[str] = sorted(all_states)
headers = ["Worker ID"] + sorted_states # type: ignore
rows = []
for worker_id, states in sorted(self.worker_status.items()):
row = [str(worker_id)]
for state in sorted_states:
count = states.get(state, 0)
row.append(str(count))
rows.append(row)
# Calculate column widths
col_widths = [len(header) for header in headers]
for row in rows:
for idx, cell in enumerate(row):
col_widths[idx] = max(col_widths[idx], len(cell))
# Create the table header
header_line = " | ".join(header.ljust(col_widths[idx]) for idx, header in enumerate(headers))
separator = "-+-".join("-" * col_widths[idx] for idx in range(len(headers)))
# Create the table rows
row_lines = [" | ".join(cell.ljust(col_widths[idx]) for idx, cell in enumerate(row)) for row in rows]
# Combine all parts
table = "\n".join([header_line, separator] + row_lines)
return table
def __str__(self):
"""
String representation is not directly supported.
Use 'await get_status_table()' to retrieve the status table.
"""
raise NotImplementedError("Use 'await get_status_table()' to get the status table.")
|