-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8203] Make record merge mode the primary merging config #11943
base: master
Are you sure you want to change the base?
Changes from all commits
a010515
d3045da
371fd96
1a64f42
373f67f
8745ea0
f77cb87
60755d4
aac457a
63c9a00
c82c27f
5d466c3
416835f
9795fe8
ab6d567
ae29647
12e8de2
ce04bd5
01f2024
ffc94b8
c789678
88799a2
638d350
3e486d2
1db1fec
8d0d358
4d04980
3ee84d8
da849df
3480af2
1fce20c
2371e19
8af0e40
8dfedc0
5ace882
080f024
f10ab50
5c9783b
5f59bd2
f9aee6e
aeea89b
8a737c0
6e08ef5
6e43f93
2ab97ce
eaaf998
e6ee76a
100e686
3b89dcb
22ebef6
6c2abc9
f6a0da2
80a2e16
f61553c
b07d02b
597677f
7aac16b
080de60
6fc3da5
1751520
da99393
232d0d0
9630cbe
a345c4c
4af7de5
36bd377
0e0775c
6d581c5
7723029
90db422
3ff9e74
699d455
70852c5
c4dc95f
3726f54
8306c68
95f8969
6e96999
b1bd0ca
ab40874
ac45359
cef2687
9f3adb4
709a37a
143dfa7
bdab7de
b0506a2
8fab0f0
57ad75e
25897cb
9119372
addd352
4cdbd16
f619f0b
f55704f
ad0682a
4d11955
b851357
4303223
9a56f78
08b73c0
1bb38d2
413ebc4
75beb3d
3c8df12
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,8 +72,14 @@ public String bootstrap( | |
help = "Class for Full bootstrap input provider") final String fullBootstrapInputProvider, | ||
@ShellOption(value = {"--schemaProviderClass"}, defaultValue = "", | ||
help = "SchemaProvider to attach schemas to bootstrap source data") final String schemaProviderClass, | ||
@ShellOption(value = {"--payloadClass"}, defaultValue = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload", | ||
help = "Payload Class") final String payloadClass, | ||
@ShellOption(value = {"--payloadClass"}, defaultValue = "", | ||
help = "Payload Class (deprecated). Use `--merge-mode` to specify the equivalent payload update behavior.") final String payloadClass, | ||
@ShellOption(value = {"--merge-mode", "--record-merge-mode"}, defaultValue = "", | ||
help = "Merge mode to use. 'EVENT_TIME_ORDERING', 'OVERWRITE_WITH_LATEST', or 'CUSTOM' if you want to set a custom merge strategy and implementation.") final String recordMergeMode, | ||
@ShellOption(value = {"--merger-strategy", "--record-merger-strategy"}, defaultValue = "", | ||
jonvex marked this conversation as resolved.
Show resolved
Hide resolved
jonvex marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can make all record merge configs to start with |
||
help = "ID of the merge strategy to use. Only set when using 'CUSTOM' merge mode") final String recordMergerStrategy, | ||
@ShellOption(value = {"--merger-impls"}, defaultValue = "", | ||
help = "Comma separated list of classes that implement the record merger strategy") final String recordMergerImpls, | ||
@ShellOption(value = {"--parallelism"}, defaultValue = "1500", help = "Bootstrap writer parallelism") final int parallelism, | ||
@ShellOption(value = {"--sparkMaster"}, defaultValue = "", help = "Spark Master") String master, | ||
@ShellOption(value = {"--sparkMemory"}, defaultValue = "4G", help = "Spark executor memory") final String sparkMemory, | ||
|
@@ -95,7 +101,8 @@ public String bootstrap( | |
|
||
SparkMain.addAppArgs(sparkLauncher, SparkCommand.BOOTSTRAP, master, sparkMemory, tableName, tableType, targetPath, srcPath, rowKeyField, | ||
partitionPathField, String.valueOf(parallelism), schemaProviderClass, bootstrapIndexClass, selectorClass, | ||
keyGeneratorClass, fullBootstrapInputProvider, payloadClass, String.valueOf(enableHiveSync), propsFilePath); | ||
keyGeneratorClass, fullBootstrapInputProvider, recordMergeMode, payloadClass, recordMergerStrategy, recordMergerImpls, | ||
String.valueOf(enableHiveSync), propsFilePath); | ||
UtilHelpers.validateAndAddProperties(configs, sparkLauncher); | ||
Process process = sparkLauncher.launch(); | ||
InputStreamConsumer.captureOutput(process); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,9 +65,6 @@ public class HoodiePayloadConfig extends HoodieConfig { | |
.withDocumentation("Table column/field name to order records that have the same key, before " | ||
+ "merging and writing to storage."); | ||
|
||
/** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */ | ||
@Deprecated | ||
public static final String DEFAULT_PAYLOAD_CLASS = PAYLOAD_CLASS_NAME.defaultValue(); | ||
Comment on lines
-68
to
-70
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regarding this variable for backwards compatible read (https:/apache/hudi/pull/11943/files#r1805475755), could we hard code it to be |
||
/** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */ | ||
@Deprecated | ||
public static final String PAYLOAD_CLASS_PROP = PAYLOAD_CLASS_NAME.key(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,12 +35,11 @@ | |
import org.apache.hudi.common.config.HoodieStorageConfig; | ||
import org.apache.hudi.common.config.HoodieTableServiceManagerConfig; | ||
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig; | ||
import org.apache.hudi.common.config.RecordMergeMode; | ||
import org.apache.hudi.common.config.TypedProperties; | ||
import org.apache.hudi.common.engine.EngineType; | ||
import org.apache.hudi.common.fs.ConsistencyGuardConfig; | ||
import org.apache.hudi.common.fs.FileSystemRetryConfig; | ||
import org.apache.hudi.common.model.DefaultHoodieRecordPayload; | ||
import org.apache.hudi.common.model.HoodieAvroRecordMerger; | ||
import org.apache.hudi.common.model.HoodieCleaningPolicy; | ||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; | ||
import org.apache.hudi.common.model.HoodieFileFormat; | ||
|
@@ -168,33 +167,35 @@ public class HoodieWriteConfig extends HoodieConfig { | |
|
||
public static final ConfigProperty<String> WRITE_PAYLOAD_CLASS_NAME = ConfigProperty | ||
.key("hoodie.datasource.write.payload.class") | ||
.defaultValue(DefaultHoodieRecordPayload.class.getName()) | ||
.noDefaultValue() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may still need the default value for backwards compatible write on 0.x tables cc @vinothchandar @bvaradar @balaji-varadarajan There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we do. But we might need to add something so that it doesn't remove the payload class from hoodie.properties There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The point is that, for the backwards compatible writer writing table version 6 (0.14 release and above) in Hudi 1.0 release, it should not use the merge mode and only leverage payload class to create the table. |
||
.markAdvanced() | ||
.deprecatedAfter("1.0.0") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see your point that payload class is no longer required in Hudi 1.0 tables so that no default should be set. For Hudi 1.0 release reading 0.x tables, should we still maintain a variable else where to track the default payload class to be used? The backwards compatibility read logic can be handled in a follow-up. |
||
.withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. " | ||
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective"); | ||
|
||
public static final ConfigProperty<String> WRITE_PAYLOAD_TYPE = ConfigProperty | ||
.key("hoodie.datasource.write.payload.type") | ||
jonvex marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.defaultValue(RecordPayloadType.HOODIE_AVRO_DEFAULT.name()) | ||
.markAdvanced() | ||
public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty | ||
.key("hoodie.write.record.merge.mode") | ||
.defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I remember that this can be switched back to |
||
.sinceVersion("1.0.0") | ||
.withDocumentation(RecordPayloadType.class); | ||
.withDocumentation(RecordMergeMode.class); | ||
|
||
public static final ConfigProperty<String> RECORD_MERGER_IMPLS = ConfigProperty | ||
.key("hoodie.datasource.write.record.merger.impls") | ||
.defaultValue(HoodieAvroRecordMerger.class.getName()) | ||
.key("hoodie.write.record.merge.custom.impl.classes") | ||
.noDefaultValue() | ||
.markAdvanced() | ||
.withAlternatives("hoodie.datasource.write.record.merger.impls") | ||
.sinceVersion("0.13.0") | ||
.withDocumentation("List of HoodieMerger implementations constituting Hudi's merging strategy -- based on the engine used. " | ||
+ "These merger impls will filter by hoodie.datasource.write.record.merger.strategy " | ||
+ "These merger impls will filter by hoodie.write.record.merge.strategy " | ||
+ "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)"); | ||
|
||
public static final ConfigProperty<String> RECORD_MERGER_STRATEGY = ConfigProperty | ||
.key("hoodie.datasource.write.record.merger.strategy") | ||
.defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID) | ||
public static final ConfigProperty<String> RECORD_MERGER_STRATEGY_ID = ConfigProperty | ||
.key("hoodie.write.record.merge.strategy") | ||
.noDefaultValue() | ||
.markAdvanced() | ||
.withAlternatives("hoodie.datasource.write.record.merger.strategy") | ||
.sinceVersion("0.13.0") | ||
.withDocumentation("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in hoodie.datasource.write.record.merger.impls which has the same merger strategy id"); | ||
.withDocumentation("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in hoodie.write.record.merge.custom.impl.classes which has the same merger strategy id"); | ||
|
||
public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty | ||
.key("hoodie.datasource.write.keygenerator.class") | ||
|
@@ -844,11 +845,6 @@ public class HoodieWriteConfig extends HoodieConfig { | |
*/ | ||
@Deprecated | ||
public static final String WRITE_PAYLOAD_CLASS = WRITE_PAYLOAD_CLASS_NAME.key(); | ||
/** | ||
* @deprecated Use {@link #WRITE_PAYLOAD_CLASS_NAME} and its methods instead | ||
*/ | ||
@Deprecated | ||
public static final String DEFAULT_WRITE_PAYLOAD_CLASS = WRITE_PAYLOAD_CLASS_NAME.defaultValue(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can hardcode the legacy default value here. |
||
/** | ||
* @deprecated Use {@link #KEYGENERATOR_CLASS_NAME} and its methods instead | ||
*/ | ||
|
@@ -1239,16 +1235,20 @@ public String getBasePath() { | |
} | ||
|
||
public HoodieFileFormat getBaseFileFormat() { | ||
return HoodieFileFormat.valueOf(getStringOrDefault(BASE_FILE_FORMAT)); | ||
return HoodieFileFormat.valueOf(getStringOrDefault(BASE_FILE_FORMAT).toUpperCase()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could we handle the handling of case sensitivity inside |
||
} | ||
|
||
public String getRecordMergerStrategy() { | ||
return getString(RECORD_MERGER_STRATEGY_ID); | ||
} | ||
|
||
public RecordMergeMode getRecordMergeMode() { | ||
return RecordMergeMode.getValue(getString(RECORD_MERGE_MODE)); | ||
} | ||
|
||
public HoodieRecordMerger getRecordMerger() { | ||
List<String> mergers = StringUtils.split(getStringOrDefault(RECORD_MERGER_IMPLS), ",").stream() | ||
.map(String::trim) | ||
.distinct() | ||
.collect(Collectors.toList()); | ||
String recordMergerStrategy = getString(RECORD_MERGER_STRATEGY); | ||
return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), engineType, mergers, recordMergerStrategy); | ||
return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), | ||
engineType, getString(RECORD_MERGER_IMPLS), getString(RECORD_MERGER_STRATEGY_ID)); | ||
} | ||
|
||
public String getSchema() { | ||
|
@@ -1260,7 +1260,7 @@ public void setSchema(String schemaStr) { | |
} | ||
|
||
public void setRecordMergerClass(String recordMergerStrategy) { | ||
setValue(RECORD_MERGER_STRATEGY, recordMergerStrategy); | ||
setValue(RECORD_MERGER_STRATEGY_ID, recordMergerStrategy); | ||
} | ||
|
||
/** | ||
|
@@ -1747,7 +1747,7 @@ public int getAsyncClusterMaxCommits() { | |
} | ||
|
||
public String getPayloadClass() { | ||
return getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME); | ||
return RecordPayloadType.getPayloadClassName(this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you checked if all existing logic of getting the payload class go through this method? Since this logic now returns the default payload class from the method instead of the config default value, we need to make sure any logic using |
||
} | ||
|
||
public int getTargetPartitionsPerDayBasedCompaction() { | ||
|
@@ -2903,12 +2903,14 @@ public Builder withWritePayLoad(String payload) { | |
} | ||
|
||
public Builder withRecordMergerImpls(String recordMergerImpls) { | ||
writeConfig.setValue(RECORD_MERGER_IMPLS, recordMergerImpls); | ||
if (!StringUtils.isNullOrEmpty(recordMergerImpls)) { | ||
writeConfig.setValue(RECORD_MERGER_IMPLS, recordMergerImpls); | ||
} | ||
return this; | ||
} | ||
|
||
public Builder withRecordMergerStrategy(String recordMergerStrategy) { | ||
writeConfig.setValue(RECORD_MERGER_STRATEGY, recordMergerStrategy); | ||
writeConfig.setValue(RECORD_MERGER_STRATEGY_ID, recordMergerStrategy); | ||
return this; | ||
} | ||
|
||
|
@@ -3099,6 +3101,11 @@ public Builder withPayloadConfig(HoodiePayloadConfig payloadConfig) { | |
return this; | ||
} | ||
|
||
public Builder withRecordMergeMode(RecordMergeMode recordMergeMode) { | ||
writeConfig.setValue(RECORD_MERGE_MODE, recordMergeMode.name()); | ||
return this; | ||
} | ||
|
||
public Builder withMetadataConfig(HoodieMetadataConfig metadataConfig) { | ||
writeConfig.getProps().putAll(metadataConfig.getProps()); | ||
isMetadataConfigSet = true; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,9 +22,12 @@ | |
import org.apache.hudi.client.FailOnFirstErrorWriteStatus; | ||
import org.apache.hudi.common.config.HoodieMetadataConfig; | ||
import org.apache.hudi.common.config.HoodieStorageConfig; | ||
import org.apache.hudi.common.config.RecordMergeMode; | ||
import org.apache.hudi.common.fs.ConsistencyGuardConfig; | ||
import org.apache.hudi.common.model.HoodieAvroRecordMerger; | ||
import org.apache.hudi.common.model.HoodieCleaningPolicy; | ||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; | ||
import org.apache.hudi.common.model.HoodieRecordMerger; | ||
import org.apache.hudi.common.model.WriteConcurrencyMode; | ||
import org.apache.hudi.common.table.HoodieTableConfig; | ||
import org.apache.hudi.common.table.marker.MarkerType; | ||
|
@@ -33,6 +36,7 @@ | |
import org.apache.hudi.config.HoodieArchivalConfig; | ||
import org.apache.hudi.config.HoodieCleanConfig; | ||
import org.apache.hudi.config.HoodieCompactionConfig; | ||
import org.apache.hudi.config.HoodiePayloadConfig; | ||
import org.apache.hudi.config.HoodieWriteConfig; | ||
import org.apache.hudi.config.metrics.HoodieMetricsConfig; | ||
import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig; | ||
|
@@ -156,7 +160,12 @@ public static HoodieWriteConfig createMetadataWriteConfig( | |
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) | ||
.withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS) | ||
.withWriteStatusClass(FailOnFirstErrorWriteStatus.class) | ||
.withReleaseResourceEnabled(writeConfig.areReleaseResourceEnabled()); | ||
.withReleaseResourceEnabled(writeConfig.areReleaseResourceEnabled()) | ||
.withRecordMergeMode(RecordMergeMode.CUSTOM) | ||
.withRecordMergerStrategy(HoodieRecordMerger.PAYLOAD_BASED_MERGER_STRATEGY_UUID) | ||
.withPayloadConfig(HoodiePayloadConfig.newBuilder() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was the payload class not set before? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it because the payload type is set before ( |
||
.withPayloadClass(HoodieMetadataPayload.class.getCanonicalName()).build()) | ||
.withRecordMergerImpls(HoodieAvroRecordMerger.class.getCanonicalName()); | ||
|
||
// RecordKey properties are needed for the metadata table records | ||
final Properties properties = new Properties(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,13 +19,8 @@ | |
package org.apache.hudi.table.upgrade; | ||
|
||
import org.apache.hudi.common.config.ConfigProperty; | ||
import org.apache.hudi.common.config.RecordMergeMode; | ||
import org.apache.hudi.common.engine.HoodieEngineContext; | ||
import org.apache.hudi.common.model.DefaultHoodieRecordPayload; | ||
import org.apache.hudi.common.model.HoodieTableType; | ||
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; | ||
import org.apache.hudi.common.table.HoodieTableConfig; | ||
import org.apache.hudi.common.util.ValidationUtils; | ||
import org.apache.hudi.config.HoodieWriteConfig; | ||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions; | ||
import org.apache.hudi.keygen.constant.KeyGeneratorType; | ||
|
@@ -45,29 +40,6 @@ public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngin | |
final HoodieTableConfig tableConfig = upgradeDowngradeHelper.getTable(config, context).getMetaClient().getTableConfig(); | ||
|
||
Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>(); | ||
if (tableConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still need to keep the logic of adding the required table configs in table version 8, correct? We should avoid the Hudi 1.0 writer to change table configs as much as possible, i.e., invariant table configs should either be created for a new table or changed during upgrade/downgrades, except for MDT related table configs, to avoid possible corruption. |
||
// Record merge mode is required to dictate the merging behavior in version 8, | ||
// playing the same role as the payload class config in version 7. | ||
// Inferring of record merge mode from payload class here. | ||
String payloadClassName = tableConfig.getPayloadClass(); | ||
String propToAdd; | ||
if (null != payloadClassName) { | ||
if (payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) { | ||
propToAdd = RecordMergeMode.OVERWRITE_WITH_LATEST.toString(); | ||
} else if (payloadClassName.equals(DefaultHoodieRecordPayload.class.getName())) { | ||
propToAdd = RecordMergeMode.EVENT_TIME_ORDERING.toString(); | ||
} else { | ||
propToAdd = RecordMergeMode.CUSTOM.toString(); | ||
} | ||
} else { | ||
propToAdd = RecordMergeMode.CUSTOM.toString(); | ||
} | ||
ValidationUtils.checkState(null != propToAdd, String.format("Couldn't infer (%s) from (%s) class name", | ||
HoodieTableConfig.RECORD_MERGE_MODE.key(), payloadClassName)); | ||
|
||
tablePropsToAdd.put(HoodieTableConfig.RECORD_MERGE_MODE, propToAdd); | ||
} | ||
|
||
upgradePartitionFields(config, tableConfig, tablePropsToAdd); | ||
|
||
return tablePropsToAdd; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to keep only one config here instead of alias?