diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cf3be48daa..4162e9b9d22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Support composite unique key in archivals ([#324](https://github.com/fishtown-analytics/dbt/pull/324)) - Fix target paths ([#331](https://github.com/fishtown-analytics/dbt/pull/331), [#329](https://github.com/fishtown-analytics/dbt/issues/329)) - Ignore commented-out schema tests ([#330](https://github.com/fishtown-analytics/dbt/pull/330), [#328](https://github.com/fishtown-analytics/dbt/issues/328)) +- Fix run levels ([#343](https://github.com/fishtown-analytics/dbt/pull/343), [#340](https://github.com/fishtown-analytics/dbt/issues/340), [#338](https://github.com/fishtown-analytics/dbt/issues/338)) ### Changes diff --git a/dbt/compilation.py b/dbt/compilation.py index aa0a6391996..65fb402fe96 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -6,7 +6,8 @@ import dbt.utils from dbt.model import Model -from dbt.utils import This, Var, is_enabled, get_materialization, NodeType +from dbt.utils import This, Var, is_enabled, get_materialization, NodeType, \ + is_type from dbt.linker import Linker from dbt.runtime import RuntimeContext @@ -256,7 +257,7 @@ def wrapped_do_ref(*args): logger.info("Compiler error in {}".format(model.get('path'))) logger.info("Enabled models:") for n, m in all_models.items(): - if m.get('resource_type') == NodeType.Model: + if is_type(m, NodeType.Model): logger.info(" - {}".format(m.get('unique_id'))) raise e @@ -382,7 +383,7 @@ def compile_graph(self, linker, flat_graph): # data tests get wrapped in count(*) # TODO : move this somewhere more reasonable if 'data' in injected_node['tags'] and \ - injected_node.get('resource_type') == NodeType.Test: + is_type(injected_node, NodeType.Test): injected_node['wrapped_sql'] = ( "select count(*) from (\n{test_sql}\n) sbq").format( test_sql=injected_node['injected_sql']) @@ -393,7 +394,7 @@ def compile_graph(self, linker, flat_graph): wrapped_graph['nodes'][name] = injected_node - elif injected_node.get('resource_type') == NodeType.Archive: + elif is_type(injected_node, NodeType.Archive): # unfortunately we do everything automagically for # archives. in the future it'd be nice to generate # the SQL at the parser level. diff --git a/dbt/linker.py b/dbt/linker.py index 0ff9640e99d..4604cd6622e 100644 --- a/dbt/linker.py +++ b/dbt/linker.py @@ -1,8 +1,7 @@ import networkx as nx from collections import defaultdict -import dbt.compilation -from dbt.utils import NodeType +import dbt.utils def from_file(graph_file): @@ -58,16 +57,6 @@ def as_topological_ordering(self, limit_to=None): "{}".format(cycle) ) - def is_blocking_dependency(self, node_data): - # sorting by # ancestors works, but only if we strictly consider - # non-ephemeral models - - if 'dbt_run_type' not in node_data or 'materialized' not in node_data: - return False - - return node_data['dbt_run_type'] == NodeType.Model \ - and node_data['materialized'] != 'ephemeral' - def as_dependency_list(self, limit_to=None): """returns a list of list of nodes, eg. [[0,1], [2], [4,5,6]]. Each element contains nodes whose dependenices are subsumed by the union of @@ -92,7 +81,7 @@ def as_dependency_list(self, limit_to=None): num_ancestors = len([ ancestor for ancestor in nx.ancestors(self.graph, node) - # if self.is_blocking_dependency(self.graph[ancestor]) + if dbt.utils.is_blocking_dependency(self.get_node(ancestor)) ]) depth_nodes[num_ancestors].append(node) diff --git a/dbt/runner.py b/dbt/runner.py index e6c36d420cb..9b811e7966a 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -10,7 +10,7 @@ from dbt.adapters.factory import get_adapter from dbt.logger import GLOBAL_LOGGER as logger -from dbt.utils import get_materialization, NodeType +from dbt.utils import get_materialization, NodeType, is_type import dbt.clients.jinja import dbt.compilation @@ -85,11 +85,11 @@ def print_counts(flat_nodes): def print_start_line(node, schema_name, index, total): - if node.get('resource_type') == NodeType.Model: + if is_type(node, NodeType.Model): print_model_start_line(node, schema_name, index, total) - if node.get('resource_type') == NodeType.Test: + if is_type(node, NodeType.Test): print_test_start_line(node, schema_name, index, total) - if node.get('resource_type') == NodeType.Archive: + if is_type(node, NodeType.Archive): print_archive_start_line(node, index, total) @@ -120,11 +120,11 @@ def print_archive_start_line(model, index, total): def print_result_line(result, schema_name, index, total): node = result.node - if node.get('resource_type') == NodeType.Model: + if is_type(node, NodeType.Model): print_model_result_line(result, schema_name, index, total) - elif node.get('resource_type') == NodeType.Test: + elif is_type(node, NodeType.Test): print_test_result_line(result, schema_name, index, total) - elif node.get('resource_type') == NodeType.Archive: + elif is_type(node, NodeType.Archive): print_archive_result_line(result, index, total) @@ -416,7 +416,6 @@ def __init__(self, project, target_path, args): self.threads = self.args.threads adapter = get_adapter(profile) - schema_name = adapter.get_default_schema(profile) def call_get_columns_in_table(schema_name, table_name): return adapter.get_columns_in_table( @@ -469,11 +468,11 @@ def execute_node(self, node, existing): node = self.inject_runtime_config(node) - if node.get('resource_type') == NodeType.Model: + if is_type(node, NodeType.Model): result = execute_model(profile, node, existing) - elif node.get('resource_type') == NodeType.Test: + elif is_type(node, NodeType.Test): result = execute_test(profile, node) - elif node.get('resource_type') == NodeType.Archive: + elif is_type(node, NodeType.Archive): result = execute_archive(profile, node, self.context) return result @@ -516,6 +515,9 @@ def safe_execute_node(self, data): status=status, execution_time=execution_time) + def as_flat_dep_list(self, linker, nodes_to_run): + return [[linker.get_node(node) for node in nodes_to_run]] + def as_concurrent_dep_list(self, linker, nodes_to_run): dependency_list = linker.as_dependency_list(nodes_to_run) @@ -640,11 +642,10 @@ def on_complete(run_model_results): map_result = pool.map_async( self.safe_execute_node, - [(node, existing,) for node in local_nodes], + [(local_node, existing,) for local_node in local_nodes], callback=on_complete ) map_result.wait() - run_model_results = map_result.get() node_index += threads @@ -711,7 +712,8 @@ def try_create_schema(self): raise def run_types_from_graph(self, include_spec, exclude_spec, - resource_types, tags, should_run_hooks=False): + resource_types, tags, should_run_hooks=False, + flatten_graph=False): linker = self.deserialize_graph() selected_nodes = self.get_nodes_to_run( @@ -721,9 +723,14 @@ def run_types_from_graph(self, include_spec, exclude_spec, resource_types, tags) - dependency_list = self.as_concurrent_dep_list( - linker, - selected_nodes) + dependency_list = [] + + if flatten_graph is False: + dependency_list = self.as_concurrent_dep_list(linker, + selected_nodes) + else: + dependency_list = self.as_flat_dep_list(linker, + selected_nodes) self.try_create_schema() @@ -746,11 +753,13 @@ def run_models(self, include_spec, exclude_spec): def run_tests(self, include_spec, exclude_spec, tags): return self.run_types_from_graph(include_spec, exclude_spec, - [NodeType.Test], - tags) + resource_types=[NodeType.Test], + tags=tags, + flatten_graph=True) def run_archives(self, include_spec, exclude_spec): return self.run_types_from_graph(include_spec, exclude_spec, - [NodeType.Archive], - set()) + resource_types=[NodeType.Archive], + tags=set(), + flatten_graph=True) diff --git a/dbt/utils.py b/dbt/utils.py index 3a2de8e0714..51a9e6e2921 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -207,6 +207,11 @@ def to_string(s): return s +def is_blocking_dependency(node): + return (is_type(node, NodeType.Model) and + get_materialization(node) != 'ephemeral') + + def get_materialization(node): return node.get('config', {}).get('materialized') @@ -215,6 +220,10 @@ def is_enabled(node): return node.get('config', {}).get('enabled') is True +def is_type(node, _type): + return node.get('resource_type') == _type + + def get_pseudo_test_path(node_name, source_path, test_type): "schema tests all come from schema.yml files. fake a source sql file" source_path_parts = split_path(source_path) diff --git a/test/unit/test_linker.py b/test/unit/test_linker.py index 0cbf81645a2..9321a207dd1 100644 --- a/test/unit/test_linker.py +++ b/test/unit/test_linker.py @@ -1,12 +1,22 @@ +import mock import unittest +import dbt.utils + from dbt.compilation import Linker + class LinkerTest(unittest.TestCase): def setUp(self): + self.real_is_blocking_dependency = dbt.utils.is_blocking_dependency self.linker = Linker() + dbt.utils.is_blocking_dependency = mock.MagicMock(return_value=True) + + def tearDown(self): + dbt.utils.is_blocking_dependency = self.real_is_blocking_dependency + def test_linker_add_node(self): expected_nodes = ['A', 'B', 'C'] for node in expected_nodes: @@ -69,7 +79,8 @@ def test_linker_bad_limit_throws_runtime_error(self): for (l, r) in actual_deps: self.linker.dependency(l, r) - self.assertRaises(RuntimeError, self.linker.as_dependency_list, ['ZZZ']) + self.assertRaises(RuntimeError, + self.linker.as_dependency_list, ['ZZZ']) def test__find_cycles__cycles(self): actual_deps = [('A', 'B'), ('B', 'C'), ('C', 'A')]