-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8196] Support pruning based on partition stats index in Hudi Flink #12132
base: master
Are you sure you want to change the base?
Conversation
@@ -378,6 +378,14 @@ private FlinkOptions() { | |||
.withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space by " | |||
+ "skipping over files"); | |||
|
|||
@AdvancedConfig | |||
public static final ConfigOption<Boolean> READ_PARTITION_DATA_SKIPPING_ENABLED = ConfigOptions | |||
.key("read.partition.data.skipping.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This option can be eliminated, when data skipping is enabled, we always enable partiton stats pruning if the partition metadata is there.
this.metadataConfig = metadataConfig(conf); | ||
this.dataPruner = isDataSkippingFeasible(conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ? dataPruner : null; | ||
this.metadataConfig = StreamerUtil.metadataConfig(conf); | ||
this.dataPruner = isDataSkippingFeasible(conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ? dataPruner : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should rename the pruner to colStatsPruner
because it is used for partition pruning now.
Preconditions.checkArgument(rowType != null && basePath != null && conf != null); | ||
columnStatsPruner = new ColumnStatsPartitionPruner(rowType, basePath, StreamerUtil.metadataConfig(conf), dataPruner); | ||
} | ||
List<PartitionPruner> partitionPruners = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use specific pruner if it is singleton.
public Builder conf(Configuration conf) { | ||
this.conf = conf; | ||
return this; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use builder here? The different pruners have disparity in parameters.
} | ||
PartitionPruner dynamicPruner = null; | ||
if (partitionEvaluators != null && !partitionEvaluators.isEmpty()) { | ||
Preconditions.checkArgument(partitionKeys != null && partitionTypes != null && defaultParName != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add user friendly warnings msgs for these check always.
* | ||
* <p>The {@code filters} must all be simple. | ||
* | ||
* @param dataPruner the data pruner built from push-down filters. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supplement comments for all the parameters.
*/ | ||
public class ColumnStatsIndices { | ||
private static final DataType METADATA_DATA_TYPE = getMetadataDataType(); | ||
private static final DataType COL_STATS_DATA_TYPE = getColStatsDataType(); | ||
private static final int[] COL_STATS_TARGET_POS = getColStatsTargetPos(); | ||
|
||
// the column schema: | ||
// |- file_name: string | ||
// |- file_name: string -- file name for column stats and partition name for partition stats |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file_name -> target_name?
* Utilities for abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index, | ||
* providing convenient interfaces to read it, transpose, etc. | ||
* Utilities for abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index | ||
* or Partition Stats Index, providing convenient interfaces to read it, transpose, etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The doc has inconsistency with the class name. Maybe we just keep the doc as it is if we agree both file stats
and partition stats
belong to column stats
.
Change Logs
This PR introduces a new partition pruner for Flink source based on the Partition Stats Index.
Before this PR, Flink source (batch or streaming) uses partition filters pushed down to build partition pruner and filter irrelevant partitions. Then, Column Stats Index is used to build data pruner to do the file-level data skipping. HUDI-7144 introduced the partition-level column stats, we can use the stats to prune partitions just like the way files are pruned.
Main changes:
ColumnStatsPartitionPruner
.read.partition.data.skipping.enabled
to enabled pruning based on partition stats,false
by default.Impact
Enhance the data skipping ability for Flink source by introducing a new partition pruner based on Partition Stats Index.
Risk level (write none, low medium or high below)
low
Documentation Update
none
Contributor's checklist