Hadoop, Pig, Ruby, Map/Reduce, on OSX via Homebrew

In this blog post, I’ll share some code and configurations I have used to setup and demo Hadoop, Pig, and Ruby Map/Reduce via Homebrew on OSX.

Install Hadoop and Pig via Brew:

# version 2.4.1
brew install hadoop

# note: "brew install pig" did not work for me (for now), but I found a recipe that did work:
brew install https://gist.githubusercontent.com/akiatoji/6d67a7fa470e1218dc5b/raw/dff23e5668e0a3855e91a661296503dc6701266a/pig.rb

Added environment variables (file: ~/.bash_profile or ~/.bashrc). Note: I upgraded my OSX Java version (to 1.7) using Oracle’s binary.

export JAVA_HOME=`/usr/libexec/java_home -v 1.7`
export HADOOP_HOME=/usr/local/Cellar/hadoop/2.4.1
export HADOOP_CONF_DIR=$HADOOP_HOME/libexec/etc/hadoop
export PIG_HOME=/usr/local/Cellar/pig/0.12.0

Revised default Hadoop config. directory: $HADOOP_CONF_DIR

<!-- file: core-site.xml -->

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>

<!-- file: hdfs-site.xml -->

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
</configuration>

<!-- file: mapred-site.xml -->

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

<!-- file: yarn-site.xml -->

<?xml version="1.0"?>
<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
</configuration>

Setup HDFS

# format
$HADOOP_HOME/bin/hdfs namenode -format

# start services
$HADOOP_HOME/sbin/start-all.sh

# check for running java processes
$ jps
40873 ResourceManager
41009 Jps
40569 NameNode
40760 SecondaryNameNode
40964 NodeManager

# namenode web interface: http://localhost:50070

# resource manager url: http://localhost:8088

# create hdfs directories
$HADOOP_HOME/bin/hdfs dfs -mkdir /user
$HADOOP_HOME/bin/hdfs dfs -mkdir /user/Eric
$HADOOP_HOME/bin/hdfs dfs -mkdir /user/Eric/input

Execute first example:

# copy input file to hdfs
$HADOOP_HOME/bin/hdfs dfs -put /usr/share/dict/words /user/Eric/input

# show created file(s)
$HADOOP_HOME/bin/hadoop dfs -ls /user/Eric/input

# run hadoop process
$HADOOP_HOME/bin/hadoop jar libexec/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar grep input output 'eric'

# show results on hdfs
$HADOOP_HOME/bin/hdfs dfs -cat output/*
559     eric

Created a Ruby script for map/reduce functions:

#!/usr/bin/env ruby

require 'optparse'
require 'json'
require 'securerandom'

# parse command line options:

options = {
  action: nil
}
OptionParser.new do |opts|

  opts.on("--map", "Map") do |v|
    options[:action] = :map
  end

  opts.on("--reduce", "Reduce") do |v|
    options[:action] = :reduce
  end

  opts.on("--create_json_file", "Create JSON file") do |v|
    options[:action] = :create_json_file
  end

end.parse!

# define class to map and reduce
# and also to create a json file (used later in post)
# @see: http://www.bigfastblog.com/map-reduce-with-ruby-using-hadoop#coding-your-map-and-reduce-scripts-in-ruby
# note: this code was modeled after the above blog post

class MapReduce

  def initialize(options)
    raise "Valid action required." if options.nil? || options[:action].nil? || !respond_to?(options[:action])
    @action = options[:action]
  end

  def run
    send @action
  end

  def map

    ARGF.each do |line|

       # remove newline
       line = line.chomp
       next if line.nil? || line.empty?

       # output to STDOUT
       # <key><tab><value><newline>
       puts "#{line[0].downcase}\t1"

    end

  end

  def reduce

    prev_key = nil
    key_total = 0

    ARGF.each do |line|

      # remove newline
      line = line.chomp

      # split key and value on tab character
      (key, value) = line.split(/\t/)

      # check for new key
      if prev_key && key != prev_key && key_total > 0

        # output total for previous key
        # <key><tab><value><newline>
        puts "#{prev_key}\t#{key_total}"

        # reset key total for new key
        prev_key = key
        key_total = 0

      elsif ! prev_key
        prev_key = key

      end

      # add to count for this current key
      key_total += value.to_i

    end

    # last iteration
    puts "#{key}\t#{key_total}"

  end

  def create_json_file

    known_fields = [:first_name, :last_name, :address, :city, :state]

    output_json_file = 'data.json'

    file = File.open(output_json_file, 'w')
    while file.size <= 1073741824
      data = {}
      # randomize key order
      known_fields.shuffle.each do |field|
        data[field] = SecureRandom.hex
      end
      file.write(data.to_json + "\n")
    end
    file.close

  end

end

MapReduce.new(options).run

Simple stream example using pipes and sort (external to hadoop):

cat words | ./map_reduce.rb --map | sort | ./map_reduce.rb --reduce

Executing the above script in Hadoop via stream:

# remove previous output directory
$HADOOP_HOME/bin/hdfs dfs -rmr output

# execute hadoop stream job
hadoop jar $HADOOP_HOME/libexec/share/hadoop/tools/lib/hadoop-streaming-2.4.1.jar -mapper './map_reduce.rb --map' -reducer './map_reduce.rb --reduce' -file 'map_reduce.rb' -input input -output output

# show output in hdfs
$HADOOP_HOME/bin/hdfs dfs -cat output/*
a     17096
b     11070
c     19901
d     10896
e     8736
f     6860
g     6861
h     9027
i     8799
j     1642
k     2281
l     6284
m     12616
n     6780
o     7849
p     24461
q     1152
r     9671
s     25162
t     12966
u     16387
v     3440
w     3944
x     385
y     671

Using Pig (on Hadoop with HDFS)

Create JSON file using above script:

# create 1GB JSON file
./map_reduce.rb --create_json_file

# review (sample) output
head -2 data.json
{"state":"6111d898b191429b7fbe72277828fcc1","city":"0339dbf4d8196f95484f2ca12b51a22a","address":"f47c63c997e0c000114386f10061a739","first_name":"b7b772e2af1a402fee79babd9059dd11","last_name":"c123de4de67d34fb75489e8597628b49"}
{"first_name":"be23ece2bbee9db19ceb8b4e33253f81","last_name":"efd3b8eab953569ef8f617f150a3237f","city":"42c09d08b29f12205aff1f3c8ac434e1","address":"7e3f490fd153628e2c3e9ef24f742174","state":"a76e5cfb6b6986303dc7d8d569e86956"}

# remove previous output directory
$HADOOP_HOME/bin/hdfs dfs -rmr output

# push file to HDFS
$HADOOP_HOME/bin/hdfs dfs -put data.json input

Use Pig command line tool to load JSON file and convert to CSV:

$ pig

grunt> json_data = LOAD 'input/data.json' USING JsonLoader('first_name:chararray, last_name:chararray, address:chararray, city:chararray, state:chararray');

grunt> store json_data into 'output/data.csv' using PigStorage('\t','-schema');

grunt> quit

Output CSV data to local filesystem:

# output CSV header row
$HADOOP_HOME/bin/dfs -cat output/data.csv/.pig_header > data.csv

# output (concat) CSV data
$HADOOP_HOME/bin/hdfs dfs -cat output/data.csv/part* >> data.csv

More examples to follow :)

Updated: