Skip to content

Commit

Permalink
feat(ingest/dbt-cloud): update metadata_endpoint inference (#11041)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jul 31, 2024
1 parent b13d990 commit 89933fe
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/cli/get_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def urn(ctx: Any, urn: Optional[str], aspect: List[str], details: bool) -> None:
entity_urn=urn,
aspects=aspect,
typed=False,
details=details,
),
sort_keys=True,
indent=2,
Expand Down
37 changes: 31 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ class DBTCloudConfig(DBTCommonConfig):

metadata_endpoint: str = Field(
default="https://metadata.cloud.getdbt.com/graphql",
description="The dbt Cloud metadata API endpoint. This is deprecated, and will be removed in a future release. Please use access_url instead.",
deprecated=True,
description="The dbt Cloud metadata API endpoint. If not provided, we will try to infer it from the access_url.",
)

token: str = Field(
Expand All @@ -66,13 +65,39 @@ class DBTCloudConfig(DBTCommonConfig):
@root_validator(pre=True)
def set_metadata_endpoint(cls, values: dict) -> dict:
if values.get("access_url") and not values.get("metadata_endpoint"):
parsed_uri = urlparse(values["access_url"])
values[
"metadata_endpoint"
] = f"{parsed_uri.scheme}://metadata.{parsed_uri.netloc}/graphql"
metadata_endpoint = infer_metadata_endpoint(values["access_url"])
if metadata_endpoint is None:
raise ValueError(
"Unable to infer the metadata endpoint from the access URL. Please provide a metadata endpoint."
)
values["metadata_endpoint"] = metadata_endpoint
return values


def infer_metadata_endpoint(access_url: str) -> Optional[str]:
# See https://docs.getdbt.com/docs/cloud/about-cloud/access-regions-ip-addresses#api-access-urls
# and https://docs.getdbt.com/docs/dbt-cloud-apis/discovery-querying#discovery-api-endpoints

try:
parsed_uri = urlparse(access_url)
assert parsed_uri.scheme is not None
assert parsed_uri.hostname is not None
except Exception as e:
logger.debug(f"Unable to parse access URL {access_url}: {e}", exc_info=e)
return None

if parsed_uri.hostname.endswith(".dbt.com"):
# For cell-based deployments.
# prefix.region.dbt.com -> prefix.metadata.region.dbt.com
hostname_parts = parsed_uri.hostname.split(".", maxsplit=1)
return f"{parsed_uri.scheme}://{hostname_parts[0]}.metadata.{hostname_parts[1]}/graphql"
elif parsed_uri.hostname.endswith(".getdbt.com"):
return f"{parsed_uri.scheme}://metadata.{parsed_uri.netloc}/graphql"
else:
# The self-hosted variants also have the metadata. prefix.
return f"{parsed_uri.scheme}://metadata.{parsed_uri.netloc}/graphql"


_DBT_GRAPHQL_COMMON_FIELDS = """
runId
accountId
Expand Down
25 changes: 21 additions & 4 deletions metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

from datahub.emitter import mce_builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.dbt.dbt_cloud import DBTCloudConfig
from datahub.ingestion.source.dbt.dbt_cloud import (
DBTCloudConfig,
infer_metadata_endpoint,
)
from datahub.ingestion.source.dbt.dbt_core import (
DBTCoreConfig,
DBTCoreSource,
Expand Down Expand Up @@ -366,7 +369,7 @@ def test_dbt_entity_emission_configuration_helpers():

def test_dbt_cloud_config_access_url():
config_dict = {
"access_url": "https://my-dbt-cloud.dbt.com",
"access_url": "https://emea.getdbt.com",
"token": "dummy_token",
"account_id": "123456",
"project_id": "1234567",
Expand All @@ -375,8 +378,8 @@ def test_dbt_cloud_config_access_url():
"target_platform": "dummy_platform",
}
config = DBTCloudConfig.parse_obj(config_dict)
assert config.access_url == "https://my-dbt-cloud.dbt.com"
assert config.metadata_endpoint == "https://metadata.my-dbt-cloud.dbt.com/graphql"
assert config.access_url == "https://emea.getdbt.com"
assert config.metadata_endpoint == "https://metadata.emea.getdbt.com/graphql"


def test_dbt_cloud_config_with_defined_metadata_endpoint():
Expand All @@ -398,6 +401,20 @@ def test_dbt_cloud_config_with_defined_metadata_endpoint():
)


def test_infer_metadata_endpoint() -> None:
assert (
infer_metadata_endpoint("https://cloud.getdbt.com")
== "https://metadata.cloud.getdbt.com/graphql"
)
assert (
infer_metadata_endpoint("https://prefix.us1.dbt.com")
== "https://prefix.metadata.us1.dbt.com/graphql"
)
assert (
infer_metadata_endpoint("http://dbt.corp.internal")
) == "http://metadata.dbt.corp.internal/graphql"


def test_dbt_time_parsing() -> None:
time_formats = [
"2024-03-28T05:56:15.236210Z",
Expand Down

0 comments on commit 89933fe

Please sign in to comment.