diff --git a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql index fd7eb2b9624..7847117e005 100644 --- a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql +++ b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql @@ -82,10 +82,30 @@ {% endmacro %} +{% macro snapshot_string_as_time(timestamp) -%} + {{ adapter_macro('snapshot_string_as_time', timestamp) }} +{%- endmacro %} + + +{% macro default__snapshot_string_as_time(timestamp) %} + {% do exceptions.raise_not_implemented( + 'snapshot_string_as_time macro not implemented for adapter '+adapter.type() + ) %} +{% endmacro %} + {% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} {% set check_cols_config = config['check_cols'] %} {% set primary_key = config['unique_key'] %} - {% set updated_at = snapshot_get_time() %} + {% set select_current_time -%} + select {{ snapshot_get_time() }} as snapshot_start + {%- endset %} + + {# don't access the column by name, to avoid dealing with casing issues on snowflake #} + {%- set now = run_query(select_current_time)[0][0] -%} + {% if now is none or now is undefined -%} + {%- do exceptions.raise_compiler_error('Could not get a snapshot start time from the database') -%} + {%- endif %} + {% set updated_at = snapshot_string_as_time(now) %} {% if check_cols_config == 'all' %} {% set check_cols = get_columns_in_query(node['injected_sql']) %} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql index 944127024f5..b683bbd26c2 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/adapters.sql @@ -103,6 +103,12 @@ {%- endmacro %} +{% macro bigquery__snapshot_string_as_time(timestamp) -%} + {%- set result = 'TIMESTAMP("' ~ timestamp ~ '")' -%} + {{ return(result) }} +{%- endmacro %} + + {% macro bigquery__list_schemas(database) -%} {{ return(adapter.list_schemas()) }} {% endmacro %} diff --git a/plugins/postgres/dbt/include/postgres/macros/adapters.sql b/plugins/postgres/dbt/include/postgres/macros/adapters.sql index 82484b34978..9f7ac647693 100644 --- a/plugins/postgres/dbt/include/postgres/macros/adapters.sql +++ b/plugins/postgres/dbt/include/postgres/macros/adapters.sql @@ -104,6 +104,12 @@ now() {%- endmacro %} +{% macro postgres__snapshot_string_as_time(timestamp) -%} + {%- set result = "'" ~ timestamp ~ "'::timestamp without time zone" -%} + {{ return(result) }} +{%- endmacro %} + + {% macro postgres__snapshot_get_time() -%} {{ current_timestamp() }}::timestamp without time zone {%- endmacro %} diff --git a/plugins/redshift/dbt/include/redshift/macros/adapters.sql b/plugins/redshift/dbt/include/redshift/macros/adapters.sql index fa75765c0a3..d274b9ba7d6 100644 --- a/plugins/redshift/dbt/include/redshift/macros/adapters.sql +++ b/plugins/redshift/dbt/include/redshift/macros/adapters.sql @@ -172,6 +172,12 @@ {{ current_timestamp() }}::timestamp {%- endmacro %} + +{% macro redshift__snapshot_string_as_time(timestamp) -%} + {%- set result = "'" ~ timestamp ~ "'::timestamp" -%} + {{ return(result) }} +{%- endmacro %} + {% macro redshift__make_temp_relation(base_relation, suffix) %} {% do return(postgres__make_temp_relation(base_relation, suffix)) %} {% endmacro %} diff --git a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql index 5cf8b7b63cd..8c7f48bcdd4 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/adapters.sql @@ -108,6 +108,13 @@ convert_timezone('UTC', current_timestamp()) {%- endmacro %} + +{% macro snowflake__snapshot_string_as_time(timestamp) -%} + {%- set result = "to_timestamp_ntz('" ~ timestamp ~ "')" -%} + {{ return(result) }} +{%- endmacro %} + + {% macro snowflake__snapshot_get_time() -%} to_timestamp_ntz({{ current_timestamp() }}) {%- endmacro %} diff --git a/test/integration/004_simple_snapshot_test/macros/test_no_overlaps.sql b/test/integration/004_simple_snapshot_test/macros/test_no_overlaps.sql new file mode 100644 index 00000000000..136c6efe5c8 --- /dev/null +++ b/test/integration/004_simple_snapshot_test/macros/test_no_overlaps.sql @@ -0,0 +1,85 @@ +{% macro get_snapshot_unique_id() -%} + {{ return(adapter_macro('get_snapshot_unique_id', )) }} +{%- endmacro %} + +{% macro default__get_snapshot_unique_id() -%} + {% do return("id || '-' || first_name") %} +{%- endmacro %} + + +{% macro bigquery__get_snapshot_unique_id() -%} + {%- do return('concat(cast(id as string), "-", first_name)') -%} +{%- endmacro %} + +{# + mostly copy+pasted from dbt_utils, but I removed some parameters and added + a query that calls get_snapshot_unique_id +#} +{% macro test_mutually_exclusive_ranges(model) %} + +with base as ( + select {{ get_snapshot_unique_id() }} as dbt_unique_id, + * + from {{ model }} +), +window_functions as ( + + select + dbt_valid_from as lower_bound, + coalesce(dbt_valid_to, '2099-1-1T00:00:01') as upper_bound, + + lead(dbt_valid_from) over ( + partition by dbt_unique_id + order by dbt_valid_from + ) as next_lower_bound, + + row_number() over ( + partition by dbt_unique_id + order by dbt_valid_from desc + ) = 1 as is_last_record + + from base + +), + +calc as ( + -- We want to return records where one of our assumptions fails, so we'll use + -- the `not` function with `and` statements so we can write our assumptions nore cleanly + select + *, + + -- For each record: lower_bound should be < upper_bound. + -- Coalesce it to return an error on the null case (implicit assumption + -- these columns are not_null) + coalesce( + lower_bound < upper_bound, + is_last_record + ) as lower_bound_less_than_upper_bound, + + -- For each record: upper_bound {{ allow_gaps_operator }} the next lower_bound. + -- Coalesce it to handle null cases for the last record. + coalesce( + upper_bound = next_lower_bound, + is_last_record, + false + ) as upper_bound_equal_to_next_lower_bound + + from window_functions + +), + +validation_errors as ( + + select + * + from calc + + where not( + -- THE FOLLOWING SHOULD BE TRUE -- + lower_bound_less_than_upper_bound + and upper_bound_equal_to_next_lower_bound + ) +) + +select count(*) from validation_errors +{% endmacro %} diff --git a/test/integration/004_simple_snapshot_test/models/schema.yml b/test/integration/004_simple_snapshot_test/models/schema.yml new file mode 100644 index 00000000000..187a9d13ab2 --- /dev/null +++ b/test/integration/004_simple_snapshot_test/models/schema.yml @@ -0,0 +1,5 @@ +version: 2 +models: + - name: snapshot_actual + tests: + - mutually_exclusive_ranges diff --git a/test/integration/004_simple_snapshot_test/test_simple_snapshot.py b/test/integration/004_simple_snapshot_test/test_simple_snapshot.py index 683e70afcef..b6a42bdc753 100644 --- a/test/integration/004_simple_snapshot_test/test_simple_snapshot.py +++ b/test/integration/004_simple_snapshot_test/test_simple_snapshot.py @@ -33,6 +33,7 @@ def assert_case_tables_equal(self, actual, expected): self.assertTablesEqual(actual, expected) def assert_expected(self): + self.run_dbt(['test']) self.assert_case_tables_equal('snapshot_actual', 'snapshot_expected') @@ -42,6 +43,7 @@ def project_config(self): return { "data-paths": ['data'], "snapshot-paths": ['test-snapshots-pg'], + 'macro-paths': ['macros'], } @use_profile('postgres') @@ -98,7 +100,7 @@ class TestCustomSnapshotFiles(BaseSimpleSnapshotTest): def project_config(self): return { 'data-paths': ['data'], - 'macro-paths': ['custom-snapshot-macros'], + 'macro-paths': ['custom-snapshot-macros', 'macros'], 'snapshot-paths': ['test-snapshots-pg-custom'], } @@ -128,7 +130,7 @@ class TestNamespacedCustomSnapshotFiles(BaseSimpleSnapshotTest): def project_config(self): return { 'data-paths': ['data'], - 'macro-paths': ['custom-snapshot-macros'], + 'macro-paths': ['custom-snapshot-macros', 'macros'], 'snapshot-paths': ['test-snapshots-pg-custom-namespaced'], } @@ -152,7 +154,7 @@ class TestInvalidNamespacedCustomSnapshotFiles(BaseSimpleSnapshotTest): def project_config(self): return { 'data-paths': ['data'], - 'macro-paths': ['custom-snapshot-macros'], + 'macro-paths': ['custom-snapshot-macros', 'macros'], 'snapshot-paths': ['test-snapshots-pg-custom-invalid'], } @@ -179,6 +181,7 @@ def project_config(self): "data-paths": ['data'], "snapshot-paths": ['test-snapshots-select', 'test-snapshots-pg'], + 'macro-paths': ['macros'], } @use_profile('postgres') @@ -236,7 +239,8 @@ def project_config(self): 'strategy': 'timestamp', 'updated_at': 'updated_at', } - } + }, + 'macro-paths': ['macros'], } @@ -253,16 +257,15 @@ def models(self): def project_config(self): return { "snapshot-paths": ['test-snapshots-bq'], + 'macro-paths': ['macros'], } def assert_expected(self): + self.run_dbt(['test']) self.assertTablesEqual('snapshot_actual', 'snapshot_expected') @use_profile('bigquery') def test__bigquery__simple_snapshot(self): - self.use_default_project() - self.use_profile('bigquery') - self.run_sql_file("seed_bq.sql") self.run_dbt(["snapshot"]) @@ -276,11 +279,8 @@ def test__bigquery__simple_snapshot(self): self.assert_expected() - @use_profile('bigquery') def test__bigquery__snapshot_with_new_field(self): - self.use_default_project() - self.use_profile('bigquery') self.run_sql_file("seed_bq.sql") @@ -341,6 +341,7 @@ def project_config(self): paths = ['test-snapshots-bq'] return { 'snapshot-paths': paths, + 'macro-paths': ['macros'], } def run_snapshot(self): @@ -395,6 +396,7 @@ def project_config(self): paths = ['test-snapshots-pg'] return { 'snapshot-paths': paths, + 'macro-paths': ['macros'], } def target_schema(self): @@ -427,6 +429,7 @@ def models(self): def project_config(self): return { "snapshot-paths": ['test-snapshots-invalid'], + 'macro-paths': ['macros'], } @use_profile('postgres') @@ -456,6 +459,7 @@ def project_config(self): return { "data-paths": ['data'], "snapshot-paths": ['test-check-col-snapshots'], + 'macro-paths': ['macros'], } @@ -472,7 +476,8 @@ def project_config(self): "strategy": "check", "check_cols": ["email"], } - } + }, + 'macro-paths': ['macros'], } @@ -493,6 +498,7 @@ def project_config(self): return { "data-paths": ['data'], "snapshot-paths": ['test-check-col-snapshots-bq'], + 'macro-paths': ['macros'], } @use_profile('bigquery') @@ -562,6 +568,7 @@ def run_snapshot(self): def project_config(self): return { "snapshot-paths": ['test-snapshots-longtext'], + 'macro-paths': ['macros'], } @use_profile('postgres') diff --git a/test/integration/base.py b/test/integration/base.py index 3ebf503b4cb..f8546cd7061 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -545,10 +545,6 @@ def run_dbt_and_check(self, args=None, strict=True, parser=False, profiles_dir=T final_args.append('--log-cache-events') logger.info("Invoking dbt with {}".format(final_args)) - if args is None: - args = ["run"] - - logger.info("Invoking dbt with {}".format(args)) return dbt.handle_and_check(final_args) def run_sql_file(self, path, kwargs=None):