File size: 2,895 Bytes
7c117ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# Replay Buffer

A multi-process capable replay buffer system for storing and sampling experience data.

## Features

- **Multi-process Support**: Safe concurrent access using shared memory and locks
- **Flexible Querying**: Powerful query builder for filtering stored data
- **Task-based Organization**: Data organized by task_id and agent_id
- **Capacity Management**: FIFO eviction when reaching max capacity
- **Custom Sampling**: Implement custom sampling logic through Sampler interface
- **Data Conversion**: Custom data conversion through Converter interface

## Basic Usage

### Writing Data

```python
from aworld.replay_buffer import ReplayBuffer, DataRow, ExpMeta, Experience
from aworld.core.common import ActionModel, Observation

# Create a data row
data = DataRow(
    exp_meta=ExpMeta(
        task_id="task_1",
        task_name="my_task",
        agent_id="agent_1",
        step=1,
        execute_time=time.time()
    ),
    exp_data=Experience(
        state=Observation(),
        action=ActionModel()
    )
)

# Store data
replay_buffer.store(data)
```

### Reading Data

```python
from aworld.replay_buffer.query_filter import QueryBuilder

# Basic example
replay_buffer = ReplayBuffer()
query_condition = QueryBuilder().eq("exp_meta.task_name", "test_task").build()
data = replay_buffer.sample(sampler=RandomTaskSample(),
                            query_condition=query_condition,
                            converter=DefaultConverter(),
                            batch_size=1000)

# Query Task by task_id
query = QueryBuilder().eq("exp_meta.task_id", "task_1").build()
data = replay_buffer.sample_task(query_condition=query, batch_size=10)

# Query Task by agent_id 
query = QueryBuilder().eq("exp_meta.agent_id", "agent_1").build()
data = replay_buffer.sample_task(query_condition=query, batch_size=5)
```
## Multi-processing Example

```python
import multiprocessing
from aworld.replay_buffer.storage.multi_proc_mem import MultiProcMemoryStorage

manager = multiprocessing.Manager()
replay_buffer = ReplayBuffer(
    storage=MultiProcMemoryStorage(
        data_dict=manager.dict(),
        fifo_queue=manager.list(),
        lock=manager.Lock(),
        max_capacity=10000
    )
)

# Start writer processes
processes = [
    multiprocessing.Process(target=write_processing, args=(replay_buffer, f"task_{i}"))
    for i in range(4)
]
```
## Query Builder Examples

### Simple Equality
```python
QueryBuilder().eq("exp_meta.task_id", "123").build()
```

### Complex Conditions
```python
QueryBuilder()
    .eq("exp_meta.task_id", "123")
    .and_()
    .eq("exp_meta.agent_id", "456")
    .build()
```
### Nested Conditions
```python
QueryBuilder()
    .eq("exp_meta.task_id", "123")
    .and_()
    .nested(
        QueryBuilder()
            .eq("exp_meta.agent_id", "111")
            .or_()
            .eq("exp_meta.agent_id", "222")
    )
    .build()
```