Spaces:
Sleeping
Sleeping
File size: 2,895 Bytes
7c117ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# Replay Buffer
A multi-process capable replay buffer system for storing and sampling experience data.
## Features
- **Multi-process Support**: Safe concurrent access using shared memory and locks
- **Flexible Querying**: Powerful query builder for filtering stored data
- **Task-based Organization**: Data organized by task_id and agent_id
- **Capacity Management**: FIFO eviction when reaching max capacity
- **Custom Sampling**: Implement custom sampling logic through Sampler interface
- **Data Conversion**: Custom data conversion through Converter interface
## Basic Usage
### Writing Data
```python
from aworld.replay_buffer import ReplayBuffer, DataRow, ExpMeta, Experience
from aworld.core.common import ActionModel, Observation
# Create a data row
data = DataRow(
exp_meta=ExpMeta(
task_id="task_1",
task_name="my_task",
agent_id="agent_1",
step=1,
execute_time=time.time()
),
exp_data=Experience(
state=Observation(),
action=ActionModel()
)
)
# Store data
replay_buffer.store(data)
```
### Reading Data
```python
from aworld.replay_buffer.query_filter import QueryBuilder
# Basic example
replay_buffer = ReplayBuffer()
query_condition = QueryBuilder().eq("exp_meta.task_name", "test_task").build()
data = replay_buffer.sample(sampler=RandomTaskSample(),
query_condition=query_condition,
converter=DefaultConverter(),
batch_size=1000)
# Query Task by task_id
query = QueryBuilder().eq("exp_meta.task_id", "task_1").build()
data = replay_buffer.sample_task(query_condition=query, batch_size=10)
# Query Task by agent_id
query = QueryBuilder().eq("exp_meta.agent_id", "agent_1").build()
data = replay_buffer.sample_task(query_condition=query, batch_size=5)
```
## Multi-processing Example
```python
import multiprocessing
from aworld.replay_buffer.storage.multi_proc_mem import MultiProcMemoryStorage
manager = multiprocessing.Manager()
replay_buffer = ReplayBuffer(
storage=MultiProcMemoryStorage(
data_dict=manager.dict(),
fifo_queue=manager.list(),
lock=manager.Lock(),
max_capacity=10000
)
)
# Start writer processes
processes = [
multiprocessing.Process(target=write_processing, args=(replay_buffer, f"task_{i}"))
for i in range(4)
]
```
## Query Builder Examples
### Simple Equality
```python
QueryBuilder().eq("exp_meta.task_id", "123").build()
```
### Complex Conditions
```python
QueryBuilder()
.eq("exp_meta.task_id", "123")
.and_()
.eq("exp_meta.agent_id", "456")
.build()
```
### Nested Conditions
```python
QueryBuilder()
.eq("exp_meta.task_id", "123")
.and_()
.nested(
QueryBuilder()
.eq("exp_meta.agent_id", "111")
.or_()
.eq("exp_meta.agent_id", "222")
)
.build()
```
|