diff --git a/core/dbt/logger.py b/core/dbt/logger.py index f658769172f..9840d8c8fc8 100644 --- a/core/dbt/logger.py +++ b/core/dbt/logger.py @@ -67,6 +67,10 @@ def notice(self, msg, *args, **kwargs): # provide this for the cache. CACHE_LOGGER = logging.getLogger('dbt.cache') +# add a dummy handler to avoid `No handlers could be found for logger` +nothing_handler = logging.StreamHandler() +nothing_handler.setLevel(CRITICAL) +CACHE_LOGGER.addHandler(nothing_handler) # provide this for RPC connection logging RPC_LOGGER = logging.getLogger('dbt.rpc') diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 3e7903a3736..53a769c6b6d 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,54 +1,126 @@ from __future__ import print_function +import functools +import time + from dbt.logger import GLOBAL_LOGGER as logger from dbt.node_types import NodeType, RunHookType from dbt.node_runners import ModelRunner, RPCExecuteRunner import dbt.exceptions import dbt.flags -import dbt.ui.printer from dbt.contracts.graph.parsed import Hook from dbt.hooks import get_hook_dict +from dbt.ui.printer import \ + print_hook_start_line, \ + print_hook_end_line, \ + print_timestamped_line, \ + print_run_end_messages, \ + get_counts from dbt.compilation import compile_node from dbt.task.compile import CompileTask, RemoteCompileTask from dbt.utils import get_nodes_by_tags +class Timer(object): + def __init__(self): + self.start = None + self.end = None + + @property + def elapsed(self): + if self.start is None or self.end is None: + return None + return self.end - self.start + + def __enter__(self): + self.start = time.time() + return self + + def __exit__(self, exc_type, exc_value, exc_tracebck): + self.end = time.time() + + +@functools.total_ordering +class BiggestName(object): + def __lt__(self, other): + return True + + def __eq__(self, other): + return isinstance(other, self.__class__) + + class RunTask(CompileTask): + def __init__(self, args, config): + super(RunTask, self).__init__(args, config) + self.ran_hooks = [] + def raise_on_first_error(self): return False def populate_adapter_cache(self, adapter): adapter.set_relations_cache(self.manifest) - def run_hooks(self, adapter, hook_type, extra_context): - + def get_hook_sql(self, adapter, hook, idx, num_hooks, extra_context): + compiled = compile_node(adapter, self.config, hook, self.manifest, + extra_context) + statement = compiled.wrapped_sql + hook_index = hook.get('index', num_hooks) + hook_dict = get_hook_dict(statement, index=hook_index) + if dbt.flags.STRICT_MODE: + Hook(**hook_dict) + return hook_dict.get('sql', '') + + def _hook_keyfunc(self, hook): + package_name = hook.package_name + if package_name == self.config.project_name: + package_name = BiggestName() + return package_name, hook.index + + def get_hooks_by_type(self, hook_type): nodes = self.manifest.nodes.values() + # find all hooks defined in the manifest (could be multiple projects) hooks = get_nodes_by_tags(nodes, {hook_type}, NodeType.Operation) + hooks.sort(key=self._hook_keyfunc) + return hooks - ordered_hooks = sorted(hooks, key=lambda h: h.get('index', len(hooks))) + def run_hooks(self, adapter, hook_type, extra_context): + ordered_hooks = self.get_hooks_by_type(hook_type) # on-run-* hooks should run outside of a transaction. This happens # b/c psycopg2 automatically begins a transaction when a connection # is created. adapter.clear_transaction() + if not ordered_hooks: + return + num_hooks = len(ordered_hooks) + + plural = 'hook' if num_hooks == 1 else 'hooks' + print_timestamped_line("") + print_timestamped_line( + 'Running {} {} {}'.format(num_hooks, hook_type, plural) + ) - for i, hook in enumerate(ordered_hooks): - compiled = compile_node(adapter, self.config, hook, - self.manifest, extra_context) - statement = compiled.wrapped_sql + for idx, hook in enumerate(ordered_hooks, start=1): + sql = self.get_hook_sql(adapter, hook, idx, num_hooks, + extra_context) - hook_index = hook.get('index', len(hooks)) - hook_dict = get_hook_dict(statement, index=hook_index) + hook_text = '{}.{}.{}'.format(hook.package_name, hook_type, + hook.index) + print_hook_start_line(hook_text, idx, num_hooks) + status = 'OK' - if dbt.flags.STRICT_MODE: - Hook(**hook_dict) + with Timer() as timer: + if len(sql.strip()) > 0: + status, _ = adapter.execute(sql, auto_begin=False, + fetch=False) + self.ran_hooks.append(hook) - sql = hook_dict.get('sql', '') + print_hook_end_line(hook_text, status, idx, num_hooks, + timer.elapsed) - if len(sql.strip()) > 0: - adapter.execute(sql, auto_begin=False, fetch=False) + print_timestamped_line("") def safe_run_hooks(self, adapter, hook_type, extra_context): try: @@ -57,10 +129,9 @@ def safe_run_hooks(self, adapter, hook_type, extra_context): logger.info("Database error while running {}".format(hook_type)) raise - @classmethod - def print_results_line(cls, results, execution_time): - nodes = [r.node for r in results] - stat_line = dbt.ui.printer.get_counts(nodes) + def print_results_line(self, results, execution_time): + nodes = [r.node for r in results] + self.ran_hooks + stat_line = get_counts(nodes) execution = "" @@ -68,8 +139,8 @@ def print_results_line(cls, results, execution_time): execution = " in {execution_time:0.2f}s".format( execution_time=execution_time) - dbt.ui.printer.print_timestamped_line("") - dbt.ui.printer.print_timestamped_line( + print_timestamped_line("") + print_timestamped_line( "Finished running {stat_line}{execution}." .format(stat_line=stat_line, execution=execution)) @@ -107,7 +178,7 @@ def get_runner_type(self): def task_end_messages(self, results): if results: - dbt.ui.printer.print_run_end_messages(results) + print_run_end_messages(results) class RemoteRunTask(RemoteCompileTask, RunTask): diff --git a/core/dbt/ui/printer.py b/core/dbt/ui/printer.py index cc02bd50d7a..8bbdd294ae8 100644 --- a/core/dbt/ui/printer.py +++ b/core/dbt/ui/printer.py @@ -49,7 +49,8 @@ def print_timestamped_line(msg, use_color=None): logger.info("{} | {}".format(get_timestamp(), msg)) -def print_fancy_output_line(msg, status, index, total, execution_time=None): +def print_fancy_output_line(msg, status, index, total, execution_time=None, + truncate=False): if index is None or total is None: progress = '' else: @@ -60,6 +61,8 @@ def print_fancy_output_line(msg, status, index, total, execution_time=None): message=msg) justified = prefix.ljust(80, ".") + if truncate and len(justified) > 77: + justified = justified[:77] + '...' if execution_time is None: status_time = "" @@ -83,6 +86,8 @@ def get_counts(flat_nodes): if node.get('resource_type') == NodeType.Model: t = '{} {}'.format(get_materialization(node), t) + elif node.get('resource_type') == NodeType.Operation: + t = 'hook' counts[t] = counts.get(t, 0) + 1 @@ -97,6 +102,18 @@ def print_start_line(description, index, total): print_fancy_output_line(msg, 'RUN', index, total) +def print_hook_start_line(statement, index, total): + msg = 'START hook: {}'.format(statement) + print_fancy_output_line(msg, 'RUN', index, total, truncate=True) + + +def print_hook_end_line(statement, status, index, total, execution_time): + msg = 'OK hook: {}'.format(statement) + # hooks don't fail into this path, so always green + print_fancy_output_line(msg, green(status), index, total, + execution_time=execution_time, truncate=True) + + def print_skip_line(model, schema, relation, index, num_models): msg = 'SKIP relation {}.{}'.format(schema, relation) print_fancy_output_line(msg, yellow('SKIP'), index, num_models) diff --git a/test/integration/006_simple_dependency_test/early_hook_dependency/dbt_project.yml b/test/integration/006_simple_dependency_test/early_hook_dependency/dbt_project.yml new file mode 100644 index 00000000000..7d83235e470 --- /dev/null +++ b/test/integration/006_simple_dependency_test/early_hook_dependency/dbt_project.yml @@ -0,0 +1,5 @@ +name: early_hooks +version: '1.0' +on-run-start: + - create table {{ var('test_create_table') }} as (select 1 as id) + - create table {{ var('test_create_second_table') }} as (select 3 as id) diff --git a/test/integration/006_simple_dependency_test/hook_models/actual.sql b/test/integration/006_simple_dependency_test/hook_models/actual.sql new file mode 100644 index 00000000000..65c31900590 --- /dev/null +++ b/test/integration/006_simple_dependency_test/hook_models/actual.sql @@ -0,0 +1,3 @@ +select * from {{ var('test_create_table') }} +union all +select * from {{ var('test_create_second_table') }} diff --git a/test/integration/006_simple_dependency_test/hook_models/expected.sql b/test/integration/006_simple_dependency_test/hook_models/expected.sql new file mode 100644 index 00000000000..152fc5d286f --- /dev/null +++ b/test/integration/006_simple_dependency_test/hook_models/expected.sql @@ -0,0 +1,6 @@ +{# surely there is a better way to do this! #} + +{% for _ in range(1, 5) %} +select {{ loop.index }} as id +{% if not loop.last %}union all{% endif %} +{% endfor %} diff --git a/test/integration/006_simple_dependency_test/late_hook_dependency/dbt_project.yml b/test/integration/006_simple_dependency_test/late_hook_dependency/dbt_project.yml new file mode 100644 index 00000000000..d67b089273f --- /dev/null +++ b/test/integration/006_simple_dependency_test/late_hook_dependency/dbt_project.yml @@ -0,0 +1,5 @@ +name: late_hooks +version: '1.0' +on-run-start: + - insert into {{ var('test_create_table') }} values (2) + - insert into {{ var('test_create_second_table') }} values (4) diff --git a/test/integration/006_simple_dependency_test/test_local_dependency.py b/test/integration/006_simple_dependency_test/test_local_dependency.py index b7a231aa3e1..27b7a9719a5 100644 --- a/test/integration/006_simple_dependency_test/test_local_dependency.py +++ b/test/integration/006_simple_dependency_test/test_local_dependency.py @@ -1,4 +1,5 @@ from test.integration.base import DBTIntegrationTest, use_profile +import json import mock import dbt.semver @@ -97,3 +98,53 @@ def test_postgres_local_dependency_out_of_date_no_check(self, mock_get): self.run_dbt(['deps']) results = self.run_dbt(['run', '--no-version-check']) self.assertEqual(len(results), 3) + + +class TestSimpleDependencyHooks(DBTIntegrationTest): + @property + def schema(self): + return "hooks_dependency_006" + + @property + def models(self): + return "test/integration/006_simple_dependency_test/hook_models" + + @property + def project_config(self): + # these hooks should run first, so nothing to drop + return { + 'on-run-start': [ + "drop table if exists {{ var('test_create_table') }}", + "drop table if exists {{ var('test_create_second_table') }}", + ] + } + + @property + def packages_config(self): + return { + "packages": [ + { + 'local': 'test/integration/006_simple_dependency_test/early_hook_dependency' + }, + { + 'local': 'test/integration/006_simple_dependency_test/late_hook_dependency' + } + ] + } + + def base_schema(self): + return self.unique_schema() + + def configured_schema(self): + return self.unique_schema() + '_configured' + + @use_profile('postgres') + def test_postgres_hook_dependency(self): + cli_vars = json.dumps({ + 'test_create_table': '"{}"."hook_test"'.format(self.unique_schema()), + 'test_create_second_table': '"{}"."hook_test_2"'.format(self.unique_schema()) + }) + self.run_dbt(["deps", '--vars', cli_vars]) + results = self.run_dbt(["run", '--vars', cli_vars]) + self.assertEqual(len(results), 2) + self.assertTablesEqual('actual', 'expected')