Ruby Redis Pub/Sub Worker Queue
In this post I’ll share some Ruby code that uses Redis Pub/Sub and Redis lists to implement work queues. One list will contain strings of the task name to complete, and another will contain a JSON string of the completed task and the worker that completed it.
I first defined a RedisBase parent class that the workers and producer with inherit from. It contains all the Redis client methods. On initialize it creates a connection to Redis from environment variables. new file: redis_base.rb
require 'json'
require 'logger'
require 'redis'
require 'securerandom'
class RedisBase
def initialize
@uuid = SecureRandom.uuid
@logger = Logger.new(STDOUT)
@queue = ENV.fetch('WORK_QUEUE', 'work_queue')
@processed = ENV.fetch('WORK_PROCESSED', 'work_processed')
@channel = ENV.fetch('WORK_CHANNEL', 'work_channel')
@redis_host = ENV.fetch('REDIS_HOST', 'localhost')
@redis_port = ENV.fetch('REDIS_PORT', '6379').to_i
@client = Redis.new(host: @redis_host, port: @redis_port)
end
protected
def queue_list
@client.lrange @queue, 0, -1
end
def processed_list
@client.lrange @processed, 0, -1
end
def log(level, message)
@logger.send(level, "#{@uuid}: #{message}")
end
def queue_work
@client.rpush @queue, rand(1..5)
end
def publish_work
@client.publish @channel, @queue
end
def next_task
@client.lpop @queue
end
def complete_task(task)
@client.rpush @processed, { worker: @uuid, task: task }.to_json
end
end
I defined the producer class (RedisProducer) below. In a loop, it queues work by pushing a task into the work queue, and then publishes to the pub/sub channel to inform subscribers there is new work to complete. new file: producer.rb
#!/usr/bin/env ruby
require_relative 'redis_base'
class RedisProducer < RedisBase
def start
loop do
queue_work
publish_work
sleep 0.25
end
end
end
RedisProducer.new.start
Next I defined the worker class (RedisWorker). On initialize, it creates a pub/sub client, checks if there is incomplete work to resume, and then subscribes to the pub/sub channel for new work tasks. new file: worker.rb
#!/usr/bin/env ruby
require_relative 'redis_base'
class RedisWorker < RedisBase
def initialize
super
@pub_sub = Redis.new(host: @redis_host, port: @redis_port)
end
def start
log :info, 'WORKER STARTED'
resume_work
subscribe
end
protected
def resume_work
log :info, 'WORKER RESUME: START'
has_work = false
begin
result = check_work
has_work = true if result
end while has_work
log :info, 'WORKER RESUME: END'
end
def check_work
task = next_task
return false unless task
work(task)
true
end
def subscribe
@pub_sub.subscribe(@channel) do |on|
on.subscribe do |channel, _subscriptions|
log :info, "WORKER SUBSCRIBED TO: #{channel}"
end
on.message do |channel, message|
check_work if channel == @channel && message == @queue
end
end
end
def work(task)
log :info, "TASK START: #{task}"
# TODO: do actual work here
sleep(task.to_i)
complete_task(task)
log :info, "TASK END: #{task}"
rescue StandardError => e
# TODO: put task back in work queue
log :error, e
end
end
RedisWorker.new.start
I created a monitor script to show queued tasks and the completed tasks for each worker. new file: monitor.rb
#!/usr/bin/env ruby
require_relative 'redis_base'
class RedisClient < RedisBase
def start
loop do
puts `clear` + output
sleep 1
end
end
protected
def output
output = ''
output << output_from(:queue)
output << output_from(:processed)
end
def output_from(kind)
data = send("#{kind}_grouped")
output = "#{kind.capitalize}:\n"
data.each { |k, v| output << "\t#{k}\t#{v}\n" }
output << "\n"
end
def queue_grouped
grouped = queue_list.each_with_object(Hash.new(0)) { |task, hsh| hsh[task] += 1 }
Hash[grouped.sort_by { |k, _v| k }]
end
def processed_grouped
grouped = processed_list.each_with_object({}) do |jsn, hsh|
item = JSON.parse(jsn)
worker_uuid = item['worker']
hsh[worker_uuid] = 0 unless hsh.key?(worker_uuid)
hsh[worker_uuid] += 1
end
Hash[grouped.sort_by { |k, _v| k }]
end
end
RedisClient.new.start
Here is a Dockerfile definition to run the workers and producer Ruby code, new file: Dockerfile
FROM ruby:2.5.3-alpine3.8
RUN apk add --no-cache --update build-base
RUN echo 'gem: --no-document' > ~/.gemrc
RUN gem install bundler
ENV APP_HOME /app/
COPY Gemfile Gemfile.lock $APP_HOME
WORKDIR $APP_HOME
RUN bundle install
COPY *.rb $APP_HOME
I defined a docker compose file to start Redis, a producer, and 10 workers. new file: docker-compose.yml
version: '3'
services:
redis:
image: redis:latest
ports:
- "6379:6379"
producer:
build:
context: .
dockerfile: Dockerfile
command: /app/producer.rb
environment:
- REDIS_HOST=redis
depends_on:
- redis
worker:
build:
context: .
dockerfile: Dockerfile
command: /app/worker.rb
deploy:
mode: replicated
replicas: 10
environment:
- REDIS_HOST=redis
depends_on:
- redis
I started the docker containers via compose and then executed the monitor script inside the producer container to show the results.
# start containers
$ docker-compose build && docker-compose --compatibility up
# view containers
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
55bb702657fc redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_9
8bdbb033e247 redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_6
256a165d1897 redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_7
b5f2256d839b redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_2
9d6b6646c481 redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_8
68b4fd1ac5e3 redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_4
185da4e137df redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_10
a0eb6d2f82a7 redis-worker_producer "/app/producer.rb" 3 minutes ago Up 3 minutes redis-worker_producer_1
d0b18174daa6 redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_1
9405439235da redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_3
9904f7ab16af redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_5
657d2386da40 redis:latest "docker-entrypoint.s…" 3 minutes ago Up 3 minutes 0.0.0.0:6379->6379/tcp redis-worker_redis_1
# start monitor script
$ docker exec -it a0eb6d2f82a7 /app/monitor.rb
# example output
Queue:
1 19
2 30
3 21
4 24
5 25
Processed:
079f5841-e0fa-46da-83a1-3e9ad0ed1816 61
2795fd22-6ccb-4a4e-9489-3d7fb2d42dca 60
430f5d00-8f84-46fb-9fbe-0a9d5ebea8a5 67
4b70afa3-d77a-4f25-94f2-f2d3e281b578 60
5aae5ae8-4124-4df6-976c-cf9ddafdf240 62
7466716d-0306-48b4-bd2f-5d93778daf81 64
795d07fd-c45a-4a0a-8903-a22329ac52ec 65
9874b347-16cb-4f3e-87ed-6a54ae86e926 57
adb258a5-ba05-44dd-a1a3-c6e9ebfd3c1b 58
fe4f78bf-7dff-4302-ba67-ca3fe578d9f1 62