Usually when I want to convert a JSON file to a CSV I will write a simple script in PHP. Lately I've been playing more with Apache Spark and wanted to try converting a 600MB JSON file to a CSV using a 3 node cluster I have setup. The JSON file itself contains a nested structure so it took a little fiddling to get it right, but overall I'm impressed with the speed of the execution.

So I decided to take the JSON data and put it on the HDFS (Hadoop Filesystem). My setup consists of 3 RHEL 7 boxes running Spark and Hadoop in cluster mode.

So I uploaded some json file containing a bunch of keyword data to my home folder (/home/tegan). The ran the following to move it to HDFS.

dzdo -s
hdfs dfs -mkdir /keywordData
hdfs dfs -put /tegan/keywordData.json /keywordData
# verify the folder shows up
hdfs dfs -ls
#verify the file shows up
hdfs dfs -ls /keywordData

Ok then I decided to try a spark-shell and write some scala to convert the JSON to CSV:

val json_path = "hdfs:///keywordData/keywordData.json"
val df = spark.read.json(json_path)

df.write
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("keywordData.csv")

Executing that on the spark-shell caused a java heap error with memory:

17/02/07 16:00:33 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space

I thought this would be an issue with such a large JSON file, luckily Spark's ability to cluster a job can solve my memory issue. Instead of running this on one server, I can distribute it out to my other worker nodes by compiling the Scala code to a JAR.

To setup the structure of the Scala project i did the following:

mkdir -p /home/tegan/KeywordData/src/main/scala/com/tegan/spark/keyworddata/
cd /home/tegan/KeywordData/src/main/scala/com/tegab/spark/keyworddata/

Then in the folder I created a KeywordData.scala file with the following in it:

vi /home/tegan/KeywordData/src/main/scala/com/tegab/spark/keyworddata/KeywordData.scala
package com.tegan.spark.keyworddata
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SparkSession

object KeywordData {
    def main(args: Array[String]) {

        val sparkConf = new SparkConf().setAppName("Keyword Data")
        val sc = new SparkContext(sparkConf)
        val sparkSession = SparkSession.builder.getOrCreate()
        import sparkSession.implicits._

        val json_path = "hdfs://hadoop-master:9000/keywordData/keywordData.json"
        val df = sparkSession.read.json(json_path)

        df.write
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .save("hdfs://hadoop-master:9000/keywordData/keywordData.csv")
    }
}

To compile the Scala above into a JAR I'm using SBT which requires a project file. This is created by switching directories to /home/tegan/KeywordData/ and creating a KeywordData.sbt file.

cd /home/tegan/KeywordData
vi ExternalSearch.sbt
name := "KeywordData"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.0.2",
  "org.apache.spark" %% "spark-sql" % "2.0.2"
)

Then compile the Scala project to a JAR with:

sbt package

When you are ready to submit the submit to Spark issue:

/opt/spark/bin/spark-submit \
 --class com.tegan.spark.keyworddata.KeywordData \
 --master spark://spark-master:7077 \
 --deploy-mode=client \
 /home/tegan/KeywordData/target/scala-2.11/externalsearch_2.11-1.0.jar

It starts to run for a bit then I get this error:

Exception in thread "main" java.lang.UnsupportedOperationException: CSV data source does not support array

I'm guessing it is because the JSON data contains a nested format. Here is an example of what it looks like:

 {
    "keyword_data": [{
        "value": "some keyword term here",
        "type": "Keyword Phrase",
        "data_points": [{
            "name": "Sessions",
            "value": 173628
        }, {
            "name": "Users",
            "value": 158454
        }, {
            "name": "Views",
            "value": 221868
        }]
    },{
        "value": "another keyword term here",
        "type": "Keyword Phrase",
        "data_points": [{
            "name": "Sessions",
            "value": 32432
        }, {
            "name": "Users",
            "value": 2333
        }, {
            "name": "Views",
            "value": 3332111
        }]
    }]
}

So a little altercations to the Scala code to read these nested values into a data frame looks like this:

package com.tegan.spark.keyworddata
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SparkSession

object ExternalSearch {
    def main(args: Array[String]) {

        val sparkConf = new SparkConf().setAppName("Keyword Data")
        val sc = new SparkContext(sparkConf)
        val spark = SparkSession.builder.getOrCreate()
        import spark.implicits._

        val json_path = "hdfs://hadoop-master:9000/keywordData/keywordData.json"
        val df = spark.read.json(json_path)
        val df_2 = spark.read.json(json_path)

        var xp_df_1 = df.withColumn("term_flat", explode(df("keyword_data")))
        var xp_df_2 = xp_df_1.drop(xp_df_1.col("keyword_data"))

        var xp_df_data_points = xp_df_2.withColumn("data_points", xp_df_2("term_flat.data_points"))

        var xp_df_name = xp_df_data_points.withColumn("m_guid", xp_df_data_points("measure.name"))
        var xp_df_name_val = xp_df_name.withColumn("m_name", xp_df_name("measure.value"))

        var xp_df_final = xp_df_name_val.drop(xp_df_name_val.col("term_flat"))
        var final_df = xp_df_final.drop(xp_df_final.col("data_points"))

        final_df.write
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .save("hdfs://hadoop-master:9000/keywordData/keywordData.csv")
    }
}

Recompile the new Scala code to a JAR using "sbt package" then submit it again and it should run, placing the final results in the HDFS location called out in the write option above.

The more I play with data, building ETL pipelines, working on marketing data applications, and working hand in hand with sales data, the more power I see to developing a good understanding of tools like Apache Spark, Airbnb's Airflow, and Elastic Stack. More to come!