Skip to content

Commit

Permalink
add adapter_response for test (#6645)
Browse files Browse the repository at this point in the history
  • Loading branch information
aezomz authored and Hasyimi Bahrudin committed Mar 5, 2023
1 parent d2c9ee4 commit 19e0916
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 14 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230118-233801.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: add adapter_response to dbt test and freshness result
time: 2023-01-18T23:38:01.857342+08:00
custom:
Author: aezomz
Issue: "2964"
15 changes: 8 additions & 7 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
Iterator,
Set,
)

import agate
import pytz

Expand Down Expand Up @@ -54,7 +53,7 @@
CodeExecutionStatus,
CatalogGenerationError,
)
from dbt.utils import filter_null_values, executor, cast_to_str
from dbt.utils import filter_null_values, executor, cast_to_str, AttrDict

from dbt.adapters.base.connections import Connection, AdapterResponse
from dbt.adapters.base.meta import AdapterMeta, available
Expand Down Expand Up @@ -943,7 +942,7 @@ def execute_macro(
context_override: Optional[Dict[str, Any]] = None,
kwargs: Dict[str, Any] = None,
text_only_columns: Optional[Iterable[str]] = None,
) -> agate.Table:
) -> AttrDict:
"""Look macro_name up in the manifest and execute its results.
:param macro_name: The name of the macro to execute.
Expand Down Expand Up @@ -1028,7 +1027,7 @@ def _get_one_catalog(
manifest=manifest,
)

results = self._catalog_filter_table(table, manifest)
results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]:
Expand Down Expand Up @@ -1060,7 +1059,7 @@ def calculate_freshness(
loaded_at_field: str,
filter: Optional[str],
manifest: Optional[Manifest] = None,
) -> Dict[str, Any]:
) -> Tuple[AdapterResponse, Dict[str, Any]]:
"""Calculate the freshness of sources in dbt, and return it"""
kwargs: Dict[str, Any] = {
"source": source,
Expand All @@ -1069,7 +1068,8 @@ def calculate_freshness(
}

# run the macro
table = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest)
result = self.execute_macro(FRESHNESS_MACRO_NAME, kwargs=kwargs, manifest=manifest)
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
# now we have a 1-row table of the maximum `loaded_at_field` value and
# the current time according to the db.
if len(table) != 1 or len(table[0]) != 2:
Expand All @@ -1083,11 +1083,12 @@ def calculate_freshness(

snapshotted_at = _utc(table[0][1], source, loaded_at_field)
age = (snapshotted_at - max_loaded_at).total_seconds()
return {
freshness = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
}
return adapter_response, freshness

def pre_model_hook(self, config: Mapping[str, Any]) -> Any:
"""A hook for running some operation before the model materialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
where {{ filter }}
{% endif %}
{% endcall %}
{{ return(load_result('collect_freshness').table) }}
{{ return(load_result('collect_freshness')) }}
{% endmacro %}
6 changes: 3 additions & 3 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ def execute(self, compiled_node, manifest):
)

relation = self.adapter.Relation.create_from_source(compiled_node)
# given a Source, calculate its fresnhess.
# given a Source, calculate its freshness.
with self.adapter.connection_for(compiled_node):
self.adapter.clear_transaction()
freshness = self.adapter.calculate_freshness(
adapter_response, freshness = self.adapter.calculate_freshness(
relation,
compiled_node.loaded_at_field,
compiled_node.freshness.filter,
Expand All @@ -124,7 +124,7 @@ def execute(self, compiled_node, manifest):
timing=[],
execution_time=0,
message=None,
adapter_response={},
adapter_response=adapter_response.to_dict(omit_none=True),
failures=None,
**freshness,
)
Expand Down
5 changes: 4 additions & 1 deletion core/dbt/task/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dbt.events.format import pluralize
from dbt.dataclass_schema import dbtClassMixin
import threading
from typing import Dict, Any

from .compile import CompileRunner
from .run import RunTask
Expand Down Expand Up @@ -38,6 +39,7 @@ class TestResultData(dbtClassMixin):
failures: int
should_warn: bool
should_error: bool
adapter_response: Dict[str, Any]

@classmethod
def validate(cls, data):
Expand Down Expand Up @@ -137,6 +139,7 @@ def execute_test(
map(_coerce_decimal, table.rows[0]),
)
)
test_result_dct["adapter_response"] = result["response"].to_dict(omit_none=True)
TestResultData.validate(test_result_dct)
return TestResultData.from_dict(test_result_dct)

Expand Down Expand Up @@ -171,7 +174,7 @@ def execute(self, test: TestNode, manifest: Manifest):
thread_id=thread_id,
execution_time=0,
message=message,
adapter_response={},
adapter_response=result.adapter_response,
failures=failures,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,8 @@ def test_data_tests(self, project, tests):
assert result.status == "fail"
assert not result.skipped
assert result.failures > 0
assert result.adapter_response == {
"_message": "SELECT 1",
"code": "SELECT",
"rows_affected": 1,
}
2 changes: 1 addition & 1 deletion tests/functional/sources/test_source_fresher_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def _assert_freshness_results(self, path, state):
"warn_after": {"count": 10, "period": "hour"},
"error_after": {"count": 18, "period": "hour"},
},
"adapter_response": {},
"adapter_response": {"_message": "SELECT 1", "code": "SELECT", "rows_affected": 1},
"thread_id": AnyStringWith("Thread-"),
"execution_time": AnyFloat(),
"timing": [
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/sources/test_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _assert_freshness_results(self, path, state):
"warn_after": {"count": 10, "period": "hour"},
"error_after": {"count": 18, "period": "hour"},
},
"adapter_response": {},
"adapter_response": {"_message": "SELECT 1", "code": "SELECT", "rows_affected": 1},
"thread_id": AnyStringWith("Thread-"),
"execution_time": AnyFloat(),
"timing": [
Expand Down

0 comments on commit 19e0916

Please sign in to comment.