Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8203] Make record merge mode the primary merging config #11943

Open
wants to merge 105 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
a010515
first attempt at making merge mode the primary config for merging/pay…
Sep 13, 2024
d3045da
remove unused methods in meta client
Sep 13, 2024
371fd96
Merge branch 'master' into create_new_merger_configs
Sep 16, 2024
1a64f42
get the quickstart to pass
Sep 16, 2024
373f67f
get rid of defaults for payload in hoodie option config
Sep 16, 2024
8745ea0
don't infer based on engine type now
Sep 18, 2024
f77cb87
Merge branch 'master' into create_new_merger_configs
Sep 18, 2024
60755d4
fix some things and fix the issue with MIT
Sep 19, 2024
aac457a
fix checkstyle
Sep 19, 2024
63c9a00
support avro read strategy
Sep 19, 2024
c82c27f
checkstyle
Sep 19, 2024
5d466c3
allow payload during read
Sep 19, 2024
416835f
Merge branch 'master' into create_new_merger_configs
Sep 19, 2024
9795fe8
fix merge issue
Sep 19, 2024
ab6d567
validate that we don't try to do payload merging with event time or o…
Sep 19, 2024
ae29647
fix bugs and tests
Sep 19, 2024
12e8de2
fix failing tests
Sep 19, 2024
ce04bd5
fix test
Sep 20, 2024
01f2024
fix testMergerStrategySet
Sep 20, 2024
ffc94b8
fix utf8 comparable issue in default record payload
Sep 20, 2024
c789678
fix getting payload in bootstrap
Sep 20, 2024
88799a2
fix testCreateHoodieConfigWithAsyncClustering
Sep 20, 2024
638d350
fix log format capitalization
Sep 20, 2024
3e486d2
deprecate payload config
Sep 20, 2024
1db1fec
fix checkstyle and make some more changes like not calling get defaul…
Sep 20, 2024
8d0d358
infer merge mode if they use default or overwrite payload
Sep 20, 2024
4d04980
fix TestTableSchemaEvolution.testMORTable
Sep 20, 2024
3ee84d8
fix bad parens
Sep 20, 2024
da849df
fix log block casing issue
Sep 20, 2024
3480af2
fix checkstyle
Sep 20, 2024
1fce20c
fix more places where the table is not updated
Sep 20, 2024
2371e19
fix checkstyle
Sep 20, 2024
8af0e40
need to set strategy when payload is set
Sep 20, 2024
8dfedc0
fix some errors I made
Sep 20, 2024
5ace882
fix instantiate server client
Sep 20, 2024
080f024
problem seems to be with the old read path, not with fg reader
Sep 20, 2024
f10ab50
fix checkstyle
Sep 20, 2024
5c9783b
Merge branch 'master' into create_new_merger_configs
Sep 23, 2024
5f59bd2
TestHoodieMergeHandleWithSparkMerger fix because delete records were …
Sep 23, 2024
f9aee6e
fix configs for more tests
Sep 23, 2024
aeea89b
fix bootstrap tests
Sep 23, 2024
8a737c0
bootstrap executor remove payload defaulting
Sep 23, 2024
6e08ef5
fix checkstyle
Sep 23, 2024
6e43f93
fix some more test cases
Sep 23, 2024
2ab97ce
disable test pruned filtered
Sep 23, 2024
eaaf998
fix deltastreamer infer merge modes and some other tests
Sep 23, 2024
e6ee76a
remove comment in tableconfig
Sep 23, 2024
100e686
overwrite the tableconfig payload as well for mit
Sep 23, 2024
3b89dcb
get rid of stupid get record type method
Sep 23, 2024
22ebef6
fix Test Call repair_overwrite_hoodie_props Procedure
Sep 23, 2024
6c2abc9
fix more test issues
Sep 23, 2024
f6a0da2
fix checkstyle
Sep 23, 2024
80a2e16
fix failing test
Sep 23, 2024
f61553c
fix pruned filtered test
Sep 24, 2024
b07d02b
add log for bootstrap IT
Sep 24, 2024
597677f
fix minargs for the cli command
Sep 30, 2024
7aac16b
Merge branch 'master' into create_new_merger_configs
Sep 30, 2024
080de60
use get value for record merge mode
Oct 3, 2024
6fc3da5
Merge branch 'master' into create_new_merger_configs
Oct 3, 2024
1751520
Merge branch 'apache:master' into create_new_merger_configs
jonvex Oct 4, 2024
da99393
Merge branch 'apache:master' into create_new_merger_configs
jonvex Oct 7, 2024
232d0d0
add spark write support for avro log block
Oct 8, 2024
9630cbe
add merger impls to bootstrap and deltastreamer
Oct 8, 2024
a345c4c
add string casting to allow for case where ordering field is missing
Oct 8, 2024
4af7de5
Merge branch 'master' into create_new_merger_configs
Oct 13, 2024
36bd377
merge mode in the fg reader should be consistent
Oct 13, 2024
0e0775c
set merger configs for bootstrap
Oct 13, 2024
6d581c5
fix avro payload merger strategy var name
Oct 14, 2024
7723029
add comments to the merger strategies
Oct 14, 2024
90db422
Add lokesh changes oct 16th
jonvex Oct 16, 2024
3ff9e74
fix checkstyle
Oct 16, 2024
699d455
Merge branch 'master' into create_new_merger_configs
Oct 16, 2024
70852c5
get rid of avro block write
Oct 16, 2024
c4dc95f
get rid of javascalautils
Oct 16, 2024
3726f54
throw real error message
Oct 16, 2024
8306c68
fix build errors
Oct 16, 2024
95f8969
always set values for payload and strategy
Oct 16, 2024
6e96999
fix up some more things
Oct 16, 2024
b1bd0ca
fix build errors that didn't appear locally
Oct 16, 2024
ab40874
remove log block changes
Oct 16, 2024
ac45359
fix checkstyle
Oct 16, 2024
cef2687
remove java scala option converters
Oct 16, 2024
9f3adb4
handle some sql config management
Oct 17, 2024
709a37a
fix custom merger case
Oct 17, 2024
143dfa7
fix more tests
Oct 17, 2024
bdab7de
fix style
Oct 17, 2024
b0506a2
use config instead of get string
Oct 17, 2024
8fab0f0
spark sql config issue fix + fix some other tests
Oct 17, 2024
57ad75e
fix npe issue when creating writer
Oct 17, 2024
25897cb
fix some more tests
Oct 17, 2024
9119372
fix testMergeOnReadSnapshotRelationWithDeltaLogsFallback()(TestParque…
Oct 17, 2024
addd352
fix failing tests
Oct 18, 2024
4cdbd16
fix azure tests
Oct 18, 2024
f619f0b
add more validation to infer merge mode
Oct 18, 2024
f55704f
address review comments and fix tests
Oct 18, 2024
ad0682a
fix checkstyle
Oct 18, 2024
4d11955
remove duplicate test case
Oct 18, 2024
b851357
fix style
Oct 18, 2024
4303223
throw exception if no valid merger impl is present for custom merger
Oct 18, 2024
9a56f78
address review comments and fix tests
Oct 18, 2024
08b73c0
address more review comments
Oct 18, 2024
1bb38d2
revert config name change
Oct 18, 2024
413ebc4
fix failing tests
Oct 18, 2024
75beb3d
change config name to have id in it, but not table config
Oct 18, 2024
3c8df12
Merge branch 'master' into create_new_merger_configs
Oct 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,14 @@ public String bootstrap(
help = "Class for Full bootstrap input provider") final String fullBootstrapInputProvider,
@ShellOption(value = {"--schemaProviderClass"}, defaultValue = "",
help = "SchemaProvider to attach schemas to bootstrap source data") final String schemaProviderClass,
@ShellOption(value = {"--payloadClass"}, defaultValue = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
help = "Payload Class") final String payloadClass,
@ShellOption(value = {"--payloadClass"}, defaultValue = "",
help = "Payload Class (deprecated). Use merge-mode for overwrite or event time merging.") final String payloadClass,
jonvex marked this conversation as resolved.
Show resolved Hide resolved
@ShellOption(value = {"--merge-mode", "--record-merge-mode"}, defaultValue = "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to keep only one config here instead of alias?

help = "Merge mode to use. 'EVENT_TIME_ORDERING', 'OVERWRITE_WITH_LATEST', or 'CUSTOM' if you want to set a merge strategy") final String recordMergeMode,
jonvex marked this conversation as resolved.
Show resolved Hide resolved
@ShellOption(value = {"--merger-strategy", "--record-merger-strategy"}, defaultValue = "",
jonvex marked this conversation as resolved.
Show resolved Hide resolved
jonvex marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make all record merge configs to start with --record-merge-, hoodie.write.record.merge., ``hoodie.table.record.merge., e.g., --record-merge-strategy`, `hoodie.write.record.merge.strategy`, to be consistent.

help = "Merge strategy to use. Only set when using 'CUSTOM' merge mode") final String recordMergerStrategy,
jonvex marked this conversation as resolved.
Show resolved Hide resolved
@ShellOption(value = {"--merger-impls"}, defaultValue = "",
help = "Comma separated list of classes that implement the record merger strategy") final String recordMergerImpls,
@ShellOption(value = {"--parallelism"}, defaultValue = "1500", help = "Bootstrap writer parallelism") final int parallelism,
@ShellOption(value = {"--sparkMaster"}, defaultValue = "", help = "Spark Master") String master,
@ShellOption(value = {"--sparkMemory"}, defaultValue = "4G", help = "Spark executor memory") final String sparkMemory,
Expand All @@ -95,7 +101,8 @@ public String bootstrap(

SparkMain.addAppArgs(sparkLauncher, SparkCommand.BOOTSTRAP, master, sparkMemory, tableName, tableType, targetPath, srcPath, rowKeyField,
partitionPathField, String.valueOf(parallelism), schemaProviderClass, bootstrapIndexClass, selectorClass,
keyGeneratorClass, fullBootstrapInputProvider, payloadClass, String.valueOf(enableHiveSync), propsFilePath);
keyGeneratorClass, fullBootstrapInputProvider, recordMergeMode, payloadClass, recordMergerStrategy, recordMergerImpls,
String.valueOf(enableHiveSync), propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down
17 changes: 11 additions & 6 deletions hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand All @@ -34,7 +35,6 @@
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCleanConfig;
Expand Down Expand Up @@ -82,6 +82,7 @@
import java.util.Locale;
import java.util.Map;

import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
Expand All @@ -99,7 +100,7 @@ public class SparkMain {
* Commands.
*/
enum SparkCommand {
BOOTSTRAP(18), ROLLBACK(6), DEDUPLICATE(8), ROLLBACK_TO_SAVEPOINT(6), SAVEPOINT(7),
BOOTSTRAP(21), ROLLBACK(6), DEDUPLICATE(8), ROLLBACK_TO_SAVEPOINT(6), SAVEPOINT(7),
IMPORT(13), UPSERT(13), COMPACT_SCHEDULE(7), COMPACT_RUN(10), COMPACT_SCHEDULE_AND_EXECUTE(9),
COMPACT_UNSCHEDULE_PLAN(9), COMPACT_UNSCHEDULE_FILE(10), COMPACT_VALIDATE(7), COMPACT_REPAIR(8),
CLUSTERING_SCHEDULE(7), CLUSTERING_RUN(9), CLUSTERING_SCHEDULE_AND_EXECUTE(8), CLEAN(5),
Expand Down Expand Up @@ -129,7 +130,7 @@ List<String> makeConfigs(String[] args) {
}

String getPropsFilePath(String[] args) {
return (args.length >= minArgsCount && !StringUtils.isNullOrEmpty(args[minArgsCount - 1]))
return (args.length >= minArgsCount && !isNullOrEmpty(args[minArgsCount - 1]))
? args[minArgsCount - 1] : null;
}
}
Expand Down Expand Up @@ -234,7 +235,7 @@ public static void main(String[] args) {
break;
case BOOTSTRAP:
returnCode = doBootstrap(jsc, args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10],
args[11], args[12], args[13], args[14], args[15], args[16], propsFilePath, configs);
args[11], args[12], args[13], args[14], args[15], args[16], args[17], args[18], args[19], propsFilePath, configs);
break;
case UPGRADE:
case DOWNGRADE:
Expand Down Expand Up @@ -488,14 +489,15 @@ private static Map<String, String> getPropsForRewrite(HoodieTableMetaClient meta
private static int doBootstrap(JavaSparkContext jsc, String tableName, String tableType, String basePath,
String sourcePath, String recordKeyCols, String partitionFields, String parallelism, String schemaProviderClass,
String bootstrapIndexClass, String selectorClass, String keyGenerator, String fullBootstrapInputProvider,
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {
String recordMergeMode, String payloadClassName, String recordMergeStrategy, String recordMergerImpls,
String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {

TypedProperties properties = propsFilePath == null ? buildProperties(configs)
: readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), configs).getProps(true);

properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);

if (!StringUtils.isNullOrEmpty(keyGenerator) && KeyGeneratorType.getNames().contains(keyGenerator.toUpperCase(Locale.ROOT))) {
if (!isNullOrEmpty(keyGenerator) && KeyGeneratorType.getNames().contains(keyGenerator.toUpperCase(Locale.ROOT))) {
properties.setProperty(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), keyGenerator.toUpperCase(Locale.ROOT));
} else {
properties.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), keyGenerator);
Expand All @@ -514,6 +516,9 @@ private static int doBootstrap(JavaSparkContext jsc, String tableName, String ta
cfg.schemaProviderClassName = schemaProviderClass;
cfg.bootstrapIndexClass = bootstrapIndexClass;
cfg.payloadClassName = payloadClassName;
cfg.recordMergeMode = RecordMergeMode.getValue(recordMergeMode);
cfg.recordMergerStrategy = recordMergeStrategy;
cfg.recordMergerImpls = recordMergerImpls;
cfg.enableHiveSync = Boolean.valueOf(enableHiveSync);

new BootstrapExecutor(cfg, jsc, HadoopFSUtils.getFs(basePath, jsc.hadoopConfiguration()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;

import java.io.File;
import java.io.FileReader;
Expand Down Expand Up @@ -65,9 +66,6 @@ public class HoodiePayloadConfig extends HoodieConfig {
.withDocumentation("Table column/field name to order records that have the same key, before "
+ "merging and writing to storage.");

/** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
@Deprecated
public static final String DEFAULT_PAYLOAD_CLASS = PAYLOAD_CLASS_NAME.defaultValue();
Comment on lines -68 to -70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding this variable for backwards compatible read (https:/apache/hudi/pull/11943/files#r1805475755), could we hard code it to be DefaultHoodieRecordPayload.class.getName()?

/** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
@Deprecated
public static final String PAYLOAD_CLASS_PROP = PAYLOAD_CLASS_NAME.key();
Expand Down Expand Up @@ -107,7 +105,9 @@ public Builder withPayloadEventTimeField(String payloadEventTimeField) {
}

public HoodiePayloadConfig.Builder withPayloadClass(String payloadClassName) {
payloadConfig.setValue(PAYLOAD_CLASS_NAME, payloadClassName);
if (!StringUtils.isNullOrEmpty(payloadClassName)) {
payloadConfig.setValue(PAYLOAD_CLASS_NAME, payloadClassName);
}
jonvex marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.HoodieTableServiceManagerConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FileSystemRetryConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.RecordPayloadType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -168,21 +168,21 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<String> WRITE_PAYLOAD_CLASS_NAME = ConfigProperty
.key("hoodie.datasource.write.payload.class")
.defaultValue(DefaultHoodieRecordPayload.class.getName())
.noDefaultValue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may still need the default value for backwards compatible write on 0.x tables cc @vinothchandar @bvaradar @balaji-varadarajan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we do. But we might need to add something so that it doesn't remove the payload class from hoodie.properties

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is that, for the backwards compatible writer writing table version 6 (0.14 release and above) in Hudi 1.0 release, it should not use the merge mode and only leverage payload class to create the table.

.markAdvanced()
.deprecatedAfter("1.0.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point that payload class is no longer required in Hudi 1.0 tables so that no default should be set. For Hudi 1.0 release reading 0.x tables, should we still maintain a variable else where to track the default payload class to be used? The backwards compatibility read logic can be handled in a follow-up.

.withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. "
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");

public static final ConfigProperty<String> WRITE_PAYLOAD_TYPE = ConfigProperty
.key("hoodie.datasource.write.payload.type")
jonvex marked this conversation as resolved.
Show resolved Hide resolved
.defaultValue(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())
.markAdvanced()
public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty
.key("hoodie.record.merge.mode")
.defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name())
jonvex marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that this can be switched back to OVERWRITE_WITH_LATEST. Is there still an issue? I'm OK with EVENT_TIME_ORDERING; trying to see if it still strictly requires precombine/ordering field to be set.

.sinceVersion("1.0.0")
.withDocumentation(RecordPayloadType.class);
.withDocumentation(RecordMergeMode.class);

public static final ConfigProperty<String> RECORD_MERGER_IMPLS = ConfigProperty
.key("hoodie.datasource.write.record.merger.impls")
jonvex marked this conversation as resolved.
Show resolved Hide resolved
.defaultValue(HoodieAvroRecordMerger.class.getName())
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.13.0")
.withDocumentation("List of HoodieMerger implementations constituting Hudi's merging strategy -- based on the engine used. "
Expand All @@ -191,7 +191,7 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<String> RECORD_MERGER_STRATEGY = ConfigProperty
jonvex marked this conversation as resolved.
Show resolved Hide resolved
.key("hoodie.datasource.write.record.merger.strategy")
.defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
.noDefaultValue()
jonvex marked this conversation as resolved.
Show resolved Hide resolved
.markAdvanced()
.sinceVersion("0.13.0")
.withDocumentation("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in hoodie.datasource.write.record.merger.impls which has the same merger strategy id");
Expand Down Expand Up @@ -844,11 +844,6 @@ public class HoodieWriteConfig extends HoodieConfig {
*/
@Deprecated
public static final String WRITE_PAYLOAD_CLASS = WRITE_PAYLOAD_CLASS_NAME.key();
/**
* @deprecated Use {@link #WRITE_PAYLOAD_CLASS_NAME} and its methods instead
*/
@Deprecated
public static final String DEFAULT_WRITE_PAYLOAD_CLASS = WRITE_PAYLOAD_CLASS_NAME.defaultValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can hardcode the legacy default value here.

/**
* @deprecated Use {@link #KEYGENERATOR_CLASS_NAME} and its methods instead
*/
Expand Down Expand Up @@ -1239,16 +1234,69 @@ public String getBasePath() {
}

public HoodieFileFormat getBaseFileFormat() {
return HoodieFileFormat.valueOf(getStringOrDefault(BASE_FILE_FORMAT));
return HoodieFileFormat.valueOf(getStringOrDefault(BASE_FILE_FORMAT).toUpperCase());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we handle the handling of case sensitivity inside HoodieFileFormat?

}

public String getRecordMergerStrategy() {
return getString(RECORD_MERGER_STRATEGY);
}

public RecordMergeMode getRecordMergeMode() {
return RecordMergeMode.getValue(getString(RECORD_MERGE_MODE));
}

public HoodieRecordMerger getRecordMerger() {
List<String> mergers = StringUtils.split(getStringOrDefault(RECORD_MERGER_IMPLS), ",").stream()
List<String> mergers = StringUtils.split(getString(RECORD_MERGER_IMPLS), ",").stream()
.map(String::trim)
.distinct()
.collect(Collectors.toList());
String recordMergerStrategy = getString(RECORD_MERGER_STRATEGY);
return HoodieRecordUtils.createRecordMerger(getString(BASE_PATH), engineType, mergers, recordMergerStrategy);
return getRecordMerger(getString(BASE_PATH), getRecordMergeMode(),
engineType, getLogDataBlockFormat(), mergers, getStringOpt(RECORD_MERGER_STRATEGY));
}

public static HoodieRecordMerger getRecordMerger(String basePath,
jonvex marked this conversation as resolved.
Show resolved Hide resolved
RecordMergeMode mergeMode,
EngineType engineType,
HoodieLogBlock.HoodieLogBlockType logBlockType,
List<String> mergers,
Option<String> strategy) {
switch (logBlockType) {
case HFILE_DATA_BLOCK:
return HoodieAvroRecordMerger.INSTANCE;
case AVRO_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
//TODO: [HUDI-8317] return getMergerByMode(basePath, mergeMode, engineType, mergers, strategy);
return HoodieRecordUtils.createRecordMerger(basePath, engineType, mergers, strategy);
default:
throw new IllegalStateException("This log block type is not implemented");
}
}

private static HoodieRecordMerger getMergerByMode(String basePath,
RecordMergeMode mergeMode,
EngineType engineType,
List<String> mergers,
Option<String> strategy) {
//TODO: [HUDI-8202] make this custom mergers only
switch (mergeMode) {
case EVENT_TIME_ORDERING:
switch (engineType) {
case SPARK:
return HoodieRecordUtils.loadRecordMerger("org.apache.hudi.DefaultSparkRecordMerger");
default:
return HoodieRecordUtils.createRecordMerger(basePath, engineType, mergers, strategy);
}
case OVERWRITE_WITH_LATEST:
switch (engineType) {
case SPARK:
return HoodieRecordUtils.loadRecordMerger("org.apache.hudi.OverwriteWithLatestSparkRecordMerger");
default:
return HoodieRecordUtils.createRecordMerger(basePath, engineType, mergers, strategy);
}
case CUSTOM:
default:
return HoodieRecordUtils.createRecordMerger(basePath, engineType, mergers, strategy);
}
}

public String getSchema() {
Expand Down Expand Up @@ -1746,6 +1794,10 @@ public int getAsyncClusterMaxCommits() {
return getInt(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS);
}

public String getAvroPayloadClass() {
return getStringOpt(HoodiePayloadConfig.PAYLOAD_CLASS_NAME).orElseGet(() -> HoodieRecordPayload.getAvroPayloadForMergeMode(getRecordMergeMode()));
jonvex marked this conversation as resolved.
Show resolved Hide resolved
}

public String getPayloadClass() {
return getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME);
}
Expand Down Expand Up @@ -2200,9 +2252,8 @@ public boolean parquetBloomFilterEnabled() {
return getBooleanOrDefault(HoodieStorageConfig.PARQUET_WITH_BLOOM_FILTER_ENABLED);
}

public Option<HoodieLogBlock.HoodieLogBlockType> getLogDataBlockFormat() {
return Option.ofNullable(getString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT))
.map(HoodieLogBlock.HoodieLogBlockType::fromId);
public HoodieLogBlock.HoodieLogBlockType getLogDataBlockFormat() {
return HoodieLogBlock.inferLogBlockWriteFormat(getString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT), getBaseFileFormat());
}

public long getLogFileMaxSize() {
Expand Down Expand Up @@ -2903,12 +2954,16 @@ public Builder withWritePayLoad(String payload) {
}

public Builder withRecordMergerImpls(String recordMergerImpls) {
writeConfig.setValue(RECORD_MERGER_IMPLS, recordMergerImpls);
if (!StringUtils.isNullOrEmpty(recordMergerImpls)) {
writeConfig.setValue(RECORD_MERGER_IMPLS, recordMergerImpls);
}
return this;
}

public Builder withRecordMergerStrategy(String recordMergerStrategy) {
writeConfig.setValue(RECORD_MERGER_STRATEGY, recordMergerStrategy);
if (recordMergerStrategy != null) {
writeConfig.setValue(RECORD_MERGER_STRATEGY, recordMergerStrategy);
}
return this;
}

Expand Down Expand Up @@ -3099,6 +3154,13 @@ public Builder withPayloadConfig(HoodiePayloadConfig payloadConfig) {
return this;
}

public Builder withRecordMergeMode(RecordMergeMode recordMergeMode) {
if (recordMergeMode != null) {
writeConfig.setValue(RECORD_MERGE_MODE, recordMergeMode.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any validation on missing RECORD_MERGE_MODE config, throwing an exception if missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed in the write config because it has a default value

}
return this;
}

public Builder withMetadataConfig(HoodieMetadataConfig metadataConfig) {
writeConfig.getProps().putAll(metadataConfig.getProps());
isMetadataConfigSet = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
* evaluates the merge.
*/
private static Pair<HoodieWriteConfig, Option<BaseKeyGenerator>> getKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
if (config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload")) {
if (config.getAvroPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload")) {
TypedProperties typedProperties = new TypedProperties(config.getProps());
// set the payload class to table's payload class and not expresison payload. this will be used to read the existing records
typedProperties.setProperty(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), tableConfig.getPayloadClass());
typedProperties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), tableConfig.getPayloadClass());
typedProperties.setProperty(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), tableConfig.getAvroPayloadClass());
typedProperties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), tableConfig.getAvroPayloadClass());
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withProperties(typedProperties).build();
try {
return Pair.of(writeConfig, Option.of((BaseKeyGenerator) HoodieAvroKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps())));
Expand Down
Loading
Loading