In this post I’ll share some Ruby code that uses Redis Pub/Sub and Redis lists to implement work queues. One list will contain strings of the task name to complete, and another will contain a JSON string of the completed task and the worker that completed it.
I first defined a RedisBase parent class that the workers and producer with inherit from. It contains all the Redis client methods. On initialize it creates a connection to Redis from environment variables. new file: redis_base.rb
require 'json'
require 'logger'
require 'redis'
require 'securerandom'
class RedisBase
def initialize
@uuid = SecureRandom . uuid
@logger = Logger . new ( STDOUT )
@queue = ENV . fetch ( 'WORK_QUEUE' , 'work_queue' )
@processed = ENV . fetch ( 'WORK_PROCESSED' , 'work_processed' )
@channel = ENV . fetch ( 'WORK_CHANNEL' , 'work_channel' )
@redis_host = ENV . fetch ( 'REDIS_HOST' , 'localhost' )
@redis_port = ENV . fetch ( 'REDIS_PORT' , '6379' ). to_i
@client = Redis . new ( host: @redis_host , port: @redis_port )
end
protected
def queue_list
@client . lrange @queue , 0 , - 1
end
def processed_list
@client . lrange @processed , 0 , - 1
end
def log ( level , message )
@logger . send ( level , " #{ @uuid } : #{ message } " )
end
def queue_work
@client . rpush @queue , rand ( 1 .. 5 )
end
def publish_work
@client . publish @channel , @queue
end
def next_task
@client . lpop @queue
end
def complete_task ( task )
@client . rpush @processed , { worker: @uuid , task: task }. to_json
end
end
I defined the producer class (RedisProducer) below. In a loop, it queues work by pushing a task into the work queue, and then publishes to the pub/sub channel to inform subscribers there is new work to complete. new file: producer.rb
#!/usr/bin/env ruby
require_relative 'redis_base'
class RedisProducer < RedisBase
def start
loop do
queue_work
publish_work
sleep 0.25
end
end
end
RedisProducer . new . start
Next I defined the worker class (RedisWorker). On initialize, it creates a pub/sub client, checks if there is incomplete work to resume, and then subscribes to the pub/sub channel for new work tasks. new file: worker.rb
#!/usr/bin/env ruby
require_relative 'redis_base'
class RedisWorker < RedisBase
def initialize
super
@pub_sub = Redis . new ( host: @redis_host , port: @redis_port )
end
def start
log :info , 'WORKER STARTED'
resume_work
subscribe
end
protected
def resume_work
log :info , 'WORKER RESUME: START'
has_work = false
begin
result = check_work
has_work = true if result
end while has_work
log :info , 'WORKER RESUME: END'
end
def check_work
task = next_task
return false unless task
work ( task )
true
end
def subscribe
@pub_sub . subscribe ( @channel ) do | on |
on . subscribe do | channel , _subscriptions |
log :info , "WORKER SUBSCRIBED TO: #{ channel } "
end
on . message do | channel , message |
check_work if channel == @channel && message == @queue
end
end
end
def work ( task )
log :info , "TASK START: #{ task } "
# TODO: do actual work here
sleep ( task . to_i )
complete_task ( task )
log :info , "TASK END: #{ task } "
rescue StandardError => e
# TODO: put task back in work queue
log :error , e
end
end
RedisWorker . new . start
I created a monitor script to show queued tasks and the completed tasks for each worker. new file: monitor.rb
#!/usr/bin/env ruby
require_relative 'redis_base'
class RedisClient < RedisBase
def start
loop do
puts `clear` + output
sleep 1
end
end
protected
def output
output = ''
output << output_from ( :queue )
output << output_from ( :processed )
end
def output_from ( kind )
data = send ( " #{ kind } _grouped" )
output = " #{ kind . capitalize } : \n "
data . each { | k , v | output << " \t #{ k } \t #{ v } \n " }
output << " \n "
end
def queue_grouped
grouped = queue_list . each_with_object ( Hash . new ( 0 )) { | task , hsh | hsh [ task ] += 1 }
Hash [ grouped . sort_by { | k , _v | k }]
end
def processed_grouped
grouped = processed_list . each_with_object ({}) do | jsn , hsh |
item = JSON . parse ( jsn )
worker_uuid = item [ 'worker' ]
hsh [ worker_uuid ] = 0 unless hsh . key? ( worker_uuid )
hsh [ worker_uuid ] += 1
end
Hash [ grouped . sort_by { | k , _v | k }]
end
end
RedisClient . new . start
Here is a Dockerfile definition to run the workers and producer Ruby code, new file: Dockerfile
FROM ruby:2.5.3-alpine3.8
RUN apk add --no-cache --update build-base
RUN echo 'gem: --no-document' > ~/.gemrc
RUN gem install bundler
ENV APP_HOME /app/
COPY Gemfile Gemfile.lock $APP_HOME
WORKDIR $APP_HOME
RUN bundle install
COPY * .rb $APP_HOME
I defined a docker compose file to start Redis, a producer, and 10 workers. new file: docker-compose.yml
version : ' 3'
services :
redis :
image : redis:latest
ports :
- " 6379:6379"
producer :
build :
context : .
dockerfile : Dockerfile
command : /app/producer.rb
environment :
- REDIS_HOST=redis
depends_on :
- redis
worker :
build :
context : .
dockerfile : Dockerfile
command : /app/worker.rb
deploy :
mode : replicated
replicas : 10
environment :
- REDIS_HOST=redis
depends_on :
- redis
I started the docker containers via compose and then executed the monitor script inside the producer container to show the results.
# start containers
$ docker-compose build && docker-compose --compatibility up
# view containers
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
55bb702657fc redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_9
8bdbb033e247 redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_6
256a165d1897 redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_7
b5f2256d839b redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_2
9d6b6646c481 redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_8
68b4fd1ac5e3 redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_4
185da4e137df redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_10
a0eb6d2f82a7 redis-worker_producer "/app/producer.rb" 3 minutes ago Up 3 minutes redis-worker_producer_1
d0b18174daa6 redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_1
9405439235da redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_3
9904f7ab16af redis-worker_worker "/app/worker.rb" 3 minutes ago Up 3 minutes redis-worker_worker_5
657d2386da40 redis:latest "docker-entrypoint.s…" 3 minutes ago Up 3 minutes 0.0.0.0:6379->6379/tcp redis-worker_redis_1
# start monitor script
$ docker exec -it a0eb6d2f82a7 /app/monitor.rb
# example output
Queue:
1 19
2 30
3 21
4 24
5 25
Processed:
079f5841-e0fa-46da-83a1-3e9ad0ed1816 61
2795fd22-6ccb-4a4e-9489-3d7fb2d42dca 60
430f5d00-8f84-46fb-9fbe-0a9d5ebea8a5 67
4b70afa3-d77a-4f25-94f2-f2d3e281b578 60
5aae5ae8-4124-4df6-976c-cf9ddafdf240 62
7466716d-0306-48b4-bd2f-5d93778daf81 64
795d07fd-c45a-4a0a-8903-a22329ac52ec 65
9874b347-16cb-4f3e-87ed-6a54ae86e926 57
adb258a5-ba05-44dd-a1a3-c6e9ebfd3c1b 58
fe4f78bf-7dff-4302-ba67-ca3fe578d9f1 62
Source code on GitHub