how to convert struct to map type and vice versa in spark via scala

Hi all, I recently faced an interesting challenge of having to convert Spark StructType to MapType and vice-versa in Spark Dataset using Scala. After spending a good bit of time searching the internet, I could not find a readymade or handy solution to perform this transformation, so I started digging around and this post describes my findings.

Input Dataset

For this article, we will assume that you have a working local Spark installation (see Installing Spache Spark) and the input data is available on your local disk.

For our input dataset, we will make use of the following employee JSON data.

[
    {
        "id": "E100",
        "employeeName": "Ravi",
        "department": {
            "departmentId": "D100",
            "departmentName": "Software Engineering"
        },
        "salary": {
            "baseSalary": 1000,
            "currency": "USD"
        }
    },
    {
        "id": "E101",
        "employeeName": "Alice",
        "department": {
            "departmentId": "D200",
            "departmentName": "Finance"
        },
        "salary": {
            "baseSalary": 1000,
            "currency": "USD"
        }
    },
    {
        "id": "E102",
        "employeeName": "Bob",
        "department": {
            "departmentId": "D300",
            "departmentName": "Sales"
        },
        "salary": {
            "baseSalary": 1000,
            "currency": "USD"
        }
    }
]

You can read the raw JSON data to a Spark dataset using below code.

val employees = (spark.read
  .option("multiLine", "true") // Required for formatted JSON
  .json("/tmp/employees.json"))

You can view the inferred schema and print the dataset rows using below code.

// Print schema
employees.printSchema()
root
|-- department: struct (nullable = true)
|    |-- departmentId: string (nullable = true)
|    |-- departmentName: string (nullable = true)
|-- employeeName: string (nullable = true)
|-- id: string (nullable = true)
|-- salary: struct (nullable = true)
|    |-- baseSalary: long (nullable = true)
|    |-- currency: string (nullable = true)
// Show dataset
employees.show(false)
+----------------------------+------------+----+-----------+
|department                  |employeeName|id  |salary     |
+----------------------------+------------+----+-----------+
|{D100, Software Engineering}|Ravi        |E100|{1000, USD}|
|{D200, Finance}             |Alice       |E101|{1000, USD}|
|{D300, Sales}               |Bob         |E102|{1000, USD}|
+----------------------------+------------+----+-----------+

StructType to MapType

In the above employees dataset, we have two fields of StructType namely department and salary. We need to convert these to MapType.

For our use-case, we can convert them to Map[String, String] using below helper function.

/**
* Converts struct columns to Map[String, String] and returns the updated dataset
*
* @param dataset Input dataset
* @return Dataset with struct columns being replaced by Map[String, String]
*/
def convertStructColumnsToMap(dataset: Dataset[Row]): Dataset[Row] = {
  // Create MapType
  val mapColumnType = DataTypes.createMapType(StringType, StringType)
  dataset.schema.fields
    // Filter "struct" fields
    .filter(_.dataType.typeName.equals("struct"))
    .foldLeft(dataset) {
      case (intermediateDataset, field) =>
        // Convert column to JSON String
        val columnValueAsJson = to_json(col(field.name))
        // Convert JSON string column to MapType
        val columnValueAsMap = from_json(columnValueAsJson, mapColumnType)
        // Update column in the intermediate dataset
        intermediateDataset.withColumn(field.name, columnValueAsMap)
    }
}

We will invoke the above helper function to create a new employeesWithMapType dataset.

val employeesWithMapType = convertStructColumnsToMap(employees)
employeesWithMapType.printSchema()
root
|-- department: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)
|-- employeeName: string (nullable = true)
|-- id: string (nullable = true)
|-- salary: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)
employeesWithMapType.show(false)
+--------------------------------------------------------------+------------+----+-------------------------------------+
|department                                                    |employeeName|id  |salary                               |
+--------------------------------------------------------------+------------+----+-------------------------------------+
|{departmentId -> D100, departmentName -> Software Engineering}|Ravi        |E100|{baseSalary -> 1000, currency -> USD}|
|{departmentId -> D200, departmentName -> Finance}             |Alice       |E101|{baseSalary -> 1000, currency -> USD}|
|{departmentId -> D300, departmentName -> Sales}               |Bob         |E102|{baseSalary -> 1000, currency -> USD}|
+--------------------------------------------------------------+------------+----+-------------------------------------+

MapType To StructType

You can achieve the inverse with the help of below function.

/**
* Converts map columns to struct and returns the updated dataset
*
* @param sparkSession Spark Session
* @param dataset Input dataset
* @return Dataset with map columns being replaced by struct columns
*/
def convertMapColumnsToStruct(sparkSession: SparkSession, dataset: Dataset[Row]): Dataset[Row] = {
  dataset.schema.fields
    // Filter map columns
    .filter(_.dataType.typeName.equals("map"))
    .foldLeft(dataset) {
      case (intermediateDataset, field) =>
        // Convert column to JSON string
        val jsonColumn = to_json(col(field.name))
        // Compute schema by reading the JSON string so that we can convert to StructType
        val jsonStringDataset = dataset
          .select(jsonColumn)
          .filter(jsonColumn.isNotNull) // Filter "null" to avoid corrupt record
          .as[String](Encoders.STRING)
        val jsonSchema = sparkSession.read
          .json(jsonStringDataset)
          .schema
        // Update column in the intermediate dataset
        intermediateDataset.withColumn(field.name, from_json(jsonColumn, jsonSchema))
    }
}

We can make use of the above function to create a new employeesWithStructType dataset.

val employeesWithStructType = convertMapColumnsToStruct(spark, employeesWithMapType)
employeesWithStructType.printSchema()
root
|-- department: struct (nullable = true)
|    |-- departmentId: string (nullable = true)
|    |-- departmentName: string (nullable = true)
|-- employeeName: string (nullable = true)
|-- id: string (nullable = true)
|-- salary: struct (nullable = true)
|    |-- baseSalary: string (nullable = true)
|    |-- currency: string (nullable = true)
employeesWithStructType.show(false)
+----------------------------+------------+----+-----------+
|department                  |employeeName|id  |salary     |
+----------------------------+------------+----+-----------+
|{D100, Software Engineering}|Ravi        |E100|{1000, USD}|
|{D200, Finance}             |Alice       |E101|{1000, USD}|
|{D300, Sales}               |Bob         |E102|{1000, USD}|
+----------------------------+------------+----+-----------+

Comments

Comments powered by Disqus