Spaces:
Running
Running
#!/usr/bin/env python | |
"""PickleShare - a small 'shelve' like datastore with concurrency support | |
Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike | |
shelve, many processes can access the database simultaneously. Changing a | |
value in database is immediately visible to other processes accessing the | |
same database. | |
Concurrency is possible because the values are stored in separate files. Hence | |
the "database" is a directory where *all* files are governed by PickleShare. | |
Example usage:: | |
from pickleshare import * | |
db = PickleShareDB('~/testpickleshare') | |
db.clear() | |
print "Should be empty:",db.items() | |
db['hello'] = 15 | |
db['aku ankka'] = [1,2,313] | |
db['paths/are/ok/key'] = [1,(5,46)] | |
print db.keys() | |
del db['aku ankka'] | |
This module is certainly not ZODB, but can be used for low-load | |
(non-mission-critical) situations where tiny code size trumps the | |
advanced features of a "real" object database. | |
Installation guide: pip install pickleshare | |
Author: Ville Vainio <vivainio@gmail.com> | |
License: MIT open source license. | |
""" | |
from __future__ import print_function | |
__version__ = "0.7.5" | |
try: | |
from pathlib import Path | |
except ImportError: | |
# Python 2 backport | |
from pathlib2 import Path | |
import os, stat, time | |
try: | |
import collections.abc as collections_abc | |
except ImportError: | |
import collections as collections_abc | |
try: | |
import cPickle as pickle | |
except ImportError: | |
import pickle | |
import errno | |
import sys | |
if sys.version_info[0] >= 3: | |
string_types = (str,) | |
else: | |
string_types = (str, unicode) | |
def gethashfile(key): | |
return ("%02x" % abs(hash(key) % 256))[-2:] | |
_sentinel = object() | |
class PickleShareDB(collections_abc.MutableMapping): | |
"""The main 'connection' object for PickleShare database""" | |
def __init__(self, root): | |
"""Return a db object that will manage the specied directory""" | |
if not isinstance(root, string_types): | |
root = str(root) | |
root = os.path.abspath(os.path.expanduser(root)) | |
self.root = Path(root) | |
if not self.root.is_dir(): | |
# catching the exception is necessary if multiple processes are concurrently trying to create a folder | |
# exists_ok keyword argument of mkdir does the same but only from Python 3.5 | |
try: | |
self.root.mkdir(parents=True) | |
except OSError as e: | |
if e.errno != errno.EEXIST: | |
raise | |
# cache has { 'key' : (obj, orig_mod_time) } | |
self.cache = {} | |
def __getitem__(self, key): | |
"""db['key'] reading""" | |
fil = self.root / key | |
try: | |
mtime = fil.stat()[stat.ST_MTIME] | |
except OSError: | |
raise KeyError(key) | |
if fil in self.cache and mtime == self.cache[fil][1]: | |
return self.cache[fil][0] | |
try: | |
# The cached item has expired, need to read | |
with fil.open("rb") as f: | |
obj = pickle.loads(f.read()) | |
except: | |
raise KeyError(key) | |
self.cache[fil] = (obj, mtime) | |
return obj | |
def __setitem__(self, key, value): | |
"""db['key'] = 5""" | |
fil = self.root / key | |
parent = fil.parent | |
if parent and not parent.is_dir(): | |
parent.mkdir(parents=True) | |
# We specify protocol 2, so that we can mostly go between Python 2 | |
# and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete. | |
with fil.open("wb") as f: | |
pickle.dump(value, f, protocol=2) | |
try: | |
self.cache[fil] = (value, fil.stat().st_mtime) | |
except OSError as e: | |
if e.errno != errno.ENOENT: | |
raise | |
def hset(self, hashroot, key, value): | |
"""hashed set""" | |
hroot = self.root / hashroot | |
if not hroot.is_dir(): | |
hroot.mkdir() | |
hfile = hroot / gethashfile(key) | |
d = self.get(hfile, {}) | |
d.update({key: value}) | |
self[hfile] = d | |
def hget(self, hashroot, key, default=_sentinel, fast_only=True): | |
"""hashed get""" | |
hroot = self.root / hashroot | |
hfile = hroot / gethashfile(key) | |
d = self.get(hfile, _sentinel) | |
# print "got dict",d,"from",hfile | |
if d is _sentinel: | |
if fast_only: | |
if default is _sentinel: | |
raise KeyError(key) | |
return default | |
# slow mode ok, works even after hcompress() | |
d = self.hdict(hashroot) | |
return d.get(key, default) | |
def hdict(self, hashroot): | |
"""Get all data contained in hashed category 'hashroot' as dict""" | |
hfiles = self.keys(hashroot + "/*") | |
hfiles.sort() | |
last = len(hfiles) and hfiles[-1] or "" | |
if last.endswith("xx"): | |
# print "using xx" | |
hfiles = [last] + hfiles[:-1] | |
all = {} | |
for f in hfiles: | |
# print "using",f | |
try: | |
all.update(self[f]) | |
except KeyError: | |
print("Corrupt", f, "deleted - hset is not threadsafe!") | |
del self[f] | |
self.uncache(f) | |
return all | |
def hcompress(self, hashroot): | |
"""Compress category 'hashroot', so hset is fast again | |
hget will fail if fast_only is True for compressed items (that were | |
hset before hcompress). | |
""" | |
hfiles = self.keys(hashroot + "/*") | |
all = {} | |
for f in hfiles: | |
# print "using",f | |
all.update(self[f]) | |
self.uncache(f) | |
self[hashroot + "/xx"] = all | |
for f in hfiles: | |
p = self.root / f | |
if p.name == "xx": | |
continue | |
p.unlink() | |
def __delitem__(self, key): | |
"""del db["key"]""" | |
fil = self.root / key | |
self.cache.pop(fil, None) | |
try: | |
fil.unlink() | |
except OSError: | |
# notfound and permission denied are ok - we | |
# lost, the other process wins the conflict | |
pass | |
def _normalized(self, p): | |
"""Make a key suitable for user's eyes""" | |
return str(p.relative_to(self.root)).replace("\\", "/") | |
def keys(self, globpat=None): | |
"""All keys in DB, or all keys matching a glob""" | |
if globpat is None: | |
files = self.root.rglob("*") | |
else: | |
files = self.root.glob(globpat) | |
return [self._normalized(p) for p in files if p.is_file()] | |
def __iter__(self): | |
return iter(self.keys()) | |
def __len__(self): | |
return len(self.keys()) | |
def uncache(self, *items): | |
"""Removes all, or specified items from cache | |
Use this after reading a large amount of large objects | |
to free up memory, when you won't be needing the objects | |
for a while. | |
""" | |
if not items: | |
self.cache = {} | |
for it in items: | |
self.cache.pop(it, None) | |
def waitget(self, key, maxwaittime=60): | |
"""Wait (poll) for a key to get a value | |
Will wait for `maxwaittime` seconds before raising a KeyError. | |
The call exits normally if the `key` field in db gets a value | |
within the timeout period. | |
Use this for synchronizing different processes or for ensuring | |
that an unfortunately timed "db['key'] = newvalue" operation | |
in another process (which causes all 'get' operation to cause a | |
KeyError for the duration of pickling) won't screw up your program | |
logic. | |
""" | |
wtimes = [0.2] * 3 + [0.5] * 2 + [1] | |
tries = 0 | |
waited = 0 | |
while 1: | |
try: | |
val = self[key] | |
return val | |
except KeyError: | |
pass | |
if waited > maxwaittime: | |
raise KeyError(key) | |
time.sleep(wtimes[tries]) | |
waited += wtimes[tries] | |
if tries < len(wtimes) - 1: | |
tries += 1 | |
def getlink(self, folder): | |
"""Get a convenient link for accessing items""" | |
return PickleShareLink(self, folder) | |
def __repr__(self): | |
return "PickleShareDB('%s')" % self.root | |
class PickleShareLink: | |
"""A shortdand for accessing nested PickleShare data conveniently. | |
Created through PickleShareDB.getlink(), example:: | |
lnk = db.getlink('myobjects/test') | |
lnk.foo = 2 | |
lnk.bar = lnk.foo + 5 | |
""" | |
def __init__(self, db, keydir): | |
self.__dict__.update(locals()) | |
def __getattr__(self, key): | |
return self.__dict__["db"][self.__dict__["keydir"] + "/" + key] | |
def __setattr__(self, key, val): | |
self.db[self.keydir + "/" + key] = val | |
def __repr__(self): | |
db = self.__dict__["db"] | |
keys = db.keys(self.__dict__["keydir"] + "/*") | |
return "<PickleShareLink '%s': %s>" % ( | |
self.__dict__["keydir"], | |
";".join([Path(k).basename() for k in keys]), | |
) | |
def main(): | |
import textwrap | |
usage = textwrap.dedent( | |
"""\ | |
pickleshare - manage PickleShare databases | |
Usage: | |
pickleshare dump /path/to/db > dump.txt | |
pickleshare load /path/to/db < dump.txt | |
pickleshare test /path/to/db | |
""" | |
) | |
DB = PickleShareDB | |
import sys | |
if len(sys.argv) < 2: | |
print(usage) | |
return | |
cmd = sys.argv[1] | |
args = sys.argv[2:] | |
if cmd == "dump": | |
if not args: | |
args = ["."] | |
db = DB(args[0]) | |
import pprint | |
pprint.pprint(db.items()) | |
elif cmd == "load": | |
cont = sys.stdin.read() | |
db = DB(args[0]) | |
data = eval(cont) | |
db.clear() | |
for k, v in db.items(): | |
db[k] = v | |
elif cmd == "testwait": | |
db = DB(args[0]) | |
db.clear() | |
print(db.waitget("250")) | |
elif cmd == "test": | |
test() | |
stress() | |
if __name__ == "__main__": | |
main() | |