In this post I’ll share an example Dockerfile file to execute a Hadoop Streaming Scala job. I’ll start by sharing some basic Scala scripts for the map/reduce functions.
File: mapper.sh
#!/ bin / sh
exec / usr / local / share / scala / bin / scala "$0" "$@"
!#
object Mapper {
def processLine : Unit = {
val line = scala . io . StdIn . readLine
if ( line == null || line . isEmpty ) return
for ( c <- line ) println ( c . toLower + "\t1" )
processLine
}
def main ( args : Array [ String ]) {
processLine
}
}
File: reducer.sh
#!/ bin / sh
exec / usr / local / share / scala / bin / scala "$0" "$@"
!#
object Reducer {
def processLine ( previousLetter : String = "" , letterCount : Integer = 0 ) : Unit = {
val line = scala . io . StdIn . readLine
if ( line == null || line . isEmpty ) {
println ( previousLetter + "\t" + letterCount )
return
}
val data = line . split ( "\t" )
if ( previousLetter == "" ) {
processLine ( data ( 0 ), data ( 1 ). toInt )
} else if ( data ( 0 ) != previousLetter ) {
println ( previousLetter + "\t" + letterCount )
processLine ( data ( 0 ), data ( 1 ). toInt )
} else if ( data ( 0 ) == previousLetter ) {
processLine ( data ( 0 ), data ( 1 ). toInt + letterCount )
}
}
def main ( args : Array [ String ]) {
processLine ()
}
}
These hadoop streaming map/reduce scripts can be tested outside hadoop, via:
cat words | ./mapper.sh | sort | ./reducer.sh
Next I created a new Dockerfile from sequenceiq/hadoop-docker
which uses Centos as the Linux distribution. The Dockerfile updates packages, overwrites the yarn-site.xml
file to bypass virtual memory limitations, copies scripts to the Docker container, downloads/installs Scala, and sets a few environment variables. Contents:
FROM sequenceiq/hadoop-docker
# update packages
RUN yum update -y
RUN yum install wget -y
# per yarn.nodemanager.vmem-check-enabled
COPY yarn-site.xml /usr/local/hadoop/etc/hadoop/yarn-site.xml
COPY . /root
# download/install scala
RUN /root/install_scala.sh
# set scala env vars
ENV SCALA_HOME = /usr/local/share/scala
ENV PATH = " $PATH : $SCALA_HOME /bin"
The yarn-site.xml
modifications to bypass virtual memory limits:
<configuration>
<!-- snip -->
<property>
<name> yarn.nodemanager.vmem-check-enabled</name>
<value> false</value>
<description> Whether virtual memory limits will be enforced for containers</description>
</property>
</configuration>
I provided a simple script (install_scala.sh) to download and install Scala on the Docker container. NOTE: the sequenceiq/hadoop-docker
container is built on Java 7, so I did not use latest stable Scala (2.12) which requires Java 8.
#!/bin/sh
if [ ! -f '/root/scala-2.11.8.tgz' ] ; then
cd /root
wget http://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz
fi
if [ ! -d '/root/scala-2.11.8' ] ; then
cd /root
tar -xzf scala-2.11.8.tgz
fi
cp -r /root/scala-2.11.8 /usr/local/share/scala
And last I created the main file to execute inside the container and run the hadoop streaming job, main.sh:
#!/bin/sh
/etc/bootstrap.sh
cd $HADOOP_PREFIX
# prepare hdfs files:
bin/hdfs dfsadmin -safemode leave
bin/hdfs dfs -mkdir /scala_in
bin/hdfs dfs -put /root/words /scala_in
bin/hdfs dfs -rmdir /scala_out
# execute hadoop streaming job
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
-mapper "/root/mapper.sh" \
-reducer "/root/reducer.sh" \
-input "/scala_in" \
-output "/scala_out"
# hdfs output
bin/hdfs dfs -cat /scala_out/part-00000
Container usage:
docker build -t ericlondon/hadoop .
docker run -it ericlondon/hadoop /root/main.sh
Example output:
Starting sshd: [ OK ]
Starting namenodes on [ 554aac76a2db]
554aac76a2db: starting namenode, logging to /usr/local/hadoop/logs/hadoop-root-namenode-554aac76a2db.out
localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-root-datanode-554aac76a2db.out
Starting secondary namenodes [ 0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-root-secondarynamenode-554aac76a2db.out
starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn--resourcemanager-554aac76a2db.out
localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-root-nodemanager-554aac76a2db.out
Safe mode is OFF
rmdir : ` /scala_out': No such file or directory
packageJobJar: [/tmp/hadoop-unjar3563391432684918317/] [] /tmp/streamjob16776878846147015.jar tmpDir=null
16/12/01 13:09:57 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/12/01 13:09:57 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/12/01 13:09:58 INFO mapred.FileInputFormat: Total input paths to process : 1
16/12/01 13:09:58 INFO mapreduce.JobSubmitter: number of splits:2
16/12/01 13:09:59 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1480615773226_0001
16/12/01 13:09:59 INFO impl.YarnClientImpl: Submitted application application_1480615773226_0001
16/12/01 13:09:59 INFO mapreduce.Job: The url to track the job: http://554aac76a2db:8088/proxy/application_1480615773226_0001/
16/12/01 13:09:59 INFO mapreduce.Job: Running job: job_1480615773226_0001
16/12/01 13:10:13 INFO mapreduce.Job: Job job_1480615773226_0001 running in uber mode : false
16/12/01 13:10:13 INFO mapreduce.Job: map 0% reduce 0%
16/12/01 13:10:35 INFO mapreduce.Job: map 7% reduce 0%
16/12/01 13:10:47 INFO mapreduce.Job: map 21% reduce 0%
16/12/01 13:10:50 INFO mapreduce.Job: map 42% reduce 0%
16/12/01 13:10:53 INFO mapreduce.Job: map 54% reduce 0%
16/12/01 13:10:56 INFO mapreduce.Job: map 83% reduce 0%
16/12/01 13:10:58 INFO mapreduce.Job: map 100% reduce 0%
16/12/01 13:11:11 INFO mapreduce.Job: map 100% reduce 67%
16/12/01 13:11:20 INFO mapreduce.Job: map 100% reduce 100%
16/12/01 13:11:20 INFO mapreduce.Job: Job job_1480615773226_0001 completed successfully
16/12/01 13:11:20 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=13543344
FILE: Number of bytes written=27437579
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2496017
HDFS: Number of bytes written=218
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=83815
Total time spent by all reduces in occupied slots (ms)=21077
Total time spent by all map tasks (ms)=83815
Total time spent by all reduce tasks (ms)=21077
Total vcore-seconds taken by all map tasks=83815
Total vcore-seconds taken by all reduce tasks=21077
Total megabyte-seconds taken by all map tasks=85826560
Total megabyte-seconds taken by all reduce tasks=21582848
Map-Reduce Framework
Map input records=235886
Map output records=2257223
Map output bytes=9028892
Map output materialized bytes=13543350
Input split bytes=182
Combine input records=0
Combine output records=0
Reduce input groups=27
Reduce shuffle bytes=13543350
Reduce input records=2257223
Reduce output records=27
Spilled Records=4514446
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=378
CPU time spent (ms)=35970
Physical memory (bytes) snapshot=568856576
Virtual memory (bytes) snapshot=2088333312
Total committed heap usage (bytes)=381288448
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2495835
File Output Format Counters
Bytes Written=218
16/12/01 13:11:20 INFO streaming.StreamJob: Output directory: /scala_out
- 2
a 199554
b 40433
c 103440
d 68191
e 235331
f 24165
g 47094
h 64356
i 201032
j 3167
k 16158
l 130463
m 70680
n 158743
o 170692
p 78163
q 3734
r 160985
s 139542
t 152831
u 87353
v 20177
w 13864
x 6932
y 51681
z 8460
Source code on Github