File size: 1,467 Bytes
63deadc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os
import shlex
import subprocess
import time

import pytest

import fsspec

pytest.importorskip("notebook")
requests = pytest.importorskip("requests")


@pytest.fixture()
def jupyter(tmpdir):
    tmpdir = str(tmpdir)
    os.environ["JUPYTER_TOKEN"] = "blah"
    try:
        cmd = f'jupyter notebook --notebook-dir="{tmpdir}" --no-browser --port=5566'
        P = subprocess.Popen(shlex.split(cmd))
    except FileNotFoundError:
        pytest.skip("notebook not installed correctly")
    try:
        timeout = 15
        while True:
            try:
                r = requests.get("http://localhost:5566/?token=blah")
                r.raise_for_status()
                break
            except (requests.exceptions.BaseHTTPError, OSError):
                time.sleep(0.1)
                timeout -= 0.1
                if timeout < 0:
                    pytest.xfail("Timed out for jupyter")
        yield "http://localhost:5566/?token=blah", tmpdir
    finally:
        P.terminate()


def test_simple(jupyter):
    url, d = jupyter
    fs = fsspec.filesystem("jupyter", url=url)
    assert fs.ls("") == []

    fs.pipe("afile", b"data")
    assert fs.cat("afile") == b"data"
    assert "afile" in os.listdir(d)

    with fs.open("bfile", "wb") as f:
        f.write(b"more")
    with fs.open("bfile", "rb") as f:
        assert f.read() == b"more"

    assert fs.info("bfile")["size"] == 4
    fs.rm("afile")

    assert "afile" not in os.listdir(d)