Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(streaming): better source executor & split change #3669

Merged
merged 5 commits into from
Jul 6, 2022

Conversation

BugenZhao
Copy link
Member

@BugenZhao BugenZhao commented Jul 5, 2022

I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.

What's changed and what's your intention?

We need to construct a new SourceStreamReaderImpl on split change. The previous implementation uses a hacky way to do this and may lead to bugs (#3411), however, we can directly mutate the source chunk arm of the stream.

This PR also refines some code styles.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Refer to a related PR or issue link (optional)

Signed-off-by: Bugen Zhao <[email protected]>
Signed-off-by: Bugen Zhao <[email protected]>
Signed-off-by: Bugen Zhao <[email protected]>
@codecov
Copy link

codecov bot commented Jul 5, 2022

Codecov Report

Merging #3669 (90fc43a) into main (383199d) will decrease coverage by 0.00%.
The diff coverage is 81.25%.

@@            Coverage Diff             @@
##             main    #3669      +/-   ##
==========================================
- Coverage   74.23%   74.23%   -0.01%     
==========================================
  Files         789      789              
  Lines      111366   111368       +2     
==========================================
+ Hits        82676    82677       +1     
- Misses      28690    28691       +1     
Flag Coverage Δ
rust 74.23% <81.25%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/stream/src/executor/source/source_executor.rs 95.24% <81.25%> (-0.17%) ⬇️
src/meta/src/hummock/mock_hummock_meta_client.rs 40.56% <0.00%> (-0.95%) ⬇️
src/storage/src/hummock/local_version_manager.rs 81.38% <0.00%> (-0.12%) ⬇️
src/frontend/src/expr/utils.rs 99.24% <0.00%> (+0.25%) ⬆️
src/meta/src/manager/id.rs 96.06% <0.00%> (+0.56%) ⬆️

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

Signed-off-by: Bugen Zhao <[email protected]>
Copy link
Contributor

@skyzh skyzh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Most of the comments are unrelated to your change, I'm just revisiting the previous source implementation.

cc @KeXiangWang I think we should not make the source executor part harder to read and maintain (as seen in #3471). With this PR's change, it would be easier to follow and implement pause barrier. Would you please base your changes on this? What do you think?

while let Some(barrier) = rx.recv().await {
yield barrier;
}
bail!("barrier reader closed unexpectedly");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may also pending here. Not sure whether replacing a terminated stream would cause some confusing result...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or panic here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May fix in further refactors.

for ele in &mut boot_state {
if let Some(recover_state) = self
.split_state_store
.try_recover_from_state_store(ele, epoch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should wait_epoch here in the future, need revisiting this part of code.

{
if let Some(target_splits) = mapping.get(&self.actor_id).cloned() {
if let Some(target_state) = self.get_diff(target_splits) {
log::info!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracing::info

split_offset_mapping,
} = chunk_with_state?;

if let Some(mapping) = split_offset_mapping {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So every time we receive a chunk, we will have such many things to do... I'm thinking of how to improve this part's efficiency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better if we store stream_source_splits as a HashMap?

}
self.state_cache.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to clear state cache here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we write state into state store every time a barrier comes. if the state is none we can skip the process.

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! so successful!

@mergify mergify bot merged commit 4b54d91 into main Jul 6, 2022
@mergify mergify bot deleted the bz/cleanup-source-executor branch July 6, 2022 06:31
nasnoisaac pushed a commit to nasnoisaac/risingwave that referenced this pull request Aug 9, 2022
…elabs#3669)

* remove lock replace

Signed-off-by: Bugen Zhao <[email protected]>

* further cleanup

Signed-off-by: Bugen Zhao <[email protected]>

* fix clippy

Signed-off-by: Bugen Zhao <[email protected]>

* refine docs

Signed-off-by: Bugen Zhao <[email protected]>

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants