-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle concurrent job ordering via a graph iterator (#813) #1157
Conversation
139518d
to
538d53b
Compare
…get() Assorted cleanup/test fixes
538d53b
to
5a06d57
Compare
dbt/linker.py
Outdated
|
||
def _include_in_cost(self, node_id): | ||
node = self.get_node(node_id) | ||
if not dbt.utils.is_blocking_dependency(node): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe is_blocking_dependency
is only used here -- it's probably a good idea to pull the logic from dbt.utils in here and fix the linker test. This _include_in_cost
function will change if we decide to operate on different graphs, e.g. running seeds + models. I'd rather have all of that logic in one place, this seems like an appropriate place for it.
class GraphQueue(object): | ||
"""A fancy queue that is backed by the dependency graph. | ||
Note: this will mutate input! | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the public API for this class thread-safe? it looks like it is, if so can you add that to the docstring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is mostly thread-safe, I will add that fact and the caveats to the docstring.
dbt/linker.py
Outdated
|
||
def _mark_in_progress(self, task_id): | ||
self.queued.remove(task_id) | ||
self.in_progress.add(task_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to our last convo about thread safety, I wonder if a RLock would make the lock management easier -- then you could just add with self.rlock:
blocks around these private fns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really feel like the lock management is that hard here to require an RLock
, but maybe I'm overly reluctant to pull those in. The biggest issue is making sure not to lock around self.inner.get
and triggering deadlocks, which RLock
won't really help with. I will add a docstring to this method indicating it needs to hold the lock though, as that is important.
I'm happy to discuss my thoughts on RLock
as they're complicated and informed by my many threading-related mistakes in life, but in short: it's a dangerous tool that can easily make you complacent about locking, leading to mystery deadlocks. With only 3 places that take the lock (two of which have to be the same thread!), a lack of forward progress is much easier to debug. When they're at all easy to avoid, I prefer to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I'm not sure we want to document it, but all the underscore methods in this class require that you hold the lock (the only ones that don't are called from __init__
where the initializing thread has exclusive access to begin with). Once you're inside an _*
method, you can assume the lock is held and behave accordingly.
dbt/runner.py
Outdated
run_count = self.run_count | ||
num_nodes = self.num_nodes | ||
|
||
return self.Runner(self.config, adapter, node, run_count, num_nodes) | ||
|
||
def deserialize_graph(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you remove this fn? it's not used
if not Runner.is_ephemeral_model(n) | ||
]) | ||
self.node_results = [] | ||
self._skipped_children = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is strictly an improvement of the RunManager. But, this branch changes it from a reusable thing to a single-use-only thing -- once you call run()
, the queue is spent and you need a new RunManager
to run more nodes.
Architecturally I wonder what the point of the RunManager is... the Runner knows how to execute a specific node type / command, the Queue knows how to execute a series of nodes, and the RunManager just sets up the state and knocks it down. That leads me to believe that the "RunManager" is actually an "Invocation"... it contains all the state for the invocation, and has states like "ready", "in progress", and "complete"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have mixed feelings about the existence of RunManager. On the one hand, something has to live "above" the individual task runners and maintain overall run state, and making that thing a class makes organization/data sharing across threads easier. On the other hand, it's hard to decide with these changes where the boundary between __init__
and run
ought to even be for the RunManager, and it feels like its entry point should just be a single function.
The fact that it isn't re-usable is kind of regrettable, but also it's not like we ever re-used RunManager to begin with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left you some comments
Looks good to me. I still think the RunManager needs some attention in the near future. But it should not be part of this branch. |
Fixes #813 by replacing the node selector
as_node_list
withas_graph_iterator
and do graph operations using it. The current metric is that nodes whose dependencies have finished are placed onto a priority queue, prioritized by the number of dependencies they have in the graph as a whole, instead of creating runlevels and only operating on the pool before them.This required quite a bit of restructuring in the RunManager, as it needs to feed completeness information back into the queue. A source of confidence in that refactor is commit e7b1a09, which accomplishes a lot of this with (postgres, at least) integration tests still passing. Other intermediates that I never committed also passed tests during the process of writing acddb3b, so I have pretty high confidence in the new structure.
Worker threads block when there are no nodes left to process.