In this post I’ll share some Ruby code that uses RabbitMQ as a work queue to populate Elasticsearch documents.
First I created a RabbitMQ base class to contain shared functionality between the producer and workers. On initialize, the base class waits for the RabbitMQ and Elasticsearch services to be available before starting. file: rabbitmq_base.rb
require 'bunny'
require 'elasticsearch'
require 'faker'
module ES
module_function
def client
@client ||= Elasticsearch :: Client . new url: es_url , log: true
end
def es_url
"http:// #{ es_host } :9200"
end
def es_host
ENV . fetch ( 'ELASTICSEARCH_HOST' , 'localhost' )
end
end
class RabbitmqBase
def initialize
begin
create_connection
create_channel
create_queue
rescue StandardError
sleep 1
retry
end
begin
try_es_connection
rescue StandardError
sleep 1
retry
end
end
protected
def try_es_connection
ES . client . cluster . health wait_for_status: 'yellow'
end
def create_connection
@connection = Bunny . new ( hostname: rabbitmq_host )
@connection . start
end
def create_channel
@channel = @connection . create_channel
end
def create_queue
@queue = @channel . queue ( queue_name )
end
def queue_name
ENV . fetch ( 'QUEUE_NAME' , 'worker_queue' )
end
def rabbitmq_host
ENV . fetch ( 'RABBITMQ_HOST' , 'localhost' )
end
end
The producer subclass publishes a set number of tasks to complete and then exits. file: producer.rb
#!/usr/bin/env ruby
require_relative 'rabbitmq_base'
class Producer < RabbitmqBase
def start
message = 'create_person'
1_000 . times { @queue . publish ( message , persistent: true ) }
end
end
Producer . new . start
The worker subclass subscribes to the queue, checks if the task matches an available worker method, and then generates a person document in Elasticsearch. file: worker.rb
#!/usr/bin/env ruby
require_relative 'rabbitmq_base'
class Worker < RabbitmqBase
def start
@queue . subscribe ( block: true ) do | _delivery_info , _properties , body |
if WorkerMethods . public_methods . include? ( body . to_sym )
WorkerMethods . send ( body )
else
raise 'Worker method not found'
end
end
end
module WorkerMethods
module_function
def create_person
person = {
first_name: Faker :: Name . first_name ,
last_name: Faker :: Name . last_name ,
email: Faker :: Internet . email
}
ES . client . create index: 'people' ,
type: 'person' ,
body: person
puts "Job processed by worker: #{ hostname } "
end
def hostname
@hostname ||= `hostname` . strip
end
end
end
Worker . new . start
I create a ruby-based Dockerfile for the producer and workers, file: Dockerfile
FROM ruby:2.5.3-alpine3.8
RUN apk add --no-cache --update build-base
RUN echo 'gem: --no-document' > ~/.gemrc
RUN gem install bundler
ENV APP_HOME /app/
COPY Gemfile Gemfile.lock $APP_HOME
WORKDIR $APP_HOME
RUN bundle install
COPY * .rb $APP_HOME
I used docker compose to create a cluster of services. I implemented a deploy/replicas configuration to spin up 10 worker apps to distribute the load. file: docker-compose.yml
version : ' 3'
services :
rabbitmq :
image : rabbitmq:latest
ports :
- " 5672:5672"
app_producer :
build :
context : .
dockerfile : Dockerfile
command : /app/producer.rb
environment :
- ELASTICSEARCH_HOST=elasticsearch
- RABBITMQ_HOST=rabbitmq
- QUEUE_NAME=worker_queue
depends_on :
- elasticsearch
- rabbitmq
app_worker :
build :
context : .
dockerfile : Dockerfile
command : /app/worker.rb
deploy :
mode : replicated
replicas : 10
environment :
- ELASTICSEARCH_HOST=elasticsearch
- RABBITMQ_HOST=rabbitmq
- QUEUE_NAME=worker_queue
depends_on :
- elasticsearch
- rabbitmq
elasticsearch :
image : docker.elastic.co/elasticsearch/elasticsearch:6.5.2
container_name : elasticsearch
environment :
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- " ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits :
memlock :
soft : -1
hard : -1
volumes :
- esdata:/usr/share/elasticsearch/data
ports :
- 9200:9200
volumes :
esdata :
driver : local
Here are the commands I executed to run the apps and verify the results:
# build and start docker container
docker-compose build && docker-compose --compatibility up
# review docker containers
# NOTE: as expected, the producer container exited after queuing all the tasks
docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8d0e832f2deb rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_10
5420e004c788 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_4
3d0d70b04310 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_5
e2bc549a2cbb rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_7
11445b7c6295 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_3
5a27d37015c4 rabbitmq_app_producer "/app/producer.rb" About a minute ago Exited ( 0) 49 seconds ago rabbitmq_app_producer_1
a51bcb127e76 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_2
42bfd224e65e rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_9
9307e547454b rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_1
d49337e9c5a8 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_8
ebf2e23a8736 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_6
788244fb1620 rabbitmq:latest "docker-entrypoint.s…" About a minute ago Up About a minute 4369/tcp, 5671/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp rabbitmq_rabbitmq_1
ec59a0e8f744 docker.elastic.co/elasticsearch/elasticsearch:6.5.2 "/usr/local/bin/dock…" About a minute ago Up About a minute 0.0.0.0:9200->9200/tcp, 9300/tcp elasticsearch
# query elasticsearch
curl 'http://localhost:9200/people/_search?pretty&size=1'
{
"took" : 43,
"timed_out" : false ,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
} ,
"hits" : {
"total" : 1000,
"max_score" : 1.0,
"hits" : [
{
"_index" : "people" ,
"_type" : "person" ,
"_id" : "ZmiIjmcBxEAUMv2VhCO2" ,
"_score" : 1.0,
"_source" : {
"first_name" : "Eric" ,
"last_name" : "London" ,
"email" : "eric@example.com"
}
}
]
}
}
Source code on GitHub