Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8203] Make record merge mode the primary merging config #11943

Open
wants to merge 105 commits into
base: master
Choose a base branch
from

Conversation

jonvex
Copy link
Contributor

@jonvex jonvex commented Sep 13, 2024

Change Logs

Record Merge Mode is the primary way to configure how your merging logic works. Avro payload based merging is only used if you set a payload that is not the default payload or the overwrite with latest payload.

The filegroup reader now will use payload based merging when the table is configured to use a custom avro payload.

Fg reader now uses payload based merging if the user uses that on the write side. This leads to consistent results when reading vs compaction

Impact

Part of umbrella to simplify merge configs for 1.X

Risk level (write none, low medium or high below)

low

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Sep 13, 2024
@jonvex jonvex changed the title first attempt at making merge mode the primary config for merging/pay… [HUDI-8203] Make record merge mode the primary merging config Sep 16, 2024
@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Sep 18, 2024
Comment on lines -68 to -70
/** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
@Deprecated
public static final String DEFAULT_PAYLOAD_CLASS = PAYLOAD_CLASS_NAME.defaultValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding this variable for backwards compatible read (https:/apache/hudi/pull/11943/files#r1805475755), could we hard code it to be DefaultHoodieRecordPayload.class.getName()?

.markAdvanced()
public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty
.key("hoodie.datasource.write.record.merge.mode")
.defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that this can be switched back to OVERWRITE_WITH_LATEST. Is there still an issue? I'm OK with EVENT_TIME_ORDERING; trying to see if it still strictly requires precombine/ordering field to be set.

help = "Payload Class (deprecated). Use merge-mode for overwrite or event time merging.") final String payloadClass,
@ShellOption(value = {"--merge-mode", "--record-merge-mode"}, defaultValue = "",
help = "Merge mode to use. 'EVENT_TIME_ORDERING', 'OVERWRITE_WITH_LATEST', or 'CUSTOM' if you want to set a merge strategy") final String recordMergeMode,
@ShellOption(value = {"--merger-strategy", "--record-merger-strategy"}, defaultValue = "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make all record merge configs to start with --record-merge-, hoodie.write.record.merge., ``hoodie.table.record.merge., e.g., --record-merge-strategy`, `hoodie.write.record.merge.strategy`, to be consistent.

* @deprecated Use {@link #WRITE_PAYLOAD_CLASS_NAME} and its methods instead
*/
@Deprecated
public static final String DEFAULT_WRITE_PAYLOAD_CLASS = WRITE_PAYLOAD_CLASS_NAME.defaultValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can hardcode the legacy default value here.

@@ -1239,16 +1233,19 @@ public String getBasePath() {
}

public HoodieFileFormat getBaseFileFormat() {
return HoodieFileFormat.valueOf(getStringOrDefault(BASE_FILE_FORMAT));
return HoodieFileFormat.valueOf(getStringOrDefault(BASE_FILE_FORMAT).toUpperCase());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we handle the handling of case sensitivity inside HoodieFileFormat?

.collect(Collectors.toList());
String recordMergerStrategy = getString(RECORD_MERGER_STRATEGY);
return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), engineType, mergers, recordMergerStrategy);
return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), engineType, getString(RECORD_MERGER_IMPLS), getString(RECORD_MERGER_STRATEGY));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this API used by Avro merger or custom merge mode only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used to get the avro merger as well

} else if (config.contains("hoodie.datasource.write.payload.class")) {
payloadClassName = config.getString("hoodie.datasource.write.payload.class");
} else {
payloadClassName = RecordPayloadType.valueOf(PAYLOAD_TYPE.defaultValue()).getClassName();
return DefaultHoodieRecordPayload.class.getName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the defined default legacy payload class variable here instead of hardcoding the value.

@@ -1747,7 +1744,7 @@ public int getAsyncClusterMaxCommits() {
}

public String getPayloadClass() {
return getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME);
return RecordPayloadType.getPayloadClassName(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you checked if all existing logic of getting the payload class go through this method? Since this logic now returns the default payload class from the method instead of the config default value, we need to make sure any logic using getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME) should migrate to this API too, to avoid possible null or empty payload class name.

.withReleaseResourceEnabled(writeConfig.areReleaseResourceEnabled())
.withRecordMergeMode(RecordMergeMode.CUSTOM)
.withRecordMergerStrategy(HoodieRecordMerger.PAYLOAD_BASED_MERGER_STRATEGY_UUDID)
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because the payload type is set before (HOODIE_METADATA), which is removed now?

@@ -127,6 +129,14 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
}

override def convertToAvroRecord(record: InternalRow, schema: Schema): GenericRecord = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to myself: revisit for integration and performance impact.

@@ -55,7 +57,7 @@ public abstract class HoodieReaderContext<T> {
private HoodieFileGroupReaderSchemaHandler<T> schemaHandler = null;
private String tablePath = null;
private String latestCommitTime = null;
private HoodieRecordMerger recordMerger = null;
private Option<HoodieRecordMerger> recordMerger = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option.empty()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to make sure it is set. If we get an npe then we know there is a bug. If we have it as option.empty then it is possible that the bug will happen silently and cause correctness issues

Comment on lines +98 to 100
public void setRecordMerger(Option<HoodieRecordMerger> recordMerger) {
this.recordMerger = recordMerger;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the argument avoid Option?

Suggested change
public void setRecordMerger(Option<HoodieRecordMerger> recordMerger) {
this.recordMerger = recordMerger;
}
public void setRecordMerger(HoodieRecordMerger recordMerger) {
this.recordMerger = Option.of(recordMerger);
}

Comment on lines 193 to 195
return persistedOrderingVal == null || ((persistedOrderingVal instanceof String || incomingOrderingVal instanceof String)
? ((Comparable) persistedOrderingVal.toString()).compareTo(incomingOrderingVal.toString()) <= 0
: ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be safe, let's avoid any changes in existing payload classes.

…tColumnProjection) and Test Call repair_overwrite_hoodie_props Procedure
static String getAvroPayloadForMergeMode(RecordMergeMode mergeMode) {
switch (mergeMode) {
//TODO: After we have merge mode working for writing, we should have a dummy payload that will throw exception when used
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should CUSTOM mode be handled too? Is it easier to store the payload class name in the RecordMergeMode enum itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CUSTOM doesn't have an associated payload class. For now we will default to the default payload. But once we fix the experience of using engine native records in the writer, we should not be using payloads anymore.

return Option.empty();
}
})
.key("hoodie.record.merge.custom.strategy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure the config naming is consistent among all write and table configs for merging behavior.

this.recordMerger = readerContext.getRecordMerger(tableConfig.getRecordMergerStrategy());
readerContext.setRecordMerger(this.recordMerger);
readerContext.setRecordMerger(readerContext.getRecordMerger(tableConfig.getRecordMergeMode(),
tableConfig.getRecordMergerStrategy(), props.getString("hoodie.datasource.write.record.merger.impl","")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference the config property declaration of "hoodie.datasource.write.record.merger.impl".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we directly do readerContext.setRecordMerger(tableConfig.getRecordMergeMode(), tableConfig.getRecordMergerStrategy(), props.getString("hoodie.datasource.write.record.merger.impl","")) by changing the setter logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good that all immutable merge configs come from table config now. Should we persist hoodie.datasource.write.record.merger.impl in table config too and make it mutable, so it also comes from the table config?

@@ -162,6 +165,31 @@ private Schema generateRequiredSchema() {
return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
}

private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg, Option<HoodieRecordMerger> recordMerger) {
if (cfg.getRecordMergeMode() == RecordMergeMode.CUSTOM) {
return recordMerger.get().getMandatoryFieldsForMerging(cfg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the default implementation of getMandatoryFieldsForMerging return all columns since custom merging may need all columns regardless of the request schema (to be followed up in a separate fix)? Only the DefaultSparkRecordMerger and OverwriteWithLatestSparkRecordMerger(and counterpart classes for other engines) should overwrite the implementation to return record key and optional preCombine field.

@@ -199,7 +199,7 @@ public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable rec
if (recordOpt.isPresent()) {
String recordKey = recordOpt.get().getRecordKey();
records.put(recordPosition, Pair.of(Option.empty(), readerContext.generateMetadataForRecord(
recordKey, recordOpt.get().getPartitionPath(), recordOpt.get().getOrderingValue(), orderingFieldType)));
recordKey, recordOpt.get().getPartitionPath(), recordOpt.get().getOrderingValue() == null ? orderingFieldDefault : recordOpt.get().getOrderingValue(), orderingFieldTypeOpt)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not set orderingFieldDefault which can cause confusion and written to the storage eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then what should we use as the ordering value when it is null?

@@ -71,6 +71,11 @@ public static HoodieRecordMerger loadRecordMerger(String mergerClass) {
}
}

public static HoodieRecordMerger createRecordMerger(String basePath, EngineType engineType,
String mergerImpls, String recordMergerStrategy) {
return createRecordMerger(basePath, engineType, ConfigUtils.split2List(mergerImpls), recordMergerStrategy);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should HoodieTableConfig parse the custom merge implementation class list, i.e., HoodieTableConfig#getRecordMergerStrategy returns the List<String> mergerClassList, instead of passing the config value all the way down? and get rid of this new API.

@apache apache deleted a comment from hudi-bot Oct 18, 2024
@jonvex jonvex requested a review from yihua October 20, 2024 00:49
@apache apache deleted a comment from hudi-bot Oct 20, 2024
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants