Skip to content

Commit

Permalink
Implement pre/post hooks and incremental tables on BigQuery. dbt-labs…
Browse files Browse the repository at this point in the history
  • Loading branch information
lewish committed May 31, 2018
1 parent c19a426 commit 6068a28
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ target/

# Sublime Text
*.sublime-*

#PyCharm/IntelliJ
.idea/
22 changes: 5 additions & 17 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,11 @@ def get_table_from_response(cls, resp):

@classmethod
def add_begin_query(cls, profile, name):
raise dbt.exceptions.NotImplementedException(
'`add_begin_query` is not implemented for this adapter!')
pass

@classmethod
def add_commit_query(cls, profile, name):
pass

@classmethod
def create_schema(cls, profile, project_cfg, schema, model_name=None):
Expand Down Expand Up @@ -465,21 +468,6 @@ def get_dataset(cls, profile, project_cfg, dataset_name, model_name=None):
dataset_ref = client.dataset(dataset_name)
return google.cloud.bigquery.Dataset(dataset_ref)

@classmethod
def warning_on_hooks(cls, hook_type):
msg = "{} is not supported in bigquery and will be ignored"
dbt.ui.printer.print_timestamped_line(msg.format(hook_type),
dbt.ui.printer.COLOR_FG_YELLOW)

@classmethod
def add_query(cls, profile, sql, model_name=None, auto_begin=True,
bindings=None, abridge_sql_log=False):
if model_name in ['on-run-start', 'on-run-end']:
cls.warning_on_hooks(model_name)
else:
raise dbt.exceptions.NotImplementedException(
'`add_query` is not implemented for this adapter!')

@classmethod
def is_cancelable(cls):
return False
Expand Down
5 changes: 5 additions & 0 deletions dbt/include/global_project/macros/adapters/bigquery.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

create or replace table {{ relation }}
{{ partition_by(raw_partition_by) }}
{% if temporary %}
OPTIONS(
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
)
{% endif %}
as (
{{ sql }}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
{% endfor %}
{% endmacro %}

{% macro run_bigquery_hooks(hooks) %}
{% for hook in hooks %}
{% call statement(auto_begin=inside_transaction) %}
{{ hook.get('sql') }}
{% endcall %}
{% endfor %}
{% endmacro %}

{% macro column_list(columns) %}
{%- for col in columns %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
{% macro dbt_bigquery__incremental_delete(target_relation, tmp_relation) -%}

{%- set unique_key = config.require('unique_key') -%}
{%- set identifier = model['name'] -%}

delete
from {{ target_relation }}
where ({{ unique_key }}) in (
select ({{ unique_key }})
from {{ tmp_relation.include(schema=True) }}
);

{%- endmacro %}

{% materialization incremental, default -%}
{%- set sql_where = config.require('sql_where') -%}
{%- set unique_key = config.get('unique_key') -%}

{%- set identifier = model['name'] -%}
{%- set tmp_identifier = model['name'] + '__dbt_incremental_tmp' -%}

{%- set existing_relations = adapter.list_relations(schema=schema) -%}
{%- set old_relation = adapter.get_relation(relations_list=existing_relations,
schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, type='table') -%}
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema, type='table') -%}

{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}

{%- set should_truncate = (non_destructive_mode and full_refresh_mode and exists_as_table) -%}
{%- set should_drop = (not should_truncate and (full_refresh_mode or exists_not_as_table)) -%}
{%- set force_create = (flags.FULL_REFRESH and not flags.NON_DESTRUCTIVE) -%}

-- setup
{% if old_relation is none -%}
-- noop
{%- elif should_truncate -%}
{{ adapter.truncate_relation(old_relation) }}
{%- elif should_drop -%}
{{ adapter.drop_relation(old_relation) }}
{%- set old_relation = none -%}
{%- endif %}

{{ run_bigquery_hooks(pre_hooks) }}

-- build model
{% if force_create or old_relation is none -%}
{%- call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall -%}
{%- else -%}
{%- call statement() -%}

{% set tmp_table_sql -%}
with dbt_incr_sbq as (
{{ sql }}
)
select * from dbt_incr_sbq
where ({{ sql_where }})
or ({{ sql_where }}) is null
{%- endset %}

{{ dbt.create_table_as(True, tmp_relation, tmp_table_sql) }}

{%- endcall -%}

{{ adapter.expand_target_column_types(temp_table=tmp_identifier,
to_schema=schema,
to_table=identifier) }}


{% if unique_key is not none -%}
{%- call statement() -%}
{{ dbt_bigquery__incremental_delete(target_relation, tmp_relation) }}
{%- endcall -%}

{%- endif %}
{%- call statement('main') -%}
{% set dest_columns = adapter.get_columns_in_table(schema, identifier) %}
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation.include(schema=True) }}
);
{% endcall %}

-- clean up temp table in bigquery
{{ drop_relation_if_exists(tmp_relation) }}

{%- endif %}

{{ run_bigquery_hooks(post_hooks) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
{% endif %}
{% endif %}

{{ run_bigquery_hooks(pre_hooks) }}

{#
Since dbt uses WRITE_TRUNCATE mode for tables, we only need to drop this thing
if it is not a table. If it _is_ already a table, then we can overwrite it without downtime
Expand All @@ -71,5 +73,6 @@
{% endcall -%}
{% endif %}

{{ run_bigquery_hooks(pre_hooks) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
identifier=identifier, schema=schema,
type='view') -%}

{{ run_bigquery_hooks(pre_hooks) }}

-- drop if exists
{%- if old_relation is not none -%}
{%- if old_relation.is_table and not flags.FULL_REFRESH -%}
Expand All @@ -36,4 +38,6 @@
{%- endcall %}
{%- endif %}

{{ run_bigquery_hooks(post_hooks) }}

{%- endmaterialization %}

0 comments on commit 6068a28

Please sign in to comment.