File size: 7,884 Bytes
e0f8ec7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
from typing import Any, Dict, Optional, List
import copy
import uuid
from datetime import datetime, timezone
from abc import ABC, abstractmethod
import asyncio
from pydantic import BaseModel, Field

class CheckpointMetadata(BaseModel):
    """
    Metadata for a checkpoint, including session and task identifiers.
    
    Attributes:
        session_id (str): The session identifier (required).
        task_id (Optional[str]): The task identifier (optional).
    """
    session_id: str = Field(..., description="The session identifier.")
    task_id: Optional[str] = Field(None, description="The task identifier.")

class Checkpoint(BaseModel):
    """
    Core structure for a state checkpoint.
    
    Attributes:
        id (str): Unique identifier for the checkpoint.
        ts (str): Timestamp of the checkpoint.
        metadata (CheckpointMetadata): Metadata associated with the checkpoint.
        values (dict[str, Any]): State values stored in the checkpoint.
        version (str): Version of the checkpoint format.
        parent_id (Optional[str]): Parent checkpoint identifier, if any.
        namespace (str): Namespace for the checkpoint, default is 'aworld'.
    """
    id: str = Field(..., description="Unique identifier for the checkpoint.")
    ts: str = Field(..., description="Timestamp of the checkpoint.")
    metadata: CheckpointMetadata = Field(..., description="Metadata associated with the checkpoint.")
    values: Dict[str, Any] = Field(..., description="State values stored in the checkpoint.")
    version: int = Field(..., description="Version of the checkpoint format.")
    parent_id: Optional[str] = Field(default=None, description="Parent checkpoint identifier, if any.")
    namespace: str = Field(default="aworld", description="Namespace for the checkpoint, default is 'aworld'.")

def empty_checkpoint() -> Checkpoint:
    """
    Create an empty checkpoint with default values.
    
    Returns:
        Checkpoint: An empty checkpoint structure.
    """
    return Checkpoint(
        id=str(uuid.uuid4()),
        ts=datetime.now(timezone.utc).isoformat(),
        metadata=CheckpointMetadata(session_id="", task_id=None),
        values={},
        version=1,
        parent_id=None,
        namespace="aworld",
    )

def copy_checkpoint(checkpoint: Checkpoint) -> Checkpoint:
    """
    Create a deep copy of a checkpoint.
    
    Args:
        checkpoint (Checkpoint): The checkpoint to copy.
    Returns:
        Checkpoint: A deep copy of the provided checkpoint.
    """
    return copy.deepcopy(checkpoint)

def create_checkpoint(
    values: Dict[str, Any],
    metadata: CheckpointMetadata,
    parent_id: Optional[str] = None,
    version: int = 1,
    namespace: str = 'aworld',
) -> Checkpoint:
    """
    Create a new checkpoint from provided state values and metadata.
    
    Args:
        values (dict[str, Any]): State values to store in the checkpoint.
        metadata (CheckpointMetadata): Metadata for the checkpoint.
        parent_id (Optional[str]): Parent checkpoint identifier, if any.
        version (str): Version of the checkpoint format.
        namespace (str): Namespace for the checkpoint.
    Returns:
        Checkpoint: The newly created checkpoint.
    """
    return Checkpoint(
        id=str(uuid.uuid4()),
        ts=datetime.now(timezone.utc).isoformat(),
        metadata=metadata,
        values=values,
        version=VersionUtils.get_next_version(version),
        parent_id=parent_id,
        namespace=namespace,
    )

class BaseCheckpointRepository(ABC):
    """
    Abstract base class for a checkpoint repository.
    Provides synchronous and asynchronous methods for checkpoint management.
    """

    @abstractmethod
    def get(self, checkpoint_id: str) -> Optional[Checkpoint]:
        """
        Retrieve a checkpoint by its unique identifier.
        
        Args:
            checkpoint_id (str): The unique identifier of the checkpoint.
        Returns:
            Optional[Checkpoint]: The checkpoint if found, otherwise None.
        """
        pass

    @abstractmethod
    def list(self, params: Dict[str, Any]) -> List[Checkpoint]:
        """
        List checkpoints matching the given parameters.
        
        Args:
            params (dict): Parameters to filter checkpoints.
        Returns:
            List[Checkpoint]: List of matching checkpoints.
        """
        pass

    @abstractmethod
    def put(self, checkpoint: Checkpoint) -> None:
        """
        Store a checkpoint.
        
        Args:
            checkpoint (Checkpoint): The checkpoint to store.
        """
        pass

    @abstractmethod
    def get_by_session(self, session_id: str) -> Optional[Checkpoint]:
        """
        Get the latest checkpoint for a session.
        
        Args:
            session_id (str): The session identifier.
        Returns:
            Optional[Checkpoint]: The latest checkpoint if found, otherwise None.
        """
        pass

    @abstractmethod
    def delete_by_session(self, session_id: str) -> None:
        """
        Delete all checkpoints related to a session.
        
        Args:
            session_id (str): The session identifier.
        """
        pass

    # Async methods
    async def aget(self, checkpoint_id: str) -> Optional[Checkpoint]:
        """
        Asynchronously retrieve a checkpoint by its unique identifier.
        
        Args:
            checkpoint_id (str): The unique identifier of the checkpoint.
        Returns:
            Optional[Checkpoint]: The checkpoint if found, otherwise None.
        """
        return await asyncio.to_thread(self.get, checkpoint_id)

    async def alist(self, params: Dict[str, Any]) -> List[Checkpoint]:
        """
        Asynchronously list checkpoints matching the given parameters.
        
        Args:
            params (dict): Parameters to filter checkpoints.
        Returns:
            List[Checkpoint]: List of matching checkpoints.
        """
        return await asyncio.to_thread(self.list, params)

    async def aput(self, checkpoint: Checkpoint) -> None:
        """
        Asynchronously store a checkpoint.
        
        Args:
            checkpoint (Checkpoint): The checkpoint to store.
        """
        await asyncio.to_thread(self.put, checkpoint)

    async def aget_by_session(self, session_id: str) -> Optional[Checkpoint]:
        """
        Asynchronously get the latest checkpoint for a session.
        
        Args:
            session_id (str): The session identifier.
        Returns:
            Optional[Checkpoint]: The latest checkpoint if found, otherwise None.
        """
        return await asyncio.to_thread(self.get_by_session, session_id)

    async def adelete_by_session(self, session_id: str) -> None:
        """
        Asynchronously delete all checkpoints related to a session.
        
        Args:
            session_id (str): The session identifier.
        """
        await asyncio.to_thread(self.delete_by_session, session_id)

class VersionUtils:

    @staticmethod
    def get_next_version(version: int) -> int:
        """
        Get the next version of the checkpoint.
        """
        return version + 1
    
    @staticmethod
    def get_previous_version(version: int)  -> int:
        """
        Get the previous version of the checkpoint.
        """
        return version - 1
        
    @staticmethod
    def is_version_greater(checkpoint: Checkpoint, version: int) -> bool:
        """
        Check if the checkpoint version is greater than the given version.
        """
        return checkpoint.version > version
    
    @staticmethod
    def is_version_less(checkpoint: Checkpoint, version: int) -> bool:
        """
        Check if the checkpoint version is less than the given version.
        """
        return checkpoint.version < version