In this post I’ll share a simple Scala Spark app I used to join CSV tables in HDFS into a nested data structure and save to Elasticsearch. I used the elastic-hadoop library saveToEs method which makes this integration trivial. I wrote this code in OSX and prototyped in Apache Zeppelin . In a subsequent post I’ll share a Docker version.
My environment:
$ java -version
java version "1.8.0_121"
$ brew list --versions | egrep -i "(hadoop|spark|scala|elasticsearch|hive)"
apache-spark 2.1.1
elasticsearch@2.4 2.4.5
hadoop 2.8.0
hive 2.1.1
scala@2.10 2.10.6_1
# environment variables in my ~/.profile
~/.profile
export JAVA_HOME = " $( /usr/libexec/java_home) "
export HADOOP_HOME = /usr/local/Cellar/hadoop/2.8.0/libexec
export HADOOP_CONF_DIR = $HADOOP_HOME /etc/hadoop
export HIVE_HOME = /usr/local/Cellar/hive/2.1.1
export SPARK_HOME = /usr/local/Cellar/apache-spark/2.1.1/libexec
export PATH = " $PATH :/usr/local/opt/scala@2.10/bin"
Modifications I made to my Hadoop configuration:
core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name> fs.defaultFS</name>
<value> hdfs://localhost:9000</value>
</property>
</configuration>
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name> dfs.replication</name>
<value> 1</value>
</property>
</configuration>
mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name> mapreduce.framework.name</name>
<value> yarn</value>
</property>
</configuration>
yarn-site.xml
<?xml version="1.0"?>
<configuration>
<property>
<name> yarn.nodemanager.aux-services</name>
<value> mapreduce_shuffle</value>
</property>
</configuration>
Start services:
$ brew services start elasticsearch@2.4
$ $HADOOP_HOME /sbin/start-dfs.sh
$ $HADOOP_HOME /sbin/start-yarn.sh
$ jps | grep -v Jps
19681 NodeManager
19331 DataNode
19238 NameNode
19446 SecondaryNameNode
19581 ResourceManager
23549 Elasticsearch
I previously generated some sample CSV data using the faker gem with the following structure. I put these 2 files in HDFS:
$ head users.csv
user_id,first_name,last_name
1,Lukas,Bartoletti
2,Anjali,Leuschke
3,Lois,Hayes
4,Ervin,Zieme
5,Sabina,Hegmann
6,Tavares,Fahey
7,Violet,Zieme
8,Saul,Keebler
9,Haskell,Schaefer
$ head things.csv
thing_id,user_id,thing
1,1,Durable Plastic Shirt
2,1,Heavy Duty Concrete Shirt
3,1,Awesome Marble Lamp
4,1,Sleek Granite Pants
5,1,Synergistic Granite Bottle
6,1,Small Iron Hat
7,1,Awesome Iron Bench
8,1,Incredible Linen Coat
9,1,Awesome Bronze Chair
$ $HADOOP_HOME /bin/hdfs dfs -mkdir -p /user/eric
$ $HADOOP_HOME /bin/hdfs dfs -put users.csv /user/eric/
$ $HADOOP_HOME /bin/hdfs dfs -put things.csv /user/eric/
$ $HADOOP_HOME /bin/hdfs dfs -ls
-rw-r--r-- 1 eric supergroup 372405902 2017-06-29 17:00 things.csv
-rw-r--r-- 1 eric supergroup 21410090 2017-06-29 17:00 users.csv
At this point I was ready to prototype the code in Apache Zeppelin. In Zeppelin I manually loaded the elasticsearch-hadoop dependency via:
%dep
z.load( "org.elasticsearch:elasticsearch-hadoop:6.0.0-alpha2" )
My sample code: src/main/scala/CsvToElasticsearch.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
import org.elasticsearch.spark.sql._
// TODO: datasets not working at the moment
// case class User(user_id: Int, first_name: String, last_name: String)
// case class Thing(thing_id: Int, user_id: Int, thing: String)
// case class UsersThings(user_id: Int, first_name: String, last_name: String, things: List[Thing])
object CsvToElasticsearch {
def main ( args : Array [ String ]) {
val conf = new SparkConf (). setAppName ( "CSV to Elasticsearch" )
conf . set ( "es.index.auto.create" , "true" )
val sc = new SparkContext ( conf )
val sqlc = new org . apache . spark . sql . SQLContext ( sc )
import sqlc.implicits._
val baseDir = "/user/eric"
val usersCSVFile = baseDir + "/users.csv"
val thingsCSVFile = baseDir + "/things.csv"
val users = sqlc . read . format ( "com.databricks.spark.csv" )
. option ( "header" , "true" )
. option ( "inferSchema" , "true" )
. load ( usersCSVFile )
// .as[User]
val things = sqlc . read . format ( "com.databricks.spark.csv" )
. option ( "header" , "true" )
. option ( "inferSchema" , "true" )
. load ( thingsCSVFile )
// .as[Thing]
val users_things = users
. join ( things , "user_id" )
. groupBy ( "user_id" )
. agg (
first ( $ "first_name" ). as ( "first_name" ), first ( $ "last_name" ). as ( "last_name" ),
collect_list ( struct ( "thing_id" , "thing" )). as ( "things" )
)
users_things . saveToEs ( "users_things/user_thing" )
sc . stop ()
}
}
The contents of my build.sbt file:
name := "CSV to Elasticsearch"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.1"
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "6.0.0-alpha2"
I built the spark application via: sbt package
.
Executing the Spark application using spark-submit:
$SPARK_HOME /bin/spark-submit \
--class "CsvToElasticsearch" \
--packages org.elasticsearch:elasticsearch-hadoop:6.0.0-alpha2 \
--master local [ 4] \
target/scala-2.10/csv-to-elasticsearch_2.10-1.0.jar
Once I saw the job had started I queried Elasticsearch to verify the nested structure:
$ curl 'http://localhost:9200/users_things/user_thing/_search?size=1&sort=user_id&pretty'
{
"took" : 37,
"timed_out" : false ,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
} ,
"hits" : {
"total" : 1000000,
"max_score" : null,
"hits" : [ {
"_index" : "users_things" ,
"_type" : "user_thing" ,
"_id" : "AV0JbhUhDqMZg0qX7Eh9" ,
"_score" : null,
"_source" : {
"user_id" : 1,
"first_name" : "Lukas" ,
"last_name" : "Bartoletti" ,
"things" : [ {
"thing_id" : 1,
"thing" : "Durable Plastic Shirt"
} , {
"thing_id" : 2,
"thing" : "Heavy Duty Concrete Shirt"
} , {
"thing_id" : 3,
"thing" : "Awesome Marble Lamp"
} , {
"thing_id" : 4,
"thing" : "Sleek Granite Pants"
} , {
"thing_id" : 5,
"thing" : "Synergistic Granite Bottle"
} , {
"thing_id" : 6,
"thing" : "Small Iron Hat"
} , {
"thing_id" : 7,
"thing" : "Awesome Iron Bench"
} , {
"thing_id" : 8,
"thing" : "Incredible Linen Coat"
} , {
"thing_id" : 9,
"thing" : "Awesome Bronze Chair"
} , {
"thing_id" : 10,
"thing" : "Small Iron Coat"
} ]
} ,
"sort" : [ 1 ]
} ]
}
}
Source code on Github