File size: 3,191 Bytes
7deb4a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import threading
import time

import huggingface_hub
from gradio_client import Client

from trackio.sqlite_storage import SQLiteStorage
from trackio.typehints import LogEntry
from trackio.utils import RESERVED_KEYS, fibo, generate_readable_name


BATCH_SEND_INTERVAL = 0.5

class Run:
    def __init__(
        self,
        url: str,
        project: str,
        client: Client | None,
        name: str | None = None,
        config: dict | None = None,
    ):
        self.url = url
        self.project = project
        self._client_lock = threading.Lock()
        self._client_thread = None
        self._client = client
        self.name = name or generate_readable_name(SQLiteStorage.get_runs(project))
        self.config = config or {}
        self._queued_logs: list[LogEntry] = []
        self._stop_flag = threading.Event()

        self._client_thread = threading.Thread(target=self._init_client_background)
        self._client_thread.daemon = True
        self._client_thread.start()

    def _batch_sender(self):
        """Send batched logs every BATCH_SEND_INTERVAL."""
        while not self._stop_flag.is_set() or len(self._queued_logs) > 0:
            # If the stop flag has been set, then just quickly send all 
            # the logs and exit.
            if not self._stop_flag.is_set():
                time.sleep(BATCH_SEND_INTERVAL)

            with self._client_lock:
                if self._queued_logs and self._client is not None:
                    logs_to_send = self._queued_logs.copy()
                    self._queued_logs.clear()
                    self._client.predict(
                        api_name="/bulk_log",
                        logs=logs_to_send,
                        hf_token=huggingface_hub.utils.get_token(),
                    )

    def _init_client_background(self):
        if self._client is None:
            fib = fibo()
            for sleep_coefficient in fib:
                try:
                    client = Client(self.url, verbose=False)
                    
                    with self._client_lock:
                        self._client = client
                    break
                except Exception:
                    pass
                if sleep_coefficient is not None:
                    time.sleep(0.1 * sleep_coefficient)

        self._batch_sender()

    def log(self, metrics: dict, step: int | None = None):
        for k in metrics.keys():
            if k in RESERVED_KEYS or k.startswith("__"):
                raise ValueError(
                    f"Please do not use this reserved key as a metric: {k}"
                )

        log_entry: LogEntry = {
            "project": self.project,
            "run": self.name,
            "metrics": metrics,
            "step": step,
        }

        with self._client_lock:
            self._queued_logs.append(log_entry)

    def finish(self):
        """Cleanup when run is finished."""
        self._stop_flag.set()
        time.sleep(2*BATCH_SEND_INTERVAL)
        
        if self._client_thread is not None:
            print(f"* Uploading logs to Trackio Space: {self.url} (please wait...)")
            self._client_thread.join()