Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs: introduce tree.find() #5878

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dvc/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,12 @@ def walk_files(self, path_info, **kwargs):
"""
raise NotImplementedError

def ls(self, path_info, detail=False, **kwargs):
def ls(self, path_info, detail=False):
raise RemoteActionNotImplemented("ls", self.scheme)

def find(self, path_info, detail=False):
raise RemoteActionNotImplemented("find", self.scheme)

def is_empty(self, path_info):
return False

Expand Down
35 changes: 19 additions & 16 deletions dvc/fs/fsspec_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ def _strip_bucket(self, entry):

return path or bucket

def _strip_buckets(self, entries, detail, prefix=None):
def _strip_buckets(self, entries, detail=False):
for entry in entries:
if detail:
entry = self._entry_hook(entry.copy())
entry["name"] = self._strip_bucket(entry["name"])
else:
entry = self._strip_bucket(
f"{prefix}/{entry}" if prefix else entry
)
entry = self._strip_bucket(entry)
yield entry

def _entry_hook(self, entry):
Expand Down Expand Up @@ -93,24 +91,29 @@ def copy(self, from_info, to_info):
def exists(self, path_info, use_dvcignore=False):
return self.fs.exists(self._with_bucket(path_info))

def ls(
self, path_info, detail=False, recursive=False
): # pylint: disable=arguments-differ
if self.isdir(path_info) and self.is_empty(path_info):
return None
def ls(self, path_info, detail=False):
path = self._with_bucket(path_info)
files = self.fs.ls(path, detail=detail)
yield from self._strip_buckets(files, detail=detail)

def find(self, path_info, detail=False):
path = self._with_bucket(path_info)
if recursive:
for root, _, files in self.fs.walk(path, detail=detail):
if detail:
files = files.values()
yield from self._strip_buckets(files, detail, prefix=root)
files = self.fs.find(path, detail=detail)
if detail:
files = files.values()

# When calling find() on a file, it returns the same file in a list.
# For object-based storages, the same behavior applies to empty
# directories since they are represented as files. This condition
# checks whether we should yield an empty list (if it is an empty
# directory) or just yield the file itself.
if len(files) == 1 and files[0] == path and self.isdir(path_info):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is intentionally omitted from the bare ls() since its main objective is to support directory listing when calling walk_files() and I didn't want to repeat it in 2 different places.

return None

yield from self._strip_buckets(self.ls(path, detail=detail), detail)
yield from self._strip_buckets(files, detail=detail)

def walk_files(self, path_info, **kwargs):
for file in self.ls(path_info, recursive=True):
for file in self.find(path_info):
yield path_info.replace(path=file)

def remove(self, path_info):
Expand Down
14 changes: 4 additions & 10 deletions dvc/fs/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ def _gdrive_list_ids(self, query_ids):
query = f"({query}) and trashed=false"
return self._gdrive_list(query)

def _ls_recursive(self, path_info, detail=False):
def find(self, path_info, detail=False):
root_path = path_info.path
seen_paths = set()

Expand Down Expand Up @@ -569,13 +569,7 @@ def _ls_recursive(self, path_info, detail=False):
else:
yield item_path

def ls(
self, path_info, detail=False, recursive=False
): # pylint: disable=arguments-differ
if recursive:
yield from self._ls_recursive(path_info, detail=detail)
return None

def ls(self, path_info, detail=False):
cached = path_info.path in self._ids_cache["dirs"]
if cached:
dir_ids = self._ids_cache["dirs"][path_info.path]
Expand Down Expand Up @@ -605,8 +599,8 @@ def ls(
self._cache_path_id(root_path, *dir_ids)

def walk_files(self, path_info, **kwargs):
for filename in self.ls(path_info, recursive=True):
yield path_info.replace(path=filename)
for file in self.find(path_info):
yield path_info.replace(path=file)

def remove(self, path_info):
item_id = self._get_item_id(path_info)
Expand Down
6 changes: 1 addition & 5 deletions dvc/fs/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,7 @@ def walk_files(self, path_info, **kwargs):

yield path_info.replace(path=fname)

def ls(
self, path_info, detail=False, recursive=False
): # pylint: disable=arguments-differ
assert recursive

def find(self, path_info, detail=False):
with self._get_bucket(path_info.bucket) as bucket:
for obj_summary in bucket.objects.filter(Prefix=path_info.path):
if detail:
Expand Down
46 changes: 16 additions & 30 deletions dvc/fs/webdav.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,33 +151,27 @@ def isdir(self, path_info):
# Use webdav is_dir to test whether path points to a directory
return self._client.is_dir(path_info.path)

# Yields path info to all files
def walk_files(self, path_info, **kwargs):
# Check whether directory exists
if not self.exists(path_info):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed like a legacy check since none of the other implementations calls exists() before walk_files() (even the ones that doesn't raise proper errors like s3 etc, I am not sure whether it is still needed or not)

return

# Collect directories
dirs = deque([path_info.path])
for path in self.find(path_info):
yield path_info.replace(path=path)

# Iterate all directories found so far
while dirs:
# Iterate directory content
for entry in self._client.list(dirs.pop(), get_info=True):
# Construct path_info to entry
info = path_info.replace(path=entry["path"])

# Check whether entry is a directory
def ls(self, path_info, detail=False):
for entry in self._client.list(path_info.path):
path = entry["path"]
if detail:
if entry["isdir"]:
# Append new found directory to directory list
dirs.append(info.path)
yield {"type": "directory", "name": path}
else:
# Yield path info to non directory
yield info
yield {
"type": "file",
"name": path,
"size": entry["size"],
"etag": entry["etag"],
}
else:
yield path

def ls(
self, path_info, detail=False, recursive=False
): # pylint: disable=arguments-differ
def find(self, path_info, detail=False):
dirs = deque([path_info.path])

while dirs:
Expand All @@ -197,14 +191,6 @@ def ls(
else:
yield path

if not recursive:
for entry in dirs:
if detail:
yield {"type": "directory", "name": entry}
else:
yield entry
return None

# Removes file/directory
def remove(self, path_info):
# Use webdav client clean (DELETE) method to remove file/directory
Expand Down
2 changes: 1 addition & 1 deletion dvc/objects/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def _build_objects(path_info, fs, name, odb, state, upload, **kwargs):

def _iter_objects(path_info, fs, name, odb, state, upload, **kwargs):
if not upload and name in fs.DETAIL_FIELDS:
for details in fs.ls(path_info, recursive=True, detail=True):
for details in fs.find(path_info, detail=True):
file_info = path_info.replace(path=details["name"])
hash_info = HashInfo(
name, details[name], size=details.get("size"),
Expand Down
9 changes: 4 additions & 5 deletions tests/func/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,13 @@ def test_fs_ls(dvc, cloud):
pytest.lazy_fixture("gdrive"),
],
)
def test_fs_ls_recursive(dvc, cloud):
def test_fs_find_recursive(dvc, cloud):
cloud.gen({"data": {"foo": "foo", "bar": {"baz": "baz"}, "quux": "quux"}})
fs = get_cloud_fs(dvc, **cloud.config)
path_info = fs.path_info

assert {
os.path.basename(file_key)
for file_key in fs.ls(path_info / "data", recursive=True)
os.path.basename(file_key) for file_key in fs.find(path_info / "data")
} == {"foo", "baz", "quux"}


Expand All @@ -339,12 +338,12 @@ def test_fs_ls_recursive(dvc, cloud):
pytest.lazy_fixture("webdav"),
],
)
def test_fs_ls_with_etag(dvc, cloud):
def test_fs_find_with_etag(dvc, cloud):
cloud.gen({"data": {"foo": "foo", "bar": {"baz": "baz"}, "quux": "quux"}})
fs = get_cloud_fs(dvc, **cloud.config)
path_info = fs.path_info

for details in fs.ls(path_info / "data", recursive=True, detail=True):
for details in fs.find(path_info / "data", detail=True):
assert (
fs.info(path_info.replace(path=details["name"]))["etag"]
== details["etag"]
Expand Down