Skip to content

Commit

Permalink
[HUDI-7147] Fix npe stream sync first batch, empty schema, upsert (#1…
Browse files Browse the repository at this point in the history
…0689)

* fix npe

* add empty table support as well

* use empty relation

* fix failing tests

---------

Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex authored Feb 17, 2024
1 parent b99bf70 commit 3a97b01
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common;

import org.apache.hudi.internal.schema.HoodieSchemaException;

public class HoodieSchemaNotFoundException extends HoodieSchemaException {
public HoodieSchemaNotFoundException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.table;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.HoodieSchemaNotFoundException;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
Expand Down Expand Up @@ -588,6 +589,6 @@ public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]>
}

private Supplier<Exception> schemaNotFoundError() {
return () -> new IllegalArgumentException("No schema found for table at " + metaClient.getBasePathV2().toString());
return () -> new HoodieSchemaNotFoundException("No schema found for table at " + metaClient.getBasePathV2().toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public static Schema convert(InternalSchema internalSchema, String name) {
* @return an avro Schema where null is the first.
*/
public static Schema fixNullOrdering(Schema schema) {
if (schema.getType() == Schema.Type.NULL) {
if (schema == null) {
return Schema.create(Schema.Type.NULL);
} else if (schema.getType() == Schema.Type.NULL) {
return schema;
}
return convert(convert(schema), schema.getFullName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.HoodieSchemaNotFoundException
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieReaderConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
Expand All @@ -33,14 +34,13 @@ import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.util.PathUtils

import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
import org.apache.spark.sql.hudi.streaming.{HoodieEarliestOffsetRangeLimit, HoodieLatestOffsetRangeLimit, HoodieSpecifiedOffsetRangeLimit, HoodieStreamSource}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions.mapAsJavaMap
Expand Down Expand Up @@ -73,7 +73,12 @@ class DefaultSource extends RelationProvider

override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
try {
createRelation(sqlContext, parameters, null)
} catch {
case _: HoodieSchemaNotFoundException => new EmptyRelation(sqlContext, new StructType())
case e => throw e
}
}

override def createRelation(sqlContext: SQLContext,
Expand Down Expand Up @@ -373,7 +378,9 @@ object DefaultSource {
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
} catch {
case _: Exception =>
require(schema.isDefined, "Fail to resolve source schema")
if (schema.isEmpty || schema.get == null) {
throw new HoodieSchemaNotFoundException("Failed to resolve source schema")
}
schema.get
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
} getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
logError("Failed to fetch schema from the table", e)
throw new HoodieSchemaException("Failed to fetch schema from the table")
case Failure(e) => throw e
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
} getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
throw new HoodieSchemaException("Failed to fetch schema from the table")
case Failure(e) => throw e
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.hudi.functional

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, KEYGENERATOR_CLASS_NAME}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs}
Expand Down Expand Up @@ -1855,6 +1855,34 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
})
assertEquals(3, clusterInstants.size)
}


@Test
def testReadOfAnEmptyTable(): Unit = {
val (writeOpts, _) = getWriterReaderOpts(HoodieRecordType.AVRO)

// Insert Operation
val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("hudi")
.options(writeOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

val fileStatuses = fs.listStatus(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME), new PathFilter {
override def accept(path: Path): Boolean = {
path.getName.endsWith(HoodieTimeline.COMMIT_ACTION)
}
})

// delete completed instant
fs.delete(fileStatuses.toList.get(0).getPath)
// try reading the empty table
val count = spark.read.format("hudi").load(basePath).count()
assertEquals(count, 0)
}

}

object TestCOWDataSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2111,6 +2111,36 @@ public void testEmptyBatchWithNullSchemaValue() throws Exception {
deltaStreamer2.shutdownGracefully();
}

@Test
public void testEmptyBatchWithNullSchemaFirstBatch() throws Exception {
PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
int parquetRecordsCount = 10;
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null);
prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "partition_path", "0");

String tableBasePath = basePath + "/test_parquet_table" + testNum;
HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp", null);

config.schemaProviderClassName = NullValueSchemaProvider.class.getName();
config.sourceClassName = TestParquetDFSSourceEmptyBatch.class.getName();
HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(config, jsc);
deltaStreamer1.sync();
deltaStreamer1.shutdownGracefully();
assertRecordCount(0, tableBasePath, sqlContext);

config.schemaProviderClassName = null;
config.sourceClassName = ParquetDFSSource.class.getName();
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
HoodieDeltaStreamer deltaStreamer2 = new HoodieDeltaStreamer(config, jsc);
deltaStreamer2.sync();
deltaStreamer2.shutdownGracefully();
//since first batch has empty schema, only records from the second batch should be written
assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
}

@Test
public void testDeltaStreamerRestartAfterMissingHoodieProps() throws Exception {
testDeltaStreamerRestartAfterMissingHoodieProps(true);
Expand Down

0 comments on commit 3a97b01

Please sign in to comment.