Skip to content

Commit

Permalink
Merge pull request #1409 from bastienboutonnet/snowflake_create_or_re…
Browse files Browse the repository at this point in the history
…place

Snowflake create or replace
  • Loading branch information
drewbanin authored May 15, 2019
2 parents d5774b3 + 90abc2d commit 0a2e4f7
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 67 deletions.
1 change: 0 additions & 1 deletion core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
);
{% endmacro %}


{% macro create_view_as(relation, sql) -%}
{{ adapter_macro('create_view_as', relation, sql) }}
{%- endmacro %}
Expand Down
8 changes: 2 additions & 6 deletions plugins/snowflake/dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
{% macro snowflake__create_table_as(temporary, relation, sql) -%}
{% if temporary %}
use schema {{ adapter.quote_as_configured(schema, 'schema') }};
{% endif %}

{%- set transient = config.get('transient', default=true) -%}

create {% if temporary -%}
create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation.include(database=(not temporary), schema=(not temporary)) }}
{%- endif %} table {{ relation }}
as (
{{ sql }}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

{% materialization incremental, adapter='snowflake' -%}

{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set identifier = model['alias'] -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(database=database,
schema=schema,
identifier=identifier,
type='table') -%}

{%- set tmp_relation = api.Relation.create(database=database,
schema=schema,
identifier=identifier ~ "__dbt_tmp",
type='table') -%}

-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{# -- If the destination is a view, then we have no choice but to drop it #}
{% if old_relation is not none and old_relation.type == 'view' %}
{{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }}
{{ adapter.drop_relation(old_relation) }}
{% set old_relation = none %}
{% endif %}

-- build model
{% if full_refresh_mode or old_relation is none -%}

{%- call statement('main') -%}
{{ create_table_as(false, target_relation, sql) }}
{%- endcall -%}

{%- else -%}

{%- call statement() -%}
{{ create_table_as(true, tmp_relation, sql) }}
{%- endcall -%}

{{ adapter.expand_target_column_types(temp_table=tmp_relation.identifier,
to_relation=target_relation) }}
{% set incremental_sql %}
(
select * from {{ tmp_relation }}
)
{% endset %}

{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{%- call statement('main') -%}
{{ get_merge_sql(target_relation, incremental_sql, unique_key, dest_columns) }}
{% endcall %}

{%- endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
{% macro snowflake__get_merge_sql(target, source, unique_key, dest_columns) %}
{{ common_get_merge_sql(target, source, unique_key, dest_columns) }}
{% macro snowflake__get_merge_sql(target, source_sql, unique_key, dest_columns) %}
{%- set dest_cols_csv = dest_columns | map(attribute="name") | join(', ') -%}
{%- if unique_key is none -%}
{# workaround for Snowflake not being happy with "on false" merge.
when no unique key is provided we'll do a regular insert, other times we'll
use the preferred merge. #}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source_sql }}
);
{%- else -%}
{# call regular merge when a unique key is present. #}
{{ common_get_merge_sql(target, source_sql, unique_key, dest_columns) }}
{%- endif -%}

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,68 +1,32 @@
{% materialization table, adapter='snowflake' %}
{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = model['name'] + '__dbt_tmp' -%}
{%- set backup_identifier = model['name'] + '__dbt_backup' -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema,
database=database, type='table') -%}

/*
See ../view/view.sql for more information about this relation.
*/

-- drop the backup relation if it exists, then make a new one that uses the old relation's type
{%- set backup_relation = adapter.get_relation(database=database, schema=schema, identifier=backup_identifier) -%}
{% if backup_relation is not none -%}
{{ adapter.drop_relation(backup_relation) }}
{%- endif %}
{%- set backup_relation_type = 'table' if old_relation is none else old_relation.type -%}
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema,
database=database,
type=backup_relation_type) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}

-- drop the temp relations if they exists for some reason
{{ adapter.drop_relation(intermediate_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
{% if old_relation is not none and not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}

-- build model
--build model
{% call statement('main') -%}
{{ create_table_as(False, intermediate_relation, sql) }}
{{ create_table_as(false, target_relation, sql) }}
{%- endcall %}

-- cleanup
{% if old_relation is not none %}
{% if old_relation.type == 'view' %}
{#-- This is the primary difference between Snowflake and Redshift. Renaming this view
-- would cause an error if the view has become invalid due to upstream schema changes #}
{{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(old_relation) }}
{% else %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,3 @@ def test_postgres_delete__dbt_tmp_relation(self):

self.assertTableDoesNotExist('view__dbt_tmp')
self.assertTablesEqual("seed","view")


@use_profile('snowflake')
def test_snowflake_backup_different_type(self):
self.run_sql_file(
'test/integration/017_runtime_materialization_tests/create_backup_and_original.sql'
)
results = self.run_dbt(['run', '--model', 'materialized'])
self.assertEqual(len(results), 1)

self.assertTableDoesNotExist('materialized__dbt_tmp')
self.assertTableDoesNotExist('materialized__dbt_backup')
self.assertTablesEqual("seed", "materialized")

0 comments on commit 0a2e4f7

Please sign in to comment.