Skip to content

Commit

Permalink
Merge branch 'master' into feat/lineageViaResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien authored Apr 2, 2024
2 parents b873f6b + 5c06f7a commit 4706533
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,17 @@ def _create_lineage_map(

# Try the sql parser first.
if self.config.lineage_use_sql_parser:
if e.statementType == "SELECT":
# We wrap select statements in a CTE to make them parseable as insert statement.
# This is a workaround for the sql parser to support the case where the user runs a query and inserts the result into a table..
query = f"""create table `{destination_table.table_identifier.get_table_name()}` AS
(
{e.query}
)"""
else:
query = e.query
raw_lineage = sqlglot_lineage(
e.query,
query,
schema_resolver=sql_parser_schema_resolver,
default_db=e.project_id,
)
Expand Down
135 changes: 133 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import datetime
import logging
import re
import time
from dataclasses import dataclass
from typing import Iterable
from typing import Dict, Iterable

from pydantic import Field

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import ConfigModel, OperationalError
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand All @@ -15,12 +18,30 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit

logger = logging.getLogger(__name__)


class DataHubGcSourceConfig(ConfigModel):
cleanup_expired_tokens: bool = Field(
default=True,
description="Whether to clean up expired tokens or not",
)
truncate_indices: bool = Field(
default=True,
description="Whether to truncate elasticsearch indices or not which can be safely truncated",
)
truncate_index_older_than_days: int = Field(
default=30,
description="Indices older than this number of days will be truncated",
)
truncation_watch_until: int = Field(
default=10000,
description="Wait for truncation of indices until this number of documents are left",
)
truncation_sleep_between_seconds: int = Field(
default=30,
description="Sleep between truncation monitoring.",
)


@dataclass
Expand Down Expand Up @@ -51,8 +72,118 @@ def get_workunits_internal(
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
self.revoke_expired_tokens()
if self.config.truncate_indices:
self.truncate_indices()
yield from []

def truncate_indices(self) -> None:
self._truncate_timeseries_helper(aspect_name="operation", entity_type="dataset")
self._truncate_timeseries_helper(
aspect_name="datasetusagestatistics", entity_type="dataset"
)
self._truncate_timeseries_helper(
aspect_name="chartUsageStatistics", entity_type="chart"
)
self._truncate_timeseries_helper(
aspect_name="dashboardUsageStatistics", entity_type="dashboard"
)

def _truncate_timeseries_helper(self, aspect_name: str, entity_type: str) -> None:
self._truncate_timeseries_with_watch_optional(
aspect_name=aspect_name, entity_type=entity_type, watch=False
)
self._truncate_timeseries_with_watch_optional(
aspect_name=aspect_name, entity_type=entity_type, watch=True
)

def _truncate_timeseries_with_watch_optional(
self, aspect_name: str, entity_type: str, watch: bool
) -> None:
graph = self.graph
assert graph is not None
if watch:
to_delete = 1
while to_delete > 0:
response = self.truncate_timeseries_util(
aspect=aspect_name,
dry_run=watch,
days_ago=self.config.truncate_index_older_than_days,
entity_type=entity_type,
)
val = response.get("value", "")
if "This was a dry run" not in val or "out of" not in val:
return
prev_to_delete = to_delete
to_delete, total = re.findall(r"\d+", val)[:2]
to_delete = int(to_delete)
if to_delete <= 0:
logger.info("Nothing to delete.")
return
logger.info(f"to_delete {to_delete} / {total}")
if to_delete == prev_to_delete:
logger.info("Seems to be stuck. Ending the loop.")
break
elif to_delete < self.config.truncation_watch_until:
logger.info("Too small truncation. Not going to watch.")
return
else:
time.sleep(self.config.truncation_sleep_between_seconds)
else:
self.truncate_timeseries_util(
aspect=aspect_name,
dry_run=watch,
days_ago=self.config.truncate_index_older_than_days,
entity_type=entity_type,
)

def x_days_ago_millis(self, days: int) -> int:
x_days_ago_datetime = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(days=days)
return int(x_days_ago_datetime.timestamp() * 1000)

def truncate_timeseries_util(
self,
aspect: str,
days_ago: int,
dry_run: bool = True,
entity_type: str = "dataset",
) -> Dict:
graph = self.graph
assert graph is not None

gms_url = graph._gms_server
if not dry_run:
logger.info(
f"Going to truncate timeseries for {aspect} for {gms_url} older than {days_ago} days"
)
days_ago_millis = self.x_days_ago_millis(days_ago)
url = f"{gms_url}/operations?action=truncateTimeseriesAspect"
try:
response = graph._post_generic(
url=url,
payload_dict={
"entityType": entity_type,
"aspect": aspect,
"endTimeMillis": days_ago_millis,
"dryRun": dry_run,
},
)
# logger.info(f"Response: {response}")
except OperationalError:
response = graph._post_generic(
url=url,
payload_dict={
"entityType": entity_type,
"aspect": aspect,
"endTimeMillis": days_ago_millis,
"dryRun": dry_run,
"forceDeleteByQuery": True,
},
)
# logger.info(f"Response: {response}")
return response

def revoke_expired_tokens(self) -> None:
total = 1
while total > 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,15 @@ def _get_schema_field(

fieldTags: List[str] = self.get_field_tags(fieldName, field)

description = self._get_field_description(field, customField)

# escaping string starting with `#`
description = "\\" + description if description.startswith("#") else description

schemaField = SchemaFieldClass(
fieldPath=fieldPath,
type=SchemaFieldDataTypeClass(type=TypeClass()), # type:ignore
description=self._get_field_description(field, customField),
description=description,
# nativeDataType is set to data type shown on salesforce user interface,
# not the corresponding API data type names.
nativeDataType=field["FieldDefinition"]["DataType"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2508,6 +2508,74 @@
},
"RelationshipName": null,
"IsNillable": true
},
{
"attributes": {
"type": "EntityParticle",
"url": "/services/data/v54.0/tooling/sobjects/EntityParticle/Account.Unique_Account"
},
"QualifiedApiName": "Unique_Account",
"DeveloperName": "Unique_Account",
"Label": "# Unique_Account",
"FieldDefinition": {
"attributes": {
"type": "FieldDefinition",
"url": "/services/data/v54.0/tooling/sobjects/FieldDefinition/Account.Unique_Account"
},
"DataType": "Text(80)",
"LastModifiedDate": null,
"LastModifiedBy": null,
"IsIndexed": false,
"ComplianceGroup": null,
"SecurityClassification": null
},
"DataType": "string",
"Precision": 0,
"Scale": 0,
"Length": 80,
"Digits": 0,
"IsUnique": false,
"IsCompound": false,
"IsComponent": false,
"ReferenceTo": {
"referenceTo": null
},
"RelationshipName": null,
"IsNillable": true
},
{
"attributes": {
"type": "EntityParticle",
"url": "/services/data/v54.0/tooling/sobjects/EntityParticle/Account.Unique_Number"
},
"QualifiedApiName": "Unique_Number",
"DeveloperName": "Unique_Account",
"Label": "#Unique_Number",
"FieldDefinition": {
"attributes": {
"type": "FieldDefinition",
"url": "/services/data/v54.0/tooling/sobjects/FieldDefinition/Account.Unique_Number"
},
"DataType": "Text(80)",
"LastModifiedDate": null,
"LastModifiedBy": null,
"IsIndexed": false,
"ComplianceGroup": null,
"SecurityClassification": null
},
"DataType": "string",
"Precision": 0,
"Scale": 0,
"Length": 80,
"Digits": 0,
"IsUnique": false,
"IsCompound": false,
"IsComponent": false,
"ReferenceTo": {
"referenceTo": null
},
"RelationshipName": null,
"IsNillable": true
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,40 @@
},
"isPartOfKey": false,
"jsonProps": "{}"
},
{
"fieldPath": "Unique_Account",
"nullable": true,
"description": "\\# Unique_Account",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "Text(80)",
"recursive": false,
"globalTags": {
"tags": []
},
"isPartOfKey": false,
"jsonProps": "{}"
},
{
"fieldPath": "Unique_Number",
"nullable": true,
"description": "\\#Unique_Number",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "Text(80)",
"recursive": false,
"globalTags": {
"tags": []
},
"isPartOfKey": false,
"jsonProps": "{}"
}
],
"primaryKeys": [
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_bigquery_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def lineage_entries() -> List[QueryEvent]:
SELECT first.a, second.b FROM `my_project.my_dataset.my_source_table1` first
LEFT JOIN `my_project.my_dataset.my_source_table2` second ON first.id = second.id
""",
statementType="SELECT",
statementType="INSERT",
project_id="proj_12344",
end_time=None,
referencedTables=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ public IngestResult ingestProposal(
AspectsBatchImpl.builder().mcps(List.of(proposal), auditStamp, this).build(), async)
.stream()
.findFirst()
.get();
.orElse(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private void updateAspect(
final IngestResult ingestProposalResult =
_entityService.ingestProposal(proposal, auditStamp, false);

if (!ingestProposalResult.isSqlCommitted()) {
if (ingestProposalResult != null && !ingestProposalResult.isSqlCommitted()) {
log.error(
"Failed to ingest aspect with references removed. Before {}, after: {}, please check MCP processor"
+ " logs for more information",
Expand Down

0 comments on commit 4706533

Please sign in to comment.