-
Notifications
You must be signed in to change notification settings - Fork 572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: decouple iceberg commit from risingwave commit #15634
Conversation
446f257
to
0d1380d
Compare
fn default_sink_decouple(desc: &SinkDesc) -> bool { | ||
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") { | ||
interval.parse::<u64>().unwrap_or(0) > 1 | ||
} else { | ||
false | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned by @wenym1, when users can set sink_decouple = false
, sink will not be decoupled. If users use commit_checkpoint_interval
at the same time, it could be problematic and cause a deadlock. Please verify this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Waiting for @wenym1 to take a look.
src/connector/src/sink/mod.rs
Outdated
pub trait Sink: TryFrom<SinkParam, Error = SinkError> { | ||
const SINK_NAME: &'static str; | ||
type LogSinker: LogSinker; | ||
type Coordinator: SinkCommitCoordinator; | ||
|
||
fn default_sink_decouple(_desc: &SinkDesc) -> bool { | ||
false | ||
/// Infer the decouple mode from the sink properties. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can directly change the default_sink_decouple
to fn is_sink_decouple(desc: &SinkDesc, user_specified: SinkDecouple) -> bool
and in frontend we can it directly.
For the enum SinkDecouple
we can implement have a method like fn is_decouple(&self) -> Option<bool>
, and then the default implementation of is_sink_decouple
can be user_specified.is_decouple().unwrap_or(false)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If my understanding is correct, the user_specified: SinkDecouple
is the session config. fn is_sink_decouple(desc: &SinkDesc, user_specified: SinkDecouple) -> bool
determines whether the sink is decoupled. It also will check if there are conflicts between them. 🤔 Maybe it should be fn is_sink_decouple(desc: &SinkDesc, user_specified: SinkDecouple) -> Result<bool>
. and return errors when there are conflicts.
I agree with this idea and we can remove the logic here
log_store_type: match self.base.ctx().session_ctx().config().sink_decouple() { |
So that we can determine the decouple logic in one place rather than separate them.
} else { | ||
Ok(None) | ||
} | ||
// Commit every n checkpoints, if n is not set, 0, 1, we will commit every checkpoint. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to specify this special deserialization method? What the difference to the default one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config actually will be deserialize from string. So we need to specify a method from str to other type. Like
risingwave/src/connector/src/sink/kafka.rs
Line 217 in 61c025d
pub max_retry_num: u32, |
src/connector/src/sink/writer.rs
Outdated
@@ -113,13 +113,16 @@ pub trait FormattedSink { | |||
pub struct LogSinkerOf<W> { | |||
writer: W, | |||
sink_metrics: SinkMetrics, | |||
|
|||
commit_checkpoint_interval: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better implement this special logic in the iceberg sink writer and leave this shared default implementation unchanged. We can implement a new IcebergLogSinker
to consume the log, and then for every commit_checkpoint_interval
number of checkpoint barrier we then commit to iceberg and then truncate the log reader.
61c025d
to
dd0400e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4950 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2128 | 1 | 2821 | 0 |
Click to see the invalid file list
- src/connector/src/sink/iceberg/log_sink.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
6f21026
to
995f97f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
}; | ||
if is_checkpoint { | ||
current_checkpoint += 1; | ||
if commit_checkpoint_interval <= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just assert that the commit_checkpoint_interval
must not be 0 and only check current_checkpoint >= commit_checkpoint_interval
? Currently we use unwrap_or_default
to get the commit_checkpoint
, which will pass 0 when not set. If we use 1 as the default value, we can avoid passing the 0 value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we can't prevent the user from setting it as 0 so it means that we should adjust it to 1 if the user sets it to 0. Another concern is that this assumption is not type-safe. We can't prevent others follow this assumption(It's easy to miss), even for now, seems it will not cause something wrong. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the user set it to 0, we should return with error in sink validation. And in the code here, we can assert that it must not be error, or return with error if it's 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the config and used NonZero
internally to guarantee that the interval is bigger than 0 now.
995f97f
to
adc6317
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM!
Do we have any report on how the checkpoint interval affects the sink performance? How much performance gain can we achieve when we combine the data of multiple checkpoints?
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
complete #13899
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.