Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fragmenter): introduce batch plan fragmenter #65

Merged
merged 7 commits into from
Feb 9, 2022

Conversation

BowenXiao1999
Copy link
Contributor

@BowenXiao1999 BowenXiao1999 commented Jan 30, 2022

What's changed and what's your intention?

BatchPlanFragmenter has a split function, split distributed physical plan into QueryStage (Based on the exchange node).

The connection of each QueryStage is recorded in StageGraph. To build a StageGraph, needs StageGraphBuilder. Most patterns are learned from Java. May refine it to be more rusty.

Note that this PR do not includes

  • operator id
  • augment (iterate the stage graph to augment exchange source info, node address etc.)
  • schedule (Wrap Stage as Task and distribute to compute node).

Also cuz I use some plan node in test, I have to remove todo and fill in some impl in Plan Node. For example, BatchHashJoin(#deriveSchema...), BatchSeqScan etc.

Some PR relates:
#20

Main API/idea are ported from Java PlanFragmenter. Check the unittest to help review :).

Checklist

  • I have written necessary docs and comments
  • I have added necessary unit tests and integration tests

Refer to a related PR or issue link (optional)

@BowenXiao1999 BowenXiao1999 marked this pull request as draft January 30, 2022 02:47
@BowenXiao1999 BowenXiao1999 linked an issue Jan 30, 2022 that may be closed by this pull request
@BowenXiao1999 BowenXiao1999 marked this pull request as ready for review February 8, 2022 07:19
@BowenXiao1999 BowenXiao1999 force-pushed the bw_add_plan_fragmenter branch 3 times, most recently from 878016e to 78479ac Compare February 8, 2022 09:39
@BowenXiao1999 BowenXiao1999 changed the title feat(fragmenter): introduce plan fragmenter feat(fragmenter): introduce batch plan fragmenter Feb 8, 2022
@codecov
Copy link

codecov bot commented Feb 8, 2022

Codecov Report

Merging #65 (ec0c0ac) into main (c082cb6) will increase coverage by 0.20%.
The diff coverage is 98.11%.

Impacted file tree graph

@@             Coverage Diff              @@
##               main      #65      +/-   ##
============================================
+ Coverage     74.44%   74.65%   +0.20%     
  Complexity     2654     2654              
============================================
  Files           833      834       +1     
  Lines         47835    47883      +48     
  Branches       1562     1562              
============================================
+ Hits          35612    35745     +133     
+ Misses        11424    11339      -85     
  Partials        799      799              
Flag Coverage Δ
java 62.08% <ø> (ø)
rust 80.06% <98.11%> (+0.28%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...frontend/src/optimizer/plan_node/batch_seq_scan.rs 33.33% <ø> (+33.33%) ⬆️
...t/frontend/src/optimizer/plan_node/logical_scan.rs 33.33% <ø> (+33.33%) ⬆️
rust/frontend/src/optimizer/property/order.rs 0.00% <ø> (ø)
rust/frontend/src/scheduler/plan_fragmenter.rs 97.91% <97.91%> (ø)
...frontend/src/optimizer/plan_node/join_predicate.rs 38.88% <100.00%> (+38.88%) ⬆️
...t/frontend/src/optimizer/plan_node/logical_join.rs 32.07% <100.00%> (+32.07%) ⬆️
rust/frontend/src/optimizer/plan_node/mod.rs 29.03% <100.00%> (+9.03%) ⬆️
rust/frontend/src/catalog/column_catalog.rs 28.57% <0.00%> (-3.01%) ⬇️
rust/stream/src/task/barrier_manager.rs 94.44% <0.00%> (-1.07%) ⬇️
rust/meta/src/hummock/compaction.rs 73.85% <0.00%> (-0.66%) ⬇️
... and 15 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c082cb6...ec0c0ac. Read the comment docs.


/// Link parent stage and child stage. Maintain the mappings of parent -> child and child ->
/// parent.
pub fn link_to_child(&mut self, parent_id: StageId, exchange_id: u64, child_id: StageId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the exchange_id mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's operator id of exchange. Got it from L157. Recommend to explain this in rustdoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's operator id of exchange. Got it from L157. Recommend to explain this in rustdoc

Actually I'm not sure whether distinguish operator id (logical) and executor id (physical) in batch? Seems streaming do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm not sure whether distinguish operator id (logical) and executor id (physical) in batch? Seems streaming do that.

Looks good to me

rust/frontend/src/scheduler/plan_fragmenter.rs Outdated Show resolved Hide resolved
rust/frontend/src/scheduler/plan_fragmenter.rs Outdated Show resolved Hide resolved
use crate::scheduler::plan_fragmenter::BatchPlanFragmenter;

#[test]
fn test_fragmenter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some internal operators (like Project/Filter) in the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not so easy to mock Project Plan Node and it won't affect the results. Let's add it in future.

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM!

let query = fragmenter.split(batch_exchange_node3).unwrap();

assert_eq!(query.stage_graph.id, 0);
assert_eq!(query.stage_graph.stages.len(), 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can dump the stage_graph into a json (traversing the graph) and verify the resulting json for correctness.
Otherwise you will have to write much boilerplate code for every case.

Copy link
Contributor Author

@BowenXiao1999 BowenXiao1999 Feb 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! This can be a TODO when the explain of Plan Node is ready.

rust/frontend/src/scheduler/plan_fragmenter.rs Outdated Show resolved Hide resolved
@BowenXiao1999 BowenXiao1999 merged commit bf46b38 into main Feb 9, 2022
@BowenXiao1999 BowenXiao1999 deleted the bw_add_plan_fragmenter branch February 9, 2022 04:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introduce batch plan fragmenter
4 participants