geowatch.utils.util_fsspec module

fsspec wrappers that should make working with S3 / the local file system seemless.

Todo

Someone must have already implemented this somewhere. Find that to either use directly or as a reference.

Look into:

https://github.com/fsspec/universal_pathlib https://pypi.org/project/pathlibfs/

class geowatch.utils.util_fsspec.FSPath(path, *, fs=None)[source]

Bases: str

Provide a pathlib.Path-like way of interacting with fsspec.

This has a few notable differences with pathlib.Path. We inherit from str because pathlib.Path semantics can break protocols sections of URIs. This means we have to use os.path functions to implement things like FSPath.relative_to() and FSPath.joinpath() (which behave differently than pathlib)

Note

Not all of the fsspec / pathlib operations are currently implemented, add as needed.

Example

>>> cwd = FSPath.coerce('.')
>>> print(cwd)
>>> print(cwd.fs)
classmethod get_filesystem_class()[source]
property fs: AbstractFileSystem
classmethod coerce(path)[source]

Determine which backend to use automatically

Example

>>> path2 = FSPath.coerce('/local/path')
>>> print(f'path2={path2}')
>>> assert path2.is_local()
>>> # xdoctest: +REQUIRES(module:s3fs)
>>> path1 = FSPath.coerce('s3://demo_bucket')
>>> print(f'path1={path1}')
>>> assert path1.is_remote()
property path

By default the string representation is assumed to be the entire path, however, for subclasses like SSHPath it is necessary to overwrite this so the core object represents the entire URI, but this just returns the path part, which is what the fsspec.FileSystem object expects.

relative_to(other)[source]
is_remote()[source]
is_local()[source]
open(mode='rb', block_size=None, cache_options=None, compression=None)[source]

Example

>>> from geowatch.utils.util_fsspec import *  # NOQA
>>> from geowatch.utils import util_fsspec
>>> dpath = util_fsspec.LocalPath.appdir('geowatch/fsspec/tests/open').ensuredir()
>>> fpath = dpath / 'file.txt'
>>> file = fpath.open(mode='w')
>>> file.write('hello world')
>>> file.close()
>>> assert fpath.read_text() == fpath.open('r').read()
ls(detail=False, **kwargs)[source]

Example

>>> from geowatch.utils.util_fsspec import *  # NOQA
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('geowatch', 'tests', 'fsspec', 'ls').ensuredir()
>>> (dpath / 'file1').touch()
>>> (dpath / 'file2').touch()
>>> (dpath / 'subfolder').ensuredir()
>>> self = FSPath.coerce(dpath)
>>> results = self.ls()
>>> assert sorted(results) == sorted(map(str, dpath.ls()))
touch(truncate=False, **kwargs)[source]

Example

>>> import ubelt as ub
>>> dpath = ub.Path.appdir('geowatch', 'tests', 'fsspec', 'touch').ensuredir()
>>> dpath_ = FSPath.coerce(dpath)
>>> self = (dpath_ / 'file')
>>> self.touch()
>>> assert self.exists()
>>> assert (dpath / 'file').exists()
move(path2, recursive='auto', maxdepth=None, idempotent=True, verbose=1, **kwargs)[source]

Note: this may work differently than ubelt.Path.move, ideally we should rectify this. The difference case is what happens when you move:

./path/to/dir -> ./path/to/other/dir

Does ./path/to/dir merge into ./path/to/other/dir, or do you get all of the src contents in ./path/to/other/dir/dir?

delete(recursive='auto', maxdepth=True, verbose=1)[source]

Deletes this file or this directory (and all of its contents)

Unlike fs.delete, this will not error if the file doesnt exist. See FSPath.rm() if you want standard error-ing behavior.

rm(recursive='auto', maxdepth=True)[source]

Deletes this file or this directory (and all of its contents)

mkdir(create_parents=True, **kwargs)[source]

Note

does nothing on some filesystems (e.g. S3)

stat()[source]
is_dir()[source]
is_file()[source]
exists()[source]
write_text(value, **kwargs)[source]
read_text(**kwargs)[source]
walk(include_protocol='auto', **kwargs)[source]
Yields:

Tuple[Self, List[str], List[str]] - root, dir names, file names

property parent

Example:

>>> self = FSPath.coerce('foo/bar/baz.jaz.raz')
>>> assert str(ub.Path(self).parent) == self.parent
>>> assert self.parent == 'foo/bar'
property name

Example:

>>> self = FSPath.coerce('foo/bar/baz.jaz')
>>> assert ub.Path(self).name == self.name
>>> assert self.name == 'baz.jaz'
property stem

Example:

>>> self = FSPath.coerce('foo/bar/baz.jaz')
>>> assert ub.Path(self).stem == self.stem
>>> assert self.stem == 'baz'
property suffix

Example:

>>> self = FSPath.coerce('foo/bar/baz.jaz.raz')
>>> assert ub.Path(self).suffix == self.suffix
>>> assert self.suffix == '.raz'
property suffixes

Example:

>>> self = FSPath.coerce('foo/bar/baz.jaz.raz')
>>> assert ub.Path(self).suffixes == self.suffixes
>>> assert self.suffixes == ['.jaz', '.raz']
property parts

Example:

>>> self = FSPath.coerce('foo/bar/baz.jaz.raz')
>>> assert ub.Path(self).parts == self.parts
>>> assert self.parts == ('foo', 'bar', 'baz.jaz.raz')
copy(dst, recursive='auto', maxdepth=None, on_error=None, callback=None, verbose=1, idempotent=True, overwrite=False, **kwargs)[source]

Copies this file or directory to dst

Abtracts fsspec copy / put / get.

If dst ends with a “/”, it will be assumed to be a directory, and target files will go within.

Unlike fsspec, this attempts to be idempotent. See [FSSpecCopy].

Parameters:
  • dst (FSPath) – location to copy to

  • recursive (bool | str) – If ‘auto’ (the default), attempt to determine if this is a directory or a file. Set to True if it is a directory and False otherwise. If you know what this is beforehand, you can set it explicitly to be more efficient.

  • maxdepth (int | None) – only makes sense when recursive is True

  • callback (None | callable) – for put / get cases

  • on_error (str) – either “raise”, “ignore”. Only applicable in the “copy” case.

  • idempotent (bool) – if False, use standard fsspec behavior, otherwise attempt to be idempotent.

  • overwrite (bool) – if True, overwrite existing data instead of erroring. Defaults to False.

Note

There are different functions depending on if we are going from remote->remote (copy), local->remote (put), or remote->local (get)

References

Example

>>> from geowatch.utils import util_fsspec
>>> dpath = util_fsspec.LocalPath.appdir('geowatch/fsspec/tests/copy').ensuredir()
>>> src_dpath = (dpath / 'src').ensuredir()
>>> for i in range(100):
...     (src_dpath / 'file_{i:03d}.txt').write_text('hello world' * 100)
>>> dst_dpath = (dpath / 'dst')
>>> dst_dpath.delete()
>>> src_dpath.copy(dst_dpath, verbose=3)
>>> dst_dpath.delete()
>>> if 0:
>>>     from fsspec.callbacks import TqdmCallback
>>>     callback = TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"})
>>>     src_dpath.copy(dst_dpath, callback=callback)
joinpath(*others)[source]
tree(max_files=100, dirblocklist=None, show_nfiles='auto', return_text=False, return_tree=True, pathstyle='name', max_depth=None, with_type=False, abs_root_label=True, colors=False)[source]

Filesystem tree representation

Like the unix util tree, but allow writing numbers of files per directory when given -d option

Ported from xdev.misc.tree_repr

Todo

instead of building the networkx structure and then waiting to display everything, build and display simultaniously. Will require using a modified version of write_network_text

Parameters:
  • max_files (int | None) – maximum files to print before supressing a directory

  • pathstyle (str) – can be rel, name, or abs

  • return_tree (bool) – if True return the tree

  • return_text (bool) – if True return the text

  • maxdepth (int | None) – maximum depth to descend

  • abs_root_label (bool) – if True force the root to always be absolute

  • colors (bool) – if True use rich

class geowatch.utils.util_fsspec.LocalPath(path, *, fs=None)[source]

Bases: FSPath

The implementation for the local filesystem

CommandLine

xdoctest -m geowatch.utils.util_fsspec LocalPath
xdoctest geowatch/utils/util_fsspec.py

Example

>>> from geowatch.utils.util_fsspec import *  # NOQA
>>> dpath = ub.Path.appdir('geowatch/tests/util_fsspec/demo')
>>> dpath.delete().ensuredir()
>>> (dpath / 'file1.txt').write_text('data')
>>> (dpath / 'dpath').ensuredir()
>>> (dpath / 'dpath/file2.txt').write_text('data')
>>> self = LocalPath(dpath).absolute()
>>> print(f'self={self}')
>>> print(self.ls())
>>> info = self.tree()
>>> fsspec_dpath = (dpath / 'dpath')
>>> fsspec_fpath = (dpath / 'file1.txt')
>>> pathlib_dpath = ub.Path(dpath / 'pathlib_dpath')
>>> pathlib_fpath = ub.Path(dpath / 'pathlib_fpath')
>>> assert not pathlib_dpath.exists()
>>> assert not pathlib_fpath.exists()
>>> fsspec_dpath.copy(pathlib_dpath)
>>> fsspec_fpath.copy(pathlib_fpath)
>>> assert pathlib_dpath.exists()
>>> assert pathlib_fpath.exists()
ensuredir(mode=511)[source]
absolute()[source]
classmethod appdir(*args, **kw)[source]
class geowatch.utils.util_fsspec.MemoryPath(path, *, fs=None)[source]

Bases: FSPath

class geowatch.utils.util_fsspec.RemotePath(path, *, fs=None)[source]

Bases: FSPath

Abstract implementation for all remote filesystems

class geowatch.utils.util_fsspec.S3Path(path, *, fs=None)[source]

Bases: RemotePath

The specific S3 remote filesystem.

Control credentials with the environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN.

A single S3 filesystem is used by default, but you can work with multiple of them if you pass in the fs object. E.g.

fs = S3Path._new_fs(profile=’iarpa’) self = S3Path(‘s3://kitware-smart-watch-data/’, fs=fs) self.ls()

# Can also do S3Path.register_bucket(‘s3://kitware-smart-watch-data’, profile=’iarpa’) self = S3Path.coerce(‘s3://kitware-smart-watch-data/’) self.ls()

# Demo of multiple registered buckets S3Path.register_bucket(‘s3://usgs-landsat-ard’, profile=’iarpa’, requester_pays=True) self = S3Path.coerce(‘s3://usgs-landsat-ard/collection02’) self.ls()

self = S3Path.coerce(‘/vsis3/usgs-landsat-ard/collection02’) self.ls()

SeeAlso:

geowatch.heuristics.register_known_fsspec_s3_buckets

To work with different S3 filesystems,

See [S3FS_Docs].

Requirements:

s3fs>=2023.6.0

References

Example

>>> # xdoctest: +REQUIRES(module:s3fs)
>>> fs = S3Path._new_fs()
ensuredir(mode=511)[source]
classmethod register_bucket(bucket, **kwargs)[source]
classmethod coerce(path)[source]
class geowatch.utils.util_fsspec.SSHPath(path, *, fs=None)[source]

Bases: RemotePath

property host