In this post, I’ll demonstrate how to run Apache Spark , Hadoop , and Scala locally (on OS X) and prototype Spark/Scala/SQL code in Apache Zeppelin . The goal of this exercise is to connect to Postgresql from Zeppelin, populate two tables with sample data, join them together, and export the results to separate CSV files (by primary key).
I installed Java JDK from Orcacle; Spark, Hadoop, Postgresql, and Scala via homebrew; and downloaded Apache Zeppelin manually.
# show java version
java -version
java version "1.8.0_241"
Java( TM) SE Runtime Environment ( build 1.8.0_241-b07)
Java HotSpot( TM) 64-Bit Server VM ( build 25.241-b07, mixed mode)
# show installed brew versions
brew list --versions | egrep -i "(hadoop|spark|scala|zeppelin|postgresql)"
apache-spark 2.4.4
apache-zeppelin 0.8.1
hadoop 3.2.1
postgresql 12.1
scala 2.13.1
# environment variables added to ~/.profile
export JAVA_HOME = " $( /usr/libexec/java_home) "
export HADOOP_HOME = /usr/local/Cellar/hadoop/3.2.1/libexec
export HADOOP_MAPRED_HOME = /usr/local/Cellar/hadoop/3.2.1/libexec
export HADOOP_CONF_DIR = $HADOOP_HOME /etc/hadoop
export SPARK_HOME = /usr/local/Cellar/apache-spark/2.4.4/libexec
export ZEPPELIN_HOME = /Users/eric/Documents/code/spark/zeppelin/zeppelin-0.8.2-bin-all
export PATH = " $PATH :/usr/local/opt/scala/bin"
Part 1: Hadoop Setup
Ensure you can ssh to localhost without a password.
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
ssh localhost
Changes I made to Hadoop configuration files, located in $HADOOP_CONF_DIR
core-site.xml
<configuration>
<property>
<name> fs.defaultFS</name>
<value> hdfs://localhost:9000</value>
</property>
</configuration>
hdfs-site.xml
<configuration>
<property>
<name> dfs.replication</name>
<value> 1</value>
</property>
</configuration>
mapred-site.xml
<configuration>
<property>
<name> mapreduce.framework.name</name>
<value> yarn</value>
</property>
<property>
<name> mapreduce.application.classpath</name>
<value> $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<property>
<name> yarn.nodemanager.aux-services</name>
<value> mapreduce_shuffle</value>
</property>
<property>
<name> yarn.nodemanager.env-whitelist</name>
<value> JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>
Prepare HDFS and start Hadoop services
# format HDFS filesystem
$HADOOP_HOME /bin/hdfs namenode -format
# Start NameNode daemon and DataNode daemon
$HADOOP_HOME /sbin/start-dfs.sh
# Ensure Java processes are running
jps | grep -v Jps
64099 NameNode
64420 SecondaryNameNode
64222 DataNode
# browse to NameNode
http://localhost:9870
# Start ResourceManager daemon and NodeManager daemon
$HADOOP_HOME /sbin/start-yarn.sh
# Ensure Java processes are running
jps | grep -v Jps
64993 ResourceManager
64099 NameNode
64420 SecondaryNameNode
65111 NodeManager
64222 DataNode
# browse to ResourceManager
http://localhost:8088/
Test HDFS, Hadoop, MapReduce:
# make HDFS directories to execute MadReduce jobs
$HADOOP_HOME /bin/hdfs dfs -mkdir /user
$HADOOP_HOME /bin/hdfs dfs -mkdir /user/$( whoami )
# copy some test files
$HADOOP_HOME /bin/hdfs dfs -mkdir input
$HADOOP_HOME /bin/hdfs dfs -put $HADOOP_HOME /etc/hadoop/* .xml input
# run example
$HADOOP_HOME /bin/hadoop jar $HADOOP_HOME /share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar grep input output 'dfs[a-z.]+'
# inspect results
$HADOOP_HOME /bin/hdfs dfs -cat output/part-r-00000
1 dfsadmin
1 dfs.replication
Part 2: Postgresql Setup
Ensure Postgresql is running
brew services list | egrep -i "(^Name|postgresql)"
Name Status User Plist
postgresql started eric /Users/eric/Library/LaunchAgents/homebrew.mxcl.postgresql.plist
Create a Postgresql database and user for Spark development
createdb spark_data
createuser spark_data
psql spark_data
> alter user spark_data with encrypted password 'spark_data' ;
> grant all privileges on database spark_data to spark_data;
Part 3: Apache Zeppelin Setup
I encountered an issue installing Apache Zeppelin via homebrew, so I manually downloaded the full package.
# browse to https://zeppelin.apache.org/download.html, download zeppelin-0.8.2-bin-all.tgz
# extract
tar -xzf zeppelin-0.8.2-bin-all.tgz
cd zeppelin-0.8.2-bin-all
# ensure ENV var is set to the extracted path, example:
export ZEPPELIN_HOME = /Users/eric/Documents/code/spark/zeppelin/zeppelin-0.8.2-bin-all
# start Apache Zeppelin
./bin/zeppelin-daemon.sh start
# browse to web interface:
http://localhost:8080/#/
Part 4: Scala and Spark development via Zeppelin
I added my postgresql credentials for the jdbc interpreter
Browse to:
http://localhost:8080/#/interpreter
Set:
default.driver: default.driver
default.password: spark_data
default.url: jdbc:postgresql://localhost:5432/
default.user: spark_data
I added a new Zeppelin Notebook (with default interpreter: spark/scala) and began adding paragraphs.
In my first paragraph, I included the postgresql JDBC driver jar, ex:
% spark . conf
spark . jars / Users / eric / Documents / code / spark / zeppelin / zeppelin - 0.8 . 2 - bin - all / interpreter / jdbc / postgresql - 9.4 - 1201 - jdbc41 . jar
Ensure the Postgresql driver is loaded
% spark
Class . forName ( "org.postgresql.Driver" )
res0 : Class [ _ ] = class org . postgresql . Driver
Create 2 Postgresql tables which I plan to populate and join for demonstration purposes
% jdbc
CREATE TABLE accounts (
id serial PRIMARY KEY ,
name VARCHAR ( 50 ) UNIQUE NOT NULL
);
CREATE TABLE reports (
id serial PRIMARY KEY ,
account_id integer NOT NULL ,
name VARCHAR ( 50 ) NOT NULL
);
Define credentials in Scala code and create an initial JDBC DataFrame connection variable for reading and writing
% spark
val jdbcUrl = "jdbc:postgresql://localhost:5432/spark_data"
val jdbcDriver = "org.postgresql.Driver"
val jdbcUser = "spark_data"
val jdbcPassword = "spark_data"
import java.util.Properties
val jdbcProps = new Properties ()
jdbcProps . setProperty ( "driver" , jdbcDriver )
jdbcProps . setProperty ( "user" , jdbcUser )
jdbcProps . setProperty ( "password" , jdbcPassword )
val dfReader = spark . read . format ( "jdbc" )
. option ( "driver" , jdbcDriver )
. option ( "url" , jdbcUrl )
. option ( "user" , jdbcUser )
. option ( "password" , jdbcPassword )
Create a list of Account names as a DataFrame
% spark
val accountNamesDf = Range ( 0 , 10 )
. map ( "account" + _ . toString ())
. toDF ( "name" )
// inspect:
accountsDf . show ()
+--------+
| name |
+--------+
| account0 |
| account1 |
| account2 |
| account3 |
| account4 |
| account5 |
| account6 |
| account7 |
| account8 |
| account9 |
+--------+
Write Accounts DataFrame to Postgresql table
% spark
accountsDf
. write
. mode ( "append" )
. jdbc ( jdbcUrl , "accounts" , jdbcProps )
Load Accounts (with IDs) into a new DataFrame
% spark
val accountsDf = dfReader
. option ( "query" , "select id, name from accounts" )
. load ()
// inspect
accountsDf . show ()
+---+--------+
| id | name |
+---+--------+
| 5 | account0 |
| 8 | account5 |
| 7 | account7 |
| 10 | account4 |
| 9 | account1 |
| 4 | account3 |
| 2 | account9 |
| 6 | account2 |
| 3 | account8 |
| 1 | account6 |
+---+--------+
Collect a list of Account IDs to populate foreign keys in the the other table
% spark
val accountIds = accountsDf
. select ( "id" )
. map ( _ . getInt ( 0 ))
. collect ()
// accountIds: Array[Int] = Array(5, 8, 7, 10, 9, 4, 2, 6, 3, 1)
Create a function to randomly select an Account ID
% spark
import scala.util.Random
val random = new Random
def randomAccountId () : Int = accountIds ( random . nextInt ( accountIds . length ))
randomAccountId ()
// res0: Int = 10
Create a list of Report names and a function to select a random name
% spark
val reportNames = Range ( 0 , 1000 )
. map ( "report" + _ . toString ())
. toList
def randomReportName () : String = reportNames ( random . nextInt ( reportNames . length ))
randomReportName ()
// res0: String = report167
Generate a million row DataFrame containing randomized Account IDs and Report names to populate the Reports table
% spark
val accountIdReportNameDf = sc . parallelize (
Seq . fill ( 1000000 ){( randomAccountId , randomReportName )}
). toDF ( "account_id" , "name" )
// inspect
accountIdReportNameDf . show ()
+----------+---------+
| account_id | name |
+----------+---------+
| 8 | report415 |
| 9 | report585 |
| 4 | report818 |
| 8 | report938 |
| 8 | report962 |
| 1 | report324 |
| 8 | report169 |
| 3 | report624 |
| 8 | report712 |
| 1 | report92 |
| 6 | report56 |
| 8 | report115 |
| 6 | report67 |
| 2 | report359 |
| 1 | report556 |
| 7 | report829 |
| 6 | report56 |
| 2 | report620 |
| 6 | report623 |
| 9 | report940 |
+----------+---------+
only showing top 20 rows
Write DataFrame rows to the Postgresql Reports table
% spark
accountIdReportNameDf
. write
. mode ( "append" )
. jdbc ( jdbcUrl , "reports" , jdbcProps )
Load the Reports table into a DataFrame
% spark
val reportsDf = dfReader
. option ( "query" , "select id, account_id, name from reports" )
. load ()
// inspect
reportsDf . show ()
+---+----------+---------+
| id | account_id | name |
+---+----------+---------+
| 4 | 7 | report47 |
| 5 | 6 | report620 |
| 6 | 6 | report375 |
| 8 | 2 | report724 |
| 12 | 2 | report350 |
| 14 | 9 | report27 |
| 15 | 3 | report965 |
| 16 | 3 | report447 |
| 17 | 1 | report817 |
| 18 | 1 | report670 |
| 20 | 8 | report643 |
| 21 | 4 | report130 |
| 22 | 1 | report832 |
| 23 | 3 | report863 |
| 24 | 9 | report1 |
| 13 | 8 | report450 |
| 26 | 6 | report716 |
| 27 | 2 | report808 |
| 28 | 3 | report56 |
| 9 | 8 | report415 |
+---+----------+---------+
only showing top 20 rows
Using Spark/SQL to join the tables together
% spark
dfReader
. option ( "query" , "select accounts.*, reports.name as report_name from accounts join reports on reports.account_id = accounts.id" )
. load ()
. show ()
+---+--------+-----------+
| id | name | report_name |
+---+--------+-----------+
| 7 | account7 | report47 |
| 6 | account2 | report620 |
| 6 | account2 | report375 |
| 2 | account9 | report724 |
| 2 | account9 | report350 |
| 9 | account1 | report27 |
| 3 | account8 | report965 |
| 3 | account8 | report447 |
| 1 | account6 | report817 |
| 1 | account6 | report670 |
| 8 | account5 | report643 |
| 4 | account3 | report130 |
| 1 | account6 | report832 |
| 3 | account8 | report863 |
| 9 | account1 | report1 |
| 8 | account5 | report450 |
| 6 | account2 | report716 |
| 2 | account9 | report808 |
| 3 | account8 | report56 |
| 8 | account5 | report415 |
+---+--------+-----------+
only showing top 20 rows
Using Spark/Scala to join the tables together
% spark
accountsDf
. join ( reportsDf , accountsDf ( "id" ) <=> reportsDf ( "account_id" ))
. select ( accountsDf ( "id" ), accountsDf ( "name" ), reportsDf ( "name" ). as ( "report_name" ))
. show ()
+---+--------+-----------+
| id | name | report_name |
+---+--------+-----------+
| 1 | account6 | report817 |
| 1 | account6 | report670 |
| 1 | account6 | report832 |
| 1 | account6 | report52 |
| 1 | account6 | report832 |
| 1 | account6 | report291 |
| 1 | account6 | report866 |
| 1 | account6 | report191 |
| 1 | account6 | report715 |
| 1 | account6 | report174 |
| 1 | account6 | report220 |
| 1 | account6 | report488 |
| 1 | account6 | report133 |
| 1 | account6 | report534 |
| 1 | account6 | report166 |
| 1 | account6 | report625 |
| 1 | account6 | report475 |
| 1 | account6 | report249 |
| 1 | account6 | report252 |
| 1 | account6 | report708 |
+---+--------+-----------+
only showing top 20 rows
Showing an aggregate count of Report records per Account
% spark
accountsDf
. join ( reportsDf , accountsDf ( "id" ) <=> reportsDf ( "account_id" ))
. groupBy ( accountsDf ( "id" ))
. agg ( count ( accountsDf ( "id" )))
. show ()
+---+---------+
| id | count ( id )|
+---+---------+
| 1 | 100322 |
| 6 | 99479 |
| 3 | 99708 |
| 5 | 99493 |
| 9 | 100588 |
| 4 | 99921 |
| 8 | 100353 |
| 7 | 99924 |
| 10 | 100128 |
| 2 | 100084 |
+---+---------+
Joining and counting the records
% spark
accountsDf
. join ( reportsDf , accountsDf ( "id" ) <=> reportsDf ( "account_id" ))
. count ()
// res0: Long = 1000000
For the final Spark operation, write out a CSV file to HDFS containing Report data for each Account.
% spark
val dfWriteOptions : Map [ String ,String ] =
Map [ String ,String ](
"header" -> "true" ,
"quoteAll" -> "true" ,
"escape" -> "\"" )
accountsDf
. join ( reportsDf , accountsDf ( "id" ) <=> reportsDf ( "account_id" ))
. select ( accountsDf ( "id" ). as ( "account_id" ), accountsDf ( "name" ). as ( "account_name" ), reportsDf ( "id" ). as ( "report_id" ), reportsDf ( "name" ). as ( "report_name" ))
. write
. mode ( "Overwrite" )
. options ( dfWriteOptions )
. partitionBy (( accountsDf ( "id" ))
. csv ( "export" )
Inspect CSV files in HDFS
$HADOOP_HOME /bin/hdfs dfs -ls export
Found 12 items
-rw-r--r-- 1 eric supergroup 0 2020-02-01 19:56 export /_SUCCESS
-rw-r--r-- 1 eric supergroup 0 2020-02-01 19:56 export /part-00000-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r-- 1 eric supergroup 3589716 2020-02-01 19:56 export /part-00043-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r-- 1 eric supergroup 3559231 2020-02-01 19:56 export /part-00049-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r-- 1 eric supergroup 3567532 2020-02-01 19:56 export /part-00051-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r-- 1 eric supergroup 3559809 2020-02-01 19:56 export /part-00066-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r-- 1 eric supergroup 3599014 2020-02-01 19:56 export /part-00089-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r-- 1 eric supergroup 3575171 2020-02-01 19:56 export /part-00102-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r-- 1 eric supergroup 3590569 2020-02-01 19:56 export /part-00103-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r-- 1 eric supergroup 3575201 2020-02-01 19:56 export /part-00107-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r-- 1 eric supergroup 3682807 2020-02-01 19:56 export /part-00122-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r-- 1 eric supergroup 3580827 2020-02-01 19:56 export /part-00174-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
Copy HDFS CSV files to local filesystem
$HADOOP_HOME /bin/hdfs dfs -copyToLocal export ./
Count records in each CSV file (the extra ten rows are the CSV headers)
wc -l export /* .csv
0 export /part-00000-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
100323 export /part-00043-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
99480 export /part-00049-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
99709 export /part-00051-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
99494 export /part-00066-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
100589 export /part-00089-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
99922 export /part-00102-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
100354 export /part-00103-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
99925 export /part-00107-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
100129 export /part-00122-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
100085 export /part-00174-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
1000010 total
Ensuring each CSV file was partitioned by Account
egrep -io "account \d +" export /* .csv | uniq
export /part-00043-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account6
export /part-00049-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account2
export /part-00051-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account8
export /part-00066-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account0
export /part-00089-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account1
export /part-00102-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account3
export /part-00103-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account5
export /part-00107-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account7
export /part-00122-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account4
export /part-00174-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account9
Inspecting the contents of a CSV file
head export /part-00174-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
"account_id" ,"account_name" ,"report_id" ,"report_name"
"2" ,"account9" ,"8" ,"report724"
"2" ,"account9" ,"12" ,"report350"
"2" ,"account9" ,"27" ,"report808"
"2" ,"account9" ,"46" ,"report816"
"2" ,"account9" ,"1" ,"report97"
"2" ,"account9" ,"39" ,"report991"
"2" ,"account9" ,"41" ,"report496"
"2" ,"account9" ,"72" ,"report492"
"2" ,"account9" ,"61" ,"report602"