Skip to content

Commit

Permalink
fix(test): use rate limit instead in background ddl test (#13179)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Nov 1, 2023
1 parent 7f5d3f6 commit 7122d6c
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 37 deletions.
16 changes: 0 additions & 16 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,6 @@ where

let pk_order = self.upstream_table.pk_serializer().get_order_types();

#[cfg(madsim)]
let snapshot_read_delay = if let Ok(v) = std::env::var("RW_BACKFILL_SNAPSHOT_READ_DELAY")
&& let Ok(v) = v.parse::<u64>() {
v
} else {
0
};

let upstream_table_id = self.upstream_table.table_id().table_id;

let mut upstream = self.upstream.execute();
Expand Down Expand Up @@ -303,14 +295,6 @@ where
break 'backfill_loop;
}
Some(chunk) => {
#[cfg(madsim)]
{
tokio::time::sleep(std::time::Duration::from_millis(
snapshot_read_delay as u64,
))
.await;
}

// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
Expand Down
8 changes: 5 additions & 3 deletions src/stream/src/executor/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ impl FlowControlExecutor {
let msg = msg?;
match msg {
Message::Chunk(chunk) => {
let Some(n) = NonZeroU32::new(chunk.cardinality() as u32) else {
// Handle case where chunk is empty
continue;
};
if let Some(rate_limiter) = &rate_limiter {
let result = rate_limiter
.until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap())
.await;
let result = rate_limiter.until_n_ready(n).await;
if let Err(InsufficientCapacity(n)) = result {
tracing::error!(
"Rate Limit {:?} smaller than chunk cardinality {n}",
Expand Down
4 changes: 2 additions & 2 deletions src/tests/simulation/src/background_ddl.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
telemetry_enabled = false
metrics_level = "Disabled"

#[streaming.developer]
#stream_chunk_size = 1
[streaming.developer]
stream_chunk_size = 1

[system]
barrier_interval_ms = 1000
Expand Down
6 changes: 3 additions & 3 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ impl Configuration {

Configuration {
config_path: ConfigPath::Temp(config_path.into()),
frontend_nodes: 1,
compute_nodes: 1,
frontend_nodes: 2,
compute_nodes: 3,
meta_nodes: 3,
compactor_nodes: 1,
compactor_nodes: 2,
compute_node_cores: 2,
etcd_timeout_rate: 0.0,
etcd_data_path: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ use risingwave_simulation::utils::AssertResult;
use tokio::time::sleep;

const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);";
const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100000);";
const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);";
const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;";
const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;";
const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;";
const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=0;";
const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";
const WAIT: &str = "WAIT;";

async fn kill_cn_and_wait_recover(cluster: &Cluster) {
// Kill it again
Expand Down Expand Up @@ -69,6 +73,13 @@ async fn cancel_stream_jobs(session: &mut Session) -> Result<Vec<u32>> {
Ok(ids)
}

fn init_logger() {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_ansi(false)
.try_init();
}

#[tokio::test]
async fn test_background_mv_barrier_recovery() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
Expand Down Expand Up @@ -133,22 +144,17 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {

#[tokio::test]
async fn test_background_ddl_cancel() -> Result<()> {
env::set_var("RW_BACKFILL_SNAPSHOT_READ_DELAY", "100");
async fn create_mv(session: &mut Session) -> Result<()> {
session.run(CREATE_MV1).await?;
sleep(Duration::from_secs(2)).await;
Ok(())
}
// FIXME: See if we can use rate limit instead.
use std::env;
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_ansi(false)
.init();
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
init_logger();
let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?;
let mut session = cluster.start_session();
session.run(CREATE_TABLE).await?;
session.run(SEED_TABLE).await?;
session.run(SET_RATE_LIMIT_2).await?;
session.run(SET_BACKGROUND_DDL).await?;

for _ in 0..5 {
Expand All @@ -157,6 +163,7 @@ async fn test_background_ddl_cancel() -> Result<()> {
assert_eq!(ids.len(), 1);
}

session.run(SET_RATE_LIMIT_1).await?;
create_mv(&mut session).await?;

// Test cancel after kill cn
Expand All @@ -176,16 +183,14 @@ async fn test_background_ddl_cancel() -> Result<()> {
assert_eq!(ids.len(), 1);

// Make sure MV can be created after all these cancels
session.run(RESET_RATE_LIMIT).await?;
create_mv(&mut session).await?;

kill_and_wait_recover(&cluster).await;

// Wait for job to finish
session.run("WAIT;").await?;
session.run(WAIT).await?;

session.run("DROP MATERIALIZED VIEW mv1").await?;
session.run("DROP TABLE t").await?;

env::remove_var("RW_BACKFILL_SNAPSHOT_READ_DELAY");
Ok(())
}

0 comments on commit 7122d6c

Please sign in to comment.