Running Apache Spark and Hadoop locally on OSX, prototyping Scala code in Apache Zeppelin, and working with Postgresql/CSV/HDFS data

In this post, I’ll demonstrate how to run Apache Spark, Hadoop, and Scala locally (on OS X) and prototype Spark/Scala/SQL code in Apache Zeppelin. The goal of this exercise is to connect to Postgresql from Zeppelin, populate two tables with sample data, join them together, and export the results to separate CSV files (by primary key).

I installed Java JDK from Orcacle; Spark, Hadoop, Postgresql, and Scala via homebrew; and downloaded Apache Zeppelin manually.

# show java version
java -version
java version "1.8.0_241"
Java(TM) SE Runtime Environment (build 1.8.0_241-b07)
Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)

# show installed brew versions
brew list --versions | egrep -i "(hadoop|spark|scala|zeppelin|postgresql)"
apache-spark 2.4.4
apache-zeppelin 0.8.1
hadoop 3.2.1
postgresql 12.1
scala 2.13.1

# environment variables added to ~/.profile
export JAVA_HOME="$(/usr/libexec/java_home)"
export HADOOP_HOME=/usr/local/Cellar/hadoop/3.2.1/libexec
export HADOOP_MAPRED_HOME=/usr/local/Cellar/hadoop/3.2.1/libexec
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_HOME=/usr/local/Cellar/apache-spark/2.4.4/libexec
export ZEPPELIN_HOME=/Users/eric/Documents/code/spark/zeppelin/zeppelin-0.8.2-bin-all
export PATH="$PATH:/usr/local/opt/scala/bin"

Part 1: Hadoop Setup

Ensure you can ssh to localhost without a password.

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
ssh localhost

Changes I made to Hadoop configuration files, located in $HADOOP_CONF_DIR

core-site.xml

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>

hdfs-site.xml

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
</configuration>

mapred-site.xml

<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
  <property>
    <name>mapreduce.application.classpath</name>
    <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
  </property>
</configuration>

yarn-site.xml

<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.env-whitelist</name>
    <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
  </property>
</configuration>

Prepare HDFS and start Hadoop services

# format HDFS filesystem
$HADOOP_HOME/bin/hdfs namenode -format

# Start NameNode daemon and DataNode daemon
$HADOOP_HOME/sbin/start-dfs.sh

# Ensure Java processes are running
jps | grep -v Jps
64099 NameNode
64420 SecondaryNameNode
64222 DataNode

# browse to NameNode
http://localhost:9870

# Start ResourceManager daemon and NodeManager daemon
$HADOOP_HOME/sbin/start-yarn.sh

# Ensure Java processes are running
jps | grep -v Jps
64993 ResourceManager
64099 NameNode
64420 SecondaryNameNode
65111 NodeManager
64222 DataNode

# browse to ResourceManager
http://localhost:8088/

Test HDFS, Hadoop, MapReduce:

# make HDFS directories to execute MadReduce jobs
$HADOOP_HOME/bin/hdfs dfs -mkdir /user
$HADOOP_HOME/bin/hdfs dfs -mkdir /user/$(whoami)

# copy some test files
$HADOOP_HOME/bin/hdfs dfs -mkdir input
$HADOOP_HOME/bin/hdfs dfs -put $HADOOP_HOME/etc/hadoop/*.xml input

# run example
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar grep input output 'dfs[a-z.]+'

# inspect results
$HADOOP_HOME/bin/hdfs dfs -cat output/part-r-00000
1	dfsadmin
1	dfs.replication

Part 2: Postgresql Setup

Ensure Postgresql is running

brew services list | egrep -i "(^Name|postgresql)"
Name          Status  User Plist
postgresql    started eric /Users/eric/Library/LaunchAgents/homebrew.mxcl.postgresql.plist

Create a Postgresql database and user for Spark development

createdb spark_data
createuser spark_data
psql spark_data
> alter user spark_data with encrypted password 'spark_data';
> grant all privileges on database spark_data to spark_data;

Part 3: Apache Zeppelin Setup

I encountered an issue installing Apache Zeppelin via homebrew, so I manually downloaded the full package.

# browse to https://zeppelin.apache.org/download.html, download zeppelin-0.8.2-bin-all.tgz

# extract
tar -xzf zeppelin-0.8.2-bin-all.tgz
cd zeppelin-0.8.2-bin-all

# ensure ENV var is set to the extracted path, example:
export ZEPPELIN_HOME=/Users/eric/Documents/code/spark/zeppelin/zeppelin-0.8.2-bin-all

# start Apache Zeppelin
./bin/zeppelin-daemon.sh start

# browse to web interface:
http://localhost:8080/#/

Part 4: Scala and Spark development via Zeppelin

I added my postgresql credentials for the jdbc interpreter

Browse to:
http://localhost:8080/#/interpreter

Set:
default.driver: default.driver
default.password: spark_data
default.url: jdbc:postgresql://localhost:5432/
default.user: spark_data

I added a new Zeppelin Notebook (with default interpreter: spark/scala) and began adding paragraphs.

In my first paragraph, I included the postgresql JDBC driver jar, ex:

%spark.conf

spark.jars /Users/eric/Documents/code/spark/zeppelin/zeppelin-0.8.2-bin-all/interpreter/jdbc/postgresql-9.4-1201-jdbc41.jar

Ensure the Postgresql driver is loaded

%spark

Class.forName("org.postgresql.Driver")
res0: Class[_] = class org.postgresql.Driver

Create 2 Postgresql tables which I plan to populate and join for demonstration purposes

%jdbc

CREATE TABLE accounts(
  id serial PRIMARY KEY,
  name VARCHAR (50) UNIQUE NOT NULL
);

CREATE TABLE reports(
  id serial PRIMARY KEY,
  account_id integer NOT NULL,
  name VARCHAR (50) NOT NULL
);

Define credentials in Scala code and create an initial JDBC DataFrame connection variable for reading and writing

%spark

val jdbcUrl = "jdbc:postgresql://localhost:5432/spark_data"
val jdbcDriver = "org.postgresql.Driver"
val jdbcUser = "spark_data"
val jdbcPassword = "spark_data"

import java.util.Properties
val jdbcProps = new Properties()
jdbcProps.setProperty("driver", jdbcDriver)
jdbcProps.setProperty("user", jdbcUser)
jdbcProps.setProperty("password", jdbcPassword)

val dfReader = spark.read.format("jdbc")
  .option("driver", jdbcDriver)
  .option("url", jdbcUrl)
  .option("user", jdbcUser)
  .option("password", jdbcPassword)

Create a list of Account names as a DataFrame

%spark

val accountNamesDf = Range(0, 10)
  .map("account" + _.toString())
  .toDF("name")

// inspect:
accountsDf.show()
+--------+
|    name|
+--------+
|account0|
|account1|
|account2|
|account3|
|account4|
|account5|
|account6|
|account7|
|account8|
|account9|
+--------+

Write Accounts DataFrame to Postgresql table

%spark

accountsDf
  .write
  .mode("append")
  .jdbc(jdbcUrl, "accounts", jdbcProps)

Load Accounts (with IDs) into a new DataFrame

%spark

val accountsDf = dfReader
  .option("query", "select id, name from accounts")
  .load()

// inspect
accountsDf.show()
+---+--------+
| id|    name|
+---+--------+
|  5|account0|
|  8|account5|
|  7|account7|
| 10|account4|
|  9|account1|
|  4|account3|
|  2|account9|
|  6|account2|
|  3|account8|
|  1|account6|
+---+--------+

Collect a list of Account IDs to populate foreign keys in the the other table

%spark

val accountIds = accountsDf
  .select("id")
  .map(_.getInt(0))
  .collect()

// accountIds: Array[Int] = Array(5, 8, 7, 10, 9, 4, 2, 6, 3, 1)

Create a function to randomly select an Account ID

%spark

import scala.util.Random
val random = new Random

def randomAccountId(): Int = accountIds(random.nextInt(accountIds.length))

randomAccountId()
// res0: Int = 10

Create a list of Report names and a function to select a random name

%spark

val reportNames = Range(0, 1000)
  .map("report" + _.toString())
  .toList

def randomReportName(): String = reportNames(random.nextInt(reportNames.length))

randomReportName()
// res0: String = report167

Generate a million row DataFrame containing randomized Account IDs and Report names to populate the Reports table

%spark

val accountIdReportNameDf = sc.parallelize(
  Seq.fill(1000000){(randomAccountId,randomReportName)}
).toDF("account_id", "name")

// inspect
accountIdReportNameDf.show()
+----------+---------+
|account_id|     name|
+----------+---------+
|         8|report415|
|         9|report585|
|         4|report818|
|         8|report938|
|         8|report962|
|         1|report324|
|         8|report169|
|         3|report624|
|         8|report712|
|         1| report92|
|         6| report56|
|         8|report115|
|         6| report67|
|         2|report359|
|         1|report556|
|         7|report829|
|         6| report56|
|         2|report620|
|         6|report623|
|         9|report940|
+----------+---------+
only showing top 20 rows

Write DataFrame rows to the Postgresql Reports table

%spark

accountIdReportNameDf
  .write
  .mode("append")
  .jdbc(jdbcUrl, "reports", jdbcProps)

Load the Reports table into a DataFrame

%spark

val reportsDf = dfReader
  .option("query", "select id, account_id, name from reports")
  .load()

// inspect
reportsDf.show()
+---+----------+---------+
| id|account_id|     name|
+---+----------+---------+
|  4|         7| report47|
|  5|         6|report620|
|  6|         6|report375|
|  8|         2|report724|
| 12|         2|report350|
| 14|         9| report27|
| 15|         3|report965|
| 16|         3|report447|
| 17|         1|report817|
| 18|         1|report670|
| 20|         8|report643|
| 21|         4|report130|
| 22|         1|report832|
| 23|         3|report863|
| 24|         9|  report1|
| 13|         8|report450|
| 26|         6|report716|
| 27|         2|report808|
| 28|         3| report56|
|  9|         8|report415|
+---+----------+---------+
only showing top 20 rows

Using Spark/SQL to join the tables together

%spark

dfReader
  .option("query", "select accounts.*, reports.name as report_name from accounts join reports on reports.account_id = accounts.id")
  .load()
  .show()

+---+--------+-----------+
| id|    name|report_name|
+---+--------+-----------+
|  7|account7|   report47|
|  6|account2|  report620|
|  6|account2|  report375|
|  2|account9|  report724|
|  2|account9|  report350|
|  9|account1|   report27|
|  3|account8|  report965|
|  3|account8|  report447|
|  1|account6|  report817|
|  1|account6|  report670|
|  8|account5|  report643|
|  4|account3|  report130|
|  1|account6|  report832|
|  3|account8|  report863|
|  9|account1|    report1|
|  8|account5|  report450|
|  6|account2|  report716|
|  2|account9|  report808|
|  3|account8|   report56|
|  8|account5|  report415|
+---+--------+-----------+
only showing top 20 rows

Using Spark/Scala to join the tables together

%spark

accountsDf
  .join(reportsDf, accountsDf("id") <=> reportsDf("account_id"))
  .select(accountsDf("id"), accountsDf("name"), reportsDf("name").as("report_name"))
  .show()

+---+--------+-----------+
| id|    name|report_name|
+---+--------+-----------+
|  1|account6|  report817|
|  1|account6|  report670|
|  1|account6|  report832|
|  1|account6|   report52|
|  1|account6|  report832|
|  1|account6|  report291|
|  1|account6|  report866|
|  1|account6|  report191|
|  1|account6|  report715|
|  1|account6|  report174|
|  1|account6|  report220|
|  1|account6|  report488|
|  1|account6|  report133|
|  1|account6|  report534|
|  1|account6|  report166|
|  1|account6|  report625|
|  1|account6|  report475|
|  1|account6|  report249|
|  1|account6|  report252|
|  1|account6|  report708|
+---+--------+-----------+
only showing top 20 rows

Showing an aggregate count of Report records per Account

%spark

accountsDf
  .join(reportsDf, accountsDf("id") <=> reportsDf("account_id"))
  .groupBy(accountsDf("id"))
  .agg(count(accountsDf("id")))
  .show()

+---+---------+
| id|count(id)|
+---+---------+
|  1|   100322|
|  6|    99479|
|  3|    99708|
|  5|    99493|
|  9|   100588|
|  4|    99921|
|  8|   100353|
|  7|    99924|
| 10|   100128|
|  2|   100084|
+---+---------+

Joining and counting the records

%spark

accountsDf
  .join(reportsDf, accountsDf("id") <=> reportsDf("account_id"))
  .count()

// res0: Long = 1000000

For the final Spark operation, write out a CSV file to HDFS containing Report data for each Account.

%spark

val dfWriteOptions: Map[String,String] =
  Map[String,String](
    "header" -> "true",
    "quoteAll" -> "true",
    "escape" -> "\"")

accountsDf
  .join(reportsDf, accountsDf("id") <=> reportsDf("account_id"))
  .select(accountsDf("id").as("account_id"), accountsDf("name").as("account_name"), reportsDf("id").as("report_id"), reportsDf("name").as("report_name"))
  .write
  .mode("Overwrite")
  .options(dfWriteOptions)
  .partitionBy((accountsDf("id"))
  .csv("export")

Inspect CSV files in HDFS

$HADOOP_HOME/bin/hdfs dfs -ls export
Found 12 items
-rw-r--r--   1 eric supergroup          0 2020-02-01 19:56 export/_SUCCESS
-rw-r--r--   1 eric supergroup          0 2020-02-01 19:56 export/part-00000-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r--   1 eric supergroup    3589716 2020-02-01 19:56 export/part-00043-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r--   1 eric supergroup    3559231 2020-02-01 19:56 export/part-00049-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r--   1 eric supergroup    3567532 2020-02-01 19:56 export/part-00051-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r--   1 eric supergroup    3559809 2020-02-01 19:56 export/part-00066-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r--   1 eric supergroup    3599014 2020-02-01 19:56 export/part-00089-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r--   1 eric supergroup    3575171 2020-02-01 19:56 export/part-00102-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r--   1 eric supergroup    3590569 2020-02-01 19:56 export/part-00103-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r--   1 eric supergroup    3575201 2020-02-01 19:56 export/part-00107-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r--   1 eric supergroup    3682807 2020-02-01 19:56 export/part-00122-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
-rw-r--r--   1 eric supergroup    3580827 2020-02-01 19:56 export/part-00174-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv

Copy HDFS CSV files to local filesystem

$HADOOP_HOME/bin/hdfs dfs -copyToLocal export ./

Count records in each CSV file (the extra ten rows are the CSV headers)

wc -l export/*.csv
       0 export/part-00000-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
  100323 export/part-00043-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
   99480 export/part-00049-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
   99709 export/part-00051-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
   99494 export/part-00066-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
  100589 export/part-00089-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
   99922 export/part-00102-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
  100354 export/part-00103-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
   99925 export/part-00107-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
  100129 export/part-00122-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
  100085 export/part-00174-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
 1000010 total

Ensuring each CSV file was partitioned by Account

egrep -io "account\d+" export/*.csv | uniq
export/part-00043-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account6
export/part-00049-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account2
export/part-00051-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account8
export/part-00066-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account0
export/part-00089-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account1
export/part-00102-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account3
export/part-00103-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account5
export/part-00107-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account7
export/part-00122-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account4
export/part-00174-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv:account9

Inspecting the contents of a CSV file

head export/part-00174-02824c8d-bdfb-4129-881a-b0ce402878ad-c000.csv
"account_id","account_name","report_id","report_name"
"2","account9","8","report724"
"2","account9","12","report350"
"2","account9","27","report808"
"2","account9","46","report816"
"2","account9","1","report97"
"2","account9","39","report991"
"2","account9","41","report496"
"2","account9","72","report492"
"2","account9","61","report602"

Updated: