Skip to content

Commit

Permalink
move more graph selection stuff into the NodeSelector so we can use t…
Browse files Browse the repository at this point in the history
…he manifest as the source of node information
  • Loading branch information
Jacob Beck committed Nov 28, 2018
1 parent e04422d commit 139518d
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 127 deletions.
191 changes: 92 additions & 99 deletions dbt/graph/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,122 +120,116 @@ def _node_is_match(qualified_name, package_names, fqn):
return False


def get_nodes_by_qualified_name(graph, qualified_name_selector):
"""Yield all nodes in the graph that match the qualified_name_selector.
:param str qualified_name_selector: The selector or node name
"""
qualified_name = qualified_name_selector.split(".")
package_names = get_package_names(graph)

for node in graph.nodes():
fqn_ish = graph.node[node]['fqn']
if _node_is_match(qualified_name, package_names, fqn_ish):
yield node


def get_nodes_by_tag(graph, tag_name):
""" yields nodes from graph that have the specified tag """

for node in graph.nodes():
tags = graph.node[node]['tags']

if tag_name in tags:
yield node


def get_nodes_from_spec(graph, spec):
select_parents = spec['select_parents']
select_children = spec['select_children']

filter_map = {
SELECTOR_FILTERS.FQN: get_nodes_by_qualified_name,
SELECTOR_FILTERS.TAG: get_nodes_by_tag,
}

node_filter = spec['filter']
filter_func = filter_map.get(node_filter['type'])
def warn_if_useless_spec(spec, nodes):
if len(nodes) > 0:
return

if filter_func is None:
valid_selectors = ", ".join(filter_map.keys())
logger.info("The '{}' selector specified in {} is invalid. Must be "
"one of [{}]".format(
node_filter['type'],
spec['raw'],
valid_selectors))
logger.info(
"* Spec='{}' does not identify any models and was ignored\n"
.format(spec['raw'])
)

selected_nodes = set()

else:
selected_nodes = set(filter_func(graph, node_filter['value']))

additional_nodes = set()
test_nodes = set()
class NodeSelector(object):
def __init__(self, linker, manifest):
self.linker = linker
self.manifest = manifest

if select_parents:
for node in selected_nodes:
parent_nodes = nx.ancestors(graph, node)
additional_nodes.update(parent_nodes)
def get_nodes_by_qualified_name(self, graph, qualified_name_selector):
"""Yield all nodes in the graph that match the qualified_name_selector.
:param str qualified_name_selector: The selector or node name
"""
qualified_name = qualified_name_selector.split(".")
package_names = get_package_names(graph)
for node in graph.nodes():
fqn_ish = self.manifest.nodes[node].fqn
if _node_is_match(qualified_name, package_names, fqn_ish):
yield node

def get_nodes_by_tag(self, graph, tag_name):
""" yields nodes from graph that have the specified tag """
for node in graph.nodes():
tags = self.manifest.nodes[node].tags

if tag_name in tags:
yield node

def get_nodes_from_spec(self, graph, spec):
select_parents = spec['select_parents']
select_children = spec['select_children']

filter_map = {
SELECTOR_FILTERS.FQN: self.get_nodes_by_qualified_name,
SELECTOR_FILTERS.TAG: self.get_nodes_by_tag,
}

if select_children:
for node in selected_nodes:
child_nodes = nx.descendants(graph, node)
additional_nodes.update(child_nodes)
node_filter = spec['filter']
filter_method = filter_map.get(node_filter['type'])

model_nodes = selected_nodes | additional_nodes
if filter_method is None:
valid_selectors = ", ".join(filter_map.keys())
logger.info("The '{}' selector specified in {} is invalid. Must "
"be one of [{}]".format(
node_filter['type'],
spec['raw'],
valid_selectors))

for node in model_nodes:
# include tests that depend on this node. if we aren't running tests,
# they'll be filtered out later.
child_tests = [n for n in graph.successors(node)
if graph.node.get(n).get('resource_type') ==
NodeType.Test]
test_nodes.update(child_tests)
selected_nodes = set()

return model_nodes | test_nodes
else:
selected_nodes = set(filter_method(graph, node_filter['value']))

additional_nodes = set()
test_nodes = set()

def warn_if_useless_spec(spec, nodes):
if len(nodes) > 0:
return
if select_parents:
for node in selected_nodes:
parent_nodes = nx.ancestors(graph, node)
additional_nodes.update(parent_nodes)

logger.info(
"* Spec='{}' does not identify any models and was ignored\n"
.format(spec['raw'])
)
if select_children:
for node in selected_nodes:
child_nodes = nx.descendants(graph, node)
additional_nodes.update(child_nodes)

model_nodes = selected_nodes | additional_nodes

def select_nodes(graph, raw_include_specs, raw_exclude_specs):
selected_nodes = set()
for node in model_nodes:
# include tests that depend on this node. if we aren't running
# tests, they'll be filtered out later.
child_tests = [n for n in graph.successors(node)
if self.manifest.nodes[n].resource_type ==
NodeType.Test]
test_nodes.update(child_tests)

split_include_specs = split_specs(raw_include_specs)
split_exclude_specs = split_specs(raw_exclude_specs)
return model_nodes | test_nodes

include_specs = [parse_spec(spec) for spec in split_include_specs]
exclude_specs = [parse_spec(spec) for spec in split_exclude_specs]
def select_nodes(self, graph, raw_include_specs, raw_exclude_specs):
selected_nodes = set()

for spec in include_specs:
included_nodes = get_nodes_from_spec(graph, spec)
warn_if_useless_spec(spec, included_nodes)
selected_nodes = selected_nodes | included_nodes
split_include_specs = split_specs(raw_include_specs)
split_exclude_specs = split_specs(raw_exclude_specs)

for spec in exclude_specs:
excluded_nodes = get_nodes_from_spec(graph, spec)
warn_if_useless_spec(spec, excluded_nodes)
selected_nodes = selected_nodes - excluded_nodes
include_specs = [parse_spec(spec) for spec in split_include_specs]
exclude_specs = [parse_spec(spec) for spec in split_exclude_specs]

return selected_nodes
for spec in include_specs:
included_nodes = self.get_nodes_from_spec(graph, spec)
warn_if_useless_spec(spec, included_nodes)
selected_nodes = selected_nodes | included_nodes

for spec in exclude_specs:
excluded_nodes = self.get_nodes_from_spec(graph, spec)
warn_if_useless_spec(spec, excluded_nodes)
selected_nodes = selected_nodes - excluded_nodes

class NodeSelector(object):
def __init__(self, linker, manifest):
self.linker = linker
self.manifest = manifest
return selected_nodes

def get_valid_nodes(self, graph):
valid = []
for node_name in graph.nodes():
node = graph.node.get(node_name)
node = self.manifest.nodes[node_name]

if not node.get('empty') and is_enabled(node):
valid.append(node_name)
Expand All @@ -250,15 +244,15 @@ def get_selected(self, include, exclude, resource_types, tags):

to_run = self.get_valid_nodes(graph)
filtered_graph = graph.subgraph(to_run)
selected_nodes = select_nodes(filtered_graph, include, exclude)
selected_nodes = self.select_nodes(filtered_graph, include, exclude)

filtered_nodes = set()
for node_name in selected_nodes:
node = graph.node.get(node_name)
node = self.manifest.nodes[node_name]

matched_resource = node.get('resource_type') in resource_types
matched_resource = node.resource_type in resource_types
matched_tags = (len(tags) == 0 or
bool(set(node.get('tags', [])) & set(tags)))
bool(set(node.tags) & set(tags)))

if matched_resource and matched_tags:
filtered_nodes.add(node_name)
Expand All @@ -271,9 +265,8 @@ def is_ephemeral_model(self, node):
return is_model and is_ephemeral

def get_ancestor_ephemeral_nodes(self, selected_nodes):

node_names = {
node: self.manifest.nodes.get(node).name
node: self.manifest.nodes[node].name
for node in selected_nodes
if node in self.manifest.nodes
}
Expand All @@ -283,7 +276,7 @@ def get_ancestor_ephemeral_nodes(self, selected_nodes):
for node in selected_nodes if node in node_names
]

all_ancestors = select_nodes(self.linker.graph, include_spec, [])
all_ancestors = self.select_nodes(self.linker.graph, include_spec, [])

res = []
for ancestor in all_ancestors:
Expand Down
60 changes: 32 additions & 28 deletions test/unit/test_graph_selection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
import mock

import os
import string
Expand All @@ -19,21 +20,26 @@ def setUp(self):

# Edges: [(X.a, Y.b), (X.a, X.c), (Y.b, Y.d), (Y.b, X.e), (X.c, Y.f), (X.c, X.g)]
self.package_graph = nx.relabel_nodes(integer_graph, package_mapping)
nodes = {
node: mock.MagicMock(fqn=node.split('.')[1:], tags=[])
for node in self.package_graph
}

for node in self.package_graph:
self.package_graph.node[node]['fqn'] = node.split('.')[1:]
nodes['m.X.a'].tags = ['abc']
nodes['m.Y.b'].tags = ['abc']
nodes['m.X.c'].tags = ['abc']
nodes['m.Y.d'].tags = []
nodes['m.X.e'].tags = ['efg']
nodes['m.Y.f'].tags = ['efg']
nodes['m.X.g'].tags = ['efg']

self.package_graph.node['m.X.a']['tags'] = ['abc']
self.package_graph.node['m.Y.b']['tags'] = ['abc']
self.package_graph.node['m.X.c']['tags'] = ['abc']
self.package_graph.node['m.Y.d']['tags'] = []
self.package_graph.node['m.X.e']['tags'] = ['efg']
self.package_graph.node['m.Y.f']['tags'] = ['efg']
self.package_graph.node['m.X.g']['tags'] = ['efg']
self.manifest = mock.MagicMock(nodes=nodes)
self.linker = mock.MagicMock(graph=self.package_graph)
self.selector = graph_selector.NodeSelector(self.linker, self.manifest)


def run_specs_and_assert(self, graph, include, exclude, expected):
selected = graph_selector.select_nodes(
selected = self.selector.select_nodes(
graph,
include,
exclude
Expand Down Expand Up @@ -152,21 +158,19 @@ def assert_is_selected_node(self, node, spec, should_work):
)

def test__is_selected_node(self):
test = self.assert_is_selected_node

test(('X', 'a'), ('a'), True)
test(('X', 'a'), ('X', 'a'), True)
test(('X', 'a'), ('*'), True)
test(('X', 'a'), ('X', '*'), True)

test(('X', 'a', 'b', 'c'), ('X', '*'), True)
test(('X', 'a', 'b', 'c'), ('X', 'a', '*'), True)
test(('X', 'a', 'b', 'c'), ('X', 'a', 'b', '*'), True)
test(('X', 'a', 'b', 'c'), ('X', 'a', 'b', 'c'), True)
test(('X', 'a', 'b', 'c'), ('X', 'a'), True)
test(('X', 'a', 'b', 'c'), ('X', 'a', 'b'), True)

test(('X', 'a'), ('b'), False)
test(('X', 'a'), ('X', 'b'), False)
test(('X', 'a'), ('X', 'a', 'b'), False)
test(('X', 'a'), ('Y', '*'), False)
self.assert_is_selected_node(('X', 'a'), ('a'), True)
self.assert_is_selected_node(('X', 'a'), ('X', 'a'), True)
self.assert_is_selected_node(('X', 'a'), ('*'), True)
self.assert_is_selected_node(('X', 'a'), ('X', '*'), True)

self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', '*'), True)
self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', 'a', '*'), True)
self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', 'a', 'b', '*'), True)
self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', 'a', 'b', 'c'), True)
self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', 'a'), True)
self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', 'a', 'b'), True)

self.assert_is_selected_node(('X', 'a'), ('b'), False)
self.assert_is_selected_node(('X', 'a'), ('X', 'b'), False)
self.assert_is_selected_node(('X', 'a'), ('X', 'a', 'b'), False)
self.assert_is_selected_node(('X', 'a'), ('Y', '*'), False)

0 comments on commit 139518d

Please sign in to comment.