Skip to content

Commit

Permalink
Improved csv and json readers
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulbhatia023 committed Jan 30, 2023
1 parent b3a47ea commit 0d145ac
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package com.clairvoyant.restonomer.core.converter
import com.clairvoyant.restonomer.spark.utils.reader.CSVTextToDataFrameReader
import org.apache.spark.sql.{DataFrame, SparkSession}

class CSVResponseToDataFrameConverter extends ResponseToDataFrameConverter {
class CSVResponseToDataFrameConverter(
containsHeader: Boolean = true
) extends ResponseToDataFrameConverter {

def convertResponseToDataFrame(
restonomerResponseBody: Seq[String]
)(implicit sparkSession: SparkSession): DataFrame =
new CSVTextToDataFrameReader(
sparkSession = sparkSession,
text = restonomerResponseBody.mkString
).read
containsHeader = containsHeader
).read(text = restonomerResponseBody.flatMap(_.split("\n")))

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ class JSONResponseToDataFrameConverter(dataColumnName: Option[String] = None) ex
)(implicit sparkSession: SparkSession): DataFrame = {
val responseDF =
new JSONTextToDataFrameReader(
sparkSession = sparkSession,
text = restonomerResponseBody
).read
sparkSession = sparkSession
).read(restonomerResponseBody)

dataColumnName
.map { dataColumn =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ class CastColumnsTransformationSpec extends CoreSpec {

val restonomerResponseDF: DataFrame =
new JSONTextToDataFrameReader(
sparkSession = sparkSession,
text = Seq(
sparkSession = sparkSession
).read(text =
Seq(
"""
|{
| "col_A": 5,
Expand All @@ -19,7 +20,7 @@ class CastColumnsTransformationSpec extends CoreSpec {
|}
|""".stripMargin
)
).read
)

"transform() - with columnDataTypeMapper" should "cast columns as specified in the mapper" in {
restonomerResponseDF.schema.fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ class CastNestedColumnTransformationSpec extends CoreSpec with DataFrameMatchers

val restonomerResponseDF: DataFrame =
new JSONTextToDataFrameReader(
sparkSession = sparkSession,
text = Seq(
sparkSession = sparkSession
).read(text =
Seq(
"""
|{
| "col_A": "val_A",
Expand All @@ -21,7 +22,7 @@ class CastNestedColumnTransformationSpec extends CoreSpec with DataFrameMatchers
|}
|""".stripMargin
)
).read
)

"transform() - with valid column name and ddl" should "cast the nested column" in {
restonomerResponseDF.schema.fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ class ConvertColumnCaseTransformationSpec extends CoreSpec with DataFrameMatcher

val restonomerResponseDF: DataFrame =
new JSONTextToDataFrameReader(
sparkSession = sparkSession,
text = Seq(
sparkSession = sparkSession
).read(text =
Seq(
"""
|{
| "col_a": "1",
| "COL_B": "2"
|}""".stripMargin
)
).read
)

"transform() - with valid column name and case type" should "transform the column case" in {
val restonomerTransformation = ChangeColumnCase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ class ConvertColumnToJsonTransformationSpec extends CoreSpec with DataFrameMatch

val restonomerResponseDF: DataFrame =
new JSONTextToDataFrameReader(
sparkSession = sparkSession,
text = Seq(
sparkSession = sparkSession
).read(text =
Seq(
"""
|{
| "col_A": "1",
Expand All @@ -23,7 +24,7 @@ class ConvertColumnToJsonTransformationSpec extends CoreSpec with DataFrameMatch
| ]
|}""".stripMargin
)
).read
)

"transform() - with valid column name" should "transform the column to json" in {
val restonomerTransformation = ConvertColumnToJson(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ class ExplodeColumnTransformationSpec extends CoreSpec with DataFrameMatchers {

val restonomerResponseDF: DataFrame =
new JSONTextToDataFrameReader(
sparkSession = sparkSession,
text = Seq(
sparkSession = sparkSession
).read(text =
Seq(
"""
|{
| "col_A": "val_A",
| "col_B": ["val_1", "val_2", "val_3"]
|}
|""".stripMargin
)
).read
)

"transform() - with valid column name" should "explodeColumn the column into multiple rows" in {
val restonomerTransformation = ExplodeColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ class FlattenSchemaTransformationSpec extends CoreSpec with DataFrameMatchers {

val restonomerResponseDF: DataFrame =
new JSONTextToDataFrameReader(
sparkSession = sparkSession,
sparkSession = sparkSession
).read(
text = Seq(
"""
|{
Expand All @@ -20,7 +21,7 @@ class FlattenSchemaTransformationSpec extends CoreSpec with DataFrameMatchers {
|}
|""".stripMargin
)
).read
)

"transform()" should "flatten the response dataframe" in {
val restonomerTransformation = FlattenSchema()
Expand All @@ -29,16 +30,17 @@ class FlattenSchemaTransformationSpec extends CoreSpec with DataFrameMatchers {

val expectedRestonomerResponseTransformedDF: DataFrame =
new JSONTextToDataFrameReader(
sparkSession = sparkSession,
text = Seq(
sparkSession = sparkSession
).read(text =
Seq(
"""
|{
| "rewardApprovedMonthPeriod_from": "2021-09",
| "rewardApprovedMonthPeriod_to": "2021-10"
|}
|""".stripMargin
)
).read
)

actualRestonomerResponseTransformedDF should matchExpectedDataFrame(expectedRestonomerResponseTransformedDF)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import org.apache.spark.sql.{DataFrame, SparkSession}

class CSVTextToDataFrameReader(
override val sparkSession: SparkSession,
val text: String
containsHeader: Boolean = true
) extends DataFrameReader {

import sparkSession.implicits._

override def read: DataFrame =
override def read(text: Seq[String]): DataFrame =
sparkSession.read
.option("header", "true")
.option("sep", ",")
.csv(text.split("\\n").toSeq.toDS())
.option("header", containsHeader.toString)
.csv(text.toDS())

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ trait DataFrameReader {

val sparkSession: SparkSession

def read: DataFrame
def read(text: Seq[String]): DataFrame

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package com.clairvoyant.restonomer.spark.utils.reader
import org.apache.spark.sql.{DataFrame, SparkSession}

class JSONTextToDataFrameReader(
override val sparkSession: SparkSession,
val text: Seq[String]
override val sparkSession: SparkSession
) extends DataFrameReader {

import sparkSession.implicits._

override def read: DataFrame = sparkSession.read.json(text.toDS())
override def read(text: Seq[String]): DataFrame = sparkSession.read.json(text.toDS())

}

0 comments on commit 0d145ac

Please sign in to comment.