Skip to content

Commit

Permalink
[HUDI-7190] Fix nested columns vectorized read for spark33+ legacy fo…
Browse files Browse the repository at this point in the history
…rmats (#10265)

* [HUDI-7190] Fix legacy parquet format nested columns vectorized read for spark3.3+
* Fix nested type implicit schema evolution
* fix legacy format support batch read
* Add exception messages when vectorized read nested type with type change
  • Loading branch information
stream2000 authored Dec 20, 2023
1 parent 0fb0850 commit d0916cb
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,8 @@ class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterS
override def toString: String = "Hoodie-Parquet"

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
if (HoodieSparkUtils.gteqSpark3_4) {
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && schema.forall(_.dataType.isInstanceOf[AtomicType])
} else {
super.supportBatch(sparkSession, schema)
}
sparkAdapter
.createLegacyHoodieParquetFileFormat(true).get.supportBatch(sparkSession, schema)
}

override def buildReaderWithPartitionValues(sparkSession: SparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.SparkException
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
Expand Down Expand Up @@ -386,15 +387,17 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss
// upsert
upsertData(df2, tempRecordPath, isCow)

// read out the table
val readDf = spark.read.format("hudi")
// NOTE: long to int type change is not supported for the custom file format and the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
readDf.foreach(_ => {})
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
// read out the table
val readDf = spark.read.format("hudi")
// NOTE: long to int type change is not supported for the custom file format and the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
readDf.foreach(_ => {})
}
}

@ParameterizedTest
Expand Down Expand Up @@ -482,15 +485,17 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss
// upsert
upsertData(df2, tempRecordPath, isCow)

// read out the table
val readDf = spark.read.format("hudi")
// NOTE: type promotion is not supported for the custom file format and the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
readDf.foreach(_ => {})
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
// read out the table
val readDf = spark.read.format("hudi")
// NOTE: type promotion is not supported for the custom file format and the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
readDf.foreach(_ => {})
}
}

@ParameterizedTest
Expand Down Expand Up @@ -548,15 +553,17 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss
// upsert
upsertData(df2, tempRecordPath, isCow)

// read out the table
val readDf = spark.read.format("hudi")
// NOTE: type promotion is not supported for the custom file format and the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
readDf.foreach(_ => {})
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
// read out the table
val readDf = spark.read.format("hudi")
// NOTE: type promotion is not supported for the custom file format and the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
.load(tempRecordPath)
readDf.printSchema()
readDf.show(false)
readDf.foreach(_ => {})
}
}

@ParameterizedTest
Expand Down Expand Up @@ -828,4 +835,88 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss
readDf.show(false)
readDf.foreach(_ => {})
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testNestedTypeVectorizedReadWithTypeChange(isCow: Boolean): Unit = {
// test to change the value type of a MAP in a column of ARRAY< MAP<k,v> > type
val tempRecordPath = basePath + "/record_tbl/"
val arrayMapData = Seq(
Row(1, 100, List(Map("2022-12-01" -> 120), Map("2022-12-02" -> 130)), "aaa")
)
val arrayMapSchema = new StructType()
.add("id", IntegerType)
.add("userid", IntegerType)
.add("salesMap", ArrayType(
new MapType(StringType, IntegerType, true)))
.add("name", StringType)
val df1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayMapData), arrayMapSchema)
df1.printSchema()
df1.show(false)

// recreate table
initialiseTable(df1, tempRecordPath, isCow)

// read out the table, will not throw any exception
readTable(tempRecordPath)

// change value type from integer to long
val newArrayMapData = Seq(
Row(2, 200, List(Map("2022-12-01" -> 220L), Map("2022-12-02" -> 230L)), "bbb")
)
val newArrayMapSchema = new StructType()
.add("id", IntegerType)
.add("userid", IntegerType)
.add("salesMap", ArrayType(
new MapType(StringType, LongType, true)))
.add("name", StringType)
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(newArrayMapData), newArrayMapSchema)
df2.printSchema()
df2.show(false)
// upsert
upsertData(df2, tempRecordPath, isCow)

// after implicit type change, read the table with vectorized read enabled
if (HoodieSparkUtils.gteqSpark3_3) {
assertThrows(classOf[SparkException]){
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") {
readTable(tempRecordPath)
}
}
}

withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
readTable(tempRecordPath)
}
}


private def readTable(path: String): Unit = {
// read out the table
val readDf = spark.read.format("hudi")
// NOTE: type promotion is not supported for the custom file format and the filegroup reader
// HUDI-7045 and PR#10007 in progress to fix the issue
.option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false")
.load(path)
readDf.printSchema()
readDf.show(false)
readDf.foreach(_ => {})
}

protected def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
val conf = spark.sessionState.conf
val currentValues = pairs.unzip._1.map { k =>
if (conf.contains(k)) {
Some(conf.getConfString(k))
} else None
}
pairs.foreach { case (k, v) => conf.setConfString(k, v) }
try f finally {
pairs.unzip._1.zip(currentValues).foreach {
case (key, Some(value)) => conf.setConfString(key, value)
case (key, None) => conf.unsetConf(key)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,43 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}

test("Test vectorized read nested columns for LegacyHoodieParquetFileFormat") {
withSQLConf(
"hoodie.datasource.read.use.new.parquet.file.format" -> "false",
"hoodie.file.group.reader.enabled" -> "false",
"spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true",
"spark.sql.parquet.enableVectorizedReader" -> "true") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| attributes map<string, string>,
| price double,
| ts long,
| dt string
|) using hudi
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}'
""".stripMargin)
spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000, '2021-01-05'),
| (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000, '2021-01-06'),
| (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000, '2021-01-07')
""".stripMargin)
// Check the inserted records with map type attributes
checkAnswer(s"select id, name, price, ts, dt from $tableName where attributes.color = 'red'")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)
}
}
}

def ingestAndValidateDataNoPrecombine(tableType: String, tableName: String, tmp: File,
expectedOperationtype: WriteOperationType,
setOptions: List[String] = List.empty) : Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,12 +544,12 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {

test("Test alter column with complex schema") {
withRecordType()(withTempDir { tmp =>
Seq("mor").foreach { tableType =>
withSQLConf(s"$SPARK_SQL_INSERT_INTO_OPERATION" -> "upsert",
"hoodie.schema.on.read.enable" -> "true",
"spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
val tableName = generateTableName
val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
spark.sql(
s"""
|create table $tableName (
Expand All @@ -561,7 +561,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
|) using hudi
| location '$tablePath'
| options (
| type = '$tableType',
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
Expand Down Expand Up @@ -628,7 +628,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
)
}
}
spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

import java.net.URI

/**
Expand Down Expand Up @@ -121,8 +123,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
Expand Down Expand Up @@ -243,6 +244,13 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
implicitTypeChangeInfo
}

if (enableVectorizedReader && shouldUseInternalSchema &&
!typeChangeInfos.values().forall(_.getLeft.isInstanceOf[AtomicType])) {
throw new IllegalArgumentException(
"Nested types with type changes(implicit or explicit) cannot be read in vectorized mode. " +
"To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")
}

val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

/**
* This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
* that's not possible to customize in any other way
Expand All @@ -59,11 +62,6 @@ import org.apache.spark.util.SerializableConfiguration
*/
class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && schema.forall(_.dataType.isInstanceOf[AtomicType])
}

def supportsColumnar(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
// Only output columnar if there is WSCG to read it.
Expand Down Expand Up @@ -133,9 +131,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableVectorizedReader: Boolean = supportBatch(sparkSession, resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
Expand Down Expand Up @@ -259,6 +255,13 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
implicitTypeChangeInfo
}

if (enableVectorizedReader && shouldUseInternalSchema &&
!typeChangeInfos.values().forall(_.getLeft.isInstanceOf[AtomicType])) {
throw new IllegalArgumentException(
"Nested types with type changes(implicit or explicit) cannot be read in vectorized mode. " +
"To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")
}

val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

/**
* This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior
* that's not possible to customize in any other way
Expand All @@ -60,11 +63,6 @@ import org.apache.spark.util.SerializableConfiguration
*/
class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && schema.forall(_.dataType.isInstanceOf[AtomicType])
}

def supportsColumnar(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
// Only output columnar if there is WSCG to read it.
Expand Down Expand Up @@ -134,9 +132,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableVectorizedReader: Boolean = supportBatch(sparkSession, resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
Expand Down Expand Up @@ -260,6 +256,13 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
implicitTypeChangeInfo
}

if (enableVectorizedReader && shouldUseInternalSchema &&
!typeChangeInfos.values().forall(_.getLeft.isInstanceOf[AtomicType])) {
throw new IllegalArgumentException(
"Nested types with type changes(implicit or explicit) cannot be read in vectorized mode. " +
"To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")
}

val hadoopAttemptContext =
new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)

Expand Down

0 comments on commit d0916cb

Please sign in to comment.