diff --git a/dbt/compat.py b/dbt/compat.py index bbd76f5cb76..d3fec5227d0 100644 --- a/dbt/compat.py +++ b/dbt/compat.py @@ -20,9 +20,11 @@ if WHICH_PYTHON == 2: from SimpleHTTPServer import SimpleHTTPRequestHandler from SocketServer import TCPServer + from Queue import PriorityQueue else: from http.server import SimpleHTTPRequestHandler from socketserver import TCPServer + from queue import PriorityQueue def to_unicode(s): diff --git a/dbt/compilation.py b/dbt/compilation.py index 76a56bffd1d..b0f7bbb6f33 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -169,18 +169,14 @@ def write_manifest_file(self, manifest): manifest_path = os.path.join(self.config.target_path, filename) write_json(manifest_path, manifest.serialize()) - def write_graph_file(self, linker): + def write_graph_file(self, linker, manifest): filename = graph_file_name graph_path = os.path.join(self.config.target_path, filename) - linker.write_graph(graph_path) + linker.write_graph(graph_path, manifest) def link_node(self, linker, node, manifest): linker.add_node(node.unique_id) - linker.update_node_data( - node.unique_id, - node.to_dict()) - for dependency in node.depends_on_nodes: if manifest.nodes.get(dependency): linker.dependency( @@ -260,7 +256,7 @@ def compile(self): manifest.macros.items()): stats[node.resource_type] += 1 - self.write_graph_file(linker) + self.write_graph_file(linker, manifest) print_compile_stats(stats) return manifest, linker diff --git a/dbt/graph/selector.py b/dbt/graph/selector.py index d8b6a40984b..19c699eaea0 100644 --- a/dbt/graph/selector.py +++ b/dbt/graph/selector.py @@ -120,122 +120,116 @@ def _node_is_match(qualified_name, package_names, fqn): return False -def get_nodes_by_qualified_name(graph, qualified_name_selector): - """Yield all nodes in the graph that match the qualified_name_selector. - - :param str qualified_name_selector: The selector or node name - """ - qualified_name = qualified_name_selector.split(".") - package_names = get_package_names(graph) - - for node in graph.nodes(): - fqn_ish = graph.node[node]['fqn'] - if _node_is_match(qualified_name, package_names, fqn_ish): - yield node - - -def get_nodes_by_tag(graph, tag_name): - """ yields nodes from graph that have the specified tag """ - - for node in graph.nodes(): - tags = graph.node[node]['tags'] - - if tag_name in tags: - yield node - - -def get_nodes_from_spec(graph, spec): - select_parents = spec['select_parents'] - select_children = spec['select_children'] - - filter_map = { - SELECTOR_FILTERS.FQN: get_nodes_by_qualified_name, - SELECTOR_FILTERS.TAG: get_nodes_by_tag, - } - - node_filter = spec['filter'] - filter_func = filter_map.get(node_filter['type']) - - if filter_func is None: - valid_selectors = ", ".join(filter_map.keys()) - logger.info("The '{}' selector specified in {} is invalid. Must be " - "one of [{}]".format( - node_filter['type'], - spec['raw'], - valid_selectors)) +def warn_if_useless_spec(spec, nodes): + if len(nodes) > 0: + return - selected_nodes = set() + logger.info( + "* Spec='{}' does not identify any models and was ignored\n" + .format(spec['raw']) + ) - else: - selected_nodes = set(filter_func(graph, node_filter['value'])) - additional_nodes = set() - test_nodes = set() +class NodeSelector(object): + def __init__(self, linker, manifest): + self.linker = linker + self.manifest = manifest - if select_parents: - for node in selected_nodes: - parent_nodes = nx.ancestors(graph, node) - additional_nodes.update(parent_nodes) + def get_nodes_by_qualified_name(self, graph, qualified_name_selector): + """Yield all nodes in the graph that match the qualified_name_selector. + + :param str qualified_name_selector: The selector or node name + """ + qualified_name = qualified_name_selector.split(".") + package_names = get_package_names(graph) + for node in graph.nodes(): + fqn_ish = self.manifest.nodes[node].fqn + if _node_is_match(qualified_name, package_names, fqn_ish): + yield node + + def get_nodes_by_tag(self, graph, tag_name): + """ yields nodes from graph that have the specified tag """ + for node in graph.nodes(): + tags = self.manifest.nodes[node].tags + + if tag_name in tags: + yield node + + def get_nodes_from_spec(self, graph, spec): + select_parents = spec['select_parents'] + select_children = spec['select_children'] + + filter_map = { + SELECTOR_FILTERS.FQN: self.get_nodes_by_qualified_name, + SELECTOR_FILTERS.TAG: self.get_nodes_by_tag, + } - if select_children: - for node in selected_nodes: - child_nodes = nx.descendants(graph, node) - additional_nodes.update(child_nodes) + node_filter = spec['filter'] + filter_method = filter_map.get(node_filter['type']) - model_nodes = selected_nodes | additional_nodes + if filter_method is None: + valid_selectors = ", ".join(filter_map.keys()) + logger.info("The '{}' selector specified in {} is invalid. Must " + "be one of [{}]".format( + node_filter['type'], + spec['raw'], + valid_selectors)) - for node in model_nodes: - # include tests that depend on this node. if we aren't running tests, - # they'll be filtered out later. - child_tests = [n for n in graph.successors(node) - if graph.node.get(n).get('resource_type') == - NodeType.Test] - test_nodes.update(child_tests) + selected_nodes = set() - return model_nodes | test_nodes + else: + selected_nodes = set(filter_method(graph, node_filter['value'])) + additional_nodes = set() + test_nodes = set() -def warn_if_useless_spec(spec, nodes): - if len(nodes) > 0: - return + if select_parents: + for node in selected_nodes: + parent_nodes = nx.ancestors(graph, node) + additional_nodes.update(parent_nodes) - logger.info( - "* Spec='{}' does not identify any models and was ignored\n" - .format(spec['raw']) - ) + if select_children: + for node in selected_nodes: + child_nodes = nx.descendants(graph, node) + additional_nodes.update(child_nodes) + model_nodes = selected_nodes | additional_nodes -def select_nodes(graph, raw_include_specs, raw_exclude_specs): - selected_nodes = set() + for node in model_nodes: + # include tests that depend on this node. if we aren't running + # tests, they'll be filtered out later. + child_tests = [n for n in graph.successors(node) + if self.manifest.nodes[n].resource_type == + NodeType.Test] + test_nodes.update(child_tests) - split_include_specs = split_specs(raw_include_specs) - split_exclude_specs = split_specs(raw_exclude_specs) + return model_nodes | test_nodes - include_specs = [parse_spec(spec) for spec in split_include_specs] - exclude_specs = [parse_spec(spec) for spec in split_exclude_specs] + def select_nodes(self, graph, raw_include_specs, raw_exclude_specs): + selected_nodes = set() - for spec in include_specs: - included_nodes = get_nodes_from_spec(graph, spec) - warn_if_useless_spec(spec, included_nodes) - selected_nodes = selected_nodes | included_nodes + split_include_specs = split_specs(raw_include_specs) + split_exclude_specs = split_specs(raw_exclude_specs) - for spec in exclude_specs: - excluded_nodes = get_nodes_from_spec(graph, spec) - warn_if_useless_spec(spec, excluded_nodes) - selected_nodes = selected_nodes - excluded_nodes + include_specs = [parse_spec(spec) for spec in split_include_specs] + exclude_specs = [parse_spec(spec) for spec in split_exclude_specs] - return selected_nodes + for spec in include_specs: + included_nodes = self.get_nodes_from_spec(graph, spec) + warn_if_useless_spec(spec, included_nodes) + selected_nodes = selected_nodes | included_nodes + for spec in exclude_specs: + excluded_nodes = self.get_nodes_from_spec(graph, spec) + warn_if_useless_spec(spec, excluded_nodes) + selected_nodes = selected_nodes - excluded_nodes -class NodeSelector(object): - def __init__(self, linker, manifest): - self.linker = linker - self.manifest = manifest + return selected_nodes def get_valid_nodes(self, graph): valid = [] for node_name in graph.nodes(): - node = graph.node.get(node_name) + node = self.manifest.nodes[node_name] if not node.get('empty') and is_enabled(node): valid.append(node_name) @@ -250,15 +244,15 @@ def get_selected(self, include, exclude, resource_types, tags): to_run = self.get_valid_nodes(graph) filtered_graph = graph.subgraph(to_run) - selected_nodes = select_nodes(filtered_graph, include, exclude) + selected_nodes = self.select_nodes(filtered_graph, include, exclude) filtered_nodes = set() for node_name in selected_nodes: - node = graph.node.get(node_name) + node = self.manifest.nodes[node_name] - matched_resource = node.get('resource_type') in resource_types + matched_resource = node.resource_type in resource_types matched_tags = (len(tags) == 0 or - bool(set(node.get('tags', [])) & set(tags))) + bool(set(node.tags) & set(tags))) if matched_resource and matched_tags: filtered_nodes.add(node_name) @@ -271,9 +265,8 @@ def is_ephemeral_model(self, node): return is_model and is_ephemeral def get_ancestor_ephemeral_nodes(self, selected_nodes): - node_names = { - node: self.manifest.nodes.get(node).name + node: self.manifest.nodes[node].name for node in selected_nodes if node in self.manifest.nodes } @@ -283,7 +276,7 @@ def get_ancestor_ephemeral_nodes(self, selected_nodes): for node in selected_nodes if node in node_names ] - all_ancestors = select_nodes(self.linker.graph, include_spec, []) + all_ancestors = self.select_nodes(self.linker.graph, include_spec, []) res = [] for ancestor in all_ancestors: @@ -304,23 +297,3 @@ def select(self, query): addins = self.get_ancestor_ephemeral_nodes(selected) return selected | addins - - def as_node_list(self, selected_nodes, ephemeral_only=False): - dependency_list = self.linker.as_dependency_list( - selected_nodes, - ephemeral_only=ephemeral_only) - - concurrent_dependency_list = [] - for level in dependency_list: - node_level = [ - ParsedNode(**self.linker.get_node(node)) for node in level - ] - concurrent_dependency_list.append(node_level) - - return concurrent_dependency_list - - -class FlatNodeSelector(NodeSelector): - def as_node_list(self, selected_nodes): - return super(FlatNodeSelector, self).as_node_list(selected_nodes, - ephemeral_only=True) diff --git a/dbt/linker.py b/dbt/linker.py index a2669c586b0..ad1a3bc9e6f 100644 --- a/dbt/linker.py +++ b/dbt/linker.py @@ -1,7 +1,10 @@ import networkx as nx from collections import defaultdict +import threading import dbt.utils +from dbt.compat import PriorityQueue +from dbt.node_types import NodeType GRAPH_SERIALIZE_BLACKLIST = [ @@ -16,6 +19,161 @@ def from_file(graph_file): return linker +def is_blocking_dependency(node): + return node.resource_type == NodeType.Model + + +class GraphQueue(object): + """A fancy queue that is backed by the dependency graph. + Note: this will mutate input! + + This queue is thread-safe for `mark_done` calls, though you must ensure + that separate threads do not call `.empty()` or `__len__()` and `.get()` at + the same time, as there is an unlocked race! + """ + def __init__(self, graph, manifest): + self.graph = graph + self.manifest = manifest + # store the queue as a priority queue. + self.inner = PriorityQueue() + # things that have been popped off the queue but not finished + # and worker thread reservations + self.in_progress = set() + # things that are in the queue + self.queued = set() + # this lock controls most things + self.lock = threading.Lock() + # store the 'score' of each node as a number. Lower is higher priority. + self._scores = self._calculate_scores() + # populate the initial queue + self._find_new_additions() + + def get_node(self, node_id): + return self.manifest.nodes[node_id] + + def _include_in_cost(self, node_id): + node = self.get_node(node_id) + if not is_blocking_dependency(node): + return False + if node.get_materialization() == 'ephemeral': + return False + return True + + def _calculate_scores(self): + """Calculate the 'value' of each node in the graph based on how many + blocking descendants it has. We use this score for the internal + priority queue's ordering, so the quality of this metric is important. + + The score is stored as a negative number because the internal + PriorityQueue picks lowest values first. + + We could do this in one pass over the graph instead of len(self.graph) + passes but this is easy. For large graphs this may hurt performance. + + This operates on the graph, so it would require a lock if called from + outside __init__. + + :return Dict[str, int]: The score dict, mapping unique IDs to integer + scores. Lower scores are higher priority. + """ + scores = {} + for node in self.graph.nodes(): + score = -1 * len([ + d for d in nx.descendants(self.graph, node) + if self._include_in_cost(d) + ]) + scores[node] = score + return scores + + def get(self, block=True, timeout=None): + """Get a node off the inner priority queue. By default, this blocks. + + This takes the lock, but only for part of it. + + :param bool block: If True, block until the inner queue has data + :param Optional[float] timeout: If set, block for timeout seconds + waiting for data. + :return ParsedNode: The node as present in the manifest. + + See `queue.PriorityQueue` for more information on `get()` behavior and + exceptions. + """ + _, node_id = self.inner.get(block=block, timeout=timeout) + with self.lock: + self._mark_in_progress(node_id) + return self.get_node(node_id) + + def __len__(self): + """The length of the queue is the number of tasks left for the queue to + give out, regardless of where they are. Incomplete tasks are not part + of the length. + + This takes the lock. + """ + with self.lock: + return len(self.graph) - len(self.in_progress) + + def empty(self): + """The graph queue is 'empty' if it all remaining nodes in the graph + are in progress. + + This takes the lock. + """ + return len(self) == 0 + + def _already_known(self, node): + """Decide if a node is already known (either handed out as a task, or + in the queue). + + Callers must hold the lock. + + :param str node: The node ID to check + :returns bool: If the node is in progress/queued. + """ + return node in self.in_progress or node in self.queued + + def _find_new_additions(self): + """Find any nodes in the graph that need to be added to the internal + queue and add them. + + Callers must hold the lock. + """ + for node, in_degree in self.graph.in_degree_iter(): + if not self._already_known(node) and in_degree == 0: + self.inner.put((self._scores[node], node)) + self.queued.add(node) + + def mark_done(self, node_id): + """Given a node's unique ID, mark it as done. + + This method takes the lock. + + :param str node_id: The node ID to mark as complete. + """ + with self.lock: + self.in_progress.remove(node_id) + self.graph.remove_node(node_id) + self._find_new_additions() + self.inner.task_done() + + def _mark_in_progress(self, node_id): + """Mark the node as 'in progress'. + + Callers must hold the lock. + + :param str node_id: The node ID to mark as in progress. + """ + self.queued.remove(node_id) + self.in_progress.add(node_id) + + def join(self): + """Join the queue. Blocks until all tasks are marked as done. + + Make sure not to call this before the queue reports that it is empty. + """ + self.inner.join() + + class Linker(object): def __init__(self, data=None): if data is None: @@ -28,9 +186,6 @@ def edges(self): def nodes(self): return self.graph.nodes() - def get_node(self, node): - return self.graph.node[node] - def find_cycles(self): # There's a networkx find_cycle function, but there's a bug in the # nx 1.11 release that prevents us from using it. We should use that @@ -45,44 +200,33 @@ def find_cycles(self): return None - def as_dependency_list(self, limit_to=None, ephemeral_only=False): - """returns a list of list of nodes, eg. [[0,1], [2], [4,5,6]]. Each - element contains nodes whose dependenices are subsumed by the union of - all lists before it. In this way, all nodes in list `i` can be run - simultaneously assuming that all lists before list `i` have been - completed""" - - depth_nodes = defaultdict(list) - + def as_graph_queue(self, manifest, limit_to=None): + """Returns a queue over nodes in the graph that tracks progress of + dependecies. + """ if limit_to is None: graph_nodes = self.graph.nodes() else: graph_nodes = limit_to + new_graph = nx.DiGraph(self.graph) + + to_remove = [] + graph_nodes_lookup = set(graph_nodes) + for node in new_graph.nodes(): + if node not in graph_nodes_lookup: + to_remove.append(node) + + for node in to_remove: + new_graph.remove_node(node) + for node in graph_nodes: - if node not in self.graph: + if node not in new_graph: raise RuntimeError( "Couldn't find model '{}' -- does it exist or is " "it disabled?".format(node) ) - - num_ancestors = len([ - ancestor for ancestor in - nx.ancestors(self.graph, node) - if (dbt.utils.is_blocking_dependency( - self.get_node(ancestor)) and - (ephemeral_only is False or - dbt.utils.get_materialization( - self.get_node(ancestor)) == 'ephemeral')) - ]) - - depth_nodes[num_ancestors].append(node) - - dependency_list = [] - for depth in sorted(depth_nodes.keys()): - dependency_list.append(depth_nodes[depth]) - - return dependency_list + return GraphQueue(new_graph, manifest) def get_dependent_nodes(self, node): return nx.descendants(self.graph, node) @@ -101,24 +245,24 @@ def remove_node(self, node): self.graph.remove_node(node) return children - def update_node_data(self, node, data): - self.graph.add_node(node, data) - - def write_graph(self, outfile): - out_graph = self.remove_blacklisted_attributes_from_nodes(self.graph) + def write_graph(self, outfile, manifest): + """Write the graph to a gpickle file. Before doing so, serialize and + include all nodes in their corresponding graph entries. + """ + out_graph = _updated_graph(self.graph, manifest) nx.write_gpickle(out_graph, outfile) def read_graph(self, infile): self.graph = nx.read_gpickle(infile) - @classmethod - def remove_blacklisted_attributes_from_nodes(cls, graph): - graph = graph.copy() - for node_name, node in graph.node.items(): - slim_node = node.copy() - for key in GRAPH_SERIALIZE_BLACKLIST: - if key in slim_node: - del slim_node[key] - - graph.node[node_name] = slim_node - return graph + +def _updated_graph(graph, manifest): + graph = graph.copy() + for node_id in graph.nodes(): + # serialize() removes the agate table + data = manifest.nodes[node_id].serialize() + for key in GRAPH_SERIALIZE_BLACKLIST: + if key in data: + del data[key] + graph.add_node(node_id, data) + return graph diff --git a/dbt/runner.py b/dbt/runner.py index 8231406e5de..8b6e2f87b47 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -25,49 +25,50 @@ class RunManager(object): - def __init__(self, config): + def __init__(self, config, query, Runner): + """ + Runner is a type (not instance!) derived from + dbt.node_runners.BaseRunner + """ self.config = config + self.query = query + self.Runner = Runner - def deserialize_graph(self): - logger.info("Loading dependency graph file.") - - base_target_path = self.config.target_path - graph_file = os.path.join( - base_target_path, - dbt.compilation.graph_file_name - ) - - return dbt.linker.from_file(graph_file) + manifest, linker = self.compile(self.config) + self.manifest = manifest + self.linker = linker - def get_dependent(self, linker, node_id): - dependent_nodes = linker.get_dependent_nodes(node_id) - for node_id in dependent_nodes: - yield node_id + selector = dbt.graph.selector.NodeSelector(linker, manifest) + selected_nodes = selector.select(query) + self.job_queue = self.linker.as_graph_queue(manifest, selected_nodes) - def get_runners(self, Runner, adapter, node_dependency_list): - all_nodes = dbt.utils.flatten_nodes(node_dependency_list) + # we use this a couple times. order does not matter. + self._flattened_nodes = [ + self.manifest.nodes[uid] for uid in selected_nodes + ] - num_nodes = len([ - n for n in all_nodes if not Runner.is_ephemeral_model(n) + self.run_count = 0 + self.num_nodes = len([ + n for n in self._flattened_nodes + if not Runner.is_ephemeral_model(n) ]) + self.node_results = [] + self._skipped_children = {} - node_runners = {} - i = 0 - for node in all_nodes: - uid = node.get('unique_id') - if Runner.is_ephemeral_model(node): - runner = Runner(self.config, adapter, node, 0, 0) - else: - i += 1 - runner = Runner(self.config, adapter, node, i, num_nodes) - node_runners[uid] = runner + def get_runner(self, node): + adapter = get_adapter(self.config) - return node_runners + if self.Runner.is_ephemeral_model(node): + run_count = 0 + num_nodes = 0 + else: + self.run_count += 1 + run_count = self.run_count + num_nodes = self.num_nodes - def call_runner(self, data): - runner = data['runner'] - manifest = data['manifest'] + return self.Runner(self.config, adapter, node, run_count, num_nodes) + def call_runner(self, runner): if runner.skip: return runner.on_skip() @@ -75,7 +76,7 @@ def call_runner(self, data): if not runner.is_ephemeral_model(runner.node): runner.before_execute() - result = runner.safe_run(manifest) + result = runner.safe_run(self.manifest) if not runner.is_ephemeral_model(runner.node): runner.after_execute(result) @@ -85,29 +86,63 @@ def call_runner(self, data): return result - def get_relevant_runners(self, node_runners, node_subset): - runners = [] - for node in node_subset: - unique_id = node.get('unique_id') - if unique_id in node_runners: - runners.append(node_runners[unique_id]) - return runners - - def map_run(self, pool, args): - """If the caller has passed the magic 'single-threaded' flag, use map() - instead of the pool.imap_unordered. The single-threaded flag is - intended for gathering more useful performance information about what - happens beneath `call_runner`, since python's default profiling tools - ignore child threads. + def _submit(self, pool, args, callback): + """If the caller has passed the magic 'single-threaded' flag, call the + function directly instead of pool.apply_async. The single-threaded flag + is intended for gathering more useful performance information about + what appens beneath `call_runner`, since python's default profiling + tools ignore child threads. + + This does still go through the callback path for result collection. """ if self.config.args.single_threaded: - return map(self.call_runner, args) + callback(self.call_runner(*args)) else: - return pool.imap_unordered(self.call_runner, args) + pool.apply_async(self.call_runner, args=args, callback=callback) - def execute_nodes(self, linker, Runner, manifest, node_dependency_list): - adapter = get_adapter(self.config) + def run_queue(self, pool): + """Given a pool, submit jobs from the queue to the pool. + """ + def callback(result): + """A callback to handle results.""" + self._handle_result(result) + self.job_queue.mark_done(result.node.unique_id) + + while not self.job_queue.empty(): + node = self.job_queue.get() + runner = self.get_runner(node) + # we finally know what we're running! Make sure we haven't decided + # to skip it due to upstream failures + if runner.node.unique_id in self._skipped_children: + cause = self._skipped_children.pop(runner.node.unique_id) + runner.do_skip(cause=cause) + args = (runner,) + self._submit(pool, args, callback) + # block on completion + self.job_queue.join() + + return + + def _handle_result(self, result): + """Note: this happens inside an apply_async() callback, so it must be + "fast". (The pool worker thread will block!) + """ + is_ephemeral = self.Runner.is_ephemeral_model(result.node) + if not is_ephemeral: + self.node_results.append(result) + + node = CompileResultNode(**result.node) + node_id = node.unique_id + self.manifest.nodes[node_id] = node + + if result.errored: + if is_ephemeral: + cause = result + else: + cause = None + self._mark_dependent_errors(node_id, result, cause) + def execute_nodes(self): num_threads = self.config.threads target_name = self.config.target_name @@ -116,76 +151,44 @@ def execute_nodes(self, linker, Runner, manifest, node_dependency_list): dbt.ui.printer.print_timestamped_line(concurrency_line) dbt.ui.printer.print_timestamped_line("") - schemas = list(Runner.get_model_schemas(manifest)) - node_runners = self.get_runners(Runner, adapter, node_dependency_list) + schemas = list(self.Runner.get_model_schemas(self.manifest)) pool = ThreadPool(num_threads) - node_results = [] - for node_list in node_dependency_list: - runners = self.get_relevant_runners(node_runners, node_list) - - args_list = [] - for runner in runners: - args_list.append({ - 'manifest': manifest, - 'runner': runner - }) - - try: - for result in self.map_run(pool, args_list): - is_ephemeral = Runner.is_ephemeral_model(result.node) - if not is_ephemeral: - node_results.append(result) - - node = CompileResultNode(**result.node) - node_id = node.unique_id - manifest.nodes[node_id] = node - - if result.errored: - dependents = self.get_dependent(linker, node_id) - self._mark_dependent_errors(node_runners, dependents, - result, is_ephemeral) - - except KeyboardInterrupt: - pool.close() - pool.terminate() - - adapter = get_adapter(self.config) - - if not adapter.is_cancelable(): - msg = ("The {} adapter does not support query " - "cancellation. Some queries may still be " - "running!".format(adapter.type())) - - yellow = dbt.ui.printer.COLOR_FG_YELLOW - dbt.ui.printer.print_timestamped_line(msg, yellow) - raise - - for conn_name in adapter.cancel_open_connections(): - dbt.ui.printer.print_cancel_line(conn_name) - - dbt.ui.printer.print_run_end_messages(node_results, - early_exit=True) - - pool.join() + try: + self.run_queue(pool) + + except KeyboardInterrupt: + pool.close() + pool.terminate() + + adapter = get_adapter(self.config) + + if not adapter.is_cancelable(): + msg = ("The {} adapter does not support query " + "cancellation. Some queries may still be " + "running!".format(adapter.type())) + + yellow = dbt.ui.printer.COLOR_FG_YELLOW + dbt.ui.printer.print_timestamped_line(msg, yellow) raise + for conn_name in adapter.cancel_open_connections(): + dbt.ui.printer.print_cancel_line(conn_name) + + dbt.ui.printer.print_run_end_messages(self.node_results, + early_exit=True) + + pool.join() + raise + pool.close() pool.join() - return node_results + return self.node_results - @staticmethod - def _mark_dependent_errors(node_runners, dependents, result, is_ephemeral): - for dep_node_id in dependents: - runner = node_runners.get(dep_node_id) - if not runner: - continue - if is_ephemeral: - cause = result - else: - cause = None - runner.do_skip(cause=result) + def _mark_dependent_errors(self, node_id, result, cause): + for dep_node_id in self.linker.get_dependent_nodes(node_id): + self._skipped_children[dep_node_id] = cause def write_results(self, execution_result): filepath = os.path.join(self.config.target_path, RESULT_FILE_NAME) @@ -196,30 +199,18 @@ def compile(self, config): compiler.initialize() return compiler.compile() - def run_from_graph(self, Selector, Runner, query): + def run(self): """ Run dbt for the query, based on the graph. - Selector is a type (not instance!) derived from - dbt.graph.selector.NodeSelector - Runner is a type (not instance!) derived from - dbt.node_runners.BaseRunner - """ - manifest, linker = self.compile(self.config) - - selector = Selector(linker, manifest) - selected_nodes = selector.select(query) - dep_list = selector.as_node_list(selected_nodes) - adapter = get_adapter(self.config) - flat_nodes = dbt.utils.flatten_nodes(dep_list) - if len(flat_nodes) == 0: + if len(self._flattened_nodes) == 0: logger.info("WARNING: Nothing to do. Try checking your model " "configs and model specification args") return [] - elif Runner.print_header: - stat_line = dbt.ui.printer.get_counts(flat_nodes) + elif self.Runner.print_header: + stat_line = dbt.ui.printer.get_counts(self._flattened_nodes) logger.info("") dbt.ui.printer.print_timestamped_line(stat_line) dbt.ui.printer.print_timestamped_line("") @@ -227,13 +218,14 @@ def run_from_graph(self, Selector, Runner, query): logger.info("") try: - Runner.before_hooks(self.config, adapter, manifest) + self.Runner.before_hooks(self.config, adapter, self.manifest) started = time.time() - Runner.before_run(self.config, adapter, manifest) - res = self.execute_nodes(linker, Runner, manifest, dep_list) - Runner.after_run(self.config, adapter, res, manifest) + self.Runner.before_run(self.config, adapter, self.manifest) + res = self.execute_nodes() + self.Runner.after_run(self.config, adapter, res, self.manifest) elapsed = time.time() - started - Runner.after_hooks(self.config, adapter, res, manifest, elapsed) + self.Runner.after_hooks(self.config, adapter, res, self.manifest, + elapsed) finally: adapter.cleanup_connections() @@ -246,13 +238,3 @@ def run_from_graph(self, Selector, Runner, query): self.write_results(result) return res - - # ------------------------------------ - - def run(self, query, Runner): - Selector = dbt.graph.selector.NodeSelector - return self.run_from_graph(Selector, Runner, query) - - def run_flat(self, query, Runner): - Selector = dbt.graph.selector.FlatNodeSelector - return self.run_from_graph(Selector, Runner, query) diff --git a/dbt/task/archive.py b/dbt/task/archive.py index 62a1e8cb0b5..49bac14b1d2 100644 --- a/dbt/task/archive.py +++ b/dbt/task/archive.py @@ -10,7 +10,6 @@ class ArchiveTask(RunnableTask): def run(self): - runner = RunManager(self.config) query = { 'include': ['*'], @@ -18,7 +17,7 @@ def run(self): 'resource_types': [NodeType.Archive] } - results = runner.run_flat(query, ArchiveRunner) + results = RunManager(self.config, query, ArchiveRunner).run() dbt.ui.printer.print_run_end_messages(results) diff --git a/dbt/task/compile.py b/dbt/task/compile.py index 121ca02e39b..74bbf20eef2 100644 --- a/dbt/task/compile.py +++ b/dbt/task/compile.py @@ -11,7 +11,6 @@ class CompileTask(RunnableTask): def run(self): - runner = RunManager(self.config) query = { "include": self.args.models, @@ -19,8 +18,7 @@ def run(self): "resource_types": NodeType.executable(), "tags": [], } - - results = runner.run(query, CompileRunner) + results = RunManager(self.config, query, CompileRunner).run() dbt.ui.printer.print_timestamped_line('Done.') diff --git a/dbt/task/run.py b/dbt/task/run.py index efcedbe03b4..c8b47c7cc22 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -12,7 +12,6 @@ class RunTask(RunnableTask): def run(self): - runner = RunManager(self.config) query = { "include": self.args.models, @@ -21,7 +20,7 @@ def run(self): "tags": [] } - results = runner.run(query, ModelRunner) + results = RunManager(self.config, query, ModelRunner).run() if results: dbt.ui.printer.print_run_end_messages(results) diff --git a/dbt/task/seed.py b/dbt/task/seed.py index 2463bf5db3e..430479489b7 100644 --- a/dbt/task/seed.py +++ b/dbt/task/seed.py @@ -9,13 +9,12 @@ class SeedTask(RunnableTask): def run(self): - runner = RunManager(self.config) query = { "include": ["*"], "exclude": [], "resource_types": [NodeType.Seed], } - results = runner.run_flat(query, SeedRunner) + results = RunManager(self.config, query, SeedRunner).run() if self.args.show: self.show_tables(results) diff --git a/dbt/task/test.py b/dbt/task/test.py index a276db6dd44..b88441eb5b0 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -18,7 +18,6 @@ class TestTask(RunnableTask): d) accepted value """ def run(self): - runner = RunManager(self.config) include = self.args.models exclude = self.args.exclude @@ -41,7 +40,7 @@ def run(self): raise RuntimeError("unexpected") query['tags'] = tags - results = runner.run_flat(query, TestRunner) + results = RunManager(self.config, query, TestRunner).run() dbt.ui.printer.print_run_end_messages(results) diff --git a/dbt/utils.py b/dbt/utils.py index 6ee01c41fc3..252795b3c30 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -338,10 +338,6 @@ def to_string(s): return s -def is_blocking_dependency(node): - return (is_type(node, NodeType.Model)) - - def get_materialization(node): return node.get('config', {}).get('materialized') diff --git a/test/unit/test_graph.py b/test/unit/test_graph.py index 2b8ffaef119..dc63b0e3b83 100644 --- a/test/unit/test_graph.py +++ b/test/unit/test_graph.py @@ -12,6 +12,12 @@ import dbt.templates import dbt.utils +try: + from queue import Empty +except KeyError: + from Queue import Empty + + import networkx as nx from dbt.logger import GLOBAL_LOGGER as logger # noqa @@ -166,7 +172,7 @@ def test__model_materializations(self): } compiler = self.get_compiler(self.get_config(cfg)) - graph, linker = compiler.compile() + manifest, linker = compiler.compile() expected_materialization = { "model_one": "table", @@ -179,8 +185,8 @@ def test__model_materializations(self): for model, expected in expected_materialization.items(): key = 'model.test_models_compile.{}'.format(model) - actual = nodes[key].get('config', {}) \ - .get('materialized') + actual = manifest.nodes[key].get('config', {}) \ + .get('materialized') self.assertEquals(actual, expected) def test__model_incremental(self): @@ -200,8 +206,9 @@ def test__model_incremental(self): } } + compiler = self.get_compiler(self.get_config(cfg)) - graph, linker = compiler.compile() + manifest, linker = compiler.compile() node = 'model.test_models_compile.model_one' @@ -209,8 +216,9 @@ def test__model_incremental(self): self.assertEqual(linker.edges(), []) self.assertEqual( - linker.graph.node[node].get('config', {}).get('materialized'), - 'incremental') + manifest.nodes[node].get('config', {}).get('materialized'), + 'incremental' + ) def test__dependency_list(self): self.use_models({ @@ -227,13 +235,20 @@ def test__dependency_list(self): compiler = self.get_compiler(self.get_config({})) graph, linker = compiler.compile() - actual_dep_list = linker.as_dependency_list() - - expected_dep_list = [ - ['model.test_models_compile.model_1'], - ['model.test_models_compile.model_2'], - ['model.test_models_compile.model_3'], - ['model.test_models_compile.model_4'], - ] + models = ('model_1', 'model_2', 'model_3', 'model_4') + model_ids = ['model.test_models_compile.{}'.format(m) for m in models] - self.assertEqual(actual_dep_list, expected_dep_list) + manifest = MagicMock(nodes={ + n: MagicMock(unique_id=n) + for n in model_ids + }) + queue = linker.as_graph_queue(manifest) + + for model_id in model_ids: + self.assertFalse(queue.empty()) + got = queue.get(block=False) + self.assertEqual(got.unique_id, model_id) + with self.assertRaises(Empty): + queue.get(block=False) + queue.mark_done(got.unique_id) + self.assertTrue(queue.empty()) diff --git a/test/unit/test_graph_selection.py b/test/unit/test_graph_selection.py index 518102f9128..0a59c7211d2 100644 --- a/test/unit/test_graph_selection.py +++ b/test/unit/test_graph_selection.py @@ -1,4 +1,5 @@ import unittest +import mock import os import string @@ -19,21 +20,26 @@ def setUp(self): # Edges: [(X.a, Y.b), (X.a, X.c), (Y.b, Y.d), (Y.b, X.e), (X.c, Y.f), (X.c, X.g)] self.package_graph = nx.relabel_nodes(integer_graph, package_mapping) + nodes = { + node: mock.MagicMock(fqn=node.split('.')[1:], tags=[]) + for node in self.package_graph + } - for node in self.package_graph: - self.package_graph.node[node]['fqn'] = node.split('.')[1:] + nodes['m.X.a'].tags = ['abc'] + nodes['m.Y.b'].tags = ['abc'] + nodes['m.X.c'].tags = ['abc'] + nodes['m.Y.d'].tags = [] + nodes['m.X.e'].tags = ['efg'] + nodes['m.Y.f'].tags = ['efg'] + nodes['m.X.g'].tags = ['efg'] - self.package_graph.node['m.X.a']['tags'] = ['abc'] - self.package_graph.node['m.Y.b']['tags'] = ['abc'] - self.package_graph.node['m.X.c']['tags'] = ['abc'] - self.package_graph.node['m.Y.d']['tags'] = [] - self.package_graph.node['m.X.e']['tags'] = ['efg'] - self.package_graph.node['m.Y.f']['tags'] = ['efg'] - self.package_graph.node['m.X.g']['tags'] = ['efg'] + self.manifest = mock.MagicMock(nodes=nodes) + self.linker = mock.MagicMock(graph=self.package_graph) + self.selector = graph_selector.NodeSelector(self.linker, self.manifest) def run_specs_and_assert(self, graph, include, exclude, expected): - selected = graph_selector.select_nodes( + selected = self.selector.select_nodes( graph, include, exclude @@ -152,21 +158,19 @@ def assert_is_selected_node(self, node, spec, should_work): ) def test__is_selected_node(self): - test = self.assert_is_selected_node - - test(('X', 'a'), ('a'), True) - test(('X', 'a'), ('X', 'a'), True) - test(('X', 'a'), ('*'), True) - test(('X', 'a'), ('X', '*'), True) - - test(('X', 'a', 'b', 'c'), ('X', '*'), True) - test(('X', 'a', 'b', 'c'), ('X', 'a', '*'), True) - test(('X', 'a', 'b', 'c'), ('X', 'a', 'b', '*'), True) - test(('X', 'a', 'b', 'c'), ('X', 'a', 'b', 'c'), True) - test(('X', 'a', 'b', 'c'), ('X', 'a'), True) - test(('X', 'a', 'b', 'c'), ('X', 'a', 'b'), True) - - test(('X', 'a'), ('b'), False) - test(('X', 'a'), ('X', 'b'), False) - test(('X', 'a'), ('X', 'a', 'b'), False) - test(('X', 'a'), ('Y', '*'), False) + self.assert_is_selected_node(('X', 'a'), ('a'), True) + self.assert_is_selected_node(('X', 'a'), ('X', 'a'), True) + self.assert_is_selected_node(('X', 'a'), ('*'), True) + self.assert_is_selected_node(('X', 'a'), ('X', '*'), True) + + self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', '*'), True) + self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', 'a', '*'), True) + self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', 'a', 'b', '*'), True) + self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', 'a', 'b', 'c'), True) + self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', 'a'), True) + self.assert_is_selected_node(('X', 'a', 'b', 'c'), ('X', 'a', 'b'), True) + + self.assert_is_selected_node(('X', 'a'), ('b'), False) + self.assert_is_selected_node(('X', 'a'), ('X', 'b'), False) + self.assert_is_selected_node(('X', 'a'), ('X', 'a', 'b'), False) + self.assert_is_selected_node(('X', 'a'), ('Y', '*'), False) diff --git a/test/unit/test_linker.py b/test/unit/test_linker.py index 9321a207dd1..664df9455f6 100644 --- a/test/unit/test_linker.py +++ b/test/unit/test_linker.py @@ -3,19 +3,28 @@ import dbt.utils -from dbt.compilation import Linker +from dbt import linker +try: + from queue import Empty +except KeyError: + from Queue import Empty +def _mock_manifest(nodes): + return mock.MagicMock(nodes={ + n: mock.MagicMock(unique_id=n) for n in nodes + }) + class LinkerTest(unittest.TestCase): def setUp(self): - self.real_is_blocking_dependency = dbt.utils.is_blocking_dependency - self.linker = Linker() - - dbt.utils.is_blocking_dependency = mock.MagicMock(return_value=True) + self.patcher = mock.patch.object(linker, 'is_blocking_dependency') + self.is_blocking_dependency = self.patcher.start() + self.is_blocking_dependency.return_value = True + self.linker = linker.Linker() def tearDown(self): - dbt.utils.is_blocking_dependency = self.real_is_blocking_dependency + self.patcher.stop() def test_linker_add_node(self): expected_nodes = ['A', 'B', 'C'] @@ -28,15 +37,43 @@ def test_linker_add_node(self): self.assertEqual(len(actual_nodes), len(expected_nodes)) + def assert_would_join(self, queue): + """test join() without timeout risk""" + self.assertEqual(queue.inner.unfinished_tasks, 0) + def test_linker_add_dependency(self): actual_deps = [('A', 'B'), ('A', 'C'), ('B', 'C')] for (l, r) in actual_deps: self.linker.dependency(l, r) - expected_dep_list = [['C'], ['B'], ['A']] - actual_dep_list = self.linker.as_dependency_list() - self.assertEqual(expected_dep_list, actual_dep_list) + manifest = _mock_manifest('ABC') + queue = self.linker.as_graph_queue(manifest) + + got = queue.get(block=False) + self.assertEqual(got.unique_id, 'C') + with self.assertRaises(Empty): + queue.get(block=False) + self.assertFalse(queue.empty()) + queue.mark_done('C') + self.assertFalse(queue.empty()) + + got = queue.get(block=False) + self.assertEqual(got.unique_id, 'B') + with self.assertRaises(Empty): + queue.get(block=False) + self.assertFalse(queue.empty()) + queue.mark_done('B') + self.assertFalse(queue.empty()) + + got = queue.get(block=False) + self.assertEqual(got.unique_id, 'A') + with self.assertRaises(Empty): + queue.get(block=False) + self.assertTrue(queue.empty()) + queue.mark_done('A') + self.assert_would_join(queue) + self.assertTrue(queue.empty()) def test_linker_add_disjoint_dependencies(self): actual_deps = [('A', 'B')] @@ -46,18 +83,32 @@ def test_linker_add_disjoint_dependencies(self): self.linker.dependency(l, r) self.linker.add_node(additional_node) - # has to be one of these two - possible = [ - [['Z', 'B'], ['A']], - [['B', 'Z'], ['A']], - ] - actual = self.linker.as_dependency_list() - - for expected in possible: - if expected == actual: - return - self.assertTrue(False, actual) + manifest = _mock_manifest('ABZ') + queue = self.linker.as_graph_queue(manifest) + + # the first one we get must be B, it has the longest dep chain + first = queue.get(block=False) + self.assertEqual(first.unique_id, 'B') + self.assertFalse(queue.empty()) + queue.mark_done('B') + self.assertFalse(queue.empty()) + + second = queue.get(block=False) + self.assertIn(second.unique_id, {'A', 'Z'}) + self.assertFalse(queue.empty()) + queue.mark_done(second.unique_id) + self.assertFalse(queue.empty()) + + third = queue.get(block=False) + self.assertIn(third.unique_id, {'A', 'Z'}) + with self.assertRaises(Empty): + queue.get(block=False) + self.assertNotEqual(second.unique_id, third.unique_id) + self.assertTrue(queue.empty()) + queue.mark_done(third.unique_id) + self.assert_would_join(queue) + self.assertTrue(queue.empty()) def test_linker_dependencies_limited_to_some_nodes(self): actual_deps = [('A', 'B'), ('B', 'C'), ('C', 'D')] @@ -65,13 +116,30 @@ def test_linker_dependencies_limited_to_some_nodes(self): for (l, r) in actual_deps: self.linker.dependency(l, r) - actual_limit = self.linker.as_dependency_list(['B']) - expected_limit = [['B']] - self.assertEqual(expected_limit, actual_limit) - - actual_limit_2 = self.linker.as_dependency_list(['A', 'B']) - expected_limit_2 = [['B'], ['A']] - self.assertEqual(expected_limit_2, actual_limit_2) + queue = self.linker.as_graph_queue(_mock_manifest('ABCD'), ['B']) + got = queue.get(block=False) + self.assertEqual(got.unique_id, 'B') + self.assertTrue(queue.empty()) + queue.mark_done('B') + self.assert_would_join(queue) + + queue_2 = self.linker.as_graph_queue(_mock_manifest('ABCD'), ['A', 'B']) + got = queue_2.get(block=False) + self.assertEqual(got.unique_id, 'B') + self.assertFalse(queue_2.empty()) + with self.assertRaises(Empty): + queue_2.get(block=False) + queue_2.mark_done('B') + self.assertFalse(queue_2.empty()) + + got = queue_2.get(block=False) + self.assertEqual(got.unique_id, 'A') + self.assertTrue(queue_2.empty()) + with self.assertRaises(Empty): + queue_2.get(block=False) + self.assertTrue(queue_2.empty()) + queue_2.mark_done('A') + self.assert_would_join(queue_2) def test_linker_bad_limit_throws_runtime_error(self): actual_deps = [('A', 'B'), ('B', 'C'), ('C', 'D')] @@ -79,8 +147,8 @@ def test_linker_bad_limit_throws_runtime_error(self): for (l, r) in actual_deps: self.linker.dependency(l, r) - self.assertRaises(RuntimeError, - self.linker.as_dependency_list, ['ZZZ']) + with self.assertRaises(RuntimeError): + self.linker.as_graph_queue(_mock_manifest('ABCD'), ['ZZZ']) def test__find_cycles__cycles(self): actual_deps = [('A', 'B'), ('B', 'C'), ('C', 'A')]