Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anonymous access to Google Storage #4518

Merged
merged 14 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements-google.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
apache-libcloud>=2.2.1,<3
google-cloud-storage>=2,<=2.8.0
google-auth>=2.18.1,<3
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ markers =
docker_cuda
encryption
fetchable_appliance
google
google-project
google-storage
gridengine
htcondor
integrative
Expand Down
40 changes: 34 additions & 6 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,27 +1266,55 @@ def importFile(self,
def import_file(self,
src_uri: str,
shared_file_name: str,
symlink: bool = True) -> None: ...
symlink: bool = True,
check_existence: bool = True) -> None: ...

@overload
def import_file(self,
src_uri: str,
shared_file_name: None = None,
symlink: bool = True) -> FileID: ...
symlink: bool = True,
check_existence: bool = True) -> FileID: ...

def import_file(self,
src_uri: str,
shared_file_name: Optional[str] = None,
symlink: bool = True) -> Optional[FileID]:
symlink: bool = True,
check_existence: bool = True) -> Optional[FileID]:
"""
Import the file at the given URL into the job store.

By default, returns None if the file does not exist.

:param check_existence: If true, raise FileNotFoundError if the file
does not exist. If false, return None when the file does not
exist.

See :func:`toil.jobStores.abstractJobStore.AbstractJobStore.importFile` for a
full description
"""
self._assertContextManagerUsed()
src_uri = self.normalize_uri(src_uri, check_existence=True)
return self._jobStore.import_file(src_uri, shared_file_name=shared_file_name, symlink=symlink)
full_uri = self.normalize_uri(src_uri, check_existence=check_existence)
try:
imported = self._jobStore.import_file(full_uri, shared_file_name=shared_file_name, symlink=symlink)
except FileNotFoundError:
# TODO: I thought we refactored the different job store import
# methods to not raise and instead return None, but that looks to
# not currently be the case.
if check_existence:
raise
else:
# So translate the raise-based API if needed.
# TODO: If check_existence is false but a shared file name is
# specified, we have no way to report the lack of file
# existence, since we also return None on success!
return None
if imported is None and shared_file_name is None and check_existence:
# We need to protect the caller from missing files.
# We think a file was missing, and we got None becasuse of it.
# We didn't get None instead because of usign a shared file name.
raise FileNotFoundError(f'Could not find file {src_uri}')
return imported

@deprecated(new_function_name='export_file')
def exportFile(self, jobStoreFileID: FileID, dstUrl: str) -> None:
Expand All @@ -1308,7 +1336,7 @@ def normalize_uri(uri: str, check_existence: bool = False) -> str:
"""
Given a URI, if it has no scheme, prepend "file:".

:param check_existence: If set, raise an error if a URI points to
:param check_existence: If set, raise FileNotFoundError if a URI points to
a local file that does not exist.
"""
if urlparse(uri).scheme == 'file':
Expand Down
51 changes: 34 additions & 17 deletions src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@
from typing_extensions import Literal

from urllib.parse import ParseResult, urlparse
from urllib.error import HTTPError
from urllib.request import urlopen
from uuid import uuid4

from requests.exceptions import HTTPError

from toil.common import Config, getNodeID, safeUnpickleFromStream
from toil.fileStores import FileID
from toil.job import (CheckpointJobDescription,
Expand Down Expand Up @@ -420,6 +419,8 @@ def import_file(self,
- 'gs'
e.g. gs://bucket/file

Raises FileNotFoundError if the file does not exist.

:param str src_uri: URL that points to a file or object in the storage mechanism of a
supported URL scheme e.g. a blob in an AWS s3 bucket. It must be a file, not a
directory or prefix.
Expand Down Expand Up @@ -453,6 +454,8 @@ def _import_file(self,
asks the other job store class for a stream and writes that stream as either a regular or
a shared file.

Raises FileNotFoundError if the file does not exist.

:param AbstractJobStore otherCls: The concrete subclass of AbstractJobStore that supports
reading from the given URL and getting the file size from the URL.

Expand Down Expand Up @@ -587,6 +590,8 @@ def read_from_url(cls, src_uri: str, writable: IO[bytes]) -> Tuple[int, bool]:
"""
Read the given URL and write its content into the given writable stream.

Raises FileNotFoundError if the URL doesn't exist.

:return: The size of the file in bytes and whether the executable permission bit is set
:rtype: Tuple[int, bool]
"""
Expand Down Expand Up @@ -618,8 +623,12 @@ def _read_from_url(cls, url: ParseResult, writable: IO[bytes]) -> Tuple[int, boo
Reads the contents of the object at the specified location and writes it to the given
writable stream.

Raises FileNotFoundError if the URL doesn't exist.

Refer to :func:`~AbstractJobStore.importFile` documentation for currently supported URL schemes.

Raises FileNotFoundError if the thing at the URL is not found.

:param ParseResult url: URL that points to a file or object in the storage
mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket.

Expand All @@ -635,7 +644,7 @@ def _read_from_url(cls, url: ParseResult, writable: IO[bytes]) -> Tuple[int, boo
def _write_to_url(cls, readable: Union[IO[bytes], IO[str]], url: ParseResult, executable: bool = False) -> None:
"""
Reads the contents of the given readable stream and writes it to the object at the
specified location.
specified location. Raises FileNotFoundError if the URL doesn't exist..

Refer to AbstractJobStore.importFile documentation for currently supported URL schemes.

Expand Down Expand Up @@ -1707,20 +1716,28 @@ def _read_from_url(
# We can only retry on errors that happen as responses to the request.
# If we start getting file data, and the connection drops, we fail.
# So we don't have to worry about writing the start of the file twice.
with closing(urlopen(url.geturl())) as readable:
# Make something to count the bytes we get
# We need to put the actual count in a container so our
# nested function can modify it without creating its own
# local with the same name.
size = [0]
def count(l: int) -> None:
size[0] += l
counter = WriteWatchingStream(writable)
counter.onWrite(count)

# Do the download
shutil.copyfileobj(readable, counter)
return size[0], False
try:
with closing(urlopen(url.geturl())) as readable:
# Make something to count the bytes we get
# We need to put the actual count in a container so our
# nested function can modify it without creating its own
# local with the same name.
size = [0]
def count(l: int) -> None:
size[0] += l
counter = WriteWatchingStream(writable)
counter.onWrite(count)

# Do the download
shutil.copyfileobj(readable, counter)
return size[0], False
except HTTPError as e:
if e.code == 404:
# Translate into a FileNotFoundError for detecting
# un-importable files
raise FileNotFoundError(str(url)) from e
else:
raise

@classmethod
def _get_is_directory(cls, url: ParseResult) -> bool:
Expand Down
63 changes: 47 additions & 16 deletions src/toil/jobStores/googleJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
InternalServerError,
ServiceUnavailable)
from google.cloud import exceptions, storage
from google.auth.exceptions import DefaultCredentialsError

from toil.jobStores.abstractJobStore import (AbstractJobStore,
JobStoreExistsException,
Expand Down Expand Up @@ -112,25 +113,54 @@ def __init__(self, locator: str) -> None:
self.readStatsBaseID = self.statsReadPrefix+self.statsBaseID

self.sseKey = None
self.storageClient = self.create_client()

# Determine if we have an override environment variable for our credentials.
# We don't pull out the filename; we just see if a name is there.
self.credentialsFromEnvironment = bool(os.getenv('GOOGLE_APPLICATION_CREDENTIALS', False))

if self.credentialsFromEnvironment and not os.path.exists(os.getenv('GOOGLE_APPLICATION_CREDENTIALS')):
@classmethod
def create_client(cls) -> storage.Client:
"""
Produce a client for Google Sotrage with the highest level of access we can get.

Fall back to anonymous access if no project is available, unlike the
Google Storage module's behavior.

Warn if GOOGLE_APPLICATION_CREDENTIALS is set but not actually present.
"""

# Determine if we have an override environment variable for our credentials.
# We get the path to check existence, but Google Storage works out what
# to use later by looking at the environment again.
credentials_path: Optional[str] = os.getenv('GOOGLE_APPLICATION_CREDENTIALS', None)
if credentials_path is not None and not os.path.exists(credentials_path):
# If the file is missing, complain.
# This variable holds a file name and not any sensitive data itself.
log.warning("File '%s' from GOOGLE_APPLICATION_CREDENTIALS is unavailable! "
"We may not be able to authenticate!",
os.getenv('GOOGLE_APPLICATION_CREDENTIALS'))

if not self.credentialsFromEnvironment and os.path.exists(self.nodeServiceAccountJson):
# load credentials from a particular file on GCE nodes if an override path is not set
self.storageClient = storage.Client.from_service_account_json(self.nodeServiceAccountJson)
else:
# Either a filename is specified, or our fallback file isn't there.
credentials_path)

if credentials_path is None and os.path.exists(cls.nodeServiceAccountJson):
try:
# load credentials from a particular file on GCE nodes if an override path is not set
return storage.Client.from_service_account_json(cls.nodeServiceAccountJson)
except OSError:
# Probably we don't have permission to use the file.
log.warning("File '%s' exists but didn't work to authenticate!",
cls.nodeServiceAccountJson)
pass

# Either a filename is specified, or our fallback file isn't there.
try:
# See if Google can work out how to authenticate.
self.storageClient = storage.Client()
return storage.Client()
except (DefaultCredentialsError, EnvironmentError):
# Depending on which Google codepath or module version (???)
# realizes we have no credentials, we can get an EnvironemntError,
# or the new DefaultCredentialsError we are supposedly specced to
# get.

# Google can't find credentials, fall back to being anonymous.
# This is likely to happen all the time so don't warn.
return storage.Client.create_anonymous_client()


@google_retry
Expand Down Expand Up @@ -244,10 +274,11 @@ def get_env(self):

env = {}

if self.credentialsFromEnvironment:
credentials_path: Optional[str] = os.getenv('GOOGLE_APPLICATION_CREDENTIALS', None)
if credentials_path is not None:
# Send along the environment variable that points to the credentials file.
# It must be available in the same place on all nodes.
env['GOOGLE_APPLICATION_CREDENTIALS'] = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
env['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path

return env

Expand Down Expand Up @@ -351,8 +382,8 @@ def _get_blob_from_url(cls, url, exists=False):
if fileName.startswith('/'):
fileName = fileName[1:]

storageClient = storage.Client()
bucket = storageClient.get_bucket(bucketName)
storageClient = cls.create_client()
bucket = storageClient.bucket(bucket_name=bucketName)
blob = bucket.blob(compat_bytes(fileName))

if exists:
Expand Down
30 changes: 26 additions & 4 deletions src/toil/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,18 +409,25 @@ def needs_aws_batch(test_item: MT) -> MT:
)
return test_item


def needs_google(test_item: MT) -> MT:
def needs_google_storage(test_item: MT) -> MT:
"""
Use as a decorator before test classes or methods to run only if Google
Cloud is usable.
Cloud is installed and we ought to be able to access public Google Storage
URIs.
"""
test_item = _mark_test('google', test_item)
test_item = _mark_test('google-storage', test_item)
try:
from google.cloud import storage # noqa
except ImportError:
return unittest.skip("Install Toil with the 'google' extra to include this test.")(test_item)

return test_item

def needs_google_project(test_item: MT) -> MT:
"""
Use as a decorator before test classes or methods to run only if we have a Google Cloud project set.
"""
test_item = _mark_test('google-project', test_item)
test_item = needs_env_var('TOIL_GOOGLE_PROJECTID', "a Google project ID")(test_item)
return test_item

Expand Down Expand Up @@ -582,6 +589,21 @@ def needs_singularity(test_item: MT) -> MT:
return test_item
else:
return unittest.skip("Install singularity to include this test.")(test_item)

def needs_singularity_or_docker(test_item: MT) -> MT:
"""
Use as a decorator before test classes or methods to only run them if
docker is installed and docker-based tests are enabled, or if Singularity
is installed.
"""

# TODO: Is there a good way to OR decorators?
if which('singularity'):
# Singularity is here, say it's a Singularity test
return needs_singularity(test_item)
else:
# Otherwise say it's a Docker test.
return needs_docker(test_item)

def needs_local_cuda(test_item: MT) -> MT:
"""
Expand Down
16 changes: 8 additions & 8 deletions src/toil/test/jobStores/jobStoreTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,17 @@
make_tests,
needs_aws_s3,
needs_encryption,
needs_google,
needs_google_project,
needs_google_storage,
slow)

# noinspection PyPackageRequirements
# (installed by `make prepare`)

# Need google_retry decorator even if google is not available, so make one up.
# Unconventional use of decorator to determine if google is enabled by seeing if
# it returns the parameter passed in.
if needs_google(needs_google) is needs_google:
try:
from toil.jobStores.googleJobStore import google_retry
else:
except ImportError:
# Need google_retry decorator even if google is not available, so make one up.
def google_retry(x):
return x

Expand Down Expand Up @@ -1239,7 +1238,8 @@ def test_file_link_imports(self):
os.remove(srcUrl[7:])


@needs_google
@needs_google_project
@needs_google_storage
@pytest.mark.xfail
class GoogleJobStoreTest(AbstractJobStoreTest.Test):
projectID = os.getenv('TOIL_GOOGLE_PROJECTID')
Expand Down Expand Up @@ -1528,4 +1528,4 @@ def do_GET(self):
self.wfile.write(self.fileContents)


AbstractJobStoreTest.Test.makeImportExportTests()
AbstractJobStoreTest.Test.makeImportExportTests()
Loading