In this blog post, I’ll share some code and configurations I have used to setup and demo Hadoop, Pig, and Ruby Map/Reduce via Homebrew on OSX.
Install Hadoop and Pig via Brew:
# version 2.4.1
brew install hadoop
# note: "brew install pig" did not work for me (for now), but I found a recipe that did work:
brew install https://gist.githubusercontent.com/akiatoji/6d67a7fa470e1218dc5b/raw/dff23e5668e0a3855e91a661296503dc6701266a/pig.rb
Added environment variables (file: ~/.bash_profile or ~/.bashrc). Note: I upgraded my OSX Java version (to 1.7) using Oracle’s binary.
export JAVA_HOME = ` /usr/libexec/java_home -v 1.7`
export HADOOP_HOME = /usr/local/Cellar/hadoop/2.4.1
export HADOOP_CONF_DIR = $HADOOP_HOME /libexec/etc/hadoop
export PIG_HOME = /usr/local/Cellar/pig/0.12.0
Revised default Hadoop config. directory: $HADOOP_CONF_DIR
<!-- file: core-site.xml -->
<configuration>
<property>
<name> fs.defaultFS</name>
<value> hdfs://localhost:9000</value>
</property>
</configuration>
<!-- file: hdfs-site.xml -->
<configuration>
<property>
<name> dfs.replication</name>
<value> 1</value>
</property>
</configuration>
<!-- file: mapred-site.xml -->
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name> mapreduce.framework.name</name>
<value> yarn</value>
</property>
</configuration>
<!-- file: yarn-site.xml -->
<?xml version="1.0"?>
<configuration>
<property>
<name> yarn.nodemanager.aux-services</name>
<value> mapreduce_shuffle</value>
</property>
</configuration>
Setup HDFS
# format
$HADOOP_HOME /bin/hdfs namenode -format
# start services
$HADOOP_HOME /sbin/start-all.sh
# check for running java processes
$ jps
40873 ResourceManager
41009 Jps
40569 NameNode
40760 SecondaryNameNode
40964 NodeManager
# namenode web interface: http://localhost:50070
# resource manager url: http://localhost:8088
# create hdfs directories
$HADOOP_HOME /bin/hdfs dfs -mkdir /user
$HADOOP_HOME /bin/hdfs dfs -mkdir /user/Eric
$HADOOP_HOME /bin/hdfs dfs -mkdir /user/Eric/input
Execute first example:
# copy input file to hdfs
$HADOOP_HOME /bin/hdfs dfs -put /usr/share/dict/words /user/Eric/input
# show created file(s)
$HADOOP_HOME /bin/hadoop dfs -ls /user/Eric/input
# run hadoop process
$HADOOP_HOME /bin/hadoop jar libexec/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar grep input output 'eric'
# show results on hdfs
$HADOOP_HOME /bin/hdfs dfs -cat output/*
559 eric
Created a Ruby script for map/reduce functions:
#!/usr/bin/env ruby
require 'optparse'
require 'json'
require 'securerandom'
# parse command line options:
options = {
action: nil
}
OptionParser . new do | opts |
opts . on ( "--map" , "Map" ) do | v |
options [ :action ] = :map
end
opts . on ( "--reduce" , "Reduce" ) do | v |
options [ :action ] = :reduce
end
opts . on ( "--create_json_file" , "Create JSON file" ) do | v |
options [ :action ] = :create_json_file
end
end . parse!
# define class to map and reduce
# and also to create a json file (used later in post)
# @see: http://www.bigfastblog.com/map-reduce-with-ruby-using-hadoop#coding-your-map-and-reduce-scripts-in-ruby
# note: this code was modeled after the above blog post
class MapReduce
def initialize ( options )
raise "Valid action required." if options . nil? || options [ :action ]. nil? || ! respond_to? ( options [ :action ])
@action = options [ :action ]
end
def run
send @action
end
def map
ARGF . each do | line |
# remove newline
line = line . chomp
next if line . nil? || line . empty?
# output to STDOUT
# <key><tab><value><newline>
puts " #{ line [ 0 ]. downcase } \t 1"
end
end
def reduce
prev_key = nil
key_total = 0
ARGF . each do | line |
# remove newline
line = line . chomp
# split key and value on tab character
( key , value ) = line . split ( /\t/ )
# check for new key
if prev_key && key != prev_key && key_total > 0
# output total for previous key
# <key><tab><value><newline>
puts " #{ prev_key } \t #{ key_total } "
# reset key total for new key
prev_key = key
key_total = 0
elsif ! prev_key
prev_key = key
end
# add to count for this current key
key_total += value . to_i
end
# last iteration
puts " #{ key } \t #{ key_total } "
end
def create_json_file
known_fields = [ :first_name , :last_name , :address , :city , :state ]
output_json_file = 'data.json'
file = File . open ( output_json_file , 'w' )
while file . size <= 1073741824
data = {}
# randomize key order
known_fields . shuffle . each do | field |
data [ field ] = SecureRandom . hex
end
file . write ( data . to_json + " \n " )
end
file . close
end
end
MapReduce . new ( options ). run
Simple stream example using pipes and sort (external to hadoop):
cat words | ./map_reduce.rb --map | sort | ./map_reduce.rb --reduce
Executing the above script in Hadoop via stream:
# remove previous output directory
$HADOOP_HOME /bin/hdfs dfs -rmr output
# execute hadoop stream job
hadoop jar $HADOOP_HOME /libexec/share/hadoop/tools/lib/hadoop-streaming-2.4.1.jar -mapper './map_reduce.rb --map' -reducer './map_reduce.rb --reduce' -file 'map_reduce.rb' -input input -output output
# show output in hdfs
$HADOOP_HOME /bin/hdfs dfs -cat output/*
a 17096
b 11070
c 19901
d 10896
e 8736
f 6860
g 6861
h 9027
i 8799
j 1642
k 2281
l 6284
m 12616
n 6780
o 7849
p 24461
q 1152
r 9671
s 25162
t 12966
u 16387
v 3440
w 3944
x 385
y 671
Using Pig (on Hadoop with HDFS)
Create JSON file using above script:
# create 1GB JSON file
./map_reduce.rb --create_json_file
# review (sample) output
head -2 data.json
{ "state" :"6111d898b191429b7fbe72277828fcc1" ,"city" :"0339dbf4d8196f95484f2ca12b51a22a" ,"address" :"f47c63c997e0c000114386f10061a739" ,"first_name" :"b7b772e2af1a402fee79babd9059dd11" ,"last_name" :"c123de4de67d34fb75489e8597628b49" }
{ "first_name" :"be23ece2bbee9db19ceb8b4e33253f81" ,"last_name" :"efd3b8eab953569ef8f617f150a3237f" ,"city" :"42c09d08b29f12205aff1f3c8ac434e1" ,"address" :"7e3f490fd153628e2c3e9ef24f742174" ,"state" :"a76e5cfb6b6986303dc7d8d569e86956" }
# remove previous output directory
$HADOOP_HOME /bin/hdfs dfs -rmr output
# push file to HDFS
$HADOOP_HOME /bin/hdfs dfs -put data.json input
Use Pig command line tool to load JSON file and convert to CSV:
$ pig
grunt> json_data = LOAD 'input/data.json' USING JsonLoader( 'first_name:chararray, last_name:chararray, address:chararray, city:chararray, state:chararray' ) ;
grunt> store json_data into 'output/data.csv' using PigStorage( '\t' ,'-schema' ) ;
grunt> quit
Output CSV data to local filesystem:
# output CSV header row
$HADOOP_HOME /bin/dfs -cat output/data.csv/.pig_header > data.csv
# output (concat) CSV data
$HADOOP_HOME /bin/hdfs dfs -cat output/data.csv/part* >> data.csv
More examples to follow :)