Using RabbitMQ as a Ruby work queue to populate Elasticsearch via Docker Compose
In this post I’ll share some Ruby code that uses RabbitMQ as a work queue to populate Elasticsearch documents.
First I created a RabbitMQ base class to contain shared functionality between the producer and workers. On initialize, the base class waits for the RabbitMQ and Elasticsearch services to be available before starting. file: rabbitmq_base.rb
require 'bunny'
require 'elasticsearch'
require 'faker'
module ES
module_function
def client
@client ||= Elasticsearch::Client.new url: es_url, log: true
end
def es_url
"http://#{es_host}:9200"
end
def es_host
ENV.fetch('ELASTICSEARCH_HOST', 'localhost')
end
end
class RabbitmqBase
def initialize
begin
create_connection
create_channel
create_queue
rescue StandardError
sleep 1
retry
end
begin
try_es_connection
rescue StandardError
sleep 1
retry
end
end
protected
def try_es_connection
ES.client.cluster.health wait_for_status: 'yellow'
end
def create_connection
@connection = Bunny.new(hostname: rabbitmq_host)
@connection.start
end
def create_channel
@channel = @connection.create_channel
end
def create_queue
@queue = @channel.queue(queue_name)
end
def queue_name
ENV.fetch('QUEUE_NAME', 'worker_queue')
end
def rabbitmq_host
ENV.fetch('RABBITMQ_HOST', 'localhost')
end
end
The producer subclass publishes a set number of tasks to complete and then exits. file: producer.rb
#!/usr/bin/env ruby
require_relative 'rabbitmq_base'
class Producer < RabbitmqBase
def start
message = 'create_person'
1_000.times { @queue.publish(message, persistent: true) }
end
end
Producer.new.start
The worker subclass subscribes to the queue, checks if the task matches an available worker method, and then generates a person document in Elasticsearch. file: worker.rb
#!/usr/bin/env ruby
require_relative 'rabbitmq_base'
class Worker < RabbitmqBase
def start
@queue.subscribe(block: true) do |_delivery_info, _properties, body|
if WorkerMethods.public_methods.include?(body.to_sym)
WorkerMethods.send(body)
else
raise 'Worker method not found'
end
end
end
module WorkerMethods
module_function
def create_person
person = {
first_name: Faker::Name.first_name,
last_name: Faker::Name.last_name,
email: Faker::Internet.email
}
ES.client.create index: 'people',
type: 'person',
body: person
puts "Job processed by worker: #{hostname}"
end
def hostname
@hostname ||= `hostname`.strip
end
end
end
Worker.new.start
I create a ruby-based Dockerfile for the producer and workers, file: Dockerfile
FROM ruby:2.5.3-alpine3.8
RUN apk add --no-cache --update build-base
RUN echo 'gem: --no-document' > ~/.gemrc
RUN gem install bundler
ENV APP_HOME /app/
COPY Gemfile Gemfile.lock $APP_HOME
WORKDIR $APP_HOME
RUN bundle install
COPY *.rb $APP_HOME
I used docker compose to create a cluster of services. I implemented a deploy/replicas configuration to spin up 10 worker apps to distribute the load. file: docker-compose.yml
version: '3'
services:
rabbitmq:
image: rabbitmq:latest
ports:
- "5672:5672"
app_producer:
build:
context: .
dockerfile: Dockerfile
command: /app/producer.rb
environment:
- ELASTICSEARCH_HOST=elasticsearch
- RABBITMQ_HOST=rabbitmq
- QUEUE_NAME=worker_queue
depends_on:
- elasticsearch
- rabbitmq
app_worker:
build:
context: .
dockerfile: Dockerfile
command: /app/worker.rb
deploy:
mode: replicated
replicas: 10
environment:
- ELASTICSEARCH_HOST=elasticsearch
- RABBITMQ_HOST=rabbitmq
- QUEUE_NAME=worker_queue
depends_on:
- elasticsearch
- rabbitmq
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:6.5.2
container_name: elasticsearch
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- esdata:/usr/share/elasticsearch/data
ports:
- 9200:9200
volumes:
esdata:
driver: local
Here are the commands I executed to run the apps and verify the results:
# build and start docker container
docker-compose build && docker-compose --compatibility up
# review docker containers
# NOTE: as expected, the producer container exited after queuing all the tasks
docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8d0e832f2deb rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_10
5420e004c788 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_4
3d0d70b04310 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_5
e2bc549a2cbb rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_7
11445b7c6295 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_3
5a27d37015c4 rabbitmq_app_producer "/app/producer.rb" About a minute ago Exited (0) 49 seconds ago rabbitmq_app_producer_1
a51bcb127e76 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_2
42bfd224e65e rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_9
9307e547454b rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_1
d49337e9c5a8 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_8
ebf2e23a8736 rabbitmq_app_worker "/app/worker.rb" About a minute ago Up About a minute rabbitmq_app_worker_6
788244fb1620 rabbitmq:latest "docker-entrypoint.s…" About a minute ago Up About a minute 4369/tcp, 5671/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp rabbitmq_rabbitmq_1
ec59a0e8f744 docker.elastic.co/elasticsearch/elasticsearch:6.5.2 "/usr/local/bin/dock…" About a minute ago Up About a minute 0.0.0.0:9200->9200/tcp, 9300/tcp elasticsearch
# query elasticsearch
curl 'http://localhost:9200/people/_search?pretty&size=1'
{
"took" : 43,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 1000,
"max_score" : 1.0,
"hits" : [
{
"_index" : "people",
"_type" : "person",
"_id" : "ZmiIjmcBxEAUMv2VhCO2",
"_score" : 1.0,
"_source" : {
"first_name" : "Eric",
"last_name" : "London",
"email" : "eric@example.com"
}
}
]
}
}