-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New data-oriented task distribution strategy. #116
Conversation
In this change, I divide up the download requets to workers in a data-driven way. Namely, after fanning out the partitions, I group by both the license and number of requests. Then, I've restructured the fetch step to execute each group of configs in order. This way, we can guarantee that all licenses are being utilized to their max capacity and within rate limits. This is a much better fix for #98, as it also obviates the needs to limit the max number of workers (this is actually autoscale friendly!) Thanks to [email protected] for the idea and an example for how to implement this.
Now that we have a data-oriented strategy for evenly distributing tasks, we don't need to fight autoscaling anymore. Thus, we can get rid of the code that sets the max number of workers.
In an end-to-end test on dataflow, I can confirm that eventually – even for smaller downloads – this strategy will autoscale up such that all licenses / requests are in use. :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This generally looks good, although there are some sections that seem a bit harder to read than is ideal. I commented on them. Let me know if you need clarification on any of this.
if isinstance(params, dict)] or [('default', {})] | ||
|
||
|
||
def prepare_partitions(config: Config, store: t.Optional[Store] = None) -> t.Iterator[Partition]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a super complicated way to solve this problem. Is there a way to simplify this, perhaps by making it a bit less general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can start thinking about a simpler way to solve this...
params_loop = itertools.cycle(get_subsections(config)) | ||
|
||
partition_configs = filter( | ||
lambda it: new_downloads_only(it, store), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why filter here? If I'm reading this correctly, you're actually hitting the storage here to decide on what to filter. Why not do that in its own DoFn so that it can run in parallel? This could be many orders of magnitude slower than the rest of this method. It seems like just generating all possible paths is what you'd want here with a separate filter step that can do whatever test is required to see if you want to actually download later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little context: This filter is here because there was a bug we found earlier. If we skipped some downloads in a beam.Filter step, there would be a problem where subsections (licenses) wouldn't be evenly distributed to each non-skipped download.
It occurs to me: I could probably perform the cycle step as a separate Map
operation that occurs after a Filter
, and keep these steps really simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's what I was going to propose. I think separately filtering then mapping for the cycle would be most simple and possibly most efficient.
) | ||
|
||
yield from ((*name_and_params, config) for name_and_params, config in zip(params_loop, partition_configs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a subtle enough statement to warrant a comment about what exactly it is doing. I also would accept just splitting this into two lines, a for loop and then a yield statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes.
In this change, I divide up the download requets to workers in a data-driven way. Namely, after fanning out the partitions, I group by both the license and number of requests. Then, I've restructured the fetch step to execute each group of configs in order.
This way, we can guarantee that all licenses are being utilized to their max capacity and within rate limits. This is a much better fix for #98, as it also obviates the needs to limit the max number of workers (this is actually autoscale friendly!)
Thanks to [email protected] for the idea and an example for how to implement this.