Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7190] Fix nested columns vectorized read for spark33+ legacy formats #10265

Conversation

stream2000
Copy link
Contributor

@stream2000 stream2000 commented Dec 7, 2023

Change Logs

For Spark3.3+ version, we can do vectorized read for nested columns. However when
spark.sql.parquet.enableNestedColumnVectorizedReader = true and
set spark.sql.parquet.enableVectorizedReader = true is set, hudi will throw the following exception:

Job aborted due to stage failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage 28.0 (TID 51) (30.221.100.176 executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.UnsafeRow cannot be cast to org.apache.spark.sql.vectorized.ColumnarBatch
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:560)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:549)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage 28.0 (TID 51) (30.221.100.176 executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.UnsafeRow cannot be cast to org.apache.spark.sql.vectorized.ColumnarBatch
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:560)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:549)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

We need to Fix Spark33LegacyHoodieParquetFileFormat/Spark34LegacyHoodieParquetFileFormat/Spark35LegacyHoodieParquetFileFormat vectorized read nested types

Impact

Enable vectorized reading for nested types when using legacy parquet formats by default. Howeverfor schema on read or implicit nested type change we need to set set spark.sql.parquet.enableVectorizedReader =false to run the query.

Risk level (write none, low medium or high below)

medium

Documentation Update

NONE

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@@ -120,9 +120,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
val sqlConf = sparkSession.sessionState.conf
val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
val enableVectorizedReader: Boolean =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: In Spark3.3+, we will use the following code to check if we can do vecrized read:

  override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
    val conf = sparkSession.sessionState.conf
    ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled &&
      !WholeStageCodegenExec.isTooManyFields(conf, schema)
  }

So nested type can support vectorized read since Spark 3.3.

@stream2000 stream2000 force-pushed the HUDI-7190_fix_nested_type_vectorized_read_for_spark3.3plus branch from f5233e3 to 38ad80b Compare December 7, 2023 07:55
"hoodie.datasource.read.use.new.parquet.file.format" -> "false",
"hoodie.file.group.reader.enabled" -> "false",
"spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true",
"spark.sql.parquet.enableVectorizedReader" -> "true") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this test cover all the spark releases above 3.3.0 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should cover all spark versions and not throw any exceptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, saw some Travis failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is caused by my modification. I'm trying to fix them.

@stream2000 stream2000 force-pushed the HUDI-7190_fix_nested_type_vectorized_read_for_spark3.3plus branch from 88a6f69 to dbcebc5 Compare December 8, 2023 08:50
Copy link
Contributor

@bvaradar bvaradar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stream2000 : Just checking if you are still working on the tests ?

@stream2000
Copy link
Contributor Author

@stream2000 : Just checking if you are still working on the tests ?

Sorry for the late reply, I was busy with other stuff. Will fix the test ASAP.

@stream2000 stream2000 force-pushed the HUDI-7190_fix_nested_type_vectorized_read_for_spark3.3plus branch from dbcebc5 to e1423a8 Compare December 14, 2023 05:36
@stream2000 stream2000 force-pushed the HUDI-7190_fix_nested_type_vectorized_read_for_spark3.3plus branch from acf2f8f to 464d5bd Compare December 14, 2023 13:32
@stream2000 stream2000 force-pushed the HUDI-7190_fix_nested_type_vectorized_read_for_spark3.3plus branch 2 times, most recently from 939389e to c63e93d Compare December 15, 2023 08:19
@stream2000
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@stream2000
Copy link
Contributor Author

@danny0405 @yihua @xiarixiaoyao Hi, could you help review this pr~

Copy link
Contributor

@bvaradar bvaradar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@bvaradar bvaradar merged commit d0916cb into apache:master Dec 20, 2023
31 checks passed
yihua pushed a commit that referenced this pull request Feb 27, 2024
…rmats (#10265)

* [HUDI-7190] Fix legacy parquet format nested columns vectorized read for spark3.3+
* Fix nested type implicit schema evolution
* fix legacy format support batch read
* Add exception messages when vectorized read nested type with type change
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants