Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle concurrent job ordering via a graph iterator (#813) #1157

Merged
merged 6 commits into from
Dec 5, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbt/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
if WHICH_PYTHON == 2:
from SimpleHTTPServer import SimpleHTTPRequestHandler
from SocketServer import TCPServer
from Queue import PriorityQueue
else:
from http.server import SimpleHTTPRequestHandler
from socketserver import TCPServer
from queue import PriorityQueue


def to_unicode(s):
Expand Down
10 changes: 3 additions & 7 deletions dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,14 @@ def write_manifest_file(self, manifest):
manifest_path = os.path.join(self.config.target_path, filename)
write_json(manifest_path, manifest.serialize())

def write_graph_file(self, linker):
def write_graph_file(self, linker, manifest):
filename = graph_file_name
graph_path = os.path.join(self.config.target_path, filename)
linker.write_graph(graph_path)
linker.write_graph(graph_path, manifest)

def link_node(self, linker, node, manifest):
linker.add_node(node.unique_id)

linker.update_node_data(
node.unique_id,
node.to_dict())

for dependency in node.depends_on_nodes:
if manifest.nodes.get(dependency):
linker.dependency(
Expand Down Expand Up @@ -260,7 +256,7 @@ def compile(self):
manifest.macros.items()):
stats[node.resource_type] += 1

self.write_graph_file(linker)
self.write_graph_file(linker, manifest)
print_compile_stats(stats)

return manifest, linker
211 changes: 92 additions & 119 deletions dbt/graph/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,122 +120,116 @@ def _node_is_match(qualified_name, package_names, fqn):
return False


def get_nodes_by_qualified_name(graph, qualified_name_selector):
"""Yield all nodes in the graph that match the qualified_name_selector.

:param str qualified_name_selector: The selector or node name
"""
qualified_name = qualified_name_selector.split(".")
package_names = get_package_names(graph)

for node in graph.nodes():
fqn_ish = graph.node[node]['fqn']
if _node_is_match(qualified_name, package_names, fqn_ish):
yield node


def get_nodes_by_tag(graph, tag_name):
""" yields nodes from graph that have the specified tag """

for node in graph.nodes():
tags = graph.node[node]['tags']

if tag_name in tags:
yield node


def get_nodes_from_spec(graph, spec):
select_parents = spec['select_parents']
select_children = spec['select_children']

filter_map = {
SELECTOR_FILTERS.FQN: get_nodes_by_qualified_name,
SELECTOR_FILTERS.TAG: get_nodes_by_tag,
}

node_filter = spec['filter']
filter_func = filter_map.get(node_filter['type'])

if filter_func is None:
valid_selectors = ", ".join(filter_map.keys())
logger.info("The '{}' selector specified in {} is invalid. Must be "
"one of [{}]".format(
node_filter['type'],
spec['raw'],
valid_selectors))
def warn_if_useless_spec(spec, nodes):
if len(nodes) > 0:
return

selected_nodes = set()
logger.info(
"* Spec='{}' does not identify any models and was ignored\n"
.format(spec['raw'])
)

else:
selected_nodes = set(filter_func(graph, node_filter['value']))

additional_nodes = set()
test_nodes = set()
class NodeSelector(object):
def __init__(self, linker, manifest):
self.linker = linker
self.manifest = manifest

if select_parents:
for node in selected_nodes:
parent_nodes = nx.ancestors(graph, node)
additional_nodes.update(parent_nodes)
def get_nodes_by_qualified_name(self, graph, qualified_name_selector):
"""Yield all nodes in the graph that match the qualified_name_selector.

:param str qualified_name_selector: The selector or node name
"""
qualified_name = qualified_name_selector.split(".")
package_names = get_package_names(graph)
for node in graph.nodes():
fqn_ish = self.manifest.nodes[node].fqn
if _node_is_match(qualified_name, package_names, fqn_ish):
yield node

def get_nodes_by_tag(self, graph, tag_name):
""" yields nodes from graph that have the specified tag """
for node in graph.nodes():
tags = self.manifest.nodes[node].tags

if tag_name in tags:
yield node

def get_nodes_from_spec(self, graph, spec):
select_parents = spec['select_parents']
select_children = spec['select_children']

filter_map = {
SELECTOR_FILTERS.FQN: self.get_nodes_by_qualified_name,
SELECTOR_FILTERS.TAG: self.get_nodes_by_tag,
}

if select_children:
for node in selected_nodes:
child_nodes = nx.descendants(graph, node)
additional_nodes.update(child_nodes)
node_filter = spec['filter']
filter_method = filter_map.get(node_filter['type'])

model_nodes = selected_nodes | additional_nodes
if filter_method is None:
valid_selectors = ", ".join(filter_map.keys())
logger.info("The '{}' selector specified in {} is invalid. Must "
"be one of [{}]".format(
node_filter['type'],
spec['raw'],
valid_selectors))

for node in model_nodes:
# include tests that depend on this node. if we aren't running tests,
# they'll be filtered out later.
child_tests = [n for n in graph.successors(node)
if graph.node.get(n).get('resource_type') ==
NodeType.Test]
test_nodes.update(child_tests)
selected_nodes = set()

return model_nodes | test_nodes
else:
selected_nodes = set(filter_method(graph, node_filter['value']))

additional_nodes = set()
test_nodes = set()

def warn_if_useless_spec(spec, nodes):
if len(nodes) > 0:
return
if select_parents:
for node in selected_nodes:
parent_nodes = nx.ancestors(graph, node)
additional_nodes.update(parent_nodes)

logger.info(
"* Spec='{}' does not identify any models and was ignored\n"
.format(spec['raw'])
)
if select_children:
for node in selected_nodes:
child_nodes = nx.descendants(graph, node)
additional_nodes.update(child_nodes)

model_nodes = selected_nodes | additional_nodes

def select_nodes(graph, raw_include_specs, raw_exclude_specs):
selected_nodes = set()
for node in model_nodes:
# include tests that depend on this node. if we aren't running
# tests, they'll be filtered out later.
child_tests = [n for n in graph.successors(node)
if self.manifest.nodes[n].resource_type ==
NodeType.Test]
test_nodes.update(child_tests)

split_include_specs = split_specs(raw_include_specs)
split_exclude_specs = split_specs(raw_exclude_specs)
return model_nodes | test_nodes

include_specs = [parse_spec(spec) for spec in split_include_specs]
exclude_specs = [parse_spec(spec) for spec in split_exclude_specs]
def select_nodes(self, graph, raw_include_specs, raw_exclude_specs):
selected_nodes = set()

for spec in include_specs:
included_nodes = get_nodes_from_spec(graph, spec)
warn_if_useless_spec(spec, included_nodes)
selected_nodes = selected_nodes | included_nodes
split_include_specs = split_specs(raw_include_specs)
split_exclude_specs = split_specs(raw_exclude_specs)

for spec in exclude_specs:
excluded_nodes = get_nodes_from_spec(graph, spec)
warn_if_useless_spec(spec, excluded_nodes)
selected_nodes = selected_nodes - excluded_nodes
include_specs = [parse_spec(spec) for spec in split_include_specs]
exclude_specs = [parse_spec(spec) for spec in split_exclude_specs]

return selected_nodes
for spec in include_specs:
included_nodes = self.get_nodes_from_spec(graph, spec)
warn_if_useless_spec(spec, included_nodes)
selected_nodes = selected_nodes | included_nodes

for spec in exclude_specs:
excluded_nodes = self.get_nodes_from_spec(graph, spec)
warn_if_useless_spec(spec, excluded_nodes)
selected_nodes = selected_nodes - excluded_nodes

class NodeSelector(object):
def __init__(self, linker, manifest):
self.linker = linker
self.manifest = manifest
return selected_nodes

def get_valid_nodes(self, graph):
valid = []
for node_name in graph.nodes():
node = graph.node.get(node_name)
node = self.manifest.nodes[node_name]

if not node.get('empty') and is_enabled(node):
valid.append(node_name)
Expand All @@ -250,15 +244,15 @@ def get_selected(self, include, exclude, resource_types, tags):

to_run = self.get_valid_nodes(graph)
filtered_graph = graph.subgraph(to_run)
selected_nodes = select_nodes(filtered_graph, include, exclude)
selected_nodes = self.select_nodes(filtered_graph, include, exclude)

filtered_nodes = set()
for node_name in selected_nodes:
node = graph.node.get(node_name)
node = self.manifest.nodes[node_name]

matched_resource = node.get('resource_type') in resource_types
matched_resource = node.resource_type in resource_types
matched_tags = (len(tags) == 0 or
bool(set(node.get('tags', [])) & set(tags)))
bool(set(node.tags) & set(tags)))

if matched_resource and matched_tags:
filtered_nodes.add(node_name)
Expand All @@ -271,9 +265,8 @@ def is_ephemeral_model(self, node):
return is_model and is_ephemeral

def get_ancestor_ephemeral_nodes(self, selected_nodes):

node_names = {
node: self.manifest.nodes.get(node).name
node: self.manifest.nodes[node].name
for node in selected_nodes
if node in self.manifest.nodes
}
Expand All @@ -283,7 +276,7 @@ def get_ancestor_ephemeral_nodes(self, selected_nodes):
for node in selected_nodes if node in node_names
]

all_ancestors = select_nodes(self.linker.graph, include_spec, [])
all_ancestors = self.select_nodes(self.linker.graph, include_spec, [])

res = []
for ancestor in all_ancestors:
Expand All @@ -304,23 +297,3 @@ def select(self, query):
addins = self.get_ancestor_ephemeral_nodes(selected)

return selected | addins

def as_node_list(self, selected_nodes, ephemeral_only=False):
dependency_list = self.linker.as_dependency_list(
selected_nodes,
ephemeral_only=ephemeral_only)

concurrent_dependency_list = []
for level in dependency_list:
node_level = [
ParsedNode(**self.linker.get_node(node)) for node in level
]
concurrent_dependency_list.append(node_level)

return concurrent_dependency_list


class FlatNodeSelector(NodeSelector):
def as_node_list(self, selected_nodes):
return super(FlatNodeSelector, self).as_node_list(selected_nodes,
ephemeral_only=True)
Loading