Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the check archive strategy (#706) #1394

Merged
merged 4 commits into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/dbt/api/object.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import copy
from collections import Mapping
from jsonschema import Draft4Validator
from jsonschema import Draft7Validator

from dbt.exceptions import JSONValidationException
from dbt.utils import deep_merge
Expand Down Expand Up @@ -79,7 +79,7 @@ def validate(self):
of this instance. If any attributes are missing or
invalid, raise a ValidationException.
"""
validator = Draft4Validator(self.SCHEMA)
validator = Draft7Validator(self.SCHEMA)

errors = set() # make errors a set to avoid duplicates

Expand Down
41 changes: 38 additions & 3 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,44 @@ def config(self, value):
'unique_key': {
'type': 'string',
},
'strategy': {
'enum': ['timestamp'],
},
'anyOf': [
{
'properties': {
'strategy': {
'enum': ['timestamp'],
},
'updated_at': {
'type': 'string',
'description': (
'The column name with the timestamp to compare'
),
},
},
'required': ['updated_at'],
},
{
'properties': {
'strategy': {
'enum': ['check'],
},
'check_cols': {
'oneOf': [
{
'type': 'array',
'items': {'type': 'string'},
'description': 'The columns to check',
'minLength': 1,
},
{
'enum': ['all'],
'description': 'Check all columns',
},
],
},
},
'required': ['check_cols'],
}
]
},
'required': [
'target_database', 'target_schema', 'unique_key', 'strategy',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
Create SCD Hash SQL fields cross-db
#}

{% macro archive_scd_hash() %}
{{ adapter_macro('archive_scd_hash') }}
{% macro archive_hash_arguments(args) %}
{{ adapter_macro('archive_hash_arguments', args) }}
{% endmacro %}

{% macro default__archive_scd_hash() %}
md5("dbt_pk" || '|' || "dbt_updated_at")
{% macro default__archive_hash_arguments(args) %}
md5({% for arg in args %}{{ arg }}{% if not loop.last %} || '|' || {% endif %}{% endfor %})
{% endmacro %}

{% macro create_temporary_table(sql, relation) %}
Expand Down Expand Up @@ -48,46 +48,74 @@

{% macro default__archive_update(target_relation, tmp_relation) %}
update {{ target_relation }}
set {{ adapter.quote('dbt_valid_to') }} = tmp.{{ adapter.quote('dbt_valid_to') }}
set dbt_valid_to = tmp.dbt_valid_to
from {{ tmp_relation }} as tmp
where tmp.{{ adapter.quote('dbt_scd_id') }} = {{ target_relation }}.{{ adapter.quote('dbt_scd_id') }}
and {{ adapter.quote('change_type') }} = 'update';
where tmp.dbt_scd_id = {{ target_relation }}.dbt_scd_id
and change_type = 'update';
{% endmacro %}


{#
Cross-db compatible archival implementation
#}
{% macro archive_select(source_sql, target_relation, source_columns, unique_key, updated_at) %}
{% macro archive_get_time() -%}
{{ adapter_macro('archive_get_time') }}
{%- endmacro %}

{% set timestamp_column = api.Column.create('_', 'timestamp') %}
{% macro default__archive_get_time() -%}
{{ current_timestamp() }}
{%- endmacro %}

{% macro snowflake__archive_get_time() -%}
to_timestamp_ntz({{ current_timestamp() }})
{%- endmacro %}


{% macro archive_select_generic(source_sql, target_relation, transforms, scd_hash) -%}
with source as (
{{ source_sql }}
),
{{ transforms }}
merged as (

select *, 'update' as change_type from updates
union all
select *, 'insert' as change_type from insertions

)

select *,
{{ scd_hash }} as dbt_scd_id
from merged

{%- endmacro %}

{#
Cross-db compatible archival implementation
#}
{% macro archive_select_timestamp(source_sql, target_relation, source_columns, unique_key, updated_at) -%}
{% set timestamp_column = api.Column.create('_', 'timestamp') %}
{% set transforms -%}
current_data as (

select
{% for col in source_columns %}
{{ adapter.quote(col.name) }} {% if not loop.last %},{% endif %}
{{ col.name }} {% if not loop.last %},{% endif %}
{% endfor %},
{{ updated_at }} as {{ adapter.quote('dbt_updated_at') }},
{{ unique_key }} as {{ adapter.quote('dbt_pk') }},
{{ updated_at }} as {{ adapter.quote('dbt_valid_from') }},
{{ timestamp_column.literal('null') }} as {{ adapter.quote('tmp_valid_to') }}
{{ updated_at }} as dbt_updated_at,
{{ unique_key }} as dbt_pk,
{{ updated_at }} as dbt_valid_from,
{{ timestamp_column.literal('null') }} as tmp_valid_to
from source
),

archived_data as (

select
{% for col in source_columns %}
{{ adapter.quote(col.name) }},
{{ col.name }},
{% endfor %}
{{ updated_at }} as {{ adapter.quote('dbt_updated_at') }},
{{ unique_key }} as {{ adapter.quote('dbt_pk') }},
{{ adapter.quote('dbt_valid_from') }},
{{ adapter.quote('dbt_valid_to') }} as {{ adapter.quote('tmp_valid_to') }}
{{ updated_at }} as dbt_updated_at,
{{ unique_key }} as dbt_pk,
dbt_valid_from,
dbt_valid_to as tmp_valid_to
from {{ target_relation }}

),
Expand All @@ -96,44 +124,110 @@

select
current_data.*,
{{ timestamp_column.literal('null') }} as {{ adapter.quote('dbt_valid_to') }}
{{ timestamp_column.literal('null') }} as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }}
where archived_data.{{ adapter.quote('dbt_pk') }} is null or (
archived_data.{{ adapter.quote('dbt_pk') }} is not null and
current_data.{{ adapter.quote('dbt_updated_at') }} > archived_data.{{ adapter.quote('dbt_updated_at') }} and
archived_data.{{ adapter.quote('tmp_valid_to') }} is null
on archived_data.dbt_pk = current_data.dbt_pk
where
archived_data.dbt_pk is null
or (
archived_data.dbt_pk is not null
and archived_data.dbt_updated_at < current_data.dbt_updated_at
and archived_data.tmp_valid_to is null
)
),

updates as (

select
archived_data.*,
current_data.{{ adapter.quote('dbt_updated_at') }} as {{ adapter.quote('dbt_valid_to') }}
current_data.dbt_updated_at as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.{{ adapter.quote('dbt_pk') }} = current_data.{{ adapter.quote('dbt_pk') }}
where archived_data.{{ adapter.quote('dbt_pk') }} is not null
and archived_data.{{ adapter.quote('dbt_updated_at') }} < current_data.{{ adapter.quote('dbt_updated_at') }}
and archived_data.{{ adapter.quote('tmp_valid_to') }} is null
on archived_data.dbt_pk = current_data.dbt_pk
where archived_data.dbt_pk is not null
and archived_data.dbt_updated_at < current_data.dbt_updated_at
and archived_data.tmp_valid_to is null
),
{%- endset %}
{%- set scd_hash = archive_hash_arguments(['dbt_pk', 'dbt_updated_at']) -%}
{{ archive_select_generic(source_sql, target_relation, transforms, scd_hash) }}
{%- endmacro %}


{% macro archive_select_check_cols(source_sql, target_relation, source_columns, unique_key, check_cols) -%}
{%- set timestamp_column = api.Column.create('_', 'timestamp') -%}

{# if we recognize the primary key, it's the newest record, and anything we care about has changed, it's an update candidate #}
{%- set update_candidate -%}
archived_data.dbt_pk is not null
and (
{%- for col in check_cols %}
current_data.{{ col }} <> archived_data.{{ col }}
{%- if not loop.last %} or {% endif %}
{% endfor -%}
)
and archived_data.tmp_valid_to is null
{%- endset %}

merged as (
{% set transforms -%}
current_data as (

select *, 'update' as {{ adapter.quote('change_type') }} from updates
union all
select *, 'insert' as {{ adapter.quote('change_type') }} from insertions
select
{% for col in source_columns %}
{{ col.name }} {% if not loop.last %},{% endif %}
{% endfor %},
{{ archive_get_time() }} as dbt_updated_at,
{{ unique_key }} as dbt_pk,
{{ archive_get_time() }} as dbt_valid_from,
{{ timestamp_column.literal('null') }} as tmp_valid_to
from source
),

)
archived_data as (

select *,
{{ archive_scd_hash() }} as {{ adapter.quote('dbt_scd_id') }}
from merged
select
{% for col in source_columns %}
{{ col.name }},
{% endfor %}
dbt_updated_at,
{{ unique_key }} as dbt_pk,
dbt_valid_from,
dbt_valid_to as tmp_valid_to
from {{ target_relation }}

{% endmacro %}
),

insertions as (

select
current_data.*,
{{ timestamp_column.literal('null') }} as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.dbt_pk = current_data.dbt_pk
where
archived_data.dbt_pk is null
or ( {{ update_candidate }} )
),

updates as (

select
archived_data.*,
{{ archive_get_time() }} as dbt_valid_to
from current_data
left outer join archived_data
on archived_data.dbt_pk = current_data.dbt_pk
where {{ update_candidate }}
),
{%- endset %}

{%- set hash_components = ['dbt_pk'] %}
{%- do hash_components.extend(check_cols) -%}
{%- set scd_hash = archive_hash_arguments(hash_components) -%}
{{ archive_select_generic(source_sql, target_relation, transforms, scd_hash) }}
{%- endmacro %}

{# this is gross #}
{% macro create_empty_table_as(sql) %}
Expand All @@ -157,6 +251,7 @@
{%- set target_database = config.get('target_database') -%}
{%- set target_schema = config.get('target_schema') -%}
{%- set target_table = model.get('alias', model.get('name')) -%}
{%- set strategy = config.get('strategy') -%}

{{ create_schema(target_database, target_schema) }}

Expand All @@ -179,15 +274,13 @@
{%- set source_columns = adapter.get_columns_in_relation(source_info_model) -%}

{%- set unique_key = config.get('unique_key') -%}
{%- set updated_at = config.get('updated_at') -%}
{%- set dest_columns = source_columns + [
api.Column.create('dbt_valid_from', 'timestamp'),
api.Column.create('dbt_valid_to', 'timestamp'),
api.Column.create('dbt_scd_id', 'string'),
api.Column.create('dbt_updated_at', 'timestamp'),
] -%}


{% call statement() %}
{{ create_archive_table(target_relation, dest_columns) }}
{% endcall %}
Expand All @@ -204,7 +297,19 @@
{% set tmp_table_sql -%}

with dbt_archive_sbq as (
{{ archive_select(model['injected_sql'], target_relation, source_columns, unique_key, updated_at) }}

{% if strategy == 'timestamp' %}
{%- set updated_at = config.get('updated_at') -%}
{{ archive_select_timestamp(model['injected_sql'], target_relation, source_columns, unique_key, updated_at) }}
{% elif strategy == 'check' %}
{%- set check_cols = config.get('check_cols') -%}
{% if check_cols == 'all' %}
{% set check_cols = source_columns | map(attribute='name') | list %}
{% endif %}
{{ archive_select_check_cols(model['injected_sql'], target_relation, source_columns, unique_key, check_cols)}}
{% else %}
{{ exceptions.raise_compiler_error('Got invalid strategy "{}"'.format(strategy)) }}
{% endif %}
)
select * from dbt_archive_sbq

Expand All @@ -226,7 +331,7 @@
{{ column_list(dest_columns) }}
)
select {{ column_list(dest_columns) }} from {{ tmp_relation }}
where {{ adapter.quote('change_type') }} = 'insert';
where change_type = 'insert';
{% endcall %}

{{ adapter.commit() }}
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/parser/archives.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def parse_archives_from_project(cls, config):

source_schema = archive_config['source_schema']
cfg['target_schema'] = archive_config.get('target_schema')
# project-defined archives always use the 'timestamp' strategy.
cfg['strategy'] = 'timestamp'

fake_path = [cfg['target_database'], cfg['target_schema'],
cfg['target_table']]
Expand Down
2 changes: 1 addition & 1 deletion core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def read(fname):
'requests>=2.18.0,<3',
'colorama==0.3.9',
'agate>=1.6,<2',
'jsonschema==2.6.0',
'jsonschema>=3.0.1,<4',
'json-rpc>=1.12,<2',
'werkzeug>=0.14.1,<0.15',
]
Expand Down
Loading