Scala Spark application to join CSV files in HDFS and save to Elasticsearch

In this post I’ll share a simple Scala Spark app I used to join CSV tables in HDFS into a nested data structure and save to Elasticsearch. I used the elastic-hadoop library saveToEs method which makes this integration trivial. I wrote this code in OSX and prototyped in Apache Zeppelin. In a subsequent post I’ll share a Docker version.

My environment:

$ java -version
java version "1.8.0_121"

$ brew list --versions | egrep -i "(hadoop|spark|scala|elasticsearch|hive)"
apache-spark 2.1.1
elasticsearch@2.4 2.4.5
hadoop 2.8.0
hive 2.1.1
scala@2.10 2.10.6_1

# environment variables in my ~/.profile
~/.profile
export JAVA_HOME="$(/usr/libexec/java_home)"
export HADOOP_HOME=/usr/local/Cellar/hadoop/2.8.0/libexec
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HIVE_HOME=/usr/local/Cellar/hive/2.1.1
export SPARK_HOME=/usr/local/Cellar/apache-spark/2.1.1/libexec
export PATH="$PATH:/usr/local/opt/scala@2.10/bin"

Modifications I made to my Hadoop configuration:

core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
</configuration>

mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

yarn-site.xml

<?xml version="1.0"?>
<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
</configuration>

Start services:

$ brew services start elasticsearch@2.4
$ $HADOOP_HOME/sbin/start-dfs.sh
$ $HADOOP_HOME/sbin/start-yarn.sh

$ jps | grep -v Jps
19681 NodeManager
19331 DataNode
19238 NameNode
19446 SecondaryNameNode
19581 ResourceManager
23549 Elasticsearch

I previously generated some sample CSV data using the faker gem with the following structure. I put these 2 files in HDFS:

$ head users.csv
user_id,first_name,last_name
1,Lukas,Bartoletti
2,Anjali,Leuschke
3,Lois,Hayes
4,Ervin,Zieme
5,Sabina,Hegmann
6,Tavares,Fahey
7,Violet,Zieme
8,Saul,Keebler
9,Haskell,Schaefer

$ head things.csv
thing_id,user_id,thing
1,1,Durable Plastic Shirt
2,1,Heavy Duty Concrete Shirt
3,1,Awesome Marble Lamp
4,1,Sleek Granite Pants
5,1,Synergistic Granite Bottle
6,1,Small Iron Hat
7,1,Awesome Iron Bench
8,1,Incredible Linen Coat
9,1,Awesome Bronze Chair

$ $HADOOP_HOME/bin/hdfs dfs -mkdir -p /user/eric
$ $HADOOP_HOME/bin/hdfs dfs -put users.csv /user/eric/
$ $HADOOP_HOME/bin/hdfs dfs -put things.csv /user/eric/

$ $HADOOP_HOME/bin/hdfs dfs -ls
-rw-r--r--   1 eric supergroup  372405902 2017-06-29 17:00 things.csv
-rw-r--r--   1 eric supergroup   21410090 2017-06-29 17:00 users.csv

At this point I was ready to prototype the code in Apache Zeppelin. In Zeppelin I manually loaded the elasticsearch-hadoop dependency via:

%dep
z.load("org.elasticsearch:elasticsearch-hadoop:6.0.0-alpha2")

My sample code: src/main/scala/CsvToElasticsearch.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
import org.elasticsearch.spark.sql._

// TODO: datasets not working at the moment
// case class User(user_id: Int, first_name: String, last_name: String)
// case class Thing(thing_id: Int, user_id: Int, thing: String)
// case class UsersThings(user_id: Int, first_name: String, last_name: String, things: List[Thing])

object CsvToElasticsearch {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("CSV to Elasticsearch")

    conf.set("es.index.auto.create", "true")

    val sc = new SparkContext(conf)
    val sqlc = new org.apache.spark.sql.SQLContext(sc)
    import sqlc.implicits._

    val baseDir = "/user/eric"
    val usersCSVFile = baseDir + "/users.csv"
    val thingsCSVFile = baseDir + "/things.csv"

    val users = sqlc.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(usersCSVFile)
      // .as[User]

    val things = sqlc.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(thingsCSVFile)
      // .as[Thing]

    val users_things = users
      .join(things, "user_id")
      .groupBy("user_id")
      .agg(
        first($"first_name").as("first_name"), first($"last_name").as("last_name"),
        collect_list(struct("thing_id", "thing")).as("things")
      )

    users_things.saveToEs("users_things/user_thing")

    sc.stop()
  }
}

The contents of my build.sbt file:

name := "CSV to Elasticsearch"

version := "1.0"

scalaVersion := "2.10.6"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.1"
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.0.0-alpha2"

I built the spark application via: sbt package.

Executing the Spark application using spark-submit:

$SPARK_HOME/bin/spark-submit \
  --class "CsvToElasticsearch" \
  --packages org.elasticsearch:elasticsearch-hadoop:6.0.0-alpha2 \
  --master local[4] \
  target/scala-2.10/csv-to-elasticsearch_2.10-1.0.jar

Once I saw the job had started I queried Elasticsearch to verify the nested structure:

$ curl 'http://localhost:9200/users_things/user_thing/_search?size=1&sort=user_id&pretty'
{
  "took" : 37,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1000000,
    "max_score" : null,
    "hits" : [ {
      "_index" : "users_things",
      "_type" : "user_thing",
      "_id" : "AV0JbhUhDqMZg0qX7Eh9",
      "_score" : null,
      "_source" : {
        "user_id" : 1,
        "first_name" : "Lukas",
        "last_name" : "Bartoletti",
        "things" : [ {
          "thing_id" : 1,
          "thing" : "Durable Plastic Shirt"
        }, {
          "thing_id" : 2,
          "thing" : "Heavy Duty Concrete Shirt"
        }, {
          "thing_id" : 3,
          "thing" : "Awesome Marble Lamp"
        }, {
          "thing_id" : 4,
          "thing" : "Sleek Granite Pants"
        }, {
          "thing_id" : 5,
          "thing" : "Synergistic Granite Bottle"
        }, {
          "thing_id" : 6,
          "thing" : "Small Iron Hat"
        }, {
          "thing_id" : 7,
          "thing" : "Awesome Iron Bench"
        }, {
          "thing_id" : 8,
          "thing" : "Incredible Linen Coat"
        }, {
          "thing_id" : 9,
          "thing" : "Awesome Bronze Chair"
        }, {
          "thing_id" : 10,
          "thing" : "Small Iron Coat"
        } ]
      },
      "sort" : [ 1 ]
    } ]
  }
}

Source code on Github

Updated: