Skip to content

Commit

Permalink
feat: [great_expectations] supporting in-memory (Pandas) Data Assets …
Browse files Browse the repository at this point in the history
…ref #9810
  • Loading branch information
Achraf BOUAOUDA committed Feb 25, 2024
1 parent c1332c6 commit 0c96f40
Showing 1 changed file with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.execution_engine.sqlalchemy_execution_engine import (
SqlAlchemyExecutionEngine,
)
Expand Down Expand Up @@ -566,10 +567,12 @@ def get_dataset_partitions(self, batch_identifier, data_asset):

logger.debug("Finding datasets being validated")

# for now, we support only v3-api and sqlalchemy execution engine
if isinstance(data_asset, Validator) and isinstance(
data_asset.execution_engine, SqlAlchemyExecutionEngine
):
# for now, we support only v3-api and sqlalchemy execution engine and Pandas engine
is_sql_alchemy = isinstance(data_asset, Validator) and (
isinstance(data_asset.execution_engine, SqlAlchemyExecutionEngine)
)
is_pandas=isinstance(data_asset.execution_engine, PandasExecutionEngine)
if is_sql_alchemy or is_pandas:
ge_batch_spec = data_asset.active_batch_spec
partitionSpec = None
batchSpecProperties = {
Expand All @@ -581,10 +584,10 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
),
}
sqlalchemy_uri = None
if isinstance(data_asset.execution_engine.engine, Engine):
if is_sql_alchemy and isinstance(data_asset.execution_engine.engine, Engine):
sqlalchemy_uri = data_asset.execution_engine.engine.url
# For snowflake sqlalchemy_execution_engine.engine is actually instance of Connection
elif isinstance(data_asset.execution_engine.engine, Connection):
elif is_sql_alchemy and isinstance(data_asset.execution_engine.engine, Connection):
sqlalchemy_uri = data_asset.execution_engine.engine.engine.url

if isinstance(ge_batch_spec, SqlAlchemyDatasourceBatchSpec):
Expand Down Expand Up @@ -680,6 +683,26 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
"batchSpec": batchSpec,
}
)
elif isinstance(ge_batch_spec, RuntimeDataBatchSpec):
data_platform = self.get_platform_instance(data_asset.active_batch_definition.datasource_name)
dataset_urn = builder.make_dataset_urn_with_platform_instance(
platform=data_platform if self.platform_alias is None else self.platform_alias,
name=data_asset.active_batch_definition.datasource_name,
platform_instance="",
env=self.env,
)
batchSpec = BatchSpec(
nativeBatchId=batch_identifier,
query="",
customProperties=batchSpecProperties,
)
dataset_partitions.append(
{
"dataset_urn": dataset_urn,
"partitionSpec": partitionSpec,
"batchSpec": batchSpec,
}
)
else:
warn(
"DataHubValidationAction does not recognize this GE batch spec type- {batch_spec_type}.".format(
Expand Down

0 comments on commit 0c96f40

Please sign in to comment.