diff --git a/.changes/unreleased/Features-20230118-233801.yaml b/.changes/unreleased/Features-20230118-233801.yaml new file mode 100644 index 000000000..38affa143 --- /dev/null +++ b/.changes/unreleased/Features-20230118-233801.yaml @@ -0,0 +1,6 @@ +kind: Features +body: add adapter_response to dbt test and freshness result +time: 2023-01-18T23:38:01.857342+08:00 +custom: + Author: aezomz + Issue: "2964" diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 98b78217c..8234f9091 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -17,7 +17,6 @@ Iterator, Set, ) - import agate import pytz @@ -54,7 +53,7 @@ CodeExecutionStatus, CatalogGenerationError, ) -from dbt.utils import filter_null_values, executor, cast_to_str +from dbt.utils import filter_null_values, executor, cast_to_str, AttrDict from dbt.adapters.base.connections import Connection, AdapterResponse from dbt.adapters.base.meta import AdapterMeta, available @@ -943,7 +942,7 @@ def execute_macro( context_override: Optional[Dict[str, Any]] = None, kwargs: Dict[str, Any] = None, text_only_columns: Optional[Iterable[str]] = None, - ) -> agate.Table: + ) -> AttrDict: """Look macro_name up in the manifest and execute its results. :param macro_name: The name of the macro to execute. @@ -1028,7 +1027,7 @@ def _get_one_catalog( manifest=manifest, ) - results = self._catalog_filter_table(table, manifest) + results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type] return results def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]: @@ -1060,7 +1059,7 @@ def calculate_freshness( loaded_at_field: str, filter: Optional[str], manifest: Optional[Manifest] = None, - ) -> Dict[str, Any]: + ) -> Tuple[AdapterResponse, Dict[str, Any]]: """Calculate the freshness of sources in dbt, and return it""" kwargs: Dict[str, Any] = { "source": source, @@ -1069,7 +1068,8 @@ def calculate_freshness( } # run the macro - table = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest) + result = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest) + adapter_response, table = result.response, result.table # type: ignore[attr-defined] # now we have a 1-row table of the maximum `loaded_at_field` value and # the current time according to the db. if len(table) != 1 or len(table[0]) != 2: @@ -1083,11 +1083,12 @@ def calculate_freshness( snapshotted_at = _utc(table[0][1], source, loaded_at_field) age = (snapshotted_at - max_loaded_at).total_seconds() - return { + freshness = { "max_loaded_at": max_loaded_at, "snapshotted_at": snapshotted_at, "age": age, } + return adapter_response, freshness def pre_model_hook(self, config: Mapping[str, Any]) -> Any: """A hook for running some operation before the model materialization diff --git a/core/dbt/include/global_project/macros/adapters/freshness.sql b/core/dbt/include/global_project/macros/adapters/freshness.sql index 6a5bd79d1..f18499a23 100644 --- a/core/dbt/include/global_project/macros/adapters/freshness.sql +++ b/core/dbt/include/global_project/macros/adapters/freshness.sql @@ -12,5 +12,5 @@ where {{ filter }} {% endif %} {% endcall %} - {{ return(load_result('collect_freshness').table) }} + {{ return(load_result('collect_freshness')) }} {% endmacro %} diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 819bc4164..95ff76083 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -105,10 +105,10 @@ def execute(self, compiled_node, manifest): ) relation = self.adapter.Relation.create_from_source(compiled_node) - # given a Source, calculate its fresnhess. + # given a Source, calculate its freshness. with self.adapter.connection_for(compiled_node): self.adapter.clear_transaction() - freshness = self.adapter.calculate_freshness( + adapter_response, freshness = self.adapter.calculate_freshness( relation, compiled_node.loaded_at_field, compiled_node.freshness.filter, @@ -124,7 +124,7 @@ def execute(self, compiled_node, manifest): timing=[], execution_time=0, message=None, - adapter_response={}, + adapter_response=adapter_response.to_dict(omit_none=True), failures=None, **freshness, ) diff --git a/core/dbt/task/test.py b/core/dbt/task/test.py index 48422b5e7..6d9dc2bd0 100644 --- a/core/dbt/task/test.py +++ b/core/dbt/task/test.py @@ -5,6 +5,7 @@ from dbt.events.format import pluralize from dbt.dataclass_schema import dbtClassMixin import threading +from typing import Dict, Any from .compile import CompileRunner from .run import RunTask @@ -38,6 +39,7 @@ class TestResultData(dbtClassMixin): failures: int should_warn: bool should_error: bool + adapter_response: Dict[str, Any] @classmethod def validate(cls, data): @@ -137,6 +139,7 @@ def execute_test( map(_coerce_decimal, table.rows[0]), ) ) + test_result_dct["adapter_response"] = result["response"].to_dict(omit_none=True) TestResultData.validate(test_result_dct) return TestResultData.from_dict(test_result_dct) @@ -171,7 +174,7 @@ def execute(self, test: TestNode, manifest: Manifest): thread_id=thread_id, execution_time=0, message=message, - adapter_response={}, + adapter_response=result.adapter_response, failures=failures, ) diff --git a/tests/functional/custom_singular_tests/test_custom_singular_tests.py b/tests/functional/custom_singular_tests/test_custom_singular_tests.py index 9a8df3393..aec0586b8 100644 --- a/tests/functional/custom_singular_tests/test_custom_singular_tests.py +++ b/tests/functional/custom_singular_tests/test_custom_singular_tests.py @@ -103,3 +103,8 @@ def test_data_tests(self, project, tests): assert result.status == "fail" assert not result.skipped assert result.failures > 0 + assert result.adapter_response == { + "_message": "SELECT 1", + "code": "SELECT", + "rows_affected": 1, + } diff --git a/tests/functional/sources/test_source_fresher_state.py b/tests/functional/sources/test_source_fresher_state.py index a97694a9c..3ad69d97e 100644 --- a/tests/functional/sources/test_source_fresher_state.py +++ b/tests/functional/sources/test_source_fresher_state.py @@ -112,7 +112,7 @@ def _assert_freshness_results(self, path, state): "warn_after": {"count": 10, "period": "hour"}, "error_after": {"count": 18, "period": "hour"}, }, - "adapter_response": {}, + "adapter_response": {"_message": "SELECT 1", "code": "SELECT", "rows_affected": 1}, "thread_id": AnyStringWith("Thread-"), "execution_time": AnyFloat(), "timing": [ diff --git a/tests/functional/sources/test_source_freshness.py b/tests/functional/sources/test_source_freshness.py index 630f59a02..e7e1f08eb 100644 --- a/tests/functional/sources/test_source_freshness.py +++ b/tests/functional/sources/test_source_freshness.py @@ -103,7 +103,7 @@ def _assert_freshness_results(self, path, state): "warn_after": {"count": 10, "period": "hour"}, "error_after": {"count": 18, "period": "hour"}, }, - "adapter_response": {}, + "adapter_response": {"_message": "SELECT 1", "code": "SELECT", "rows_affected": 1}, "thread_id": AnyStringWith("Thread-"), "execution_time": AnyFloat(), "timing": [