Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to enable metadata comparison #164

Merged
merged 4 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ trait DataFrameComparer extends DatasetComparer {
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true,
truncate: Int = 500
): Unit = {
assertSmallDatasetEquality(
Expand All @@ -22,6 +23,7 @@ trait DataFrameComparer extends DatasetComparer {
ignoreColumnNames,
orderedComparison,
ignoreColumnOrder,
ignoreMetadata,
truncate
)
}
Expand All @@ -35,15 +37,17 @@ trait DataFrameComparer extends DatasetComparer {
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true
): Unit = {
assertLargeDatasetEquality(
actualDF,
expectedDF,
ignoreNullable = ignoreNullable,
ignoreColumnNames = ignoreColumnNames,
orderedComparison = orderedComparison,
ignoreColumnOrder = ignoreColumnOrder
ignoreColumnOrder = ignoreColumnOrder,
ignoreMetadata = ignoreMetadata
)
}

Expand All @@ -57,7 +61,8 @@ trait DataFrameComparer extends DatasetComparer {
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true
): Unit = {
assertSmallDatasetEquality[Row](
actualDF,
Expand All @@ -66,6 +71,7 @@ trait DataFrameComparer extends DatasetComparer {
ignoreColumnNames,
orderedComparison,
ignoreColumnOrder,
ignoreMetadata,
equals = RowComparer.areRowsEqual(_, _, precision)
)
}
Expand All @@ -80,7 +86,8 @@ trait DataFrameComparer extends DatasetComparer {
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true
): Unit = {
assertLargeDatasetEquality[Row](
actualDF,
Expand All @@ -89,7 +96,8 @@ trait DataFrameComparer extends DatasetComparer {
ignoreNullable,
ignoreColumnNames,
orderedComparison,
ignoreColumnOrder
ignoreColumnOrder,
ignoreMetadata
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ Expected DataFrame Row Count: '$expectedCount'
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true,
truncate: Int = 500,
equals: (T, T) => Boolean = (o1: T, o2: T) => o1.equals(o2)
): Unit = {
SchemaComparer.assertSchemaEqual(actualDS, expectedDS, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
SchemaComparer.assertSchemaEqual(actualDS, expectedDS, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)
val actual = if (ignoreColumnOrder) orderColumns(actualDS, expectedDS) else actualDS
assertSmallDatasetContentEquality(actual, expectedDS, orderedComparison, truncate, equals)
}
Expand Down Expand Up @@ -98,10 +99,11 @@ Expected DataFrame Row Count: '$expectedCount'
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true
): Unit = {
// first check if the schemas are equal
SchemaComparer.assertSchemaEqual(actualDS, expectedDS, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
SchemaComparer.assertSchemaEqual(actualDS, expectedDS, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)
val actual = if (ignoreColumnOrder) orderColumns(actualDS, expectedDS) else actualDS
assertLargeDatasetContentEquality(actual, expectedDS, equals, orderedComparison)
}
Expand Down Expand Up @@ -157,7 +159,8 @@ Expected DataFrame Row Count: '$expectedCount'
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true
): Unit = {
val e = (r1: Row, r2: Row) => {
r1.equals(r2) || RowComparer.areRowsEqual(r1, r2, precision)
Expand All @@ -169,7 +172,8 @@ Expected DataFrame Row Count: '$expectedCount'
ignoreNullable,
ignoreColumnNames,
orderedComparison,
ignoreColumnOrder
ignoreColumnOrder,
ignoreMetadata
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ object SchemaComparer {
expectedDS: Dataset[T],
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
ignoreColumnOrder: Boolean = true
ignoreColumnOrder: Boolean = true,
ignoreMetadata: Boolean = true
): Unit = {
require((ignoreColumnNames, ignoreColumnOrder) != (true, true), "Cannot set both ignoreColumnNames and ignoreColumnOrder to true.")
if (!SchemaComparer.equals(actualDS.schema, expectedDS.schema, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)) {
if (!SchemaComparer.equals(actualDS.schema, expectedDS.schema, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)) {
throw DatasetSchemaMismatch(
"Diffs\n" + betterSchemaMismatchMessage(actualDS, expectedDS)
)
Expand All @@ -35,7 +36,8 @@ object SchemaComparer {
s2: StructType,
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
ignoreColumnOrder: Boolean = true
ignoreColumnOrder: Boolean = true,
ignoreMetadata: Boolean = true
): Boolean = {
if (s1.length != s2.length) {
false
Expand All @@ -44,24 +46,32 @@ object SchemaComparer {
false
} else {
val zipStruct = if (ignoreColumnOrder) s1.sortBy(_.name) zip s2.sortBy(_.name) else s1 zip s2
zipStruct.forall { t =>
(t._1.nullable == t._2.nullable || ignoreNullable) &&
(t._1.name == t._2.name || ignoreColumnNames) &&
equals(t._1.dataType, t._2.dataType, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
zipStruct.forall { case (f1, f2) =>
(f1.nullable == f2.nullable || ignoreNullable) &&
(f1.name == f2.name || ignoreColumnNames) &&
(f1.metadata == f2.metadata || ignoreMetadata) &&
equals(f1.dataType, f2.dataType, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)
}
}
}
}

def equals(dt1: DataType, dt2: DataType, ignoreNullable: Boolean, ignoreColumnNames: Boolean, ignoreColumnOrder: Boolean): Boolean = {
(ignoreNullable, dt1, dt2) match {
case (ignoreNullable, st1: StructType, st2: StructType) if ignoreNullable || ignoreColumnOrder =>
def equals(
dt1: DataType,
dt2: DataType,
ignoreNullable: Boolean,
ignoreColumnNames: Boolean,
ignoreColumnOrder: Boolean,
ignoreMetadata: Boolean
): Boolean = {
(dt1, dt2) match {
case (st1: StructType, st2: StructType) =>
equals(st1, st2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
case (true, ArrayType(vdt1, _), ArrayType(vdt2, _)) =>
equals(vdt1, vdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
case (true, MapType(kdt1, vdt1, _), MapType(kdt2, vdt2, _)) =>
equals(kdt1, kdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder) &&
equals(vdt1, vdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
case (ArrayType(vdt1, _), ArrayType(vdt2, _)) =>
equals(vdt1, vdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)
case (MapType(kdt1, vdt1, _), MapType(kdt2, vdt2, _)) =>
equals(kdt1, kdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata) &&
equals(vdt1, vdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)
case _ => dt1 == dt2
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.github.mrpowers.spark.fast.tests

import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType}
import org.apache.spark.sql.types.{DoubleType, IntegerType, MetadataBuilder, LongType, StringType}
import SparkSessionExt._
import com.github.mrpowers.spark.fast.tests.SchemaComparer.DatasetSchemaMismatch
import com.github.mrpowers.spark.fast.tests.StringExt.StringOps
import org.apache.spark.sql.functions.col
import org.scalatest.freespec.AnyFreeSpec

class DataFrameComparerTest extends AnyFreeSpec with DataFrameComparer with SparkSessionTestWrapper {
Expand Down Expand Up @@ -366,6 +367,56 @@ class DataFrameComparerTest extends AnyFreeSpec with DataFrameComparer with Spar
assert(expectedColourGroup.contains(Seq("word", "StringType", "StructField(long,LongType,true,{})")))
assert(actualColourGroup.contains(Seq("float", "DoubleType", "MISSING")))
}

"can performed Dataset comparisons and ignore metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small int").build()))

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small number").build()))

assertLargeDataFrameEquality(sourceDF, expectedDF)
}

"can performed Dataset comparisons and compare metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small int").build()))

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small number").build()))

intercept[DatasetSchemaMismatch] {
assertLargeDataFrameEquality(sourceDF, expectedDF, ignoreMetadata = false)
}
}
}

"assertApproximateDataFrameEquality" - {
Expand Down Expand Up @@ -534,6 +585,56 @@ class DataFrameComparerTest extends AnyFreeSpec with DataFrameComparer with Spar

assertApproximateDataFrameEquality(ds1, ds2, precision = 0.0000001, orderedComparison = false)
}

"can performed Dataset comparisons and ignore metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small int").build()))

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small number").build()))

assertApproximateDataFrameEquality(sourceDF, expectedDF, precision = 0.0000001)
}

"can performed Dataset comparisons and compare metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small int").build()))

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small number").build()))

intercept[DatasetSchemaMismatch] {
assertApproximateDataFrameEquality(sourceDF, expectedDF, precision = 0.0000001, ignoreMetadata = false)
}
}
}

"assertApproximateSmallDataFrameEquality" - {
Expand Down Expand Up @@ -702,5 +803,55 @@ class DataFrameComparerTest extends AnyFreeSpec with DataFrameComparer with Spar

assertApproximateSmallDataFrameEquality(ds1, ds2, precision = 0.0000001, orderedComparison = false)
}

"can performed Dataset comparisons and ignore metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small int").build()))

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small number").build()))

assertApproximateSmallDataFrameEquality(sourceDF, expectedDF, precision = 0.0000001)
}

"can performed Dataset comparisons and compare metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small int").build()))

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withColumn("number", col("number").as("number", new MetadataBuilder().putString("description", "small number").build()))

intercept[DatasetSchemaMismatch] {
assertApproximateSmallDataFrameEquality(sourceDF, expectedDF, precision = 0.0000001, ignoreMetadata = false)
}
}
}
}
Loading