Skip to content

Commit

Permalink
feat: add token_info_url to external account credentials (#1168)
Browse files Browse the repository at this point in the history
* feat: add token_info_url to external account credentials

Also some minor refactoring

* chore: Refresh system test creds.

* Changes requested by @clundin

* Changes requested by Leo

* validating token info url

* comment

* chore: Refresh system test creds.

* tests

* secrets

* tests

* lint

* 2.7

Co-authored-by: Carl Lundin <[email protected]>
  • Loading branch information
ScruffyProdigy and clundin25 authored Oct 27, 2022
1 parent 1d278c3 commit 9adee75
Show file tree
Hide file tree
Showing 5 changed files with 695 additions and 193 deletions.
117 changes: 46 additions & 71 deletions google/auth/external_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(
service_account_impersonation_options=None,
client_id=None,
client_secret=None,
token_info_url=None,
quota_project_id=None,
scopes=None,
default_scopes=None,
Expand All @@ -94,6 +95,7 @@ def __init__(
impersonation generateAccessToken URL.
client_id (Optional[str]): The optional client ID.
client_secret (Optional[str]): The optional client secret.
token_info_url (str): The optional STS endpoint URL for token introspection.
quota_project_id (Optional[str]): The optional quota project ID.
scopes (Optional[Sequence[str]]): Optional scopes to request during the
authorization grant.
Expand All @@ -112,6 +114,7 @@ def __init__(
self._audience = audience
self._subject_token_type = subject_token_type
self._token_url = token_url
self._token_info_url = token_info_url
self._credential_source = credential_source
self._service_account_impersonation_url = service_account_impersonation_url
self._service_account_impersonation_options = (
Expand All @@ -125,6 +128,8 @@ def __init__(
self._workforce_pool_user_project = workforce_pool_user_project

Credentials.validate_token_url(token_url)
if token_info_url:
Credentials.validate_token_url(token_info_url, url_type="token info")
if service_account_impersonation_url:
Credentials.validate_service_account_impersonation_url(
service_account_impersonation_url
Expand Down Expand Up @@ -161,13 +166,25 @@ def info(self):
useful for serializing the current credentials so it can deserialized
later.
"""
config_info = {
"type": _EXTERNAL_ACCOUNT_JSON_TYPE,
config_info = self._constructor_args()
config_info.update(
type=_EXTERNAL_ACCOUNT_JSON_TYPE,
service_account_impersonation=config_info.pop(
"service_account_impersonation_options", None
),
)
config_info.pop("scopes", None)
config_info.pop("default_scopes", None)
return {key: value for key, value in config_info.items() if value is not None}

def _constructor_args(self):
args = {
"audience": self._audience,
"subject_token_type": self._subject_token_type,
"token_url": self._token_url,
"token_info_url": self._token_info_url,
"service_account_impersonation_url": self._service_account_impersonation_url,
"service_account_impersonation": copy.deepcopy(
"service_account_impersonation_options": copy.deepcopy(
self._service_account_impersonation_options
)
or None,
Expand All @@ -176,8 +193,12 @@ def info(self):
"client_id": self._client_id,
"client_secret": self._client_secret,
"workforce_pool_user_project": self._workforce_pool_user_project,
"scopes": self._scopes,
"default_scopes": self._default_scopes,
}
return {key: value for key, value in config_info.items() if value is not None}
if not self.is_workforce_pool:
args.pop("workforce_pool_user_project")
return args

@property
def service_account_email(self):
Expand Down Expand Up @@ -255,25 +276,17 @@ def project_number(self):
except ValueError:
return None

@property
def token_info_url(self):
"""Optional[str]: The STS token introspection endpoint."""

return self._token_info_url

@_helpers.copy_docstring(credentials.Scoped)
def with_scopes(self, scopes, default_scopes=None):
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
service_account_impersonation_url=self._service_account_impersonation_url,
service_account_impersonation_options=self._service_account_impersonation_options,
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=self._quota_project_id,
scopes=scopes,
default_scopes=default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
return self.__class__(**d)
kwargs = self._constructor_args()
kwargs.update(scopes=scopes, default_scopes=default_scopes)
return self.__class__(**kwargs)

@abc.abstractmethod
def retrieve_subject_token(self, request):
Expand Down Expand Up @@ -368,43 +381,15 @@ def refresh(self, request):
@_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
def with_quota_project(self, quota_project_id):
# Return copy of instance with the provided quota project ID.
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
service_account_impersonation_url=self._service_account_impersonation_url,
service_account_impersonation_options=self._service_account_impersonation_options,
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
return self.__class__(**d)
kwargs = self._constructor_args()
kwargs.update(quota_project_id=quota_project_id)
return self.__class__(**kwargs)

@_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
def with_token_uri(self, token_uri):
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=token_uri,
credential_source=self._credential_source,
service_account_impersonation_url=self._service_account_impersonation_url,
service_account_impersonation_options=self._service_account_impersonation_options,
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=self._quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
return self.__class__(**d)
kwargs = self._constructor_args()
kwargs.update(token_url=token_uri)
return self.__class__(**kwargs)

def _initialize_impersonated_credentials(self):
"""Generates an impersonated credentials.
Expand All @@ -422,23 +407,12 @@ def _initialize_impersonated_credentials(self):
endpoint returned an error.
"""
# Return copy of instance with no service account impersonation.
d = dict(
audience=self._audience,
subject_token_type=self._subject_token_type,
token_url=self._token_url,
credential_source=self._credential_source,
kwargs = self._constructor_args()
kwargs.update(
service_account_impersonation_url=None,
service_account_impersonation_options={},
client_id=self._client_id,
client_secret=self._client_secret,
quota_project_id=self._quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
workforce_pool_user_project=self._workforce_pool_user_project,
)
if not self.is_workforce_pool:
d.pop("workforce_pool_user_project")
source_credentials = self.__class__(**d)
source_credentials = self.__class__(**kwargs)

# Determine target_principal.
target_principal = self.service_account_email
Expand All @@ -461,7 +435,7 @@ def _initialize_impersonated_credentials(self):
)

@staticmethod
def validate_token_url(token_url):
def validate_token_url(token_url, url_type="token"):
_TOKEN_URL_PATTERNS = [
"^[^\\.\\s\\/\\\\]+\\.sts\\.googleapis\\.com$",
"^sts\\.googleapis\\.com$",
Expand All @@ -471,7 +445,7 @@ def validate_token_url(token_url):
]

if not Credentials.is_valid_url(_TOKEN_URL_PATTERNS, token_url):
raise ValueError("The provided token URL is invalid.")
raise ValueError("The provided {} URL is invalid.".format(url_type))

@staticmethod
def validate_service_account_impersonation_url(url):
Expand Down Expand Up @@ -530,6 +504,7 @@ def from_info(cls, info, **kwargs):
audience=info.get("audience"),
subject_token_type=info.get("subject_token_type"),
token_url=info.get("token_url"),
token_info_url=info.get("token_info_url"),
service_account_impersonation_url=info.get(
"service_account_impersonation_url"
),
Expand Down
Loading

0 comments on commit 9adee75

Please sign in to comment.