File size: 5,821 Bytes
ca5b08e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import asyncio
import time
from collections import defaultdict, deque
from typing import Any, Deque, Dict, List, Set


class MetricsKeeper:
    def __init__(self, window=60 * 5):
        """
        Initializes the MetricsKeeper.

        Args:
            window (int): Time window in seconds for recent metrics. Defaults to 5 minutes.
        """
        self.window = window  # Time window in seconds
        self.start_time = time.time()  # Timestamp when MetricsKeeper was created
        self.total_metrics = defaultdict(int)  # Cumulative metrics since start
        self.window_metrics: Deque[Any] = deque()  # Deque to store (timestamp, metrics_dict)
        self.window_sum = defaultdict(int)  # Sum of metrics within the window

    def add_metrics(self, **kwargs):
        """
        Adds metrics to the keeper.

        Args:
            **kwargs: Arbitrary keyword arguments representing metric names and their values.
        """
        current_time = time.time()
        # Update cumulative metrics
        for key, value in kwargs.items():
            self.total_metrics[key] += value

        # Append current metrics with timestamp to the deque
        self.window_metrics.append((current_time, kwargs))

        # Update window sums
        for key, value in kwargs.items():
            self.window_sum[key] += value

        # Remove metrics that are outside the time window
        while self.window_metrics and self.window_metrics[0][0] < current_time - self.window:
            old_time, old_metrics = self.window_metrics.popleft()
            for key, value in old_metrics.items():
                self.window_sum[key] -= value
                if self.window_sum[key] <= 0:
                    del self.window_sum[key]  # Clean up to prevent negative counts

    def __str__(self):
        """
        Returns a formatted string of metrics showing tokens/sec since start and within the window.

        Returns:
            str: Formatted metrics string as a table.
        """
        current_time = time.time()
        elapsed_time = current_time - self.start_time
        window_time = min(self.window, elapsed_time) if elapsed_time > 0 else 1  # Prevent division by zero

        # Header
        header = f"{'Metric Name':<30} {'Lifetime (tokens/sec)':>25} {'Recently (tokens/sec)':>25}"
        separator = "-" * len(header)
        lines = [header, separator]

        # Sort metrics alphabetically for consistency
        for key in sorted(self.total_metrics.keys()):
            total = self.total_metrics[key]
            window = self.window_sum.get(key, 0)
            total_rate = total / elapsed_time if elapsed_time > 0 else 0
            window_rate = window / window_time if window_time > 0 else 0
            line = f"{key:<20} {total_rate:>25.2f} {window_rate:>25.2f}"
            lines.append(line)

        return "\n".join(lines)


class WorkerTracker:
    def __init__(self):
        """
        Initializes the WorkerTracker with a default dictionary.
        Each worker ID maps to another dictionary that holds counts for each state.
        """
        # Mapping from worker_id to a dictionary of state counts
        self.worker_status: Dict[int, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
        self.lock = asyncio.Lock()

    async def clear_work(self, worker_id: int):
        async with self.lock:
            self.worker_status[worker_id].clear()

    async def track_work(self, worker_id: int, work_item_id: str, state: str):
        """
        Update the state count for a specific worker.

        Args:
            worker_id (int): The ID of the worker.
            work_item_id (str): The unique identifier of the work item (unused in this implementation).
            state (str): The state to increment for the work item.
        """
        async with self.lock:
            self.worker_status[worker_id][state] += 1

    async def get_status_table(self) -> str:
        """
        Generate a formatted table of the current status of all workers.

        Returns:
            str: A string representation of the workers' statuses.
        """
        async with self.lock:
            # Determine all unique states across all workers
            all_states: Set[str] = set()
            for states in self.worker_status.values():
                all_states.update(states.keys())
            sorted_states: List[str] = sorted(all_states)

            headers = ["Worker ID"] + sorted_states  # type: ignore
            rows = []
            for worker_id, states in sorted(self.worker_status.items()):
                row = [str(worker_id)]
                for state in sorted_states:
                    count = states.get(state, 0)
                    row.append(str(count))
                rows.append(row)

            # Calculate column widths
            col_widths = [len(header) for header in headers]
            for row in rows:
                for idx, cell in enumerate(row):
                    col_widths[idx] = max(col_widths[idx], len(cell))

            # Create the table header
            header_line = " | ".join(header.ljust(col_widths[idx]) for idx, header in enumerate(headers))
            separator = "-+-".join("-" * col_widths[idx] for idx in range(len(headers)))

            # Create the table rows
            row_lines = [" | ".join(cell.ljust(col_widths[idx]) for idx, cell in enumerate(row)) for row in rows]

            # Combine all parts
            table = "\n".join([header_line, separator] + row_lines)
            return table

    def __str__(self):
        """
        String representation is not directly supported.
        Use 'await get_status_table()' to retrieve the status table.
        """
        raise NotImplementedError("Use 'await get_status_table()' to get the status table.")