Skip to content

Commit

Permalink
[HUDI-6910] Fix schema evolution in the filegroup reader for parquet …
Browse files Browse the repository at this point in the history
…log blocks (#12075)

* fix schema evolution parquet data blocks

* put the evolution into the spark parquet reader and make it match the avro parquet reader in structure. Reuse existing schema evolution code we use on the main spark parquet reader

* get timezone config

---------

Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex authored Oct 16, 2024
1 parent 47e5ddd commit ee3d9c3
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -46,6 +47,7 @@
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.execution.datasources.parquet.SparkBasicSchemaEvolution;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;

Expand All @@ -62,7 +64,9 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader {
private final StoragePath path;
private final HoodieStorage storage;
private final FileFormatUtils parquetUtils;
private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
private List<ClosableIterator> readerIterators = new ArrayList<>();
private Option<StructType> structTypeOption = Option.empty();
private Option<Schema> schemaOption = Option.empty();

public HoodieSparkParquetReader(HoodieStorage storage, StoragePath path) {
this.path = path;
Expand Down Expand Up @@ -90,67 +94,72 @@ public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {

@Override
public ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
ClosableIterator<InternalRow> iterator = getInternalRowIterator(readerSchema, requestedSchema);
StructType structType = HoodieInternalRowUtils.getCachedSchema(requestedSchema);
UnsafeProjection projection = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType);
return getRecordIterator(requestedSchema);
}

return new CloseableMappingIterator<>(iterator, data -> {
// NOTE: We have to do [[UnsafeProjection]] of incoming [[InternalRow]] to convert
// it to [[UnsafeRow]] holding just raw bytes
UnsafeRow unsafeRow = projection.apply(data);
return unsafeCast(new HoodieSparkRecord(unsafeRow));
});
@Override
public ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(Schema schema) throws IOException {
ClosableIterator<UnsafeRow> iterator = getUnsafeRowIterator(schema);
return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieSparkRecord(data)));
}

@Override
public ClosableIterator<String> getRecordKeyIterator() throws IOException {
Schema schema = HoodieAvroUtils.getRecordKeySchema();
ClosableIterator<InternalRow> iterator = getInternalRowIterator(schema, schema);
StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
UnsafeProjection projection = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType);

ClosableIterator<UnsafeRow> iterator = getUnsafeRowIterator(schema);
return new CloseableMappingIterator<>(iterator, data -> {
// NOTE: We have to do [[UnsafeProjection]] of incoming [[InternalRow]] to convert
// it to [[UnsafeRow]] holding just raw bytes
UnsafeRow unsafeRow = projection.apply(data);
HoodieSparkRecord record = unsafeCast(new HoodieSparkRecord(unsafeRow));
HoodieSparkRecord record = unsafeCast(new HoodieSparkRecord(data));
return record.getRecordKey();
});
}

public ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
if (requestedSchema == null) {
requestedSchema = readerSchema;
}
StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(readerSchema);
StructType requestedStructType = HoodieInternalRowUtils.getCachedSchema(requestedSchema);
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readerStructType.json());
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), requestedStructType.json());
public ClosableIterator<UnsafeRow> getUnsafeRowIterator(Schema requestedSchema) throws IOException {
return getUnsafeRowIterator(HoodieInternalRowUtils.getCachedSchema(requestedSchema));
}

public ClosableIterator<UnsafeRow> getUnsafeRowIterator(StructType requestedSchema) throws IOException {
SparkBasicSchemaEvolution evolution = new SparkBasicSchemaEvolution(getStructSchema(), requestedSchema, SQLConf.get().sessionLocalTimeZone());
String readSchemaJson = evolution.getRequestSchema().json();
storage.getConf().set(ParquetReadSupport.PARQUET_READ_SCHEMA, readSchemaJson);
storage.getConf().set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA(), readSchemaJson);
storage.getConf().set(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.get().getConf(SQLConf.PARQUET_BINARY_AS_STRING()).toString());
storage.getConf().set(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.get().getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP()).toString());
ParquetReader<InternalRow> reader = ParquetReader.<InternalRow>builder((ReadSupport) new ParquetReadSupport(), new Path(path.toUri()))
.withConf(storage.getConf().unwrapAs(Configuration.class))
.build();
UnsafeProjection projection = evolution.generateUnsafeProjection();
ParquetReaderIterator<InternalRow> parquetReaderIterator = new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
return parquetReaderIterator;
CloseableMappingIterator<InternalRow, UnsafeRow> projectedIterator = new CloseableMappingIterator<>(parquetReaderIterator, projection::apply);
readerIterators.add(projectedIterator);
return projectedIterator;
}

@Override
public Schema getSchema() {
// Some types in avro are not compatible with parquet.
// Avro only supports representing Decimals as fixed byte array
// and therefore if we convert to Avro directly we'll lose logical type-info.
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
StructType structType = new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
return SparkAdapterSupport$.MODULE$.sparkAdapter()
.getAvroSchemaConverters()
.toAvroType(structType, true, messageType.getName(), StringUtils.EMPTY_STRING);
if (schemaOption.isEmpty()) {
// Some types in avro are not compatible with parquet.
// Avro only supports representing Decimals as fixed byte array
// and therefore if we convert to Avro directly we'll lose logical type-info.
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
StructType structType = new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
structTypeOption = Option.of(structType);
schemaOption = Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter()
.getAvroSchemaConverters()
.toAvroType(structType, true, messageType.getName(), StringUtils.EMPTY_STRING));
}
return schemaOption.get();
}

protected StructType getStructSchema() {
if (structTypeOption.isEmpty()) {
getSchema();
}
return structTypeOption.get();
}

@Override
public void close() {
readerIterators.forEach(ParquetReaderIterator::close);
readerIterators.forEach(ClosableIterator::close);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, CloseableMappingIterator}
import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator}
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath}
import org.apache.hudi.util.CloseableInternalRowIterator
Expand All @@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, SparkParquetReader}
import org.apache.spark.sql.hudi.SparkAdapter
Expand Down Expand Up @@ -87,17 +87,8 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
}
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
if (FSUtils.isLogFile(filePath)) {
val projection = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType)
new CloseableMappingIterator[InternalRow, UnsafeRow](
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
.asInstanceOf[HoodieSparkParquetReader].getInternalRowIterator(dataSchema, requiredSchema),
new java.util.function.Function[InternalRow, UnsafeRow] {
override def apply(data: InternalRow): UnsafeRow = {
// NOTE: We have to do [[UnsafeProjection]] of incoming [[InternalRow]] to convert
// it to [[UnsafeRow]] holding just raw bytes
projection.apply(data)
}
}).asInstanceOf[ClosableIterator[InternalRow]]
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(structType).asInstanceOf[ClosableIterator[InternalRow]]
} else {
// partition value is empty because the spark parquet reader will append the partition columns to
// each row if they are given. That is the only usage of the partition values in the reader.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, UnsafeProjection}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}

object HoodieParquetFileFormatHelper {

def buildImplicitSchemaChangeInfo(hadoopConf: Configuration,
parquetFileMetaData: FileMetaData,
requiredSchema: StructType): (java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]], StructType) = {
val implicitTypeChangeInfo: java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]] = new java.util.HashMap()
val convert = new ParquetToSparkSchemaConverter(hadoopConf)
val fileStruct = convert.convert(parquetFileMetaData.getSchema)
buildImplicitSchemaChangeInfo(fileStruct, requiredSchema)
}

def buildImplicitSchemaChangeInfo(fileStruct: StructType,
requiredSchema: StructType): (java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]], StructType) = {
val implicitTypeChangeInfo: java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]] = new java.util.HashMap()

val fileStructMap = fileStruct.fields.map(f => (f.name, f.dataType)).toMap
// if there are missing fields or if field's data type needs to be changed while reading, we handle it here.
val sparkRequestStructFields = requiredSchema.map(f => {
Expand Down Expand Up @@ -86,4 +97,33 @@ object HoodieParquetFileFormatHelper {
}))
case _ => fileType
}

def generateUnsafeProjection(fullSchema: Seq[Attribute],
timeZoneId: Option[String],
typeChangeInfos: java.util.Map[Integer, org.apache.hudi.common.util.collection.Pair[DataType, DataType]],
requiredSchema: StructType,
partitionSchema: StructType,
schemaUtils: HoodieSchemaUtils): UnsafeProjection = {

if (typeChangeInfos.isEmpty) {
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
} else {
// find type changed.
val newSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
if (typeChangeInfos.containsKey(i)) {
StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata)
} else f
})
val newFullSchema = schemaUtils.toAttributes(newSchema) ++ schemaUtils.toAttributes(partitionSchema)
val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
if (typeChangeInfos.containsKey(i)) {
val srcType = typeChangeInfos.get(i).getRight
val dstType = typeChangeInfos.get(i).getLeft
val needTimeZone = Cast.needsTimeZone(srcType, dstType)
Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hudi.SparkAdapterSupport.sparkAdapter

import org.apache.spark.sql.HoodieSchemaUtils
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.types.StructType


/**
* Intended to be used just with HoodieSparkParquetReader to avoid any java/scala issues
*/
class SparkBasicSchemaEvolution(fileSchema: StructType,
requiredSchema: StructType,
sessionLocalTimeZone: String) {

val (implicitTypeChangeInfo, sparkRequestSchema) = HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo(fileSchema, requiredSchema)

def getRequestSchema: StructType = {
if (implicitTypeChangeInfo.isEmpty) {
requiredSchema
} else {
sparkRequestSchema
}
}

def generateUnsafeProjection(): UnsafeProjection = {
val schemaUtils: HoodieSchemaUtils = sparkAdapter.getSchemaUtils
HoodieParquetFileFormatHelper.generateUnsafeProjection(schemaUtils.toAttributes(requiredSchema), Some(sessionLocalTimeZone),
implicitTypeChangeInfo, requiredSchema, new StructType(), schemaUtils)
}
}
Loading

0 comments on commit ee3d9c3

Please sign in to comment.