From 4eb75ec5b621eb9ddaf289552d2c30c675553d2f Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 12 Feb 2018 16:09:59 -0500 Subject: [PATCH] Bq date partitioning (#641) * first cut of date partitioning * cleanup, implement partitioning in materialization * update requirements.txt * wip for date partitioning with range * log data * arg handling, logging, cleanup + view compat for new bq version * add partitioning tests, compatibility with bq 0.29.0 release * pep8 * fix for strange error in appveyor * debug appveyor... * dumb * debugging weird bq adapter use in pg test * do not use read_project in bq tests * cleanup connections, initialize bq tests * remove debug lines * fix integration tests (actually) * warning for view creation which clobbers tables * add query timeout example for bq * no need to release connections in the adapter * partition_date interface change (wip) * list of dates for bq dp tables * tiny fixes for crufty dbt_project.yml files * rm debug line * fix tests --- dbt/adapters/bigquery.py | 156 ++++++++++-------- dbt/context/common.py | 20 ++- dbt/exceptions.py | 8 +- .../global_project/macros/etc/bigquery.sql | 4 + .../global_project/macros/etc/datetime.sql | 56 +++++++ .../macros/materializations/bigquery.sql | 67 +++++++- dbt/project.py | 6 + requirements.txt | 2 +- sample.profiles.yml | 1 + setup.py | 2 +- .../dp-models/confirmation.sql | 20 +++ .../dp-models/events_20180101.sql | 4 + .../dp-models/events_20180102.sql | 4 + .../dp-models/events_20180103.sql | 4 + .../dp-models/partitioned.sql | 16 ++ .../dp-models/partitioned_simple.sql | 14 ++ .../022_bigquery_test/dp-models/schema.yml | 18 ++ .../test_bigquery_date_partitioning.py | 33 ++++ .../test_simple_bigquery_view.py | 18 +- .../023_exit_codes_test/test_exit_codes.py | 3 - test/integration/base.py | 1 + 21 files changed, 363 insertions(+), 94 deletions(-) create mode 100644 dbt/include/global_project/macros/etc/bigquery.sql create mode 100644 dbt/include/global_project/macros/etc/datetime.sql create mode 100644 test/integration/022_bigquery_test/dp-models/confirmation.sql create mode 100644 test/integration/022_bigquery_test/dp-models/events_20180101.sql create mode 100644 test/integration/022_bigquery_test/dp-models/events_20180102.sql create mode 100644 test/integration/022_bigquery_test/dp-models/events_20180103.sql create mode 100644 test/integration/022_bigquery_test/dp-models/partitioned.sql create mode 100644 test/integration/022_bigquery_test/dp-models/partitioned_simple.sql create mode 100644 test/integration/022_bigquery_test/dp-models/schema.yml create mode 100644 test/integration/022_bigquery_test/test_bigquery_date_partitioning.py diff --git a/dbt/adapters/bigquery.py b/dbt/adapters/bigquery.py index 4e3664986ea..9d2e6eb7b97 100644 --- a/dbt/adapters/bigquery.py +++ b/dbt/adapters/bigquery.py @@ -17,7 +17,6 @@ import google.cloud.bigquery import time -import uuid class BigQueryAdapter(PostgresAdapter): @@ -27,7 +26,8 @@ class BigQueryAdapter(PostgresAdapter): "execute_model", "drop", "execute", - "quote_schema_and_table" + "quote_schema_and_table", + "make_date_partitioned_table" ] SCOPE = ('https://www.googleapis.com/auth/bigquery', @@ -150,27 +150,33 @@ def query_for_existing(cls, profile, schemas, model_name=None): if not isinstance(schemas, (list, tuple)): schemas = [schemas] + conn = cls.get_connection(profile, model_name) + client = conn.get('handle') + all_tables = [] for schema in schemas: dataset = cls.get_dataset(profile, schema, model_name) - all_tables.extend(dataset.list_tables()) + all_tables.extend(client.list_tables(dataset)) - relation_type_lookup = { + relation_types = { 'TABLE': 'table', 'VIEW': 'view', 'EXTERNAL': 'external' } - existing = [(table.name, relation_type_lookup.get(table.table_type)) + existing = [(table.table_id, relation_types.get(table.table_type)) for table in all_tables] return dict(existing) @classmethod def drop(cls, profile, schema, relation, relation_type, model_name=None): + conn = cls.get_connection(profile, model_name) + client = conn.get('handle') + dataset = cls.get_dataset(profile, schema, model_name) relation_object = dataset.table(relation) - relation_object.delete() + client.delete_table(relation_object) @classmethod def rename(cls, profile, schema, from_name, to_name, model_name=None): @@ -183,19 +189,22 @@ def get_timeout(cls, conn): return credentials.get('timeout_seconds', cls.QUERY_TIMEOUT) @classmethod - def materialize_as_view(cls, profile, dataset, model_name, model_sql): - view = dataset.table(model_name) + def materialize_as_view(cls, profile, dataset, model): + model_name = model.get('name') + model_sql = model.get('injected_sql') + + conn = cls.get_connection(profile, model_name) + client = conn.get('handle') + + view_ref = dataset.table(model_name) + view = google.cloud.bigquery.Table(view_ref) view.view_query = model_sql view.view_use_legacy_sql = False logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql)) with cls.exception_handler(profile, model_sql, model_name, model_name): - view.create() - - if view.created is None: - msg = "Error creating view {}".format(model_name) - raise dbt.exceptions.RuntimeException(msg) + client.create_table(view) return "CREATE VIEW" @@ -215,86 +224,94 @@ def poll_until_job_completes(cls, job, timeout): raise job.exception() @classmethod - def materialize_as_table(cls, profile, dataset, model_name, model_sql): + def make_date_partitioned_table(cls, profile, dataset_name, identifier, + model_name=None): conn = cls.get_connection(profile, model_name) client = conn.get('handle') - table = dataset.table(model_name) - job_id = 'dbt-create-{}-{}'.format(model_name, uuid.uuid4()) - job = client.run_async_query(job_id, model_sql) - job.use_legacy_sql = False - job.destination = table - job.write_disposition = 'WRITE_TRUNCATE' - job.begin() + dataset = cls.get_dataset(profile, dataset_name, identifier) + table_ref = dataset.table(identifier) + table = google.cloud.bigquery.Table(table_ref) + table.partitioning_type = 'DAY' - cls.release_connection(profile, model_name) + return client.create_table(table) - logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql)) + @classmethod + def materialize_as_table(cls, profile, dataset, model, model_sql, + decorator=None): + model_name = model.get('name') + + conn = cls.get_connection(profile, model_name) + client = conn.get('handle') + + if decorator is None: + table_name = model_name + else: + table_name = "{}${}".format(model_name, decorator) + + table_ref = dataset.table(table_name) + job_config = google.cloud.bigquery.QueryJobConfig() + job_config.destination = table_ref + job_config.write_disposition = 'WRITE_TRUNCATE' + + logger.debug("Model SQL ({}):\n{}".format(table_name, model_sql)) + query_job = client.query(model_sql, job_config=job_config) + # this waits for the job to complete with cls.exception_handler(profile, model_sql, model_name, model_name): - cls.poll_until_job_completes(job, cls.get_timeout(conn)) + query_job.result(timeout=cls.get_timeout(conn)) return "CREATE TABLE" @classmethod - def execute_model(cls, profile, model, materialization, model_name=None): + def execute_model(cls, profile, model, materialization, sql_override=None, + decorator=None, model_name=None): + + if sql_override is None: + sql_override = model.get('injected_sql') if flags.STRICT_MODE: connection = cls.get_connection(profile, model.get('name')) validate_connection(connection) - cls.release_connection(profile, model.get('name')) model_name = model.get('name') model_schema = model.get('schema') - model_sql = model.get('injected_sql') dataset = cls.get_dataset(profile, model_schema, model_name) if materialization == 'view': - res = cls.materialize_as_view(profile, dataset, model_name, - model_sql) + res = cls.materialize_as_view(profile, dataset, model) elif materialization == 'table': - res = cls.materialize_as_table(profile, dataset, model_name, - model_sql) + res = cls.materialize_as_table(profile, dataset, model, + sql_override, decorator) else: msg = "Invalid relation type: '{}'".format(materialization) raise dbt.exceptions.RuntimeException(msg, model) return res - @classmethod - def fetch_query_results(cls, query): - all_rows = [] - - rows = query.rows - token = query.page_token - - while True: - all_rows.extend(rows) - if token is None: - break - rows, total_count, token = query.fetch_data(page_token=token) - return all_rows - @classmethod def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs): conn = cls.get_connection(profile, model_name) client = conn.get('handle') - query = client.run_sync_query(sql) - query.timeout_ms = cls.get_timeout(conn) * 1000 - query.use_legacy_sql = False - debug_message = "Fetching data for query {}:\n{}" logger.debug(debug_message.format(model_name, sql)) - query.run() + job_config = google.cloud.bigquery.QueryJobConfig() + job_config.use_legacy_sql = False + query_job = client.query(sql, job_config) + + # this blocks until the query has completed + with cls.exception_handler(profile, 'create dataset', model_name): + iterator = query_job.result() res = [] if fetch: - res = cls.fetch_query_results(query) + res = list(iterator) - status = 'ERROR' if query.errors else 'OK' + # If we get here, the query succeeded + status = 'OK' return status, res @classmethod @@ -310,15 +327,20 @@ def add_begin_query(cls, profile, name): def create_schema(cls, profile, schema, model_name=None): logger.debug('Creating schema "%s".', schema) - dataset = cls.get_dataset(profile, schema, model_name) + conn = cls.get_connection(profile, model_name) + client = conn.get('handle') + dataset = cls.get_dataset(profile, schema, model_name) with cls.exception_handler(profile, 'create dataset', model_name): - dataset.create() + client.create_dataset(dataset) @classmethod - def drop_tables_in_schema(cls, dataset): - for table in dataset.list_tables(): - table.delete() + def drop_tables_in_schema(cls, profile, dataset): + conn = cls.get_connection(profile) + client = conn.get('handle') + + for table in client.list_tables(dataset): + client.delete_table(table.reference) @classmethod def drop_schema(cls, profile, schema, model_name=None): @@ -327,21 +349,22 @@ def drop_schema(cls, profile, schema, model_name=None): if not cls.check_schema_exists(profile, schema, model_name): return - dataset = cls.get_dataset(profile, schema, model_name) + conn = cls.get_connection(profile) + client = conn.get('handle') + dataset = cls.get_dataset(profile, schema, model_name) with cls.exception_handler(profile, 'drop dataset', model_name): - cls.drop_tables_in_schema(dataset) - dataset.delete() + cls.drop_tables_in_schema(profile, dataset) + client.delete_dataset(dataset) @classmethod def get_existing_schemas(cls, profile, model_name=None): conn = cls.get_connection(profile, model_name) - client = conn.get('handle') with cls.exception_handler(profile, 'list dataset', model_name): all_datasets = client.list_datasets() - return [ds.name for ds in all_datasets] + return [ds.dataset_id for ds in all_datasets] @classmethod def get_columns_in_table(cls, profile, schema_name, table_name, @@ -352,20 +375,19 @@ def get_columns_in_table(cls, profile, schema_name, table_name, @classmethod def check_schema_exists(cls, profile, schema, model_name=None): conn = cls.get_connection(profile, model_name) - client = conn.get('handle') with cls.exception_handler(profile, 'get dataset', model_name): all_datasets = client.list_datasets() - return any([ds.name == schema for ds in all_datasets]) + return any([ds.dataset_id == schema for ds in all_datasets]) @classmethod def get_dataset(cls, profile, dataset_name, model_name=None): conn = cls.get_connection(profile, model_name) - client = conn.get('handle') - dataset = client.dataset(dataset_name) - return dataset + + dataset_ref = client.dataset(dataset_name) + return google.cloud.bigquery.Dataset(dataset_ref) @classmethod def warning_on_hooks(cls, hook_type): diff --git a/dbt/context/common.py b/dbt/context/common.py index 072cb62a7e4..64d62c04b45 100644 --- a/dbt/context/common.py +++ b/dbt/context/common.py @@ -1,6 +1,5 @@ import json import os -import pytz import voluptuous from dbt.adapters.factory import get_adapter @@ -17,6 +16,12 @@ from dbt.logger import GLOBAL_LOGGER as logger # noqa +# These modules are added to the context. Consider alternative +# approaches which will extend well to potentially many modules +import pytz +import datetime + + class DatabaseWrapper(object): """ Wrapper for runtime database interaction. Should only call adapter @@ -248,6 +253,15 @@ def tojson(value, default=None): return default +def try_or_compiler_error(model): + def impl(message_if_exception, func, *args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + dbt.exceptions.raise_compiler_error(message_if_exception, model) + return impl + + def _return(value): raise dbt.exceptions.MacroReturn(value) @@ -291,6 +305,7 @@ def generate(model, project, flat_graph, provider=None): "model": model, "modules": { "pytz": pytz, + "datetime": datetime }, "post_hooks": post_hooks, "pre_hooks": pre_hooks, @@ -302,7 +317,8 @@ def generate(model, project, flat_graph, provider=None): "fromjson": fromjson, "tojson": tojson, "target": target, - "this": dbt.utils.Relation(profile, adapter, model, use_temp=True) + "this": dbt.utils.Relation(profile, adapter, model, use_temp=True), + "try_or_compiler_error": try_or_compiler_error(model) }) context = _add_tracking(context) diff --git a/dbt/exceptions.py b/dbt/exceptions.py index 56c65928b91..a827967f237 100644 --- a/dbt/exceptions.py +++ b/dbt/exceptions.py @@ -65,9 +65,13 @@ def __str__(self, prefix="! "): if self.node is not None: node_string = " in {}".format(self.node_to_string(self.node)) + if hasattr(self.msg, 'split'): + split_msg = self.msg.split("\n") + else: + split_msg = basestring(self.msg).split("\n") + lines = ["{}{}".format(self.type + ' Error', - node_string)] + \ - self.msg.split("\n") + node_string)] + split_msg lines += self.process_stack() diff --git a/dbt/include/global_project/macros/etc/bigquery.sql b/dbt/include/global_project/macros/etc/bigquery.sql new file mode 100644 index 00000000000..7bd0cba9086 --- /dev/null +++ b/dbt/include/global_project/macros/etc/bigquery.sql @@ -0,0 +1,4 @@ + +{% macro date_sharded_table(base_name) %} + {{ return(base_name ~ "[DBT__PARTITION_DATE]") }} +{% endmacro %} diff --git a/dbt/include/global_project/macros/etc/datetime.sql b/dbt/include/global_project/macros/etc/datetime.sql new file mode 100644 index 00000000000..28a7654110b --- /dev/null +++ b/dbt/include/global_project/macros/etc/datetime.sql @@ -0,0 +1,56 @@ + +{% macro convert_datetime(date_str, date_fmt) %} + + {% set error_msg -%} + The provided partition date '{{ date_str }}' does not match the expected format '{{ date_fmt }}' + {%- endset %} + + {% set res = try_or_compiler_error(error_msg, modules.datetime.datetime.strptime, date_str.strip(), date_fmt) %} + {{ return(res) }} + +{% endmacro %} + +{% macro dates_in_range(start_date_str, end_date_str=none, in_fmt="%Y%m%d", out_fmt="%Y%m%d") %} + {% set end_date_str = start_date_str if end_date_str is none else end_date_str %} + + {% set start_date = convert_datetime(start_date_str, in_fmt) %} + {% set end_date = convert_datetime(end_date_str, in_fmt) %} + + {% set day_count = (end_date - start_date).days %} + {% if day_count < 0 %} + {% set msg -%} + Partiton start date is after the end date ({{ start_date }}, {{ end_date }}) + {%- endset %} + + {{ exceptions.raise_compiler_error(msg, model) }} + {% endif %} + + {% set date_list = [] %} + {% for i in range(0, day_count + 1) %} + {% set the_date = (modules.datetime.timedelta(days=i) + start_date) %} + {% if not out_fmt %} + {% set _ = date_list.append(the_date) %} + {% else %} + {% set _ = date_list.append(the_date.strftime(out_fmt)) %} + {% endif %} + {% endfor %} + + {{ return(date_list) }} +{% endmacro %} + +{% macro partition_range(raw_partition_date, date_fmt='%Y%m%d') %} + {% set partition_range = (raw_partition_date | string).split(",") %} + + {% if (partition_range | length) == 1 %} + {% set start_date = partition_range[0] %} + {% set end_date = none %} + {% elif (partition_range | length) == 2 %} + {% set start_date = partition_range[0] %} + {% set end_date = partition_range[1] %} + {% else %} + {{ dbt.exceptions.raise_compiler_error("Invalid partition time. Expected format: {Start Date}[,{End Date}]. Got: " ~ raw_partition_date) }} + {% endif %} + + {{ return(dates_in_range(start_date, end_date, in_fmt=date_fmt)) }} +{% endmacro %} + diff --git a/dbt/include/global_project/macros/materializations/bigquery.sql b/dbt/include/global_project/macros/materializations/bigquery.sql index e61373a4634..f616412a593 100644 --- a/dbt/include/global_project/macros/materializations/bigquery.sql +++ b/dbt/include/global_project/macros/materializations/bigquery.sql @@ -1,12 +1,20 @@ {% materialization view, adapter='bigquery' -%} {%- set identifier = model['name'] -%} - {%- set tmp_identifier = identifier + '__dbt_tmp' -%} {%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%} {%- set existing = adapter.query_for_existing(schema) -%} {%- set existing_type = existing.get(identifier) -%} {%- if existing_type is not none -%} + {%- if existing_type == 'table' and not flags.FULL_REFRESH -%} + {# this is only intended for date partitioned tables, but we cant see that field in the context #} + {% set error_message -%} + Trying to create model '{{ identifier }}' as a view, but it already exists as a table. + Either drop the '{{ schema }}.{{ identifier }}' table manually, or use --full-refresh + {%- endset %} + {{ exceptions.raise_compiler_error(error_message) }} + {%- endif -%} + {{ adapter.drop(schema, identifier, existing_type) }} {%- endif -%} @@ -16,20 +24,69 @@ {%- endmaterialization %} + +{% macro make_date_partitioned_table(model, dates, should_create, verbose=False) %} + + {% if should_create %} + {{ adapter.make_date_partitioned_table(model.schema, model.name) }} + {% endif %} + + {% for date in dates %} + {% set date = (date | string) %} + {% if verbose %} + {% set table_start_time = modules.datetime.datetime.now().strftime("%H:%M:%S") %} + {{ log(table_start_time ~ ' | -> Running for day ' ~ date, info=True) }} + {% endif %} + + {% set fixed_sql = model['injected_sql'] | replace('[DBT__PARTITION_DATE]', date) %} + {% set _ = adapter.execute_model(model, 'table', fixed_sql, decorator=date) %} + {% endfor %} + + {% set num_days = dates | length %} + {% if num_days == 1 %} + {% set result_str = 'CREATED 1 PARTITION' %} + {% else %} + {% set result_str = 'CREATED ' ~ num_days ~ ' PARTITIONS' %} + {% endif %} + + {{ return(result_str) }} + +{% endmacro %} + {% materialization table, adapter='bigquery' -%} {%- set identifier = model['name'] -%} - {%- set tmp_identifier = identifier + '__dbt_tmp' -%} {%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%} {%- set existing = adapter.query_for_existing(schema) -%} {%- set existing_type = existing.get(identifier) -%} + {%- set verbose = config.get('verbose', False) -%} + {%- set partitions = config.get('partitions') -%} - {%- if existing_type is not none -%} - {{ adapter.drop(schema, identifier, existing_type) }} + {% if partitions %} + {% if partitions is number or partitions is string %} + {% set partitions = [(partitions | string)] %} + {% endif %} + + {% if partitions is not iterable %} + {{ exceptions.raise_compiler_error("Provided `partitions` configuration is not a list. Got: " ~ partitions, model) }} + {% endif %} + {% endif %} + + {# + Since dbt uses WRITE_TRUNCATE mode for tables, we only need to drop this thing + if it is not a table. If it _is_ already a table, then we can overwrite it without downtime + #} + {%- if existing_type is not none and existing_type != 'table' -%} + {{ adapter.drop(schema, identifier, existing_type) }} {%- endif -%} -- build model - {% set result = adapter.execute_model(model, 'table') %} + {% if partitions %} + {% set result = make_date_partitioned_table(model, partitions, (existing_type != 'table'), verbose) %} + {% else %} + {% set result = adapter.execute_model(model, 'table') %} + {% endif %} + {{ store_result('main', status=result) }} {% endmaterialization %} diff --git a/dbt/project.py b/dbt/project.py index bdb1f811076..2fb7b0bb156 100644 --- a/dbt/project.py +++ b/dbt/project.py @@ -87,6 +87,12 @@ def __init__(self, cfg, profiles, profiles_dir, profile_to_load=None, "Could not find profile named '{}'" .format(self.profile_to_load), self) + if self.cfg.get('models') is None: + self.cfg['models'] = {} + + if self.cfg['models'].get('vars') is None: + self.cfg['models']['vars'] = {} + global_vars = dbt.utils.parse_cli_vars(getattr(args, 'vars', '{}')) if 'vars' not in self.cfg['models']: self.cfg['models']['vars'] = {} diff --git a/requirements.txt b/requirements.txt index 9c2735f1523..670aa77121b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,5 +11,5 @@ celery==3.1.23 voluptuous==0.10.5 snowflake-connector-python>=1.4.9 colorama==0.3.9 -google-cloud-bigquery==0.26.0 +google-cloud-bigquery==0.29.0 agate>=1.6,<2 diff --git a/sample.profiles.yml b/sample.profiles.yml index 7106e6814da..cd898ed5a59 100644 --- a/sample.profiles.yml +++ b/sample.profiles.yml @@ -65,6 +65,7 @@ config: # project: [GCP project id] # schema: [dbt schema] # threads: [between 1 and 8] +# timeout_seconds: 300 # # # 2. use a service account keyfile # [target-name-2]: diff --git a/setup.py b/setup.py index 23ecc1bc669..b85ed9fd709 100644 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ 'voluptuous==0.10.5', 'snowflake-connector-python>=1.4.9', 'colorama==0.3.9', - 'google-cloud-bigquery==0.26.0', + 'google-cloud-bigquery==0.29.0', 'agate>=1.6,<2', ] ) diff --git a/test/integration/022_bigquery_test/dp-models/confirmation.sql b/test/integration/022_bigquery_test/dp-models/confirmation.sql new file mode 100644 index 00000000000..858c0e2e32f --- /dev/null +++ b/test/integration/022_bigquery_test/dp-models/confirmation.sql @@ -0,0 +1,20 @@ + +-- This model checks to confirm that each date partition was created correctly. +-- Columns day_1, day_2, and day_3 should have a value of 1, and count_days should be 3 + +with base as ( + + select + case when _PARTITIONTIME = '2018-01-01' then 1 else 0 end as day_1, + case when _PARTITIONTIME = '2018-01-02' then 1 else 0 end as day_2, + case when _PARTITIONTIME = '2018-01-03' then 1 else 0 end as day_3 + from {{ ref('partitioned') }} + +) + +select distinct + sum(day_1) over () as day_1, + sum(day_2) over () as day_2, + sum(day_3) over () as day_3, + count(*) over () as count_days +from base diff --git a/test/integration/022_bigquery_test/dp-models/events_20180101.sql b/test/integration/022_bigquery_test/dp-models/events_20180101.sql new file mode 100644 index 00000000000..9a8f54d5bcb --- /dev/null +++ b/test/integration/022_bigquery_test/dp-models/events_20180101.sql @@ -0,0 +1,4 @@ + +{{ config(materialized='table') }} + +select 1 as id diff --git a/test/integration/022_bigquery_test/dp-models/events_20180102.sql b/test/integration/022_bigquery_test/dp-models/events_20180102.sql new file mode 100644 index 00000000000..63bfcdc13fe --- /dev/null +++ b/test/integration/022_bigquery_test/dp-models/events_20180102.sql @@ -0,0 +1,4 @@ + +{{ config(materialized='table') }} + +select 2 as id diff --git a/test/integration/022_bigquery_test/dp-models/events_20180103.sql b/test/integration/022_bigquery_test/dp-models/events_20180103.sql new file mode 100644 index 00000000000..09a9f02c7b1 --- /dev/null +++ b/test/integration/022_bigquery_test/dp-models/events_20180103.sql @@ -0,0 +1,4 @@ + +{{ config(materialized='table') }} + +select 3 as id diff --git a/test/integration/022_bigquery_test/dp-models/partitioned.sql b/test/integration/022_bigquery_test/dp-models/partitioned.sql new file mode 100644 index 00000000000..5d77021d30c --- /dev/null +++ b/test/integration/022_bigquery_test/dp-models/partitioned.sql @@ -0,0 +1,16 @@ + +{{ + config( + materialized='table', + partitions=['20180101', '20180102', '20180103'], + verbose=True + ) +}} + +-- Hack to make sure our events models run first. +-- In practice, these would be source data +-- {{ ref('events_20180101') }} +-- {{ ref('events_20180102') }} +-- {{ ref('events_20180103') }} + +select * from `{{ this.schema }}`.`{{ date_sharded_table('events_') }}` diff --git a/test/integration/022_bigquery_test/dp-models/partitioned_simple.sql b/test/integration/022_bigquery_test/dp-models/partitioned_simple.sql new file mode 100644 index 00000000000..af65072bead --- /dev/null +++ b/test/integration/022_bigquery_test/dp-models/partitioned_simple.sql @@ -0,0 +1,14 @@ + +{{ + config( + materialized='table', + partition_date='20180101', + verbose=True + ) +}} + +-- Hack to make sure our events models run first. +-- In practice, these would be source data +-- {{ ref('events_20180101') }} + +select * from `{{ this.schema }}`.`events_20180101` diff --git a/test/integration/022_bigquery_test/dp-models/schema.yml b/test/integration/022_bigquery_test/dp-models/schema.yml new file mode 100644 index 00000000000..d699498135c --- /dev/null +++ b/test/integration/022_bigquery_test/dp-models/schema.yml @@ -0,0 +1,18 @@ + +# check that this exists +partitioned_simple: + constraints: + unique: + - id + not_null: + - id + +confirmation: + constraints: + accepted_values: + - {field: cast(day_1 as string), values:[1] } + - {field: cast(day_2 as string), values:[1] } + - {field: cast(day_3 as string), values:[1] } + - {field: cast(count_days as string), values:[3] } + + diff --git a/test/integration/022_bigquery_test/test_bigquery_date_partitioning.py b/test/integration/022_bigquery_test/test_bigquery_date_partitioning.py new file mode 100644 index 00000000000..3277c70f490 --- /dev/null +++ b/test/integration/022_bigquery_test/test_bigquery_date_partitioning.py @@ -0,0 +1,33 @@ +from nose.plugins.attrib import attr +from test.integration.base import DBTIntegrationTest, FakeArgs + + +class TestBigqueryDatePartitioning(DBTIntegrationTest): + + @property + def schema(self): + return "bigquery_test_022" + + @property + def models(self): + return "test/integration/022_bigquery_test/dp-models" + + @property + def profile_config(self): + return self.bigquery_profile() + + @attr(type='bigquery') + def test__bigquery_date_partitioning(self): + self.use_profile('bigquery') + self.use_default_project() + self.run_dbt() + + test_results = self.run_dbt(['test']) + + self.assertTrue(len(test_results) > 0) + for result in test_results: + self.assertFalse(result.errored) + self.assertFalse(result.skipped) + # status = # of failing rows + self.assertEqual(result.status, 0) + diff --git a/test/integration/022_bigquery_test/test_simple_bigquery_view.py b/test/integration/022_bigquery_test/test_simple_bigquery_view.py index c49a3c64670..c14b23c37c6 100644 --- a/test/integration/022_bigquery_test/test_simple_bigquery_view.py +++ b/test/integration/022_bigquery_test/test_simple_bigquery_view.py @@ -1,15 +1,9 @@ from nose.plugins.attrib import attr from test.integration.base import DBTIntegrationTest, FakeArgs -from dbt.task.test import TestTask -from dbt.project import read_project - class TestSimpleBigQueryRun(DBTIntegrationTest): - def setUp(self): - pass - @property def schema(self): return "bigquery_test_022" @@ -24,12 +18,9 @@ def project_config(self): 'macro-paths': ['test/integration/022_bigquery_test/macros'], } - def run_schema_validations(self): - project = read_project('dbt_project.yml') - args = FakeArgs() - - test_task = TestTask(args, project) - return test_task.run() + @property + def profile_config(self): + return self.bigquery_profile() @attr(type='bigquery') def test__bigquery_simple_run(self): @@ -37,7 +28,8 @@ def test__bigquery_simple_run(self): self.use_default_project() self.run_dbt() - test_results = self.run_schema_validations() + # The 'dupe' model should fail, but all others should pass + test_results = self.run_dbt(['test'], expect_pass=False) for result in test_results: if 'dupe' in result.node.get('name'): diff --git a/test/integration/023_exit_codes_test/test_exit_codes.py b/test/integration/023_exit_codes_test/test_exit_codes.py index dfc4a9a2931..01af65ce2ec 100644 --- a/test/integration/023_exit_codes_test/test_exit_codes.py +++ b/test/integration/023_exit_codes_test/test_exit_codes.py @@ -6,9 +6,6 @@ class TestExitCodes(DBTIntegrationTest): - def setUp(self): - pass - @property def schema(self): return "exit_codes_test_023" diff --git a/test/integration/base.py b/test/integration/base.py index 05d48487809..c7066f4c62d 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -173,6 +173,7 @@ def setUp(self): # it's important to use a different connection handle here so # we don't look into an incomplete transaction + adapter.cleanup_connections() connection = adapter.acquire_connection(profile, '__test') self.handle = connection.get('handle') self.adapter_type = profile.get('type')