Skip to content

Commit

Permalink
fix run levels for tests, models that depend on ephemeral models (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmcarthur authored Mar 20, 2017
1 parent de06d65 commit 321600b
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Support composite unique key in archivals ([#324](https:/fishtown-analytics/dbt/pull/324))
- Fix target paths ([#331](https:/fishtown-analytics/dbt/pull/331), [#329](https:/fishtown-analytics/dbt/issues/329))
- Ignore commented-out schema tests ([#330](https:/fishtown-analytics/dbt/pull/330), [#328](https:/fishtown-analytics/dbt/issues/328))
- Fix run levels ([#343](https:/fishtown-analytics/dbt/pull/343), [#340](https:/fishtown-analytics/dbt/issues/340), [#338](https:/fishtown-analytics/dbt/issues/338))

### Changes

Expand Down
9 changes: 5 additions & 4 deletions dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import dbt.utils

from dbt.model import Model
from dbt.utils import This, Var, is_enabled, get_materialization, NodeType
from dbt.utils import This, Var, is_enabled, get_materialization, NodeType, \
is_type

from dbt.linker import Linker
from dbt.runtime import RuntimeContext
Expand Down Expand Up @@ -256,7 +257,7 @@ def wrapped_do_ref(*args):
logger.info("Compiler error in {}".format(model.get('path')))
logger.info("Enabled models:")
for n, m in all_models.items():
if m.get('resource_type') == NodeType.Model:
if is_type(m, NodeType.Model):
logger.info(" - {}".format(m.get('unique_id')))
raise e

Expand Down Expand Up @@ -382,7 +383,7 @@ def compile_graph(self, linker, flat_graph):
# data tests get wrapped in count(*)
# TODO : move this somewhere more reasonable
if 'data' in injected_node['tags'] and \
injected_node.get('resource_type') == NodeType.Test:
is_type(injected_node, NodeType.Test):
injected_node['wrapped_sql'] = (
"select count(*) from (\n{test_sql}\n) sbq").format(
test_sql=injected_node['injected_sql'])
Expand All @@ -393,7 +394,7 @@ def compile_graph(self, linker, flat_graph):

wrapped_graph['nodes'][name] = injected_node

elif injected_node.get('resource_type') == NodeType.Archive:
elif is_type(injected_node, NodeType.Archive):
# unfortunately we do everything automagically for
# archives. in the future it'd be nice to generate
# the SQL at the parser level.
Expand Down
15 changes: 2 additions & 13 deletions dbt/linker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import networkx as nx
from collections import defaultdict

import dbt.compilation
from dbt.utils import NodeType
import dbt.utils


def from_file(graph_file):
Expand Down Expand Up @@ -58,16 +57,6 @@ def as_topological_ordering(self, limit_to=None):
"{}".format(cycle)
)

def is_blocking_dependency(self, node_data):
# sorting by # ancestors works, but only if we strictly consider
# non-ephemeral models

if 'dbt_run_type' not in node_data or 'materialized' not in node_data:
return False

return node_data['dbt_run_type'] == NodeType.Model \
and node_data['materialized'] != 'ephemeral'

def as_dependency_list(self, limit_to=None):
"""returns a list of list of nodes, eg. [[0,1], [2], [4,5,6]]. Each
element contains nodes whose dependenices are subsumed by the union of
Expand All @@ -92,7 +81,7 @@ def as_dependency_list(self, limit_to=None):
num_ancestors = len([
ancestor for ancestor in
nx.ancestors(self.graph, node)
# if self.is_blocking_dependency(self.graph[ancestor])
if dbt.utils.is_blocking_dependency(self.get_node(ancestor))
])
depth_nodes[num_ancestors].append(node)

Expand Down
51 changes: 30 additions & 21 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dbt.adapters.factory import get_adapter
from dbt.logger import GLOBAL_LOGGER as logger

from dbt.utils import get_materialization, NodeType
from dbt.utils import get_materialization, NodeType, is_type

import dbt.clients.jinja
import dbt.compilation
Expand Down Expand Up @@ -85,11 +85,11 @@ def print_counts(flat_nodes):


def print_start_line(node, schema_name, index, total):
if node.get('resource_type') == NodeType.Model:
if is_type(node, NodeType.Model):
print_model_start_line(node, schema_name, index, total)
if node.get('resource_type') == NodeType.Test:
if is_type(node, NodeType.Test):
print_test_start_line(node, schema_name, index, total)
if node.get('resource_type') == NodeType.Archive:
if is_type(node, NodeType.Archive):
print_archive_start_line(node, index, total)


Expand Down Expand Up @@ -120,11 +120,11 @@ def print_archive_start_line(model, index, total):
def print_result_line(result, schema_name, index, total):
node = result.node

if node.get('resource_type') == NodeType.Model:
if is_type(node, NodeType.Model):
print_model_result_line(result, schema_name, index, total)
elif node.get('resource_type') == NodeType.Test:
elif is_type(node, NodeType.Test):
print_test_result_line(result, schema_name, index, total)
elif node.get('resource_type') == NodeType.Archive:
elif is_type(node, NodeType.Archive):
print_archive_result_line(result, index, total)


Expand Down Expand Up @@ -416,7 +416,6 @@ def __init__(self, project, target_path, args):
self.threads = self.args.threads

adapter = get_adapter(profile)
schema_name = adapter.get_default_schema(profile)

def call_get_columns_in_table(schema_name, table_name):
return adapter.get_columns_in_table(
Expand Down Expand Up @@ -469,11 +468,11 @@ def execute_node(self, node, existing):

node = self.inject_runtime_config(node)

if node.get('resource_type') == NodeType.Model:
if is_type(node, NodeType.Model):
result = execute_model(profile, node, existing)
elif node.get('resource_type') == NodeType.Test:
elif is_type(node, NodeType.Test):
result = execute_test(profile, node)
elif node.get('resource_type') == NodeType.Archive:
elif is_type(node, NodeType.Archive):
result = execute_archive(profile, node, self.context)

return result
Expand Down Expand Up @@ -516,6 +515,9 @@ def safe_execute_node(self, data):
status=status,
execution_time=execution_time)

def as_flat_dep_list(self, linker, nodes_to_run):
return [[linker.get_node(node) for node in nodes_to_run]]

def as_concurrent_dep_list(self, linker, nodes_to_run):
dependency_list = linker.as_dependency_list(nodes_to_run)

Expand Down Expand Up @@ -640,11 +642,10 @@ def on_complete(run_model_results):

map_result = pool.map_async(
self.safe_execute_node,
[(node, existing,) for node in local_nodes],
[(local_node, existing,) for local_node in local_nodes],
callback=on_complete
)
map_result.wait()
run_model_results = map_result.get()

node_index += threads

Expand Down Expand Up @@ -711,7 +712,8 @@ def try_create_schema(self):
raise

def run_types_from_graph(self, include_spec, exclude_spec,
resource_types, tags, should_run_hooks=False):
resource_types, tags, should_run_hooks=False,
flatten_graph=False):
linker = self.deserialize_graph()

selected_nodes = self.get_nodes_to_run(
Expand All @@ -721,9 +723,14 @@ def run_types_from_graph(self, include_spec, exclude_spec,
resource_types,
tags)

dependency_list = self.as_concurrent_dep_list(
linker,
selected_nodes)
dependency_list = []

if flatten_graph is False:
dependency_list = self.as_concurrent_dep_list(linker,
selected_nodes)
else:
dependency_list = self.as_flat_dep_list(linker,
selected_nodes)

self.try_create_schema()

Expand All @@ -746,11 +753,13 @@ def run_models(self, include_spec, exclude_spec):
def run_tests(self, include_spec, exclude_spec, tags):
return self.run_types_from_graph(include_spec,
exclude_spec,
[NodeType.Test],
tags)
resource_types=[NodeType.Test],
tags=tags,
flatten_graph=True)

def run_archives(self, include_spec, exclude_spec):
return self.run_types_from_graph(include_spec,
exclude_spec,
[NodeType.Archive],
set())
resource_types=[NodeType.Archive],
tags=set(),
flatten_graph=True)
9 changes: 9 additions & 0 deletions dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ def to_string(s):
return s


def is_blocking_dependency(node):
return (is_type(node, NodeType.Model) and
get_materialization(node) != 'ephemeral')


def get_materialization(node):
return node.get('config', {}).get('materialized')

Expand All @@ -215,6 +220,10 @@ def is_enabled(node):
return node.get('config', {}).get('enabled') is True


def is_type(node, _type):
return node.get('resource_type') == _type


def get_pseudo_test_path(node_name, source_path, test_type):
"schema tests all come from schema.yml files. fake a source sql file"
source_path_parts = split_path(source_path)
Expand Down
13 changes: 12 additions & 1 deletion test/unit/test_linker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import mock
import unittest

import dbt.utils

from dbt.compilation import Linker


class LinkerTest(unittest.TestCase):

def setUp(self):
self.real_is_blocking_dependency = dbt.utils.is_blocking_dependency
self.linker = Linker()

dbt.utils.is_blocking_dependency = mock.MagicMock(return_value=True)

def tearDown(self):
dbt.utils.is_blocking_dependency = self.real_is_blocking_dependency

def test_linker_add_node(self):
expected_nodes = ['A', 'B', 'C']
for node in expected_nodes:
Expand Down Expand Up @@ -69,7 +79,8 @@ def test_linker_bad_limit_throws_runtime_error(self):
for (l, r) in actual_deps:
self.linker.dependency(l, r)

self.assertRaises(RuntimeError, self.linker.as_dependency_list, ['ZZZ'])
self.assertRaises(RuntimeError,
self.linker.as_dependency_list, ['ZZZ'])

def test__find_cycles__cycles(self):
actual_deps = [('A', 'B'), ('B', 'C'), ('C', 'A')]
Expand Down

0 comments on commit 321600b

Please sign in to comment.