Skip to content

Commit

Permalink
refactor: merge to use logical plans (delta-io#1720)
Browse files Browse the repository at this point in the history
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
  • Loading branch information
Blajda authored and ion-elgreco committed Nov 20, 2023
1 parent c1bc5c6 commit 1e41842
Show file tree
Hide file tree
Showing 6 changed files with 536 additions and 473 deletions.
48 changes: 48 additions & 0 deletions crates/deltalake-core/src/delta_datafusion/logical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! Logical Operations for DataFusion

use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore};

// Metric Observer is used to update DataFusion metrics from a record batch.
// See MetricObserverExec for the physical implementation

#[derive(Debug, Hash, Eq, PartialEq)]
pub(crate) struct MetricObserver {
// id is preserved during conversion to physical node
pub id: String,
pub input: LogicalPlan,
}

impl UserDefinedLogicalNodeCore for MetricObserver {
// Predicate push down is not supported for this node. Try to limit usage
// near the end of plan.
fn name(&self) -> &str {
"MetricObserver"
}

fn inputs(&self) -> Vec<&datafusion_expr::LogicalPlan> {
vec![&self.input]
}

fn schema(&self) -> &datafusion_common::DFSchemaRef {
self.input.schema()
}

fn expressions(&self) -> Vec<datafusion_expr::Expr> {
vec![]
}

fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "MetricObserver id={}", &self.id)
}

fn from_template(
&self,
_exprs: &[datafusion_expr::Expr],
inputs: &[datafusion_expr::LogicalPlan],
) -> Self {
MetricObserver {
id: self.id.clone(),
input: inputs[0].clone(),
}
}
}
9 changes: 3 additions & 6 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ use crate::{open_table, open_table_with_storage_options, DeltaTable};
const PATH_COLUMN: &str = "__delta_rs_path";

pub mod expr;
pub mod logical;
pub mod physical;

impl From<DeltaTableError> for DataFusionError {
fn from(err: DeltaTableError) -> Self {
Expand Down Expand Up @@ -351,7 +353,7 @@ pub(crate) fn logical_schema(
snapshot: &DeltaTableState,
scan_config: &DeltaScanConfig,
) -> DeltaResult<SchemaRef> {
let input_schema = snapshot.input_schema()?;
let input_schema = snapshot.arrow_schema()?;
let mut fields = Vec::new();
for field in input_schema.fields.iter() {
fields.push(field.to_owned());
Expand Down Expand Up @@ -505,11 +507,6 @@ impl<'a> DeltaScanBuilder<'a> {
self
}

pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}

pub async fn build(self) -> DeltaResult<DeltaScan> {
let config = self.config;
let schema = match self.schema {
Expand Down
180 changes: 180 additions & 0 deletions crates/deltalake-core/src/delta_datafusion/physical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
//! Physical Operations for DataFusion
use std::sync::Arc;

use arrow_schema::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result as DataFusionResult;
use datafusion::physical_plan::DisplayAs;
use datafusion::physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricsSet},
ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
use futures::{Stream, StreamExt};

use crate::DeltaTableError;

// Metric Observer is used to update DataFusion metrics from a record batch.
// Typically the null count for a particular column is pulled after performing a
// projection since this count is easy to obtain

pub(crate) type MetricObserverFunction = fn(&RecordBatch, &ExecutionPlanMetricsSet) -> ();

pub(crate) struct MetricObserverExec {
parent: Arc<dyn ExecutionPlan>,
id: String,
metrics: ExecutionPlanMetricsSet,
update: MetricObserverFunction,
}

impl MetricObserverExec {
pub fn new(id: String, parent: Arc<dyn ExecutionPlan>, f: MetricObserverFunction) -> Self {
MetricObserverExec {
parent,
id,
metrics: ExecutionPlanMetricsSet::new(),
update: f,
}
}

pub fn try_new(
id: String,
inputs: &[Arc<dyn ExecutionPlan>],
f: MetricObserverFunction,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
match inputs {
[input] => Ok(Arc::new(MetricObserverExec::new(id, input.clone(), f))),
_ => Err(datafusion_common::DataFusionError::External(Box::new(
DeltaTableError::Generic("MetricObserverExec expects only one child".into()),
))),
}
}

pub fn id(&self) -> &str {
&self.id
}
}

impl std::fmt::Debug for MetricObserverExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricObserverExec")
.field("id", &self.id)
.field("metrics", &self.metrics)
.finish()
}
}

impl DisplayAs for MetricObserverExec {
fn fmt_as(
&self,
_: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "MetricObserverExec id={}", self.id)
}
}

impl ExecutionPlan for MetricObserverExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow_schema::SchemaRef {
self.parent.schema()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
self.parent.output_partitioning()
}

fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> {
self.parent.output_ordering()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.parent.clone()]
}

fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::context::TaskContext>,
) -> datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let res = self.parent.execute(partition, context)?;
Ok(Box::pin(MetricObserverStream {
schema: self.schema(),
input: res,
metrics: self.metrics.clone(),
update: self.update,
}))
}

fn statistics(&self) -> DataFusionResult<datafusion_common::Statistics> {
self.parent.statistics()
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
MetricObserverExec::try_new(self.id.clone(), &children, self.update)
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

struct MetricObserverStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
metrics: ExecutionPlanMetricsSet,
update: MetricObserverFunction,
}

impl Stream for MetricObserverStream {
type Item = DataFusionResult<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => {
(self.update)(&batch, &self.metrics);
Some(Ok(batch))
}
other => other,
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}

impl RecordBatchStream for MetricObserverStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

pub(crate) fn find_metric_node(
id: &str,
parent: &Arc<dyn ExecutionPlan>,
) -> Option<Arc<dyn ExecutionPlan>> {
//! Used to locate the physical MetricCountExec Node after the planner converts the logical node
if let Some(metric) = parent.as_any().downcast_ref::<MetricObserverExec>() {
if metric.id().eq(id) {
return Some(parent.to_owned());
}
}

for child in &parent.children() {
let res = find_metric_node(id, child);
if res.is_some() {
return res;
}
}

None
}
Loading

0 comments on commit 1e41842

Please sign in to comment.