From 63069090293f269956724cf4f05ba8f83f2f7277 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 8 May 2017 18:58:58 -0400 Subject: [PATCH 1/6] mostly working --- dbt/compilation.py | 39 +++++++++-- dbt/contracts/graph/unparsed.py | 3 +- dbt/parser.py | 46 +++++++++++++ dbt/runner.py | 65 ++++++++---------- dbt/tracking.py | 6 +- dbt/utils.py | 15 +++++ .../macros/before-and-after.sql | 32 +++++++++ .../014_hook_tests/models/hooks.sql | 7 -- .../014_hook_tests/test_run_hooks.py | 66 ++----------------- 9 files changed, 163 insertions(+), 116 deletions(-) mode change 100644 => 100755 dbt/parser.py create mode 100644 test/integration/014_hook_tests/macros/before-and-after.sql diff --git a/dbt/compilation.py b/dbt/compilation.py index 7493ff9deaf..9f33ed67292 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -6,6 +6,7 @@ import dbt.utils import dbt.include import dbt.wrapper +import dbt.tracking from dbt.model import Model from dbt.utils import This, Var, is_enabled, get_materialization, NodeType, \ @@ -38,6 +39,7 @@ def print_compile_stats(stats): NodeType.Archive: 'archives', NodeType.Analysis: 'analyses', NodeType.Macro: 'macros', + NodeType.Operation: 'operations', } results = { @@ -46,6 +48,7 @@ def print_compile_stats(stats): NodeType.Archive: 0, NodeType.Analysis: 0, NodeType.Macro: 0, + NodeType.Operation: 0, } results.update(stats) @@ -235,8 +238,8 @@ def get_compiler_context(self, model, flat_graph): context.update(wrapper.get_context_functions()) - context['run_started_at'] = '{{ run_started_at }}' - context['invocation_id'] = '{{ invocation_id }}' + context['run_started_at'] = dbt.tracking.active_user.run_started_at + context['invocation_id'] = dbt.tracking.active_user.invocation_id context['sql_now'] = adapter.date_function() for unique_id, macro in flat_graph.get('macros').items(): @@ -280,7 +283,8 @@ def compile_node(self, node, flat_graph): injected_node, _ = prepend_ctes(compiled_node, flat_graph) if compiled_node.get('resource_type') in [NodeType.Test, - NodeType.Analysis]: + NodeType.Analysis, + NodeType.Operation]: # data tests get wrapped in count(*) # TODO : move this somewhere more reasonable if 'data' in injected_node['tags'] and \ @@ -351,11 +355,13 @@ def link_graph(self, linker, flat_graph): linked_graph = { 'nodes': {}, 'macros': flat_graph.get('macros'), + 'operations': flat_graph.get('operations'), } - for name, node in flat_graph.get('nodes').items(): - self.link_node(linker, node, flat_graph) - linked_graph['nodes'][name] = node + for node_type in ['nodes', 'operations']: + for name, node in flat_graph.get(node_type).items(): + self.link_node(linker, node, flat_graph) + linked_graph[node_type][name] = node cycle = linker.find_cycles() @@ -393,6 +399,17 @@ def get_parsed_macros(self, root_project, all_projects): return parsed_macros + def get_parsed_operations(self, root_project, all_projects): + parsed_operations = {} + + for name, project in all_projects.items(): + parsed_operations.update( + dbt.parser.load_and_parse_run_hooks(root_project, all_projects, 'on-run-start')) + parsed_operations.update( + dbt.parser.load_and_parse_run_hooks(root_project, all_projects, 'on-run-end')) + + return parsed_operations + def get_parsed_models(self, root_project, all_projects): parsed_models = {} @@ -456,6 +473,9 @@ def get_parsed_schema_tests(self, root_project, all_projects): def load_all_macros(self, root_project, all_projects): return self.get_parsed_macros(root_project, all_projects) + def load_all_operations(self, root_project, all_projects): + return self.get_parsed_operations(root_project, all_projects) + def load_all_nodes(self, root_project, all_projects): all_nodes = {} @@ -479,10 +499,12 @@ def compile(self): all_macros = self.load_all_macros(root_project, all_projects) all_nodes = self.load_all_nodes(root_project, all_projects) + all_operations = self.load_all_operations(root_project, all_projects) flat_graph = { 'nodes': all_nodes, - 'macros': all_macros + 'macros': all_macros, + 'operations': all_operations } flat_graph = dbt.parser.process_refs(flat_graph, @@ -498,6 +520,9 @@ def compile(self): for node_name, node in linked_graph.get('macros').items(): stats[node.get('resource_type')] += 1 + for node_name, node in linked_graph.get('operations').items(): + stats[node.get('resource_type')] += 1 + print_compile_stats(stats) return linked_graph, linker diff --git a/dbt/contracts/graph/unparsed.py b/dbt/contracts/graph/unparsed.py index 25415891755..f24c262337c 100644 --- a/dbt/contracts/graph/unparsed.py +++ b/dbt/contracts/graph/unparsed.py @@ -19,7 +19,8 @@ unparsed_node_contract = unparsed_base_contract.extend({ Required('resource_type'): Any(NodeType.Model, NodeType.Test, - NodeType.Analysis) + NodeType.Analysis, + NodeType.Operation) }) unparsed_nodes_contract = Schema([unparsed_node_contract]) diff --git a/dbt/parser.py b/dbt/parser.py old mode 100644 new mode 100755 index 7f749655523..21fa8dc5005 --- a/dbt/parser.py +++ b/dbt/parser.py @@ -328,6 +328,52 @@ def load_and_parse_sql(package_name, root_project, all_projects, root_dir, return parse_sql_nodes(result, root_project, all_projects, tags) +def get_hooks_from_project(project_cfg, hook_type): + hooks = project_cfg.get(hook_type, []) + + if type(hooks) not in (list, tuple): + hooks = [hooks] + + return hooks + + +def get_hooks(all_projects, hook_type): + project_hooks = {} + + for project_name, project in all_projects.items(): + hooks = get_hooks_from_project(project, hook_type) + + if len(hooks) > 0: + project_hooks[project_name] = ";\n".join(hooks) + + return project_hooks + + +def load_and_parse_run_hooks(root_project, all_projects, hook_type): + + if dbt.flags.STRICT_MODE: + dbt.contracts.project.validate_list(all_projects) + + project_hooks = get_hooks(all_projects, hook_type) + + result = [] + for project_name, hooks in project_hooks.items(): + project = all_projects[project_name] + + hook_path = dbt.utils.get_pseudo_hook_path(hook_type) + + result.append({ + 'name': hook_type, + 'root_path': "{}/dbt_project.yml".format(project_name), + 'resource_type': NodeType.Operation, + 'path': hook_path, + 'package_name': project_name, + 'raw_sql': hooks + }) + + return parse_sql_nodes(result, root_project, all_projects, tags={hook_type}) + + def load_and_parse_macros(package_name, root_project, all_projects, root_dir, relative_dirs, resource_type, tags=None): extension = "[!.#~]*.sql" diff --git a/dbt/runner.py b/dbt/runner.py index f77058704e9..9fa8e3c08c9 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -5,7 +5,6 @@ import os import time import itertools -from datetime import datetime from dbt.adapters.factory import get_adapter from dbt.logger import GLOBAL_LOGGER as logger @@ -370,24 +369,13 @@ def execute_archive(profile, node, context): return result -def run_hooks(profile, hooks, context, source): - if type(hooks) not in (list, tuple): - hooks = [hooks] - - ctx = { - "target": profile, - "state": "start", - "invocation_id": context['invocation_id'], - "run_started_at": context['run_started_at'] - } - - compiled_hooks = [ - dbt.clients.jinja.get_rendered(hook, ctx) for hook in hooks - ] - +def run_hooks(profile, hooks): adapter = get_adapter(profile) - return adapter.execute_all(profile=profile, sqls=compiled_hooks) + master_connection = adapter.begin(profile) + compiled_hooks = [hook['wrapped_sql'] for hook in hooks] + adapter.execute_all(profile=profile, sqls=compiled_hooks) + master_connection = adapter.commit(master_connection) def track_model_run(index, num_nodes, run_model_result): @@ -461,10 +449,8 @@ def call_table_exists(schema, table): return adapter.table_exists( profile, schema, table, node.get('name')) - self.run_started_at = datetime.now() - return { - "run_started_at": datetime.now(), + "run_started_at": dbt.tracking.active_user.run_started_at, "invocation_id": dbt.tracking.active_user.invocation_id, "get_columns_in_table": call_get_columns_in_table, "get_missing_columns": call_get_missing_columns, @@ -513,7 +499,6 @@ def execute_node(self, node, flat_graph, existing, profile, adapter): return node, result def compile_node(self, node, flat_graph): - compiler = dbt.compilation.Compiler(self.project) node = compiler.compile_node(node, flat_graph) return node @@ -687,12 +672,9 @@ def execute_nodes(self, flat_graph, node_dependency_list, on_failure, start_time = time.time() if should_run_hooks: - master_connection = adapter.begin(profile) - run_hooks(self.project.get_target(), - self.project.cfg.get('on-run-start', []), - self.node_context({}), - 'on-run-start hooks') - master_connection = adapter.commit(master_connection) + start_hooks = dbt.utils.get_nodes_by_tags(flat_graph, {'on-run-start'}, "operations") + hooks = [self.compile_node(hook, flat_graph) for hook in start_hooks] + run_hooks(profile, hooks) def get_idx(node): return node_id_to_index_map.get(node.get('unique_id')) @@ -739,12 +721,9 @@ def get_idx(node): pool.join() if should_run_hooks: - adapter.begin(profile) - run_hooks(self.project.get_target(), - self.project.cfg.get('on-run-end', []), - self.node_context({}), - 'on-run-end hooks') - adapter.commit(master_connection) + end_hooks = dbt.utils.get_nodes_by_tags(flat_graph, {'on-run-end'}, "operations") + hooks = [self.compile_node(hook, flat_graph) for hook in end_hooks] + run_hooks(profile, hooks) execution_time = time.time() - start_time @@ -755,11 +734,21 @@ def get_idx(node): def get_ancestor_ephemeral_nodes(self, flat_graph, linked_graph, selected_nodes): + node_names = { + node: flat_graph['nodes'].get(node).get('name') + for node in selected_nodes + if node in flat_graph['nodes'] + } + + include_spec = [ + '+{}'.format(node_names[node]) + for node in selected_nodes if node in node_names + ] + all_ancestors = dbt.graph.selector.select_nodes( self.project, linked_graph, - ['+{}'.format(flat_graph.get('nodes').get(node).get('name')) - for node in selected_nodes], + include_spec, []) return set([ancestor for ancestor in all_ancestors @@ -874,7 +863,8 @@ def compile_models(self, include_spec, exclude_spec): NodeType.Model, NodeType.Test, NodeType.Archive, - NodeType.Analysis + NodeType.Analysis, + NodeType.Operation ] return self.run_types_from_graph(include_spec, @@ -882,7 +872,8 @@ def compile_models(self, include_spec, exclude_spec): resource_types=resource_types, tags=set(), should_run_hooks=False, - should_execute=False) + should_execute=False, + flatten_graph=True) def run_models(self, include_spec, exclude_spec): return self.run_types_from_graph(include_spec, diff --git a/dbt/tracking.py b/dbt/tracking.py index 6901c947693..39e5b7b0436 100644 --- a/dbt/tracking.py +++ b/dbt/tracking.py @@ -2,6 +2,7 @@ from dbt import version as dbt_version from snowplow_tracker import Subject, Tracker, Emitter, logger as sp_logger from snowplow_tracker import SelfDescribingJson, disable_contracts +from datetime import datetime import platform import uuid @@ -42,7 +43,8 @@ def __init__(self): self.do_not_track = True self.id = None - self.invocation_id = None + self.invocation_id = str(uuid.uuid4()) + self.run_started_at = datetime.now() def state(self): return "do not track" if self.do_not_track else "tracking" @@ -50,8 +52,6 @@ def state(self): def initialize(self): self.do_not_track = False - self.invocation_id = str(uuid.uuid4()) - cookie = self.get_cookie() self.id = cookie.get('id') diff --git a/dbt/utils.py b/dbt/utils.py index 97b8bd89ac5..d38ec6ce210 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -28,6 +28,7 @@ class NodeType(object): Test = 'test' Archive = 'archive' Macro = 'macro' + Operation = 'operation' class This(object): @@ -263,6 +264,11 @@ def get_pseudo_test_path(node_name, source_path, test_type): return os.path.join(*pseudo_path_parts) +def get_pseudo_hook_path(hook_name): + path_parts = ['hooks', "{}.sql".format(hook_name)] + return os.path.join(*path_parts) + + def get_run_status_line(results): total = len(results) errored = len([r for r in results if r.errored or r.failed]) @@ -277,3 +283,12 @@ def get_run_status_line(results): errored=errored, skipped=skipped )) + + +def get_nodes_by_tags(flat_graph, match_tags, resource_type): + nodes = [] + for node_name, node in flat_graph[resource_type].items(): + node_tags = node.get('tags', set()) + if len(node_tags & match_tags): + nodes.append(node) + return nodes diff --git a/test/integration/014_hook_tests/macros/before-and-after.sql b/test/integration/014_hook_tests/macros/before-and-after.sql new file mode 100644 index 00000000000..14df56f2627 --- /dev/null +++ b/test/integration/014_hook_tests/macros/before-and-after.sql @@ -0,0 +1,32 @@ + +{% macro custom_run_hook(state, target, run_started_at, invocation_id) %} + + insert into {{ target.schema }}.on_run_hook ( + "state", + "target.dbname", + "target.host", + "target.name", + "target.schema", + "target.type", + "target.user", + "target.pass", + "target.port", + "target.threads", + "run_started_at", + "invocation_id" + ) VALUES ( + '{{ state }}', + '{{ target.dbname }}', + '{{ target.host }}', + '{{ target.name }}', + '{{ target.schema }}', + '{{ target.type }}', + '{{ target.user }}', + '{{ target.pass }}', + {{ target.port }}, + {{ target.threads }}, + '{{ run_started_at }}', + '{{ invocation_id }}' + ) + +{% endmacro %} diff --git a/test/integration/014_hook_tests/models/hooks.sql b/test/integration/014_hook_tests/models/hooks.sql index 8593206696b..2cd691ea7b4 100644 --- a/test/integration/014_hook_tests/models/hooks.sql +++ b/test/integration/014_hook_tests/models/hooks.sql @@ -1,9 +1,2 @@ -{{ - config({ - 'materialized': 'table', - 'post-hook': [ '{{ hook() }}' ] - }) -}} - select 1 as id diff --git a/test/integration/014_hook_tests/test_run_hooks.py b/test/integration/014_hook_tests/test_run_hooks.py index a1a070f4a74..9ae3da3a860 100644 --- a/test/integration/014_hook_tests/test_run_hooks.py +++ b/test/integration/014_hook_tests/test_run_hooks.py @@ -1,66 +1,8 @@ from nose.plugins.attrib import attr from test.integration.base import DBTIntegrationTest +import os.path -RUN_START_HOOK = """ - insert into run_hooks_014.on_run_hook ( - "state", - "target.dbname", - "target.host", - "target.name", - "target.schema", - "target.type", - "target.user", - "target.pass", - "target.port", - "target.threads", - "run_started_at", - "invocation_id" - ) VALUES ( - 'start', - '{{ target.dbname }}', - '{{ target.host }}', - '{{ target.name }}', - '{{ target.schema }}', - '{{ target.type }}', - '{{ target.user }}', - '{{ target.pass }}', - {{ target.port }}, - {{ target.threads }}, - '{{ run_started_at }}', - '{{ invocation_id }}' - ) -""" - -RUN_END_HOOK = """ - insert into run_hooks_014.on_run_hook ( - "state", - "target.dbname", - "target.host", - "target.name", - "target.schema", - "target.type", - "target.user", - "target.pass", - "target.port", - "target.threads", - "run_started_at", - "invocation_id" - ) VALUES ( - 'end', - '{{ target.dbname }}', - '{{ target.host }}', - '{{ target.name }}', - '{{ target.schema }}', - '{{ target.type }}', - '{{ target.user }}', - '{{ target.pass }}', - {{ target.port }}, - {{ target.threads }}, - '{{ run_started_at }}', - '{{ invocation_id }}' - ) -""" class TestPrePostRunHooks(DBTIntegrationTest): @@ -91,8 +33,10 @@ def schema(self): @property def project_config(self): return { - "on-run-start": RUN_START_HOOK, - "on-run-end": RUN_END_HOOK + 'macro-paths': ['test/integration/014_hook_tests/macros'], + + "on-run-start": "{{ custom_run_hook('start', target, run_started_at, invocation_id) }}", + "on-run-end": "{{ custom_run_hook('end', target, run_started_at, invocation_id) }}", } @property From 821256bf6eba3bbcc52e2f5db7c09cbe3e7f6230 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 8 May 2017 21:36:59 -0400 Subject: [PATCH 2/6] fix bug for test --- dbt/runner.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/dbt/runner.py b/dbt/runner.py index 9fa8e3c08c9..6380e355f54 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -751,11 +751,18 @@ def get_ancestor_ephemeral_nodes(self, flat_graph, linked_graph, include_spec, []) - return set([ancestor for ancestor in all_ancestors - if(flat_graph['nodes'][ancestor].get( - 'resource_type') == NodeType.Model and - get_materialization( - flat_graph['nodes'][ancestor]) == 'ephemeral')]) + res = [] + + for ancestor in all_ancestors: + if ancestor not in flat_graph['nodes']: + continue + ancestor_node = flat_graph['nodes'][ancestor] + is_model = ancestor_node.get('resource_type') == NodeType.Model + is_ephemeral = get_materialization(ancestor_node) == 'ephemeral' + if is_model and is_ephemeral: + res.append(ancestor) + + return set(res) def get_nodes_to_run(self, graph, include_spec, exclude_spec, resource_types, tags): From 0572e9d670ab06d751d5ec5fbc739942dc7ab521 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 9 May 2017 11:20:47 -0400 Subject: [PATCH 3/6] code cleanup, pep8 --- dbt/compilation.py | 34 ++++++++-------------------------- dbt/parser.py | 12 +++++++++++- dbt/runner.py | 34 ++++++++++++++++------------------ dbt/utils.py | 10 +++++----- 4 files changed, 40 insertions(+), 50 deletions(-) mode change 100755 => 100644 dbt/parser.py diff --git a/dbt/compilation.py b/dbt/compilation.py index 9f33ed67292..1e3e3d8dea8 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -354,14 +354,12 @@ def link_node(self, linker, node, flat_graph): def link_graph(self, linker, flat_graph): linked_graph = { 'nodes': {}, - 'macros': flat_graph.get('macros'), - 'operations': flat_graph.get('operations'), + 'macros': flat_graph.get('macros') } - for node_type in ['nodes', 'operations']: - for name, node in flat_graph.get(node_type).items(): - self.link_node(linker, node, flat_graph) - linked_graph[node_type][name] = node + for name, node in flat_graph.get('nodes').items(): + self.link_node(linker, node, flat_graph) + linked_graph['nodes'][name] = node cycle = linker.find_cycles() @@ -399,17 +397,6 @@ def get_parsed_macros(self, root_project, all_projects): return parsed_macros - def get_parsed_operations(self, root_project, all_projects): - parsed_operations = {} - - for name, project in all_projects.items(): - parsed_operations.update( - dbt.parser.load_and_parse_run_hooks(root_project, all_projects, 'on-run-start')) - parsed_operations.update( - dbt.parser.load_and_parse_run_hooks(root_project, all_projects, 'on-run-end')) - - return parsed_operations - def get_parsed_models(self, root_project, all_projects): parsed_models = {} @@ -473,9 +460,6 @@ def get_parsed_schema_tests(self, root_project, all_projects): def load_all_macros(self, root_project, all_projects): return self.get_parsed_macros(root_project, all_projects) - def load_all_operations(self, root_project, all_projects): - return self.get_parsed_operations(root_project, all_projects) - def load_all_nodes(self, root_project, all_projects): all_nodes = {} @@ -488,6 +472,8 @@ def load_all_nodes(self, root_project, all_projects): all_nodes.update( dbt.parser.parse_archives_from_projects(root_project, all_projects)) + all_nodes.update( + dbt.parser.load_and_parse_run_hooks(root_project, all_projects)) return all_nodes @@ -498,13 +484,12 @@ def compile(self): all_projects = self.get_all_projects() all_macros = self.load_all_macros(root_project, all_projects) + all_nodes = self.load_all_nodes(root_project, all_projects) - all_operations = self.load_all_operations(root_project, all_projects) flat_graph = { 'nodes': all_nodes, - 'macros': all_macros, - 'operations': all_operations + 'macros': all_macros } flat_graph = dbt.parser.process_refs(flat_graph, @@ -520,9 +505,6 @@ def compile(self): for node_name, node in linked_graph.get('macros').items(): stats[node.get('resource_type')] += 1 - for node_name, node in linked_graph.get('operations').items(): - stats[node.get('resource_type')] += 1 - print_compile_stats(stats) return linked_graph, linker diff --git a/dbt/parser.py b/dbt/parser.py old mode 100755 new mode 100644 index 77d7e528c73..dee11eee930 --- a/dbt/parser.py +++ b/dbt/parser.py @@ -352,7 +352,7 @@ def get_hooks(all_projects, hook_type): return project_hooks -def load_and_parse_run_hooks(root_project, all_projects, hook_type): +def load_and_parse_run_hook_type(root_project, all_projects, hook_type): if dbt.flags.STRICT_MODE: dbt.contracts.project.validate_list(all_projects) @@ -377,6 +377,16 @@ def load_and_parse_run_hooks(root_project, all_projects, hook_type): return parse_sql_nodes(result, root_project, all_projects, tags={hook_type}) +def load_and_parse_run_hooks(root_project, all_projects): + hook_nodes = {} + for hook_type in ['on-run-start', 'on-run-end']: + project_hooks = load_and_parse_run_hook_type(root_project, all_projects, + hook_type) + hook_nodes.update(project_hooks) + + return hook_nodes + + def load_and_parse_macros(package_name, root_project, all_projects, root_dir, relative_dirs, resource_type, tags=None): extension = "[!.#~]*.sql" diff --git a/dbt/runner.py b/dbt/runner.py index 6380e355f54..95234a667ed 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -9,7 +9,7 @@ from dbt.adapters.factory import get_adapter from dbt.logger import GLOBAL_LOGGER as logger -from dbt.utils import get_materialization, NodeType, is_type +from dbt.utils import get_materialization, NodeType, is_type, get_nodes_by_tags import dbt.clients.jinja import dbt.compilation @@ -369,15 +369,6 @@ def execute_archive(profile, node, context): return result -def run_hooks(profile, hooks): - adapter = get_adapter(profile) - - master_connection = adapter.begin(profile) - compiled_hooks = [hook['wrapped_sql'] for hook in hooks] - adapter.execute_all(profile=profile, sqls=compiled_hooks) - master_connection = adapter.commit(master_connection) - - def track_model_run(index, num_nodes, run_model_result): invocation_id = dbt.tracking.active_user.invocation_id dbt.tracking.track_model_run({ @@ -619,6 +610,18 @@ def as_concurrent_dep_list(self, linker, nodes_to_run): return concurrent_dependency_list + def run_hooks(self, profile, flat_graph, hook_type): + adapter = get_adapter(profile) + + nodes = flat_graph.get('nodes', {}).values() + start_hooks = get_nodes_by_tags(nodes, {hook_type}, NodeType.Operation) + hooks = [self.compile_node(hook, flat_graph) for hook in start_hooks] + + master_connection = adapter.begin(profile) + compiled_hooks = [hook['wrapped_sql'] for hook in hooks] + adapter.execute_all(profile=profile, sqls=compiled_hooks) + master_connection = adapter.commit(master_connection) + def on_model_failure(self, linker, selected_nodes): def skip_dependent(node): dependent_nodes = linker.get_dependent_nodes(node.get('unique_id')) @@ -672,9 +675,7 @@ def execute_nodes(self, flat_graph, node_dependency_list, on_failure, start_time = time.time() if should_run_hooks: - start_hooks = dbt.utils.get_nodes_by_tags(flat_graph, {'on-run-start'}, "operations") - hooks = [self.compile_node(hook, flat_graph) for hook in start_hooks] - run_hooks(profile, hooks) + self.run_hooks(profile, flat_graph, 'on-run-start') def get_idx(node): return node_id_to_index_map.get(node.get('unique_id')) @@ -721,9 +722,7 @@ def get_idx(node): pool.join() if should_run_hooks: - end_hooks = dbt.utils.get_nodes_by_tags(flat_graph, {'on-run-end'}, "operations") - hooks = [self.compile_node(hook, flat_graph) for hook in end_hooks] - run_hooks(profile, hooks) + self.run_hooks(profile, flat_graph, 'on-run-end') execution_time = time.time() - start_time @@ -879,8 +878,7 @@ def compile_models(self, include_spec, exclude_spec): resource_types=resource_types, tags=set(), should_run_hooks=False, - should_execute=False, - flatten_graph=True) + should_execute=False) def run_models(self, include_spec, exclude_spec): return self.run_types_from_graph(include_spec, diff --git a/dbt/utils.py b/dbt/utils.py index d38ec6ce210..5966cb4b746 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -285,10 +285,10 @@ def get_run_status_line(results): )) -def get_nodes_by_tags(flat_graph, match_tags, resource_type): - nodes = [] - for node_name, node in flat_graph[resource_type].items(): +def get_nodes_by_tags(nodes, match_tags, resource_type): + matched_nodes = [] + for node in nodes: node_tags = node.get('tags', set()) if len(node_tags & match_tags): - nodes.append(node) - return nodes + matched_nodes.append(node) + return matched_nodes From 6296e8ef1d57046cd25d2ddb683bf31e58fc6c76 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 9 May 2017 11:25:01 -0400 Subject: [PATCH 4/6] code cleanup --- dbt/parser.py | 8 +++++--- dbt/runner.py | 4 ++-- dbt/utils.py | 6 ++++++ 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/dbt/parser.py b/dbt/parser.py index dee11eee930..cdab4c1bca1 100644 --- a/dbt/parser.py +++ b/dbt/parser.py @@ -374,13 +374,15 @@ def load_and_parse_run_hook_type(root_project, all_projects, hook_type): 'raw_sql': hooks }) - return parse_sql_nodes(result, root_project, all_projects, tags={hook_type}) + tags = {hook_type} + return parse_sql_nodes(result, root_project, all_projects, tags=tags) def load_and_parse_run_hooks(root_project, all_projects): hook_nodes = {} - for hook_type in ['on-run-start', 'on-run-end']: - project_hooks = load_and_parse_run_hook_type(root_project, all_projects, + for hook_type in dbt.utils.RunHookTypes.Both: + project_hooks = load_and_parse_run_hook_type(root_project, + all_projects, hook_type) hook_nodes.update(project_hooks) diff --git a/dbt/runner.py b/dbt/runner.py index 95234a667ed..0e8c57e99de 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -675,7 +675,7 @@ def execute_nodes(self, flat_graph, node_dependency_list, on_failure, start_time = time.time() if should_run_hooks: - self.run_hooks(profile, flat_graph, 'on-run-start') + self.run_hooks(profile, flat_graph, dbt.utils.RunHookTypes.Start) def get_idx(node): return node_id_to_index_map.get(node.get('unique_id')) @@ -722,7 +722,7 @@ def get_idx(node): pool.join() if should_run_hooks: - self.run_hooks(profile, flat_graph, 'on-run-end') + self.run_hooks(profile, flat_graph, dbt.utils.RunHookTypes.End) execution_time = time.time() - start_time diff --git a/dbt/utils.py b/dbt/utils.py index 5966cb4b746..0e4487be62b 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -31,6 +31,12 @@ class NodeType(object): Operation = 'operation' +class RunHookTypes: + Start = 'on-run-start' + End = 'on-run-end' + Both = [RunHookTypes.Start, RunHookTypes.End] + + class This(object): def __init__(self, schema, table, name): self.schema = schema From 061678cf631ff1808e09966370033912c310de0d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 9 May 2017 11:27:45 -0400 Subject: [PATCH 5/6] typo --- dbt/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/utils.py b/dbt/utils.py index 0e4487be62b..a621ddfeca4 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -34,7 +34,7 @@ class NodeType(object): class RunHookTypes: Start = 'on-run-start' End = 'on-run-end' - Both = [RunHookTypes.Start, RunHookTypes.End] + Both = [Start, End] class This(object): From 01bb4d7a36b8e38bb55519884dbff8bf4c171b21 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 9 May 2017 13:44:11 -0400 Subject: [PATCH 6/6] RunHookTypes --> RunHookType --- dbt/parser.py | 2 +- dbt/runner.py | 4 ++-- dbt/utils.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbt/parser.py b/dbt/parser.py index cdab4c1bca1..f1ba1c21e3b 100644 --- a/dbt/parser.py +++ b/dbt/parser.py @@ -380,7 +380,7 @@ def load_and_parse_run_hook_type(root_project, all_projects, hook_type): def load_and_parse_run_hooks(root_project, all_projects): hook_nodes = {} - for hook_type in dbt.utils.RunHookTypes.Both: + for hook_type in dbt.utils.RunHookType.Both: project_hooks = load_and_parse_run_hook_type(root_project, all_projects, hook_type) diff --git a/dbt/runner.py b/dbt/runner.py index 0e8c57e99de..cf2d759246f 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -675,7 +675,7 @@ def execute_nodes(self, flat_graph, node_dependency_list, on_failure, start_time = time.time() if should_run_hooks: - self.run_hooks(profile, flat_graph, dbt.utils.RunHookTypes.Start) + self.run_hooks(profile, flat_graph, dbt.utils.RunHookType.Start) def get_idx(node): return node_id_to_index_map.get(node.get('unique_id')) @@ -722,7 +722,7 @@ def get_idx(node): pool.join() if should_run_hooks: - self.run_hooks(profile, flat_graph, dbt.utils.RunHookTypes.End) + self.run_hooks(profile, flat_graph, dbt.utils.RunHookType.End) execution_time = time.time() - start_time diff --git a/dbt/utils.py b/dbt/utils.py index a621ddfeca4..9a32fb59544 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -31,7 +31,7 @@ class NodeType(object): Operation = 'operation' -class RunHookTypes: +class RunHookType: Start = 'on-run-start' End = 'on-run-end' Both = [Start, End]