Skip to content

Commit

Permalink
spark sql config issue fix + fix some other tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Vexler committed Oct 17, 2024
1 parent b0506a2 commit 8fab0f0
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.RecordPayloadType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -173,7 +174,7 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");

public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty
.key("hoodie.record.merge.mode")
.key("hoodie.datasource.write.record.merge.mode")
.defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name())
.sinceVersion("1.0.0")
.withDocumentation(RecordMergeMode.class);
Expand Down Expand Up @@ -1743,7 +1744,7 @@ public int getAsyncClusterMaxCommits() {
}

public String getPayloadClass() {
return getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME);
return RecordPayloadType.getPayloadClassName(this);
}

public int getTargetPartitionsPerDayBasedCompaction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public <T> void setValue(String key, String val) {
props.setProperty(key, val);
}

public <T> void clearValue(ConfigProperty<T> cfg) {
props.remove(cfg.key());
}

public void setAll(Properties properties) {
props.putAll(properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
Expand Down Expand Up @@ -195,8 +194,7 @@ public static HoodieTableMetaClient init(StorageConfiguration<?> storageConf, St
HoodieTableMetaClient.newTableBuilder()
.setDatabaseName(databaseName)
.setTableName(RAW_TRIPS_TEST_NAME)
.setTableType(tableType)
.setPayloadClass(HoodieAvroPayload.class);
.setTableType(tableType);

if (properties.getProperty(HoodieTableConfig.KEY_GENERATOR_TYPE.key()) != null) {
builder.setKeyGeneratorType(properties.getProperty(HoodieTableConfig.KEY_GENERATOR_TYPE.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
Expand Down Expand Up @@ -191,6 +192,10 @@ public static HoodieWriteConfig createHoodieConfig(String schemaStr, String base
return builder.forTable(tblName)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(inlineCompact).build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
.withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
.withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key()))
.build())
// override above with Hoodie configs specified as options.
.withProps(parameters).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,12 @@ object DataSourceOptionsHelper {
if (!params.contains(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()) && tableConfig.getPayloadClass != null) {
missingWriteConfigs ++= Map(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> tableConfig.getPayloadClass)
}
if (!params.contains(HoodieWriteConfig.RECORD_MERGE_MODE.key()) && tableConfig.getRecordMergeMode != null) {
missingWriteConfigs ++= Map(HoodieWriteConfig.RECORD_MERGE_MODE.key() -> tableConfig.getRecordMergeMode.name())
}
if (!params.contains(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key()) && tableConfig.getRecordMergerStrategy != null) {
missingWriteConfigs ++= Map(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key() -> tableConfig.getRecordMergerStrategy)
}
if (!params.contains(DataSourceWriteOptions.TABLE_TYPE.key())) {
missingWriteConfigs ++= Map(DataSourceWriteOptions.TABLE_TYPE.key() -> tableConfig.getTableType.name())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,8 +1130,15 @@ class HoodieSparkSqlWriterInternal {
mergedParams.getOrElse(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), ""),
mergedParams.getOrElse(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key(), ""))
mergedParams.put(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), inferredMergeConfigs.getLeft.name())
mergedParams.put(HoodieTableConfig.RECORD_MERGE_MODE.key(), inferredMergeConfigs.getLeft.name())
mergedParams.put(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), inferredMergeConfigs.getMiddle)
mergedParams.put(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), inferredMergeConfigs.getMiddle)
mergedParams.put(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key(), inferredMergeConfigs.getRight)
mergedParams.put(HoodieTableConfig.RECORD_MERGER_STRATEGY.key(), inferredMergeConfigs.getRight)
} else {
mergedParams.put(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), mergedParams(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key()))
mergedParams.put(HoodieTableConfig.RECORD_MERGE_MODE.key(), mergedParams(HoodieWriteConfig.RECORD_MERGE_MODE.key()))
mergedParams.put(HoodieTableConfig.RECORD_MERGER_STRATEGY.key(), mergedParams(HoodieWriteConfig.RECORD_MERGER_STRATEGY.key()))
}

val params = mergedParams.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ object HoodieWriterUtils {
|| key.equals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()) || key.equals(RECORD_MERGE_MODE.key())
|| key.equals(RECORD_MERGER_STRATEGY.key())))

//don't validate the payload only in the case that insert into is using fallback to some legacy configs
ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && value.equals("org.apache.spark.sql.hudi.command.ValidateDuplicateKeyPayload"))

if (!ignoreConfig) {
val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
if (null != existingValue && !resolver(existingValue, value)) {
Expand Down Expand Up @@ -293,7 +296,8 @@ object HoodieWriterUtils {
PARTITIONPATH_FIELD -> HoodieTableConfig.PARTITION_FIELDS,
RECORDKEY_FIELD -> HoodieTableConfig.RECORDKEY_FIELDS,
PAYLOAD_CLASS_NAME -> HoodieTableConfig.PAYLOAD_CLASS_NAME,
RECORD_MERGER_STRATEGY -> HoodieTableConfig.RECORD_MERGER_STRATEGY
RECORD_MERGER_STRATEGY -> HoodieTableConfig.RECORD_MERGER_STRATEGY,
RECORD_MERGE_MODE -> HoodieTableConfig.RECORD_MERGE_MODE
)

def mappingSparkDatasourceConfigsToTableConfigs(options: Map[String, String]): Map[String, String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex}
import org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, TypedProperties}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, WriteOperationType}
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, RecordMergeMode, TypedProperties}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig, HoodieWriteConfig}
Expand All @@ -32,6 +32,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.keygen.{BaseKeyGenerator, ComplexKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator}
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.HoodieSyncConfig

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
Expand All @@ -48,6 +49,7 @@ import org.apache.spark.sql.types.StructType
import org.slf4j.LoggerFactory

import java.util.Locale

import scala.collection.JavaConverters._

trait ProvidesHoodieConfig extends Logging {
Expand Down Expand Up @@ -284,8 +286,23 @@ trait ProvidesHoodieConfig extends Logging {
}
}

val (recordMergeMode, recordMergeStrategy) = if (payloadClassName.equals(classOf[ValidateDuplicateKeyPayload].getCanonicalName)) {
(RecordMergeMode.CUSTOM.name(), HoodieRecordMerger.PAYLOAD_BASED_MERGER_STRATEGY_UUID)
} else {
(RecordMergeMode.EVENT_TIME_ORDERING.name(), HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
}

if (tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName) &&
tableConfig.getRecordMergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING)) {
tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME)
tableConfig.clearValue(HoodieTableConfig.RECORD_MERGE_MODE)
tableConfig.clearValue(HoodieTableConfig.RECORD_MERGER_STRATEGY)
}

val defaultOpts = Map(
PAYLOAD_CLASS_NAME.key -> payloadClassName,
DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> payloadClassName,
DataSourceWriteOptions.RECORD_MERGE_MODE.key -> recordMergeMode,
DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key() -> recordMergeStrategy,
// NOTE: By default insert would try to do deduplication in case that pre-combine column is specified
// for the table
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(combineBeforeInsert),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.hudi.functional

import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs}
import org.apache.hudi.common.config.HoodieReaderConfig
import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode}
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.util.Option
import org.apache.hudi.config.HoodieWriteConfig
Expand Down Expand Up @@ -88,6 +88,7 @@ class TestPartialUpdateAvroPayload extends HoodieClientTestBase {
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, "org.apache.hudi.common.model.PartialUpdateAvroPayload")
.option(HoodieWriteConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name())
.option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
.mode(SaveMode.Overwrite)
.save(basePath)
Expand Down Expand Up @@ -121,6 +122,7 @@ class TestPartialUpdateAvroPayload extends HoodieClientTestBase {
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
.option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, "org.apache.hudi.common.model.PartialUpdateAvroPayload")
.option(HoodieWriteConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name())
.mode(SaveMode.Append)
.save(basePath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
package org.apache.hudi.functional.cdc

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions.{MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
import org.apache.hudi.DataSourceWriteOptions.{MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY, RECORD_MERGE_MODE}
import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.common.config.RecordMergeMode
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY
import org.apache.hudi.common.table.cdc.HoodieCDCUtils.schemaBySupplementalLoggingMode
import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, HoodieCDCSupplementalLoggingMode}
Expand Down Expand Up @@ -641,6 +642,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
"hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
"hoodie.datasource.write.partitionpath.field" -> "",
"hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.model.AWSDmsAvroPayload",
DataSourceWriteOptions.RECORD_MERGE_MODE.key() -> RecordMergeMode.CUSTOM.name(),
"hoodie.table.cdc.enabled" -> "true",
"hoodie.table.cdc.supplemental.logging.mode" -> "data_before_after"
)
Expand Down Expand Up @@ -774,6 +776,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator")
.option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.AWSDmsAvroPayload")
.option(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name())
.option("hoodie.table.cdc.enabled", "true")
.option("hoodie.table.cdc.supplemental.logging.mode", loggingMode.name())
.mode(SaveMode.Append).save(basePath)
Expand All @@ -794,6 +797,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator")
.option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.AWSDmsAvroPayload")
.option(DataSourceWriteOptions.RECORD_MERGE_MODE.key(), RecordMergeMode.CUSTOM.name())
.option("hoodie.table.cdc.enabled", "true")
.option("hoodie.table.cdc.supplemental.logging.mode", loggingMode.name())
.mode(SaveMode.Append).save(basePath)
Expand Down

0 comments on commit 8fab0f0

Please sign in to comment.