Spaces:
Running
on
Zero
Running
on
Zero
File size: 6,195 Bytes
08fac87 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
import os
import re
import time
from collections.abc import Iterator, Sequence
from pathlib import Path
from typing import Optional, Union
from langchain_core.stores import ByteStore
from langchain.storage.exceptions import InvalidKeyException
class LocalFileStore(ByteStore):
"""BaseStore interface that works on the local file system.
Examples:
Create a LocalFileStore instance and perform operations on it:
.. code-block:: python
from langchain.storage import LocalFileStore
# Instantiate the LocalFileStore with the root path
file_store = LocalFileStore("/path/to/root")
# Set values for keys
file_store.mset([("key1", b"value1"), ("key2", b"value2")])
# Get values for keys
values = file_store.mget(["key1", "key2"]) # Returns [b"value1", b"value2"]
# Delete keys
file_store.mdelete(["key1"])
# Iterate over keys
for key in file_store.yield_keys():
print(key) # noqa: T201
"""
def __init__(
self,
root_path: Union[str, Path],
*,
chmod_file: Optional[int] = None,
chmod_dir: Optional[int] = None,
update_atime: bool = False,
) -> None:
"""Implement the BaseStore interface for the local file system.
Args:
root_path (Union[str, Path]): The root path of the file store. All keys are
interpreted as paths relative to this root.
chmod_file: (optional, defaults to `None`) If specified, sets permissions
for newly created files, overriding the current `umask` if needed.
chmod_dir: (optional, defaults to `None`) If specified, sets permissions
for newly created dirs, overriding the current `umask` if needed.
update_atime: (optional, defaults to `False`) If `True`, updates the
filesystem access time (but not the modified time) when a file is read.
This allows MRU/LRU cache policies to be implemented for filesystems
where access time updates are disabled.
"""
self.root_path = Path(root_path).absolute()
self.chmod_file = chmod_file
self.chmod_dir = chmod_dir
self.update_atime = update_atime
def _get_full_path(self, key: str) -> Path:
"""Get the full path for a given key relative to the root path.
Args:
key (str): The key relative to the root path.
Returns:
Path: The full path for the given key.
"""
if not re.match(r"^[a-zA-Z0-9_.\-/]+$", key):
raise InvalidKeyException(f"Invalid characters in key: {key}")
# Add key[0] to path to make subdirectories 20250720 jmd
full_path = os.path.abspath(self.root_path / key[0] / key)
common_path = os.path.commonpath([str(self.root_path), full_path])
if common_path != str(self.root_path):
raise InvalidKeyException(
f"Invalid key: {key}. Key should be relative to the full path."
f"{self.root_path} vs. {common_path} and full path of {full_path}"
)
return Path(full_path)
def _mkdir_for_store(self, dir: Path) -> None:
"""Makes a store directory path (including parents) with specified permissions
This is needed because `Path.mkdir()` is restricted by the current `umask`,
whereas the explicit `os.chmod()` used here is not.
Args:
dir: (Path) The store directory to make
Returns:
None
"""
if not dir.exists():
self._mkdir_for_store(dir.parent)
dir.mkdir(exist_ok=True)
if self.chmod_dir is not None:
os.chmod(dir, self.chmod_dir)
def mget(self, keys: Sequence[str]) -> list[Optional[bytes]]:
"""Get the values associated with the given keys.
Args:
keys: A sequence of keys.
Returns:
A sequence of optional values associated with the keys.
If a key is not found, the corresponding value will be None.
"""
values: list[Optional[bytes]] = []
for key in keys:
full_path = self._get_full_path(key)
if full_path.exists():
value = full_path.read_bytes()
values.append(value)
if self.update_atime:
# update access time only; preserve modified time
os.utime(full_path, (time.time(), os.stat(full_path).st_mtime))
else:
values.append(None)
return values
def mset(self, key_value_pairs: Sequence[tuple[str, bytes]]) -> None:
"""Set the values for the given keys.
Args:
key_value_pairs: A sequence of key-value pairs.
Returns:
None
"""
for key, value in key_value_pairs:
full_path = self._get_full_path(key)
self._mkdir_for_store(full_path.parent)
full_path.write_bytes(value)
if self.chmod_file is not None:
os.chmod(full_path, self.chmod_file)
def mdelete(self, keys: Sequence[str]) -> None:
"""Delete the given keys and their associated values.
Args:
keys (Sequence[str]): A sequence of keys to delete.
Returns:
None
"""
for key in keys:
full_path = self._get_full_path(key)
if full_path.exists():
full_path.unlink()
def yield_keys(self, prefix: Optional[str] = None) -> Iterator[str]:
"""Get an iterator over keys that match the given prefix.
Args:
prefix (Optional[str]): The prefix to match.
Returns:
Iterator[str]: An iterator over keys that match the given prefix.
"""
prefix_path = self._get_full_path(prefix) if prefix else self.root_path
for file in prefix_path.rglob("*"):
if file.is_file():
relative_path = file.relative_to(self.root_path)
yield str(relative_path)
|