Skip to content

Commit

Permalink
[HUDI-7190] Fix legacy parquet format nested columns vectorized read …
Browse files Browse the repository at this point in the history
…for spark3.3+
  • Loading branch information
stream2000 committed Dec 7, 2023
1 parent 0bbfc07 commit 38ad80b
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2075,6 +2075,43 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}

test("Test vectorized read nested columns for LegacyHoodieParquetFileFormat") {
withSQLConf(
"hoodie.datasource.read.use.new.parquet.file.format" -> "false",
"hoodie.file.group.reader.enabled" -> "false",
"spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true",
"spark.sql.parquet.enableVectorizedReader" -> "true") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| attributes map<string, string>,
| price double,
| ts long,
| dt string
|) using hudi
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}'
""".stripMargin)
spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', map('color', 'red', 'size', 'M'), 10.0, 1000, '2021-01-05'),
| (2, 'a2', map('color', 'blue', 'size', 'L'), 20.0, 2000, '2021-01-06'),
| (3, 'a3', map('color', 'green', 'size', 'S'), 30.0, 3000, '2021-01-07')
""".stripMargin)
// Check the inserted records with map type attributes
checkAnswer(s"select id, name, price, ts, dt from $tableName where attributes.color = 'red'")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)
}
}
}

def ingestAndValidateDataNoPrecombine(tableType: String, tableName: String, tmp: File,
expectedOperationtype: WriteOperationType,
setOptions: List[String] = List.empty) : Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableVectorizedReader: Boolean = supportBatch(sparkSession, resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ import org.apache.spark.util.SerializableConfiguration
*/
class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && schema.forall(_.dataType.isInstanceOf[AtomicType])
}

def supportsColumnar(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
// Only output columnar if there is WSCG to read it.
Expand Down Expand Up @@ -133,9 +128,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableVectorizedReader: Boolean = supportBatch(sparkSession, resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ import org.apache.spark.util.SerializableConfiguration
*/
class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
conf.parquetVectorizedReaderEnabled && schema.forall(_.dataType.isInstanceOf[AtomicType])
}

def supportsColumnar(sparkSession: SparkSession, schema: StructType): Boolean = {
val conf = sparkSession.sessionState.conf
// Only output columnar if there is WSCG to read it.
Expand Down Expand Up @@ -134,9 +129,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
val enableVectorizedReader: Boolean = supportBatch(sparkSession, resultSchema)
val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
val capacity = sqlConf.parquetVectorizedReaderBatchSize
Expand Down

0 comments on commit 38ad80b

Please sign in to comment.