Hadoop, Pig, Ruby, Map/Reduce, on OSX via Homebrew
In this blog post, I’ll share some code and configurations I have used to setup and demo Hadoop, Pig, and Ruby Map/Reduce via Homebrew on OSX.
Install Hadoop and Pig via Brew:
# version 2.4.1
brew install hadoop
# note: "brew install pig" did not work for me (for now), but I found a recipe that did work:
brew install https://gist.githubusercontent.com/akiatoji/6d67a7fa470e1218dc5b/raw/dff23e5668e0a3855e91a661296503dc6701266a/pig.rb
Added environment variables (file: ~/.bash_profile or ~/.bashrc). Note: I upgraded my OSX Java version (to 1.7) using Oracle’s binary.
export JAVA_HOME=`/usr/libexec/java_home -v 1.7`
export HADOOP_HOME=/usr/local/Cellar/hadoop/2.4.1
export HADOOP_CONF_DIR=$HADOOP_HOME/libexec/etc/hadoop
export PIG_HOME=/usr/local/Cellar/pig/0.12.0
Revised default Hadoop config. directory: $HADOOP_CONF_DIR
<!-- file: core-site.xml -->
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
<!-- file: hdfs-site.xml -->
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
<!-- file: mapred-site.xml -->
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
<!-- file: yarn-site.xml -->
<?xml version="1.0"?>
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
Setup HDFS
# format
$HADOOP_HOME/bin/hdfs namenode -format
# start services
$HADOOP_HOME/sbin/start-all.sh
# check for running java processes
$ jps
40873 ResourceManager
41009 Jps
40569 NameNode
40760 SecondaryNameNode
40964 NodeManager
# namenode web interface: http://localhost:50070
# resource manager url: http://localhost:8088
# create hdfs directories
$HADOOP_HOME/bin/hdfs dfs -mkdir /user
$HADOOP_HOME/bin/hdfs dfs -mkdir /user/Eric
$HADOOP_HOME/bin/hdfs dfs -mkdir /user/Eric/input
Execute first example:
# copy input file to hdfs
$HADOOP_HOME/bin/hdfs dfs -put /usr/share/dict/words /user/Eric/input
# show created file(s)
$HADOOP_HOME/bin/hadoop dfs -ls /user/Eric/input
# run hadoop process
$HADOOP_HOME/bin/hadoop jar libexec/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar grep input output 'eric'
# show results on hdfs
$HADOOP_HOME/bin/hdfs dfs -cat output/*
559 eric
Created a Ruby script for map/reduce functions:
#!/usr/bin/env ruby
require 'optparse'
require 'json'
require 'securerandom'
# parse command line options:
options = {
action: nil
}
OptionParser.new do |opts|
opts.on("--map", "Map") do |v|
options[:action] = :map
end
opts.on("--reduce", "Reduce") do |v|
options[:action] = :reduce
end
opts.on("--create_json_file", "Create JSON file") do |v|
options[:action] = :create_json_file
end
end.parse!
# define class to map and reduce
# and also to create a json file (used later in post)
# @see: http://www.bigfastblog.com/map-reduce-with-ruby-using-hadoop#coding-your-map-and-reduce-scripts-in-ruby
# note: this code was modeled after the above blog post
class MapReduce
def initialize(options)
raise "Valid action required." if options.nil? || options[:action].nil? || !respond_to?(options[:action])
@action = options[:action]
end
def run
send @action
end
def map
ARGF.each do |line|
# remove newline
line = line.chomp
next if line.nil? || line.empty?
# output to STDOUT
# <key><tab><value><newline>
puts "#{line[0].downcase}\t1"
end
end
def reduce
prev_key = nil
key_total = 0
ARGF.each do |line|
# remove newline
line = line.chomp
# split key and value on tab character
(key, value) = line.split(/\t/)
# check for new key
if prev_key && key != prev_key && key_total > 0
# output total for previous key
# <key><tab><value><newline>
puts "#{prev_key}\t#{key_total}"
# reset key total for new key
prev_key = key
key_total = 0
elsif ! prev_key
prev_key = key
end
# add to count for this current key
key_total += value.to_i
end
# last iteration
puts "#{key}\t#{key_total}"
end
def create_json_file
known_fields = [:first_name, :last_name, :address, :city, :state]
output_json_file = 'data.json'
file = File.open(output_json_file, 'w')
while file.size <= 1073741824
data = {}
# randomize key order
known_fields.shuffle.each do |field|
data[field] = SecureRandom.hex
end
file.write(data.to_json + "\n")
end
file.close
end
end
MapReduce.new(options).run
Simple stream example using pipes and sort (external to hadoop):
cat words | ./map_reduce.rb --map | sort | ./map_reduce.rb --reduce
Executing the above script in Hadoop via stream:
# remove previous output directory
$HADOOP_HOME/bin/hdfs dfs -rmr output
# execute hadoop stream job
hadoop jar $HADOOP_HOME/libexec/share/hadoop/tools/lib/hadoop-streaming-2.4.1.jar -mapper './map_reduce.rb --map' -reducer './map_reduce.rb --reduce' -file 'map_reduce.rb' -input input -output output
# show output in hdfs
$HADOOP_HOME/bin/hdfs dfs -cat output/*
a 17096
b 11070
c 19901
d 10896
e 8736
f 6860
g 6861
h 9027
i 8799
j 1642
k 2281
l 6284
m 12616
n 6780
o 7849
p 24461
q 1152
r 9671
s 25162
t 12966
u 16387
v 3440
w 3944
x 385
y 671
Using Pig (on Hadoop with HDFS)
Create JSON file using above script:
# create 1GB JSON file
./map_reduce.rb --create_json_file
# review (sample) output
head -2 data.json
{"state":"6111d898b191429b7fbe72277828fcc1","city":"0339dbf4d8196f95484f2ca12b51a22a","address":"f47c63c997e0c000114386f10061a739","first_name":"b7b772e2af1a402fee79babd9059dd11","last_name":"c123de4de67d34fb75489e8597628b49"}
{"first_name":"be23ece2bbee9db19ceb8b4e33253f81","last_name":"efd3b8eab953569ef8f617f150a3237f","city":"42c09d08b29f12205aff1f3c8ac434e1","address":"7e3f490fd153628e2c3e9ef24f742174","state":"a76e5cfb6b6986303dc7d8d569e86956"}
# remove previous output directory
$HADOOP_HOME/bin/hdfs dfs -rmr output
# push file to HDFS
$HADOOP_HOME/bin/hdfs dfs -put data.json input
Use Pig command line tool to load JSON file and convert to CSV:
$ pig
grunt> json_data = LOAD 'input/data.json' USING JsonLoader('first_name:chararray, last_name:chararray, address:chararray, city:chararray, state:chararray');
grunt> store json_data into 'output/data.csv' using PigStorage('\t','-schema');
grunt> quit
Output CSV data to local filesystem:
# output CSV header row
$HADOOP_HOME/bin/dfs -cat output/data.csv/.pig_header > data.csv
# output (concat) CSV data
$HADOOP_HOME/bin/hdfs dfs -cat output/data.csv/part* >> data.csv
More examples to follow :)