Spaces:
Runtime error
Runtime error
File size: 5,218 Bytes
63deadc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import pathlib
import pytest
import fsspec.core
from fsspec.compression import compr, register_compression
from fsspec.utils import compressions, infer_compression
def test_infer_custom_compression():
"""Inferred compression gets values from fsspec.compression.compr."""
assert infer_compression("fn.zip") == "zip"
assert infer_compression("fn.gz") == "gzip"
assert infer_compression("fn.unknown") is None
assert infer_compression("fn.test_custom") is None
assert infer_compression("fn.tst") is None
register_compression("test_custom", lambda f, **kwargs: f, "tst")
try:
assert infer_compression("fn.zip") == "zip"
assert infer_compression("fn.gz") == "gzip"
assert infer_compression("fn.unknown") is None
assert infer_compression("fn.test_custom") is None
assert infer_compression("fn.tst") == "test_custom"
# Duplicate registration in name or extension raises a value error.
with pytest.raises(ValueError):
register_compression("test_custom", lambda f, **kwargs: f, "tst")
with pytest.raises(ValueError):
register_compression("test_conflicting", lambda f, **kwargs: f, "tst")
assert "test_conflicting" not in compr
# ...but can be forced.
register_compression(
"test_conflicting", lambda f, **kwargs: f, "tst", force=True
)
assert infer_compression("fn.zip") == "zip"
assert infer_compression("fn.gz") == "gzip"
assert infer_compression("fn.unknown") is None
assert infer_compression("fn.test_custom") is None
assert infer_compression("fn.tst") == "test_conflicting"
finally:
del compr["test_custom"]
del compr["test_conflicting"]
del compressions["tst"]
def test_infer_uppercase_compression():
assert infer_compression("fn.ZIP") == "zip"
assert infer_compression("fn.GZ") == "gzip"
assert infer_compression("fn.UNKNOWN") is None
assert infer_compression("fn.TEST_UPPERCASE") is None
assert infer_compression("fn.TEST") is None
def test_lzma_compression_name():
pytest.importorskip("lzma")
assert infer_compression("fn.xz") == "xz"
assert infer_compression("fn.lzma") == "lzma"
def test_lz4_compression(tmpdir):
"""Infer lz4 compression for .lz4 files if lz4 is available."""
tmp_path = pathlib.Path(str(tmpdir))
lz4 = pytest.importorskip("lz4")
tmp_path.mkdir(exist_ok=True)
tdat = "foobar" * 100
with fsspec.core.open(
str(tmp_path / "out.lz4"), mode="wt", compression="infer"
) as outfile:
outfile.write(tdat)
compressed = (tmp_path / "out.lz4").open("rb").read()
assert lz4.frame.decompress(compressed).decode() == tdat
with fsspec.core.open(
str(tmp_path / "out.lz4"), mode="rt", compression="infer"
) as infile:
assert infile.read() == tdat
with fsspec.core.open(
str(tmp_path / "out.lz4"), mode="rt", compression="lz4"
) as infile:
assert infile.read() == tdat
def test_zstd_compression(tmpdir):
"""Infer zstd compression for .zst files if zstandard is available."""
tmp_path = pathlib.Path(str(tmpdir))
zstd = pytest.importorskip("zstandard")
tmp_path.mkdir(exist_ok=True)
tdat = "foobar" * 100
with fsspec.core.open(
str(tmp_path / "out.zst"), mode="wt", compression="infer"
) as outfile:
outfile.write(tdat)
compressed = (tmp_path / "out.zst").open("rb").read()
assert zstd.ZstdDecompressor().decompress(compressed, len(tdat)).decode() == tdat
with fsspec.core.open(
str(tmp_path / "out.zst"), mode="rt", compression="infer"
) as infile:
assert infile.read() == tdat
with fsspec.core.open(
str(tmp_path / "out.zst"), mode="rt", compression="zstd"
) as infile:
assert infile.read() == tdat
# fails in https://github.com/fsspec/filesystem_spec/issues/725
infile = fsspec.core.open(
str(tmp_path / "out.zst"), mode="rb", compression="infer"
).open()
infile.close()
def test_snappy_compression(tmpdir):
"""No registered compression for snappy, but can be specified."""
tmp_path = pathlib.Path(str(tmpdir))
snappy = pytest.importorskip("snappy")
tmp_path.mkdir(exist_ok=True)
tdat = "foobar" * 100
# Snappy isn't inferred.
with fsspec.core.open(
str(tmp_path / "out.snappy"), mode="wt", compression="infer"
) as outfile:
outfile.write(tdat)
assert (tmp_path / "out.snappy").open("rb").read().decode() == tdat
# but can be specified.
with fsspec.core.open(
str(tmp_path / "out.snappy"), mode="wt", compression="snappy"
) as outfile:
outfile.write(tdat)
compressed = (tmp_path / "out.snappy").open("rb").read()
assert snappy.StreamDecompressor().decompress(compressed).decode() == tdat
with fsspec.core.open(
str(tmp_path / "out.snappy"), mode="rb", compression="infer"
) as infile:
assert infile.read() == compressed
with fsspec.core.open(
str(tmp_path / "out.snappy"), mode="rt", compression="snappy"
) as infile:
assert infile.read() == tdat
|