Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching file/dir metadata for GSClient #275

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 71 additions & 22 deletions cloudpathlib/gs/gsclient.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
import dataclasses
from datetime import datetime
import mimetypes
import os
from pathlib import Path, PurePosixPath
from typing import Any, Callable, Dict, Iterable, Optional, TYPE_CHECKING, Tuple, Union
from typing import (
Any,
Callable,
Dict,
Iterable,
Optional,
TYPE_CHECKING,
Tuple,
Union,
MutableMapping,
)
from weakref import WeakKeyDictionary

from ..client import Client, register_client_class
from ..cloudpath import implementation_registry
Expand All @@ -13,13 +25,18 @@
from google.auth.credentials import Credentials

from google.auth.exceptions import DefaultCredentialsError
from google.cloud.storage import Client as StorageClient
from google.cloud.storage import Client as StorageClient, Bucket


except ModuleNotFoundError:
implementation_registry["gs"].dependencies_loaded = False


@dataclasses.dataclass
class PathMetadata:
is_file_or_dir: Optional[str]


@register_client_class("gs")
class GSClient(Client):
"""Client class for Google Cloud Storage which handles authentication with GCP for
Expand Down Expand Up @@ -85,15 +102,17 @@ def __init__(
except DefaultCredentialsError:
self.client = StorageClient.create_anonymous_client()

self._metadata_cache: MutableMapping[GSPath, PathMetadata] = WeakKeyDictionary()
super().__init__(local_cache_dir=local_cache_dir, content_type_method=content_type_method)

def _get_metadata(self, cloud_path: GSPath) -> Optional[Dict[str, Any]]:
bucket = self.client.bucket(cloud_path.bucket)
bucket: Bucket = self.client.bucket(cloud_path.bucket)
blob = bucket.get_blob(cloud_path.blob)

if blob is None:
return None
else:
self._set_metadata_cache(cloud_path, "file")
return {
"etag": blob.etag,
"size": blob.size,
Expand All @@ -102,23 +121,25 @@ def _get_metadata(self, cloud_path: GSPath) -> Optional[Dict[str, Any]]:
}

def _download_file(self, cloud_path: GSPath, local_path: Union[str, os.PathLike]) -> Path:
bucket = self.client.bucket(cloud_path.bucket)
bucket: Bucket = self.client.bucket(cloud_path.bucket)
blob = bucket.get_blob(cloud_path.blob)

local_path = Path(local_path)
self._set_metadata_cache(cloud_path, "file")

blob.download_to_filename(local_path)
return local_path
return Path(local_path)

def _is_file_or_dir(self, cloud_path: GSPath) -> Optional[str]:
# short-circuit the root-level bucket
if not cloud_path.blob:
return "dir"
if cloud_path in self._metadata_cache:
return self._metadata_cache[cloud_path].is_file_or_dir

bucket = self.client.bucket(cloud_path.bucket)
bucket: Bucket = self.client.bucket(cloud_path.bucket)
blob = bucket.get_blob(cloud_path.blob)

if blob is not None:
self._set_metadata_cache(cloud_path, "file")
return "file"
else:
prefix = cloud_path.blob
Expand All @@ -130,15 +151,17 @@ def _is_file_or_dir(self, cloud_path: GSPath) -> Optional[str]:

# at least one key with the prefix of the directory
if bool(list(f)):
self._set_metadata_cache(cloud_path, "dir")
return "dir"
else:
self._set_metadata_cache(cloud_path, None)
return None

def _exists(self, cloud_path: GSPath) -> bool:
return self._is_file_or_dir(cloud_path) in ["file", "dir"]
return self._is_file_or_dir(cloud_path) is not None

def _list_dir(self, cloud_path: GSPath, recursive=False) -> Iterable[Tuple[GSPath, bool]]:
bucket = self.client.bucket(cloud_path.bucket)
bucket: Bucket = self.client.bucket(cloud_path.bucket)

prefix = cloud_path.blob
if prefix and not prefix.endswith("/"):
Expand All @@ -154,13 +177,15 @@ def _list_dir(self, cloud_path: GSPath, recursive=False) -> Iterable[Tuple[GSPat

# if we haven't surfaced thei directory already
if parent not in yielded_dirs and str(parent) != ".":

# skip if not recursive and this is beyond our depth
if not recursive and "/" in str(parent):
continue

path = self.CloudPath(f"gs://{cloud_path.bucket}/{prefix}{parent}")
self._set_metadata_cache(path, "dir")

yield (
self.CloudPath(f"gs://{cloud_path.bucket}/{prefix}{parent}"),
path,
True, # is a directory
)
yielded_dirs.add(parent)
Expand All @@ -169,12 +194,15 @@ def _list_dir(self, cloud_path: GSPath, recursive=False) -> Iterable[Tuple[GSPat
if not recursive and "/" in o.name[len(prefix) :]:
continue

yield (self.CloudPath(f"gs://{cloud_path.bucket}/{o.name}"), False) # is a file
path = self.CloudPath(f"gs://{cloud_path.bucket}/{o.name}")
self._set_metadata_cache(path, "file")

yield path, False # is a file

def _move_file(self, src: GSPath, dst: GSPath, remove_src: bool = True) -> GSPath:
# just a touch, so "REPLACE" metadata
if src == dst:
bucket = self.client.bucket(src.bucket)
bucket: Bucket = self.client.bucket(src.bucket)
blob = bucket.get_blob(src.blob)

# See https:/googleapis/google-cloud-python/issues/1185#issuecomment-431537214
Expand All @@ -185,36 +213,41 @@ def _move_file(self, src: GSPath, dst: GSPath, remove_src: bool = True) -> GSPat
blob.patch()

else:
src_bucket = self.client.bucket(src.bucket)
dst_bucket = self.client.bucket(dst.bucket)
src_bucket: Bucket = self.client.bucket(src.bucket)
dst_bucket: Bucket = self.client.bucket(dst.bucket)

src_blob = src_bucket.get_blob(src.blob)
src_bucket.copy_blob(src_blob, dst_bucket, dst.blob)
self._set_metadata_cache(dst, "file")

if remove_src:
self._set_metadata_cache(src, None)
src_blob.delete()

return dst

def _remove(self, cloud_path: GSPath, missing_ok: bool = True) -> None:
file_or_dir = self._is_file_or_dir(cloud_path)
if file_or_dir == "dir":
blobs = [
b.blob for b, is_dir in self._list_dir(cloud_path, recursive=True) if not is_dir
files = [
path for path, is_dir in self._list_dir(cloud_path, recursive=True) if not is_dir
]
bucket = self.client.bucket(cloud_path.bucket)
for blob in blobs:
bucket.get_blob(blob).delete()
bucket: Bucket = self.client.bucket(cloud_path.bucket)
for path in files:
bucket.get_blob(path.blob).delete()
self._set_metadata_cache(path, None)
self._set_metadata_cache(cloud_path, None)
elif file_or_dir == "file":
bucket = self.client.bucket(cloud_path.bucket)
bucket.get_blob(cloud_path.blob).delete()
self._set_metadata_cache(cloud_path, None)
else:
# Does not exist
if not missing_ok:
raise FileNotFoundError(f"File does not exist: {cloud_path}")

def _upload_file(self, local_path: Union[str, os.PathLike], cloud_path: GSPath) -> GSPath:
bucket = self.client.bucket(cloud_path.bucket)
bucket: Bucket = self.client.bucket(cloud_path.bucket)
blob = bucket.blob(cloud_path.blob)

extra_args = {}
Expand All @@ -223,7 +256,23 @@ def _upload_file(self, local_path: Union[str, os.PathLike], cloud_path: GSPath)
extra_args["content_type"] = content_type

blob.upload_from_filename(str(local_path), **extra_args)
self._set_metadata_cache(cloud_path, "file")
return cloud_path

def _set_metadata_cache(self, cloud_path: GSPath, is_file_or_dir: Optional[str]) -> None:
if is_file_or_dir is None:
self._metadata_cache[cloud_path] = PathMetadata(is_file_or_dir=is_file_or_dir)
# If a file/dir is now known to not exist, its parent directories may no longer exist
# either, since cloud directories only exist if they have a file in them. Since their
# state is no longer known we remove them from the cache.
for parent in cloud_path.parents:
if parent in self._metadata_cache:
del self._metadata_cache[parent]
else:
self._metadata_cache[cloud_path] = PathMetadata(is_file_or_dir=is_file_or_dir)

def clear_metadata_cache(self) -> None:
self._metadata_cache.clear()


GSClient.GSPath = GSClient.CloudPath # type: ignore
3 changes: 3 additions & 0 deletions cloudpathlib/local/implementations/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class LocalGSClient(LocalClient):

_cloud_meta = local_gs_implementation

def clear_metadata_cache(self):
pass


LocalGSClient.GSPath = LocalGSClient.CloudPath # type: ignore

Expand Down