Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(plan_node_fmt): 6 more impls for Distill, refactor all columns_name functions #10344

Merged
merged 3 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ impl Schema {
self.fields().iter().map(|f| f.name.clone()).collect()
}

pub fn names_str(&self) -> Vec<&str> {
self.fields().iter().map(|f| f.name.as_str()).collect()
}

pub fn data_types(&self) -> Vec<DataType> {
self.fields
.iter()
Expand Down
21 changes: 5 additions & 16 deletions src/frontend/src/optimizer/plan_node/batch_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SourceNode;

use super::utils::Distill;
use super::utils::{column_names_pretty, Distill};
use super::{
generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch,
};
Expand All @@ -47,11 +47,7 @@ impl BatchSource {
}

pub fn column_names(&self) -> Vec<&str> {
self.schema()
.fields()
.iter()
.map(|f| f.name.as_str())
.collect()
self.schema().names_str()
}

pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
Expand Down Expand Up @@ -87,17 +83,10 @@ impl fmt::Display for BatchSource {

impl Distill for BatchSource {
fn distill<'a>(&self) -> Pretty<'a> {
let columns = self
.column_names()
.into_iter()
.map(|s| Pretty::from(s.to_owned()))
.collect();
let src = Pretty::from(self.source_catalog().unwrap().name.clone());
let fields = vec![
(
"source",
Pretty::from(self.source_catalog().unwrap().name.clone()),
),
("columns", Pretty::Array(columns)),
("source", src),
("columns", column_names_pretty(self.schema())),
("filter", Pretty::debug(&self.kafka_timestamp_range_value())),
];
Pretty::childless_record("BatchSource", fields)
Expand Down
7 changes: 2 additions & 5 deletions src/frontend/src/optimizer/plan_node/logical_now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::{
ColPrunable, ColumnPruningContext, ExprRewritable, LogicalFilter, PlanBase, PlanRef,
PredicatePushdown, RewriteStreamContext, StreamNow, ToBatch, ToStream, ToStreamContext,
};
use crate::optimizer::plan_node::utils::column_names_pretty;
use crate::optimizer::property::FunctionalDependencySet;
use crate::utils::ColIndexMapping;
use crate::OptimizerContextRef;
Expand All @@ -51,11 +52,7 @@ impl LogicalNow {
impl Distill for LogicalNow {
fn distill<'a>(&self) -> Pretty<'a> {
let vec = if self.base.ctx.is_explain_verbose() {
let disp = Pretty::debug(&IndicesDisplay {
indices: &(0..self.schema().fields.len()).collect_vec(),
input_schema: self.schema(),
});
vec![("output", disp)]
vec![("output", column_names_pretty(self.schema()))]
} else {
vec![]
};
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use std::cell::RefCell;
use std::fmt;

use pretty_xmlish::Pretty;
use risingwave_common::error::ErrorCode::NotImplemented;
use risingwave_common::error::Result;

use super::utils::Distill;
use super::{
generic, ColPrunable, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown,
ToBatch, ToStream,
Expand Down Expand Up @@ -74,6 +76,10 @@ impl LogicalShare {
) -> fmt::Result {
write!(f, "{} {{ id = {} }}", name, &base.id.0)
}

pub(super) fn pretty_fields<'a>(base: &PlanBase, name: &'a str) -> Pretty<'a> {
Pretty::childless_record(name, vec![("id", Pretty::debug(&base.id.0))])
}
}

impl PlanTreeNodeUnary for LogicalShare {
Expand Down Expand Up @@ -108,6 +114,11 @@ impl fmt::Display for LogicalShare {
Self::fmt_with_name(&self.base, f, "LogicalShare")
}
}
impl Distill for LogicalShare {
fn distill<'a>(&self) -> Pretty<'a> {
Self::pretty_fields(&self.base, "LogicalShare")
}
}

impl ColPrunable for LogicalShare {
fn prune_col(&self, _required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
Expand Down
29 changes: 20 additions & 9 deletions src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ use std::ops::Bound::{Excluded, Included, Unbounded};
use std::rc::Rc;

use itertools::Itertools;
use pretty_xmlish::Pretty;
use risingwave_common::catalog::{ColumnCatalog, Schema};
use risingwave_common::error::Result;
use risingwave_connector::source::DataType;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::GeneratedColumnDesc;

use super::stream_watermark_filter::StreamWatermarkFilter;
use super::utils::Distill;
use super::{
generic, BatchProject, BatchSource, ColPrunable, ExprRewritable, LogicalFilter, LogicalProject,
PlanBase, PlanRef, PredicatePushdown, StreamProject, StreamRowIdGen, StreamSource, ToBatch,
Expand All @@ -34,6 +36,7 @@ use super::{
use crate::catalog::source_catalog::SourceCatalog;
use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, InputRef};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::utils::column_names_pretty;
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
};
Expand Down Expand Up @@ -170,14 +173,6 @@ impl LogicalSource {
})
}

pub(super) fn column_names(&self) -> Vec<String> {
self.schema()
.fields()
.iter()
.map(|f| f.name.clone())
.collect()
}

pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
self.core.catalog.clone()
}
Expand Down Expand Up @@ -245,14 +240,30 @@ impl fmt::Display for LogicalSource {
f,
"LogicalSource {{ source: {}, columns: [{}], time_range: [{:?}] }}",
catalog.name,
self.column_names().join(", "),
self.schema().names_str().join(", "),
self.core.kafka_timestamp_range,
)
} else {
write!(f, "LogicalSource")
}
}
}
impl Distill for LogicalSource {
fn distill<'a>(&self) -> Pretty<'a> {
let fields = if let Some(catalog) = self.source_catalog() {
let src = Pretty::from(catalog.name.clone());
let time = Pretty::debug(&self.core.kafka_timestamp_range);
vec![
("source", src),
("columns", column_names_pretty(self.schema())),
("time_range", time),
]
} else {
vec![]
};
Pretty::childless_record("LogicalSource", fields)
}
}

impl ColPrunable for LogicalSource {
fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
Expand Down
27 changes: 26 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
use std::fmt;
use std::ops::BitAnd;

use pretty_xmlish::Pretty;
use risingwave_common::catalog::ColumnDesc;
use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};

use super::generic::{self};
use super::utils::formatter_debug_plan_node;
use super::utils::{formatter_debug_plan_node, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::{Expr, ExprRewriter};
use crate::optimizer::plan_node::stream::StreamPlanRef;
Expand Down Expand Up @@ -114,6 +115,30 @@ impl fmt::Display for StreamDeltaJoin {
builder.finish()
}
}
impl Distill for StreamDeltaJoin {
fn distill<'a>(&self) -> Pretty<'a> {
let verbose = self.base.ctx.is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.logical.join_type)));

let concat_schema = self.logical.concat_schema();
vec.push((
"predicate",
Pretty::debug(&EqJoinPredicateDisplay {
eq_join_predicate: self.eq_join_predicate(),
input_schema: &concat_schema,
}),
));

if verbose {
let data = IndicesDisplay::from_join(&self.logical, &concat_schema)
.map_or_else(|| Pretty::from("all"), |id| Pretty::display(&id));
vec.push(("output", data));
}

Pretty::childless_record("StreamDeltaJoin", vec)
}
}

impl PlanTreeNodeBinary for StreamDeltaJoin {
fn left(&self) -> PlanRef {
Expand Down
17 changes: 15 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use std::fmt;

use fixedbitset::FixedBitSet;
use pretty_xmlish::Pretty;
use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;

use super::utils::Distill;
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;

Expand Down Expand Up @@ -48,10 +50,10 @@ impl StreamDml {
}
}

fn column_names(&self) -> Vec<String> {
fn column_names(&self) -> Vec<&str> {
self.column_descs
.iter()
.map(|column_desc| column_desc.name.clone())
.map(|column_desc| column_desc.name.as_str())
.collect()
}
}
Expand All @@ -65,6 +67,17 @@ impl fmt::Display for StreamDml {
)
}
}
impl Distill for StreamDml {
fn distill<'a>(&self) -> Pretty<'a> {
let col = self
.column_names()
.iter()
.map(|n| Pretty::from(n.to_string()))
.collect();
let col = Pretty::Array(col);
Pretty::childless_record("StreamDml", vec![("columns", col)])
}
}

impl PlanTreeNodeUnary for StreamDml {
fn input(&self) -> PlanRef {
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::fmt;

use pretty_xmlish::Pretty;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::PbStreamNode;

use super::utils::Distill;
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode};
use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -49,6 +51,11 @@ impl fmt::Display for StreamShare {
LogicalShare::fmt_with_name(&self.base, f, "StreamShare")
}
}
impl Distill for StreamShare {
fn distill<'a>(&self) -> Pretty<'a> {
LogicalShare::pretty_fields(&self.base, "StreamShare")
}
}

impl PlanTreeNodeUnary for StreamShare {
fn input(&self) -> PlanRef {
Expand Down
28 changes: 18 additions & 10 deletions src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ use std::rc::Rc;

use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::Pretty;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{PbStreamSource, SourceNode};

use super::utils::formatter_debug_plan_node;
use super::utils::{formatter_debug_plan_node, Distill};
use super::{generic, ExprRewritable, PlanBase, StreamNode};
use crate::catalog::source_catalog::SourceCatalog;
use crate::optimizer::plan_node::utils::column_names_pretty;
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;

Expand Down Expand Up @@ -56,14 +58,6 @@ impl StreamSource {
pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
self.logical.catalog.clone()
}

pub fn column_names(&self) -> Vec<String> {
self.schema()
.fields()
.iter()
.map(|f| f.name.clone())
.collect()
}
}

impl_plan_tree_node_for_leaf! { StreamSource }
Expand All @@ -74,11 +68,25 @@ impl fmt::Display for StreamSource {
if let Some(catalog) = self.source_catalog() {
builder
.field("source", &catalog.name)
.field("columns", &self.column_names());
.field("columns", &self.schema().names_str());
}
builder.finish()
}
}
impl Distill for StreamSource {
fn distill<'a>(&self) -> Pretty<'a> {
let fields = if let Some(catalog) = self.source_catalog() {
let src = Pretty::from(catalog.name.clone());
vec![
("source", src),
("columns", column_names_pretty(self.schema())),
]
} else {
vec![]
};
Pretty::childless_record("StreamSource", fields)
}
}

impl StreamNode for StreamSource {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ macro_rules! impl_distill_by_unit {
}
pub(crate) use impl_distill_by_unit;

pub fn column_names_pretty<'a>(schema: &Schema) -> Pretty<'a> {
let columns = (schema.fields.iter())
.map(|f| f.name.clone())
.map(Pretty::from)
.collect();
Pretty::Array(columns)
}

#[derive(Clone, Copy)]
pub struct IndicesDisplay<'a> {
pub indices: &'a [usize],
Expand Down