Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Read from hudi table results in crash (com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException) #12108

Open
meatheadmike opened this issue Oct 15, 2024 · 5 comments
Labels

Comments

@meatheadmike
Copy link

meatheadmike commented Oct 15, 2024

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. Spin up a spark cluster on kubernetes with 1 x driver @ 16gb/1 core and 5 x executors @ 16gb/1 core
  2. Submit the pyspark script show below
  3. Spin up a separate spark query node while the ingest is happening (1 x 16GB / 1 core)
  4. Run a simple query against the hudi table in a loop. i.e.: spark.sql('refresh table example-table').show(); spark.sql('select count(*) from example-table').show()
  5. Wait for 4 or 5 ingest batches to complete. After that, the stack trace will show up instead of the record count as expected. Note that all queries fail from this point on.

Pyspark script*

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import dbldatagen as dg
import json
import uuid
import random

spark = (
    SparkSession
    .builder
    .appName('stream-to-hudi-s3-example')
    .getOrCreate()
)

hudi_db = 'default'
hudi_table = 'example-table'
hudi_checkpoint_path = 's3a://my-checkpoint-path'
hudi_table_path = 's3a://my-table-path' 
hive_thrift_url = 'thrift://hive-metastore.default:9083'
hive_jdbc_url = 'jdbc:hive2://hive-metastore.default:10000'

schema = ArrayType(
    StructType([
        StructField("domain", StringType(), False),
        StructField("risk", StringType(), False),
        StructField("timestamp", TimestampType(), False),
    ])
)

multi_writer_id = 'datagen-writer1'
@udf(returnType=StringType())
def generate_domain(): 
    rand_domain = f"{random.randrange(10000000,20000000)}.com"
    return rand_domain

@udf(returnType=LongType())
def generate_timestamp():
    return random.randrange(1000000000,2000000000)

@udf(returnType=StringType())
def generate_risk():
    return json.dumps({"blaa":str(uuid.uuid4())})
 
ds = (
         dg.DataGenerator(spark, name="test-data-set", partitions=1)
         .withColumn("offset", "long", minValue=1, maxValue=9999999, random=True)
         .withColumn("timestamp_", "timestamp", random=True)
         .build(withStreaming=True, options={'rowsPerSecond': 10000, 'rampUpTimeSeconds':60})
         .withColumn("domain", generate_domain())
         .withColumn("timestamp", generate_timestamp())
         .withColumn("risk", generate_risk())
         .withColumnRenamed("timestamp_","kafka_timestamp")
    ) 

df = (
         ds.select(col("offset"), col("kafka_timestamp"), col("domain"), col("timestamp"), col("risk"))
         .na.drop()
     )


hudi_precombine_field = 'timestamp'
hudi_recordkey_field = 'domain'

hudi_options = {
    'hoodie.archive.async': True,
    'hoodie.clean.async.enabled': True,
    'hoodie.clean.automatic': True,
    'hoodie.clean.commits.retained': 5,
    'hoodie.clean.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.clean.fileversions.retained': '2',
    'hoodie.cleaner.policy.failed.writes': 'LAZY',
    'hoodie.clustering.async.enabled': False,
    'hoodie.clustering.async.max.commits': 0,
    'hoodie.clustering.async.max.commits': 2,
    'hoodie.clustering.execution.strategy.class': 'org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy',
    'hoodie.clustering.inline': True,
    'hoodie.clustering.inline.max.commits': 2,
    'hoodie.clustering.plan.strategy.class': 'org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy',
    'hoodie.clustering.plan.strategy.sort.columns': hudi_recordkey_field,
    'hoodie.clustering.preserve.commit.metadata': True,
    'hoodie.clustering.rollback.pending.replacecommit.on.conflict': True,
    'hoodie.clustering.updates.strategy': 'org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy',
    'hoodie.compact.inline.max.delta.commits': 2,
    'hoodie.datasource.hive_sync.db': hudi_db,
    'hoodie.datasource.hive_sync.enable':True,
    'hoodie.datasource.hive_sync.ignore_exceptions': True,
    'hoodie.datasource.hive_sync.jdbcurl':hive_jdbc_url,
    'hoodie.datasource.hive_sync.metastore.uris':hive_thrift_url,
    'hoodie.datasource.hive_sync.mode':'hms',
    'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.NonPartitionedExtractor',
    'hoodie.datasource.hive_sync.password':'',
    'hoodie.datasource.hive_sync.recreate_table_on_error': True,
    'hoodie.datasource.hive_sync.skip_ro_suffix':True,
    'hoodie.datasource.hive_sync.table':hudi_table,
    'hoodie.datasource.hive_sync.username':'hive',
    'hoodie.datasource.meta.sync.enable': 'true',
    'hoodie.datasource.read.incr.fallback.fulltablescan.enable': True,
    'hoodie.datasource.read.use.new.parquet.file.format': True,
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': hudi_precombine_field,
    'hoodie.datasource.write.reconcile.schema':'true',
    'hoodie.datasource.write.record.merger.impls': 'org.apache.hudi.HoodieSparkRecordMerger',
    'hoodie.datasource.write.recordkey.field': hudi_recordkey_field,
    'hoodie.datasource.write.row.writer.enable': True,
    'hoodie.datasource.write.streaming.checkpoint.identifier': multi_writer_id,
    'hoodie.datasource.write.streaming.ignore.failed.batch': 'true',
    'hoodie.datasource.write.table.name': hudi_table,
    'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    'hoodie.enable.data.skipping': True,
    'hoodie.index.type': 'RECORD_INDEX',
    'hoodie.keep.min.commits':'10',
    'hoodie.logfile.data.block.format':'parquet',
    'hoodie.merge.use.record.positions': True,
    'hoodie.metadata.auto.initialize': True,
    'hoodie.metadata.enable': True,
    'hoodie.metadata.clean.async': True,
    'hoodie.metadata.index.async': False, # DO NOT SET TRUE!!! Record and column indexes will not be created!
    'hoodie.metadata.index.column.stats.columns': hudi_recordkey_field,
    'hoodie.metadata.index.column.stats.column.list': hudi_recordkey_field,
    'hoodie.metadata.index.column.stats.enable': True,
    'hoodie.metadata.record.index.enable': True,
    'hoodie.parquet.avro.write-old-list-structure':'false',
    'hoodie.parquet.compression.codec': 'snappy',
    'hoodie.record.index.use.caching':True,
    'hoodie.schema.on.read.enable': True,
    'hoodie.table.name': hudi_table,
    'hoodie.table.services.enabled': True,
    'hoodie.write.concurrency.early.conflict.detection.enable': True,
    'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
    'hoodie.write.executor.type': 'DISRUPTOR',
    'hoodie.write.lock.provider': 'org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider',
    'hoodie.write.lock.wait_time_ms': '300000',
    'hoodie.write.lock.zookeeper.base_path': '/hudi/local/table',
    'hoodie.write.lock.zookeeper.lock_key': f'{hudi_table}',
    'hoodie.write.lock.zookeeper.port': '2181',
    'hoodie.write.lock.zookeeper.url': 'zk-cs.default',
    'hoodie.write.set.null.for.missing.columns': True, 
    'checkpointLocation': hudi_checkpoint_path,
    'parquet.avro.write-old-list-structure': 'false',
    'path': hudi_table_path,
}

print(f"hudi_options={hudi_options}")

df.writeStream \
   .format("org.apache.hudi") \
   .options(**hudi_options) \
   .outputMode("append") \
   .start() 

spark.streams.awaitAnyTermination()

Expected behavior

Queries should succeed without crashing.

Environment Description

  • Hudi version : 1.0.0-beta2

  • Spark version : 3.5.2

  • Hive version : 3.1.3

  • Hadoop version : 3.3.4

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : yes

Additional context

I've been trying to narrow down the cause of this without a huge amount of success. I disabled clustering, but it still occurs. It looks like something strange is happening on the read when it's merging. It doesn't seem to crash if every record key is 100% unique. But of course in the real world I need to be able to upsert data...

Stacktrace

24/10/15 20:53:21 INFO TaskSetManager: Finished task 2.0 in stage 77.0 (TID 307) in 6029 ms on mike-test-spark-app-74d7488c9d-kzm2z (executor driver) (14/15)
24/10/15 20:53:22 INFO HoodieLogFileReader: Closing Log file reader .cbd4e0ac-3d14-4a38-9199-6800a4e25946-0_20241015205126562.log.1_6-361-2513
24/10/15 20:53:22 INFO HoodieMergedLogRecordReader: Number of log files scanned => 1
24/10/15 20:53:22 INFO HoodieMergedLogRecordReader: Number of entries in Map => 233427
24/10/15 20:53:22 INFO InternalParquetRecordReader: at row 0. reading next block
24/10/15 20:53:23 INFO InternalParquetRecordReader: block read in memory in 1023 ms. row count = 1686704
24/10/15 20:53:23 ERROR Executor: Exception in task 0.0 in stage 77.0 (TID 305)
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
right (org.apache.hudi.common.util.collection.ImmutablePair)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
	at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:103)
	at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:77)
	at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:209)
	at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:202)
	at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:198)
	at org.apache.hudi.common.util.collection.BitCaskDiskMap.remove(BitCaskDiskMap.java:245)
	at org.apache.hudi.common.util.collection.BitCaskDiskMap.remove(BitCaskDiskMap.java:67)
	at org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:240)
	at org.apache.hudi.common.table.read.HoodieKeyBasedFileGroupRecordBuffer.hasNextBaseRecord(HoodieKeyBasedFileGroupRecordBuffer.java:125)
	at org.apache.hudi.common.table.read.HoodieKeyBasedFileGroupRecordBuffer.doHasNext(HoodieKeyBasedFileGroupRecordBuffer.java:135)
	at org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.hasNext(HoodieBaseFileGroupRecordBuffer.java:130)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader.hasNext(HoodieFileGroupReader.java:201)
	at org.apache.hudi.common.table.read.HoodieFileGroupReader$HoodieFileGroupReaderIterator.hasNext(HoodieFileGroupReader.java:262)
	at org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat$$anon$1.hasNext(HoodieFileGroupReaderBasedParquetFileFormat.scala:250)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.UnsupportedOperationException
	at java.base/java.util.Collections$UnmodifiableCollection.add(Collections.java:1067)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
	... 49 more
@danny0405
Copy link
Contributor

is it because of the Kyro jar conflict?

@danny0405 danny0405 added the clazz-conflict conflict of classes label Oct 16, 2024
@meatheadmike
Copy link
Author

meatheadmike commented Oct 16, 2024

This is what I thought initially too. So then I deployed a bone-stock Spark image (i.e. the Dockerhub image published by Apacke) and used the --packages flag:

export SPARK_CONF_DIR=/opt/spark/conf
export SPARK_VERSION=3.5 # or 3.4, 3.3, 3.2
export HUDI_VERSION=1.0.0-beta2
/opt/spark/bin/pyspark \
--packages \
org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:$HUDI_VERSION,\
com.amazonaws:aws-java-sdk-bundle:1.12.770,\
org.apache.hadoop:hadoop-common:3.3.4,\
org.apache.hadoop:hadoop-client:3.3.4,\
org.apache.hadoop:hadoop-aws:3.3.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
--conf spark.driver.extraJavaOptions='-Divy.cache.dir=/tmp -Divy.home=/tmp'

This should have eliminated any potential jar conflicts, no? Mind you this was done exclusively on the reader side of thngs as I assume this is where the problem is.

@meatheadmike
Copy link
Author

UPDATE: This appears to be a regression with 1.0.0-beta2

I tried spinning up a container with spark 3.4 to see if downgrading spark would help. No dice. I got the same error. However when I downgraded hudi to 1.0.0.-beta1 I was finally able to query! So there is some sort of regression here.

@rangareddy
Copy link

Hi @meatheadmike

Thank you for reporting this issue. I can reproduce the problem with the Hudi 1.0.0-beta2 source code. However, when I tested the same code using the Hudi master branch version (i.e., 1.0.0), it worked without any issues. Could you please test with the Hudi master branch code? The issue should not arise there.

@rangareddy
Copy link

rangareddy commented Oct 21, 2024

Code:

HUDI_SPARK_JAR="./packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.0.0-SNAPSHOT.jar"

$SPARK_HOME/bin/pyspark \
--deploy-mode client \
--jars $HUDI_SPARK_JAR \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
--driver-memory 20g \
--executor-memory 20g \
--conf spark.driver.memoryOverhead=4g \
--conf spark.executor.memoryOverhead=4g \
--conf "spark.sql.hive.convertMetastoreParquet=false"
from pyspark.sql.functions import *
from pyspark.sql.types import *
import dbldatagen as dg
import json
import uuid
import random

hudi_db = 'default'
hudi_table = 'example-table'
hudi_table=f'file:///tmp/hudi/{hudi_table}'
hudi_checkpoint_path = f'{hudi_table}/checkpoint'
hudi_table_path = f'{hudi_table}/tablepath' 

schema = ArrayType(
    StructType([
        StructField("domain", StringType(), False),
        StructField("risk", StringType(), False),
        StructField("timestamp", TimestampType(), False),
    ])
)

multi_writer_id = 'datagen-writer1'

@udf(returnType=StringType())
def generate_domain(): 
    rand_domain = f"{random.randrange(10000000,20000000)}.com"
    return rand_domain

@udf(returnType=LongType())
def generate_timestamp():
    return random.randrange(1000000000,2000000000)

@udf(returnType=StringType())
def generate_risk():
    return json.dumps({"blaa":str(uuid.uuid4())})


ds = (
     dg.DataGenerator(spark, name="test-data-set", partitions=1)
     .withColumn("offset", "long", minValue=1, maxValue=9999999, random=True)
     .withColumn("timestamp_", "timestamp", random=True)
     .build(withStreaming=True, options={'rowsPerSecond': 10000, 'rampUpTimeSeconds':60})
     .withColumn("domain", generate_domain())
     .withColumn("timestamp", generate_timestamp())
     .withColumn("risk", generate_risk())
     .withColumnRenamed("timestamp_","kafka_timestamp")
    ) 

df = (ds.select(col("offset"), col("kafka_timestamp"), col("domain"), col("timestamp"), col("risk")).na.drop())

hudi_precombine_field = 'timestamp'
hudi_recordkey_field = 'domain'

hudi_options = {
    'hoodie.archive.async': True,
    'hoodie.clean.async.enabled': True,
    'hoodie.clean.automatic': True,
    'hoodie.cleaner.commits.retained': 10,
    'hoodie.keep.min.commits': 21,
    'hoodie.keep.max.commits': 30,
    'hoodie.clean.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.clean.fileversions.retained': '2',
    'hoodie.clean.failed.writes.policy': 'LAZY',
    'hoodie.clustering.inline': True,
    'hoodie.clustering.inline.max.commits': 2,
    'hoodie.clustering.execution.strategy.class': 'org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy',
    'hoodie.clustering.plan.strategy.class': 'org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy',
    'hoodie.clustering.plan.strategy.sort.columns': hudi_recordkey_field,
    'hoodie.clustering.preserve.commit.metadata': True,
    'hoodie.clustering.rollback.pending.replacecommit.on.conflict': True,
    'hoodie.clustering.updates.strategy': 'org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy',
    'hoodie.compact.inline.max.delta.commits': 2,
    'hoodie.datasource.read.incr.fallback.fulltablescan.enable': True,
    'hoodie.datasource.read.use.new.parquet.file.format': True,
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': hudi_precombine_field,
    'hoodie.datasource.write.reconcile.schema':'true',
    'hoodie.datasource.write.record.merger.impls': 'org.apache.hudi.HoodieSparkRecordMerger',
    'hoodie.datasource.write.recordkey.field': hudi_recordkey_field,
    'hoodie.datasource.write.row.writer.enable': True,
    'hoodie.datasource.write.streaming.checkpoint.identifier': multi_writer_id,
    'hoodie.datasource.write.streaming.ignore.failed.batch': 'true',
    'hoodie.datasource.write.table.name': hudi_table,
    'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    'hoodie.enable.data.skipping': True,
    'hoodie.index.type': 'RECORD_INDEX',
    'hoodie.logfile.data.block.format':'parquet',
    'hoodie.merge.use.record.positions': True,
    'hoodie.metadata.auto.initialize': True,
    'hoodie.metadata.enable': True,
    'hoodie.metadata.clean.async': True,
    'hoodie.metadata.index.async': False, # DO NOT SET TRUE!!! Record and column indexes will not be created!
    'hoodie.metadata.index.column.stats.columns': hudi_recordkey_field,
    'hoodie.metadata.index.column.stats.column.list': hudi_recordkey_field,
    'hoodie.metadata.index.column.stats.enable': True,
    'hoodie.metadata.record.index.enable': True,
    'hoodie.parquet.avro.write-old-list-structure':'false',
    'hoodie.parquet.compression.codec': 'snappy',
    'hoodie.record.index.use.caching':True,
    'hoodie.schema.on.read.enable': True,
    'hoodie.table.name': hudi_table,
    'hoodie.table.services.enabled': True,
    'checkpointLocation': hudi_checkpoint_path,
    'parquet.avro.write-old-list-structure': 'false',
    'path': hudi_table_path,
}

print(f"hudi_options={hudi_options}")

df.writeStream \
   .format("org.apache.hudi") \
   .options(**hudi_options) \
   .outputMode("append") \
   .start() 

spark.streams.awaitAnyTermination()

Exception:

Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
right (org.apache.hudi.common.util.collection.ImmutablePair)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:103)
    at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:77)
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:209)
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:202)
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:198)
    at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:67)
    at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:199)
    at org.apache.hudi.common.table.read.HoodieKeyBasedFileGroupRecordBuffer.processNextDataRecord(HoodieKeyBasedFileGroupRecordBuffer.java:88)
    at org.apache.hudi.common.table.read.HoodieKeyBasedFileGroupRecordBuffer.processDataBlock(HoodieKeyBasedFileGroupRecordBuffer.java:81)
    at org.apache.hudi.common.table.log.BaseHoodieLogRecordReader.processQueuedBlocksForInstant(BaseHoodieLogRecordReader.java:759)
    at org.apache.hudi.common.table.log.BaseHoodieLogRecordReader.scanInternalV1(BaseHoodieLogRecordReader.java:401)
    ... 32 more
Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 57 more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Awaiting Triage
Development

No branches or pull requests

4 participants