File size: 4,465 Bytes
63deadc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import os

import pytest

try:
    import fastparquet
except ImportError:
    fastparquet = None
try:
    import pyarrow.parquet as pq
except ImportError:
    pq = None

from fsspec.core import url_to_fs
from fsspec.parquet import _get_parquet_byte_ranges, open_parquet_file

# Define `engine` fixture
FASTPARQUET_MARK = pytest.mark.skipif(not fastparquet, reason="fastparquet not found")
PYARROW_MARK = pytest.mark.skipif(not pq, reason="pyarrow not found")
ANY_ENGINE_MARK = pytest.mark.skipif(
    not (fastparquet or pq),
    reason="No parquet engine (fastparquet or pyarrow) found",
)


@pytest.fixture(
    params=[
        pytest.param("fastparquet", marks=FASTPARQUET_MARK),
        pytest.param("pyarrow", marks=PYARROW_MARK),
        pytest.param("auto", marks=ANY_ENGINE_MARK),
    ]
)
def engine(request):
    return request.param


@pytest.mark.parametrize("columns", [None, ["x"], ["x", "y"], ["z"]])
@pytest.mark.parametrize("max_gap", [0, 64])
@pytest.mark.parametrize("max_block", [64, 256_000_000])
@pytest.mark.parametrize("footer_sample_size", [8, 1_000])
@pytest.mark.parametrize("range_index", [True, False])
def test_open_parquet_file(
    tmpdir, engine, columns, max_gap, max_block, footer_sample_size, range_index
):
    # Pandas required for this test
    pd = pytest.importorskip("pandas")

    # Write out a simple DataFrame
    path = os.path.join(str(tmpdir), "test.parquet")
    nrows = 40
    df = pd.DataFrame(
        {
            "x": [i * 7 % 5 for i in range(nrows)],
            "y": [[0, i] for i in range(nrows)],  # list
            "z": [{"a": i, "b": "cat"} for i in range(nrows)],  # struct
        },
        index=pd.Index([10 * i for i in range(nrows)], name="myindex"),
    )
    if range_index:
        df = df.reset_index(drop=True)
        df.index.name = "myindex"
    df.to_parquet(path)

    # "Traditional read" (without `open_parquet_file`)
    expect = pd.read_parquet(path, columns=columns)

    # Use `_get_parquet_byte_ranges` to re-write a
    # place-holder file with all bytes NOT required
    # to read `columns` set to b"0". The purpose of
    # this step is to make sure the read will fail
    # if the correct bytes have not been accurately
    # selected by `_get_parquet_byte_ranges`. If this
    # test were reading from remote storage, we would
    # not need this logic to capture errors.
    fs = url_to_fs(path)[0]
    data = _get_parquet_byte_ranges(
        [path],
        fs,
        columns=columns,
        engine=engine,
        max_gap=max_gap,
        max_block=max_block,
        footer_sample_size=footer_sample_size,
    )[path]
    file_size = fs.size(path)
    with open(path, "wb") as f:
        f.write(b"0" * file_size)

        if footer_sample_size == 8:
            # We know 8 bytes is too small to include
            # the footer metadata, so there should NOT
            # be a key for the last 8 bytes of the file
            bad_key = (file_size - 8, file_size)
            assert bad_key not in data.keys()

        for (start, stop), byte_data in data.items():
            f.seek(start)
            f.write(byte_data)

    # Read back the modified file with `open_parquet_file`
    with open_parquet_file(
        path,
        columns=columns,
        engine=engine,
        max_gap=max_gap,
        max_block=max_block,
        footer_sample_size=footer_sample_size,
    ) as f:
        result = pd.read_parquet(f, columns=columns)

    # Check that `result` matches `expect`
    pd.testing.assert_frame_equal(expect, result)

    # Try passing metadata
    if engine == "fastparquet":
        # Should work fine for "fastparquet"
        pf = fastparquet.ParquetFile(path)
        with open_parquet_file(
            path,
            metadata=pf,
            columns=columns,
            engine=engine,
            max_gap=max_gap,
            max_block=max_block,
            footer_sample_size=footer_sample_size,
        ) as f:
            result = pd.read_parquet(f, columns=columns)
        pd.testing.assert_frame_equal(expect, result)
    elif engine == "pyarrow":
        # Should raise ValueError for "pyarrow"
        with pytest.raises(ValueError):
            open_parquet_file(
                path,
                metadata=["Not-None"],
                columns=columns,
                engine=engine,
                max_gap=max_gap,
                max_block=max_block,
                footer_sample_size=footer_sample_size,
            )