Docker Hadoop Streaming Map Reduce Scala Job

In this post I’ll share an example Dockerfile file to execute a Hadoop Streaming Scala job. I’ll start by sharing some basic Scala scripts for the map/reduce functions.

File: mapper.sh

#!/bin/sh
exec /usr/local/share/scala/bin/scala "$0" "$@"
!#

object Mapper {

  def processLine: Unit = {
    val line = scala.io.StdIn.readLine
    if (line == null || line.isEmpty) return
    for (c <- line) println(c.toLower + "\t1")
    processLine
  }

  def main(args: Array[String]) {
    processLine
  }
}

File: reducer.sh

#!/bin/sh
exec /usr/local/share/scala/bin/scala "$0" "$@"
!#

object Reducer {

  def processLine(previousLetter: String = "", letterCount: Integer = 0): Unit = {
    val line = scala.io.StdIn.readLine
    if (line == null || line.isEmpty) {
      println(previousLetter + "\t" + letterCount)
      return
    }
    val data = line.split("\t")

    if (previousLetter == "") {
      processLine(data(0), data(1).toInt)
    } else if (data(0) != previousLetter) {
      println(previousLetter + "\t" + letterCount)
      processLine(data(0), data(1).toInt)
    } else if (data(0) == previousLetter) {
      processLine(data(0), data(1).toInt + letterCount)
    }
  }

  def main(args: Array[String]) {
    processLine()
  }
}

These hadoop streaming map/reduce scripts can be tested outside hadoop, via:

cat words | ./mapper.sh | sort | ./reducer.sh

Next I created a new Dockerfile from sequenceiq/hadoop-docker which uses Centos as the Linux distribution. The Dockerfile updates packages, overwrites the yarn-site.xml file to bypass virtual memory limitations, copies scripts to the Docker container, downloads/installs Scala, and sets a few environment variables. Contents:

FROM sequenceiq/hadoop-docker

# update packages
RUN yum update -y
RUN yum install wget -y

# per yarn.nodemanager.vmem-check-enabled
COPY yarn-site.xml /usr/local/hadoop/etc/hadoop/yarn-site.xml

COPY . /root

# download/install scala
RUN /root/install_scala.sh

# set scala env vars
ENV SCALA_HOME=/usr/local/share/scala
ENV PATH="$PATH:$SCALA_HOME/bin"

The yarn-site.xml modifications to bypass virtual memory limits:

<configuration>

  <!-- snip -->

  <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
    <description>Whether virtual memory limits will be enforced for containers</description>
  </property>

</configuration>

I provided a simple script (install_scala.sh) to download and install Scala on the Docker container. NOTE: the sequenceiq/hadoop-dockercontainer is built on Java 7, so I did not use latest stable Scala (2.12) which requires Java 8.

#!/bin/sh

if [ ! -f '/root/scala-2.11.8.tgz' ]; then
  cd /root
  wget http://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz
fi

if [ ! -d '/root/scala-2.11.8' ]; then
  cd /root
  tar -xzf scala-2.11.8.tgz
fi

cp -r /root/scala-2.11.8 /usr/local/share/scala

And last I created the main file to execute inside the container and run the hadoop streaming job, main.sh:

#!/bin/sh

/etc/bootstrap.sh

cd $HADOOP_PREFIX

# prepare hdfs files:
bin/hdfs dfsadmin -safemode leave
bin/hdfs dfs -mkdir /scala_in
bin/hdfs dfs -put /root/words /scala_in
bin/hdfs dfs -rmdir /scala_out

# execute hadoop streaming job
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
  -mapper "/root/mapper.sh" \
  -reducer "/root/reducer.sh" \
  -input "/scala_in" \
  -output "/scala_out"

# hdfs output
bin/hdfs dfs -cat /scala_out/part-00000

Container usage:

docker build -t ericlondon/hadoop .
docker run -it ericlondon/hadoop /root/main.sh

Example output:

Starting sshd:                                             [  OK  ]
Starting namenodes on [554aac76a2db]
554aac76a2db: starting namenode, logging to /usr/local/hadoop/logs/hadoop-root-namenode-554aac76a2db.out
localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-root-datanode-554aac76a2db.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-root-secondarynamenode-554aac76a2db.out
starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn--resourcemanager-554aac76a2db.out
localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-root-nodemanager-554aac76a2db.out
Safe mode is OFF
rmdir: `/scala_out': No such file or directory
packageJobJar: [/tmp/hadoop-unjar3563391432684918317/] [] /tmp/streamjob16776878846147015.jar tmpDir=null
16/12/01 13:09:57 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/12/01 13:09:57 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/12/01 13:09:58 INFO mapred.FileInputFormat: Total input paths to process : 1
16/12/01 13:09:58 INFO mapreduce.JobSubmitter: number of splits:2
16/12/01 13:09:59 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1480615773226_0001
16/12/01 13:09:59 INFO impl.YarnClientImpl: Submitted application application_1480615773226_0001
16/12/01 13:09:59 INFO mapreduce.Job: The url to track the job: http://554aac76a2db:8088/proxy/application_1480615773226_0001/
16/12/01 13:09:59 INFO mapreduce.Job: Running job: job_1480615773226_0001
16/12/01 13:10:13 INFO mapreduce.Job: Job job_1480615773226_0001 running in uber mode : false
16/12/01 13:10:13 INFO mapreduce.Job:  map 0% reduce 0%
16/12/01 13:10:35 INFO mapreduce.Job:  map 7% reduce 0%
16/12/01 13:10:47 INFO mapreduce.Job:  map 21% reduce 0%
16/12/01 13:10:50 INFO mapreduce.Job:  map 42% reduce 0%
16/12/01 13:10:53 INFO mapreduce.Job:  map 54% reduce 0%
16/12/01 13:10:56 INFO mapreduce.Job:  map 83% reduce 0%
16/12/01 13:10:58 INFO mapreduce.Job:  map 100% reduce 0%
16/12/01 13:11:11 INFO mapreduce.Job:  map 100% reduce 67%
16/12/01 13:11:20 INFO mapreduce.Job:  map 100% reduce 100%
16/12/01 13:11:20 INFO mapreduce.Job: Job job_1480615773226_0001 completed successfully
16/12/01 13:11:20 INFO mapreduce.Job: Counters: 49
  File System Counters
    FILE: Number of bytes read=13543344
    FILE: Number of bytes written=27437579
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=2496017
    HDFS: Number of bytes written=218
    HDFS: Number of read operations=9
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
  Job Counters
    Launched map tasks=2
    Launched reduce tasks=1
    Data-local map tasks=2
    Total time spent by all maps in occupied slots (ms)=83815
    Total time spent by all reduces in occupied slots (ms)=21077
    Total time spent by all map tasks (ms)=83815
    Total time spent by all reduce tasks (ms)=21077
    Total vcore-seconds taken by all map tasks=83815
    Total vcore-seconds taken by all reduce tasks=21077
    Total megabyte-seconds taken by all map tasks=85826560
    Total megabyte-seconds taken by all reduce tasks=21582848
  Map-Reduce Framework
    Map input records=235886
    Map output records=2257223
    Map output bytes=9028892
    Map output materialized bytes=13543350
    Input split bytes=182
    Combine input records=0
    Combine output records=0
    Reduce input groups=27
    Reduce shuffle bytes=13543350
    Reduce input records=2257223
    Reduce output records=27
    Spilled Records=4514446
    Shuffled Maps =2
    Failed Shuffles=0
    Merged Map outputs=2
    GC time elapsed (ms)=378
    CPU time spent (ms)=35970
    Physical memory (bytes) snapshot=568856576
    Virtual memory (bytes) snapshot=2088333312
    Total committed heap usage (bytes)=381288448
  Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
  File Input Format Counters
    Bytes Read=2495835
  File Output Format Counters
    Bytes Written=218
16/12/01 13:11:20 INFO streaming.StreamJob: Output directory: /scala_out
- 2
a 199554
b 40433
c 103440
d 68191
e 235331
f 24165
g 47094
h 64356
i 201032
j 3167
k 16158
l 130463
m 70680
n 158743
o 170692
p 78163
q 3734
r 160985
s 139542
t 152831
u 87353
v 20177
w 13864
x 6932
y 51681
z 8460

Source code on Github

Updated: