Skip to content

Commit

Permalink
feat(sink): decouple starrocks commit from risingwave commit (#16816)
Browse files Browse the repository at this point in the history
Co-authored-by: xxhZs <[email protected]>
Co-authored-by: Xinhao Xu <[email protected]>
Co-authored-by: TennyZhuang <[email protected]>
  • Loading branch information
4 people authored Jun 5, 2024
1 parent 030f2fa commit e01eb2e
Show file tree
Hide file tree
Showing 5 changed files with 733 additions and 129 deletions.
3 changes: 3 additions & 0 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};

/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`).
/// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader,
/// we delay the checkpoint barrier to make commits less frequent.
pub struct DecoupleCheckpointLogSinkerOf<W> {
writer: W,
sink_metrics: SinkMetrics,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct DeltaLakeCommon {
pub s3_endpoint: Option<String>,
#[serde(rename = "gcs.service.account")]
pub gcs_service_account: Option<String>,
// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
/// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
pub commit_checkpoint_interval: Option<u64>,
}
Expand Down Expand Up @@ -295,7 +295,7 @@ impl Sink for DeltaLakeSink {
SinkDecouple::Disable => {
if config_decouple {
return Err(SinkError::Config(anyhow!(
"config conflict: DeltaLake config `commit_checkpoint_interval` bigger than 1 which means that must enable sink decouple, but session config sink decouple is disabled"
"config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
)));
}
Ok(false)
Expand Down
Loading

0 comments on commit e01eb2e

Please sign in to comment.