Scala Spark application to join CSV files in HDFS and save to Elasticsearch
In this post I’ll share a simple Scala Spark app I used to join CSV tables in HDFS into a nested data structure and save to Elasticsearch. I used the elastic-hadoop library saveToEs method which makes this integration trivial. I wrote this code in OSX and prototyped in Apache Zeppelin. In a subsequent post I’ll share a Docker version.
My environment:
$ java -version
java version "1.8.0_121"
$ brew list --versions | egrep -i "(hadoop|spark|scala|elasticsearch|hive)"
apache-spark 2.1.1
elasticsearch@2.4 2.4.5
hadoop 2.8.0
hive 2.1.1
scala@2.10 2.10.6_1
# environment variables in my ~/.profile
~/.profile
export JAVA_HOME="$(/usr/libexec/java_home)"
export HADOOP_HOME=/usr/local/Cellar/hadoop/2.8.0/libexec
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HIVE_HOME=/usr/local/Cellar/hive/2.1.1
export SPARK_HOME=/usr/local/Cellar/apache-spark/2.1.1/libexec
export PATH="$PATH:/usr/local/opt/scala@2.10/bin"
Modifications I made to my Hadoop configuration:
core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
yarn-site.xml
<?xml version="1.0"?>
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
Start services:
$ brew services start elasticsearch@2.4
$ $HADOOP_HOME/sbin/start-dfs.sh
$ $HADOOP_HOME/sbin/start-yarn.sh
$ jps | grep -v Jps
19681 NodeManager
19331 DataNode
19238 NameNode
19446 SecondaryNameNode
19581 ResourceManager
23549 Elasticsearch
I previously generated some sample CSV data using the faker gem with the following structure. I put these 2 files in HDFS:
$ head users.csv
user_id,first_name,last_name
1,Lukas,Bartoletti
2,Anjali,Leuschke
3,Lois,Hayes
4,Ervin,Zieme
5,Sabina,Hegmann
6,Tavares,Fahey
7,Violet,Zieme
8,Saul,Keebler
9,Haskell,Schaefer
$ head things.csv
thing_id,user_id,thing
1,1,Durable Plastic Shirt
2,1,Heavy Duty Concrete Shirt
3,1,Awesome Marble Lamp
4,1,Sleek Granite Pants
5,1,Synergistic Granite Bottle
6,1,Small Iron Hat
7,1,Awesome Iron Bench
8,1,Incredible Linen Coat
9,1,Awesome Bronze Chair
$ $HADOOP_HOME/bin/hdfs dfs -mkdir -p /user/eric
$ $HADOOP_HOME/bin/hdfs dfs -put users.csv /user/eric/
$ $HADOOP_HOME/bin/hdfs dfs -put things.csv /user/eric/
$ $HADOOP_HOME/bin/hdfs dfs -ls
-rw-r--r-- 1 eric supergroup 372405902 2017-06-29 17:00 things.csv
-rw-r--r-- 1 eric supergroup 21410090 2017-06-29 17:00 users.csv
At this point I was ready to prototype the code in Apache Zeppelin. In Zeppelin I manually loaded the elasticsearch-hadoop dependency via:
%dep
z.load("org.elasticsearch:elasticsearch-hadoop:6.0.0-alpha2")
My sample code: src/main/scala/CsvToElasticsearch.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
import org.elasticsearch.spark.sql._
// TODO: datasets not working at the moment
// case class User(user_id: Int, first_name: String, last_name: String)
// case class Thing(thing_id: Int, user_id: Int, thing: String)
// case class UsersThings(user_id: Int, first_name: String, last_name: String, things: List[Thing])
object CsvToElasticsearch {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("CSV to Elasticsearch")
conf.set("es.index.auto.create", "true")
val sc = new SparkContext(conf)
val sqlc = new org.apache.spark.sql.SQLContext(sc)
import sqlc.implicits._
val baseDir = "/user/eric"
val usersCSVFile = baseDir + "/users.csv"
val thingsCSVFile = baseDir + "/things.csv"
val users = sqlc.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(usersCSVFile)
// .as[User]
val things = sqlc.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(thingsCSVFile)
// .as[Thing]
val users_things = users
.join(things, "user_id")
.groupBy("user_id")
.agg(
first($"first_name").as("first_name"), first($"last_name").as("last_name"),
collect_list(struct("thing_id", "thing")).as("things")
)
users_things.saveToEs("users_things/user_thing")
sc.stop()
}
}
The contents of my build.sbt file:
name := "CSV to Elasticsearch"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.1"
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.0.0-alpha2"
I built the spark application via: sbt package
.
Executing the Spark application using spark-submit:
$SPARK_HOME/bin/spark-submit \
--class "CsvToElasticsearch" \
--packages org.elasticsearch:elasticsearch-hadoop:6.0.0-alpha2 \
--master local[4] \
target/scala-2.10/csv-to-elasticsearch_2.10-1.0.jar
Once I saw the job had started I queried Elasticsearch to verify the nested structure:
$ curl 'http://localhost:9200/users_things/user_thing/_search?size=1&sort=user_id&pretty'
{
"took" : 37,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 1000000,
"max_score" : null,
"hits" : [ {
"_index" : "users_things",
"_type" : "user_thing",
"_id" : "AV0JbhUhDqMZg0qX7Eh9",
"_score" : null,
"_source" : {
"user_id" : 1,
"first_name" : "Lukas",
"last_name" : "Bartoletti",
"things" : [ {
"thing_id" : 1,
"thing" : "Durable Plastic Shirt"
}, {
"thing_id" : 2,
"thing" : "Heavy Duty Concrete Shirt"
}, {
"thing_id" : 3,
"thing" : "Awesome Marble Lamp"
}, {
"thing_id" : 4,
"thing" : "Sleek Granite Pants"
}, {
"thing_id" : 5,
"thing" : "Synergistic Granite Bottle"
}, {
"thing_id" : 6,
"thing" : "Small Iron Hat"
}, {
"thing_id" : 7,
"thing" : "Awesome Iron Bench"
}, {
"thing_id" : 8,
"thing" : "Incredible Linen Coat"
}, {
"thing_id" : 9,
"thing" : "Awesome Bronze Chair"
}, {
"thing_id" : 10,
"thing" : "Small Iron Coat"
} ]
},
"sort" : [ 1 ]
} ]
}
}