|
import requests |
|
|
|
from ..spec import AbstractFileSystem |
|
from ..utils import infer_storage_options |
|
from .memory import MemoryFile |
|
|
|
|
|
class GistFileSystem(AbstractFileSystem): |
|
""" |
|
Interface to files in a single GitHub Gist. |
|
|
|
Provides read-only access to a gist's files. Gists do not contain |
|
subdirectories, so file listing is straightforward. |
|
|
|
Parameters |
|
---------- |
|
gist_id : str |
|
The ID of the gist you want to access (the long hex value from the URL). |
|
filenames : list[str] (optional) |
|
If provided, only make a file system representing these files, and do not fetch |
|
the list of all files for this gist. |
|
sha : str (optional) |
|
If provided, fetch a particular revision of the gist. If omitted, |
|
the latest revision is used. |
|
username : str (optional) |
|
GitHub username for authentication (required if token is given). |
|
token : str (optional) |
|
GitHub personal access token (required if username is given). |
|
timeout : (float, float) or float, optional |
|
Connect and read timeouts for requests (default 60s each). |
|
kwargs : dict |
|
Stored on `self.request_kw` and passed to `requests.get` when fetching Gist |
|
metadata or reading ("opening") a file. |
|
""" |
|
|
|
protocol = "gist" |
|
gist_url = "https://api.github.com/gists/{gist_id}" |
|
gist_rev_url = "https://api.github.com/gists/{gist_id}/{sha}" |
|
|
|
def __init__( |
|
self, |
|
gist_id, |
|
filenames=None, |
|
sha=None, |
|
username=None, |
|
token=None, |
|
timeout=None, |
|
**kwargs, |
|
): |
|
super().__init__() |
|
self.gist_id = gist_id |
|
self.filenames = filenames |
|
self.sha = sha |
|
if (username is None) ^ (token is None): |
|
|
|
if username or token: |
|
raise ValueError("Auth requires both username and token, or neither.") |
|
self.username = username |
|
self.token = token |
|
self.request_kw = kwargs |
|
|
|
self.timeout = timeout if timeout is not None else (60, 60) |
|
|
|
|
|
self.dircache[""] = self._fetch_file_list() |
|
|
|
@property |
|
def kw(self): |
|
"""Auth parameters passed to 'requests' if we have username/token.""" |
|
if self.username is not None and self.token is not None: |
|
return {"auth": (self.username, self.token), **self.request_kw} |
|
return self.request_kw |
|
|
|
def _fetch_gist_metadata(self): |
|
""" |
|
Fetch the JSON metadata for this gist (possibly for a specific revision). |
|
""" |
|
if self.sha: |
|
url = self.gist_rev_url.format(gist_id=self.gist_id, sha=self.sha) |
|
else: |
|
url = self.gist_url.format(gist_id=self.gist_id) |
|
|
|
r = requests.get(url, timeout=self.timeout, **self.kw) |
|
if r.status_code == 404: |
|
raise FileNotFoundError( |
|
f"Gist not found: {self.gist_id}@{self.sha or 'latest'}" |
|
) |
|
r.raise_for_status() |
|
return r.json() |
|
|
|
def _fetch_file_list(self): |
|
""" |
|
Returns a list of dicts describing each file in the gist. These get stored |
|
in self.dircache[""]. |
|
""" |
|
meta = self._fetch_gist_metadata() |
|
if self.filenames: |
|
available_files = meta.get("files", {}) |
|
files = {} |
|
for fn in self.filenames: |
|
if fn not in available_files: |
|
raise FileNotFoundError(fn) |
|
files[fn] = available_files[fn] |
|
else: |
|
files = meta.get("files", {}) |
|
|
|
out = [] |
|
for fname, finfo in files.items(): |
|
if finfo is None: |
|
|
|
continue |
|
|
|
out.append( |
|
{ |
|
"name": fname, |
|
"type": "file", |
|
"size": finfo.get("size", 0), |
|
"raw_url": finfo.get("raw_url"), |
|
} |
|
) |
|
return out |
|
|
|
@classmethod |
|
def _strip_protocol(cls, path): |
|
""" |
|
Remove 'gist://' from the path, if present. |
|
""" |
|
|
|
|
|
|
|
path = infer_storage_options(path).get("path", path) |
|
return path.lstrip("/") |
|
|
|
@staticmethod |
|
def _get_kwargs_from_urls(path): |
|
""" |
|
Parse 'gist://' style URLs into GistFileSystem constructor kwargs. |
|
For example: |
|
gist://:TOKEN@<gist_id>/file.txt |
|
gist://username:TOKEN@<gist_id>/file.txt |
|
""" |
|
so = infer_storage_options(path) |
|
out = {} |
|
if "username" in so and so["username"]: |
|
out["username"] = so["username"] |
|
if "password" in so and so["password"]: |
|
out["token"] = so["password"] |
|
if "host" in so and so["host"]: |
|
|
|
out["gist_id"] = so["host"] |
|
|
|
|
|
if "path" in so and so["path"]: |
|
path_parts = so["path"].rsplit("/", 2)[-2:] |
|
if len(path_parts) == 2: |
|
if path_parts[0]: |
|
out["sha"] = path_parts[0] |
|
if path_parts[1]: |
|
out["filenames"] = [path_parts[1]] |
|
|
|
return out |
|
|
|
def ls(self, path="", detail=False, **kwargs): |
|
""" |
|
List files in the gist. Gists are single-level, so any 'path' is basically |
|
the filename, or empty for all files. |
|
|
|
Parameters |
|
---------- |
|
path : str, optional |
|
The filename to list. If empty, returns all files in the gist. |
|
detail : bool, default False |
|
If True, return a list of dicts; if False, return a list of filenames. |
|
""" |
|
path = self._strip_protocol(path or "") |
|
|
|
if path == "": |
|
results = self.dircache[""] |
|
else: |
|
|
|
all_files = self.dircache[""] |
|
results = [f for f in all_files if f["name"] == path] |
|
if not results: |
|
raise FileNotFoundError(path) |
|
if detail: |
|
return results |
|
else: |
|
return sorted(f["name"] for f in results) |
|
|
|
def _open(self, path, mode="rb", block_size=None, **kwargs): |
|
""" |
|
Read a single file from the gist. |
|
""" |
|
if mode != "rb": |
|
raise NotImplementedError("GitHub Gist FS is read-only (no write).") |
|
|
|
path = self._strip_protocol(path) |
|
|
|
matches = [f for f in self.dircache[""] if f["name"] == path] |
|
if not matches: |
|
raise FileNotFoundError(path) |
|
finfo = matches[0] |
|
|
|
raw_url = finfo.get("raw_url") |
|
if not raw_url: |
|
raise FileNotFoundError(f"No raw_url for file: {path}") |
|
|
|
r = requests.get(raw_url, timeout=self.timeout, **self.kw) |
|
if r.status_code == 404: |
|
raise FileNotFoundError(path) |
|
r.raise_for_status() |
|
return MemoryFile(path, None, r.content) |
|
|
|
def cat(self, path, recursive=False, on_error="raise", **kwargs): |
|
""" |
|
Return {path: contents} for the given file or files. If 'recursive' is True, |
|
and path is empty, returns all files in the gist. |
|
""" |
|
paths = self.expand_path(path, recursive=recursive) |
|
out = {} |
|
for p in paths: |
|
try: |
|
with self.open(p, "rb") as f: |
|
out[p] = f.read() |
|
except FileNotFoundError as e: |
|
if on_error == "raise": |
|
raise e |
|
elif on_error == "omit": |
|
pass |
|
else: |
|
out[p] = e |
|
return out |
|
|