Skip to content

Commit

Permalink
chore: remove enable_stream_row_count config (#10261)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz authored Jun 13, 2023
1 parent a6f38d9 commit 54c660b
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 59 deletions.
10 changes: 0 additions & 10 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,6 @@ serde_with::with_prefix!(batch_prefix "batch_");
/// It is put at [`StreamingConfig::developer`].
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct StreamingDeveloperConfig {
/// Set to true to enable per-executor row count metrics. This will produce a lot of timeseries
/// and might affect the prometheus performance. If you only need actor input and output
/// rows data, see `stream_actor_in_record_cnt` and `stream_actor_out_record_cnt` instead.
#[serde(default = "default::developer::stream_enable_executor_row_count")]
pub enable_executor_row_count: bool,

/// The capacity of the chunks in the channel that connects between `ConnectorSource` and
/// `SourceExecutor`.
#[serde(default = "default::developer::connector_message_buffer_size")]
Expand Down Expand Up @@ -795,10 +789,6 @@ mod default {
1024
}

pub fn stream_enable_executor_row_count() -> bool {
false
}

pub fn connector_message_buffer_size() -> usize {
16
}
Expand Down
1 change: 0 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ async_stack_trace = "ReleaseVerbose"
unique_user_stream_errors = 10

[streaming.developer]
stream_enable_executor_row_count = false
stream_connector_message_buffer_size = 16
stream_unsafe_extreme_cache_size = 10
stream_chunk_size = 1024
Expand Down
36 changes: 5 additions & 31 deletions src/stream/src/executor/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ pub struct WrapperExecutor {
input: BoxedExecutor,

extra: ExtraInfo,

enable_executor_row_count: bool,
}

impl WrapperExecutor {
Expand All @@ -55,7 +53,6 @@ impl WrapperExecutor {
actor_id: ActorId,
executor_id: u64,
metrics: Arc<StreamingMetrics>,
enable_executor_row_count: bool,
) -> Self {
Self {
input,
Expand All @@ -65,20 +62,17 @@ impl WrapperExecutor {
executor_id,
metrics,
},
enable_executor_row_count,
}
}

#[allow(clippy::let_and_return)]
fn wrap_debug(
enable_executor_row_count: bool,
info: Arc<ExecutorInfo>,
extra: ExtraInfo,
stream: impl MessageStream + 'static,
) -> impl MessageStream + 'static {
// Trace
let stream = trace::trace(
enable_executor_row_count,
info.clone(),
extra.input_pos,
extra.actor_id,
Expand All @@ -95,25 +89,17 @@ impl WrapperExecutor {

#[allow(clippy::let_and_return)]
fn wrap_release(
enable_executor_row_count: bool,
_info: Arc<ExecutorInfo>,
extra: ExtraInfo,
stream: impl MessageStream + 'static,
) -> impl MessageStream + 'static {
// Metrics
let stream = trace::metrics(
enable_executor_row_count,
extra.actor_id,
extra.executor_id,
extra.metrics,
stream,
);
let stream = trace::metrics(extra.actor_id, extra.executor_id, extra.metrics, stream);

stream
}

fn wrap(
enable_executor_row_count: bool,
info: Arc<ExecutorInfo>,
extra: ExtraInfo,
stream: impl MessageStream + 'static,
Expand All @@ -133,34 +119,22 @@ impl WrapperExecutor {
let stream = epoch_provide::epoch_provide(stream);

if cfg!(debug_assertions) {
Self::wrap_debug(enable_executor_row_count, info, extra, stream).boxed()
Self::wrap_debug(info, extra, stream).boxed()
} else {
Self::wrap_release(enable_executor_row_count, info, extra, stream).boxed()
Self::wrap_release(info, extra, stream).boxed()
}
}
}

impl Executor for WrapperExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
let info = Arc::new(self.input.info());
Self::wrap(
self.enable_executor_row_count,
info,
self.extra,
self.input.execute(),
)
.boxed()
Self::wrap(info, self.extra, self.input.execute()).boxed()
}

fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
let info = Arc::new(self.input.info());
Self::wrap(
self.enable_executor_row_count,
info,
self.extra,
self.input.execute_with_epoch(epoch),
)
.boxed()
Self::wrap(info, self.extra, self.input.execute_with_epoch(epoch)).boxed()
}

fn schema(&self) -> &Schema {
Expand Down
26 changes: 10 additions & 16 deletions src/stream/src/executor/wrapper/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::task::ActorId;
/// Streams wrapped by `trace` will print data passing in the stream graph to stdout.
#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn trace(
enable_executor_row_count: bool,
info: Arc<ExecutorInfo>,
input_pos: usize,
actor_id: ActorId,
Expand All @@ -52,12 +51,10 @@ pub async fn trace(
while let Some(message) = input.next().in_span(span()).await.transpose()? {
if let Message::Chunk(chunk) = &message {
if chunk.cardinality() > 0 {
if enable_executor_row_count {
metrics
.executor_row_count
.with_label_values(&[&actor_id_string, &info.identity])
.inc_by(chunk.cardinality() as u64);
}
metrics
.executor_row_count
.with_label_values(&[&actor_id_string, &info.identity])
.inc_by(chunk.cardinality() as u64);
event!(tracing::Level::TRACE, prev = %info.identity, msg = "chunk", "input = \n{:#?}", chunk);
}
}
Expand All @@ -69,7 +66,6 @@ pub async fn trace(
/// Streams wrapped by `metrics` will update actor metrics.
#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn metrics(
enable_executor_row_count: bool,
actor_id: ActorId,
executor_id: u64,
metrics: Arc<StreamingMetrics>,
Expand All @@ -80,14 +76,12 @@ pub async fn metrics(
pin_mut!(input);

while let Some(message) = input.next().await.transpose()? {
if enable_executor_row_count {
if let Message::Chunk(chunk) = &message {
if chunk.cardinality() > 0 {
metrics
.executor_row_count
.with_label_values(&[&actor_id_string, &executor_id_string])
.inc_by(chunk.cardinality() as u64);
}
if let Message::Chunk(chunk) = &message {
if chunk.cardinality() > 0 {
metrics
.executor_row_count
.with_label_values(&[&actor_id_string, &executor_id_string])
.inc_by(chunk.cardinality() as u64);
}
}

Expand Down
1 change: 0 additions & 1 deletion src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,6 @@ impl LocalStreamManagerCore {
actor_context.id,
executor_id,
self.streaming_metrics.clone(),
self.config.developer.enable_executor_row_count,
)
.boxed();

Expand Down

0 comments on commit 54c660b

Please sign in to comment.