Ruby Redis Pub/Sub Worker Queue

In this post I’ll share some Ruby code that uses Redis Pub/Sub and Redis lists to implement work queues. One list will contain strings of the task name to complete, and another will contain a JSON string of the completed task and the worker that completed it.

I first defined a RedisBase parent class that the workers and producer with inherit from. It contains all the Redis client methods. On initialize it creates a connection to Redis from environment variables. new file: redis_base.rb

require 'json'
require 'logger'
require 'redis'
require 'securerandom'

class RedisBase
  def initialize
    @uuid = SecureRandom.uuid
    @logger = Logger.new(STDOUT)

    @queue = ENV.fetch('WORK_QUEUE', 'work_queue')
    @processed = ENV.fetch('WORK_PROCESSED', 'work_processed')
    @channel = ENV.fetch('WORK_CHANNEL', 'work_channel')

    @redis_host = ENV.fetch('REDIS_HOST', 'localhost')
    @redis_port = ENV.fetch('REDIS_PORT', '6379').to_i

    @client = Redis.new(host: @redis_host, port: @redis_port)
  end

  protected

  def queue_list
    @client.lrange @queue, 0, -1
  end

  def processed_list
    @client.lrange @processed, 0, -1
  end

  def log(level, message)
    @logger.send(level, "#{@uuid}: #{message}")
  end

  def queue_work
    @client.rpush @queue, rand(1..5)
  end

  def publish_work
    @client.publish @channel, @queue
  end

  def next_task
    @client.lpop @queue
  end

  def complete_task(task)
    @client.rpush @processed, { worker: @uuid, task: task }.to_json
  end
end

I defined the producer class (RedisProducer) below. In a loop, it queues work by pushing a task into the work queue, and then publishes to the pub/sub channel to inform subscribers there is new work to complete. new file: producer.rb

#!/usr/bin/env ruby

require_relative 'redis_base'

class RedisProducer < RedisBase
  def start
    loop do
      queue_work
      publish_work
      sleep 0.25
    end
  end
end

RedisProducer.new.start

Next I defined the worker class (RedisWorker). On initialize, it creates a pub/sub client, checks if there is incomplete work to resume, and then subscribes to the pub/sub channel for new work tasks. new file: worker.rb

#!/usr/bin/env ruby

require_relative 'redis_base'

class RedisWorker < RedisBase
  def initialize
    super
    @pub_sub = Redis.new(host: @redis_host, port: @redis_port)
  end

  def start
    log :info, 'WORKER STARTED'
    resume_work
    subscribe
  end

  protected

  def resume_work
    log :info, 'WORKER RESUME: START'
    has_work = false
    begin
      result = check_work
      has_work = true if result
    end while has_work
    log :info, 'WORKER RESUME: END'
  end

  def check_work
    task = next_task
    return false unless task

    work(task)
    true
  end

  def subscribe
    @pub_sub.subscribe(@channel) do |on|
      on.subscribe do |channel, _subscriptions|
        log :info, "WORKER SUBSCRIBED TO: #{channel}"
      end

      on.message do |channel, message|
        check_work if channel == @channel && message == @queue
      end
    end
  end

  def work(task)
    log :info, "TASK START: #{task}"

    # TODO: do actual work here
    sleep(task.to_i)

    complete_task(task)
    log :info, "TASK END: #{task}"
  rescue StandardError => e
    # TODO: put task back in work queue
    log :error, e
  end
end

RedisWorker.new.start

I created a monitor script to show queued tasks and the completed tasks for each worker. new file: monitor.rb

#!/usr/bin/env ruby

require_relative 'redis_base'

class RedisClient < RedisBase
  def start
    loop do
      puts `clear` + output
      sleep 1
    end
  end

  protected

  def output
    output = ''
    output << output_from(:queue)
    output << output_from(:processed)
  end

  def output_from(kind)
    data = send("#{kind}_grouped")
    output = "#{kind.capitalize}:\n"
    data.each { |k, v| output << "\t#{k}\t#{v}\n" }
    output << "\n"
  end

  def queue_grouped
    grouped = queue_list.each_with_object(Hash.new(0)) { |task, hsh| hsh[task] += 1 }
    Hash[grouped.sort_by { |k, _v| k }]
  end

  def processed_grouped
    grouped = processed_list.each_with_object({}) do |jsn, hsh|
      item = JSON.parse(jsn)
      worker_uuid = item['worker']
      hsh[worker_uuid] = 0 unless hsh.key?(worker_uuid)
      hsh[worker_uuid] += 1
    end
    Hash[grouped.sort_by { |k, _v| k }]
  end
end

RedisClient.new.start

Here is a Dockerfile definition to run the workers and producer Ruby code, new file: Dockerfile

FROM ruby:2.5.3-alpine3.8

RUN apk add --no-cache --update build-base

RUN echo 'gem: --no-document' > ~/.gemrc
RUN gem install bundler

ENV APP_HOME /app/
COPY Gemfile Gemfile.lock $APP_HOME
WORKDIR $APP_HOME
RUN bundle install
COPY *.rb $APP_HOME

I defined a docker compose file to start Redis, a producer, and 10 workers. new file: docker-compose.yml

version: '3'
services:

  redis:
    image: redis:latest
    ports:
      - "6379:6379"

  producer:
    build:
      context: .
      dockerfile: Dockerfile
    command: /app/producer.rb
    environment:
      - REDIS_HOST=redis
    depends_on:
      - redis

  worker:
    build:
      context: .
      dockerfile: Dockerfile
    command: /app/worker.rb
    deploy:
      mode: replicated
      replicas: 10
    environment:
      - REDIS_HOST=redis
    depends_on:
      - redis

I started the docker containers via compose and then executed the monitor script inside the producer container to show the results.

# start containers
$ docker-compose build && docker-compose --compatibility up

# view containers
$ docker ps
CONTAINER ID        IMAGE                   COMMAND                  CREATED             STATUS              PORTS                    NAMES
55bb702657fc        redis-worker_worker     "/app/worker.rb"         3 minutes ago       Up 3 minutes                                 redis-worker_worker_9
8bdbb033e247        redis-worker_worker     "/app/worker.rb"         3 minutes ago       Up 3 minutes                                 redis-worker_worker_6
256a165d1897        redis-worker_worker     "/app/worker.rb"         3 minutes ago       Up 3 minutes                                 redis-worker_worker_7
b5f2256d839b        redis-worker_worker     "/app/worker.rb"         3 minutes ago       Up 3 minutes                                 redis-worker_worker_2
9d6b6646c481        redis-worker_worker     "/app/worker.rb"         3 minutes ago       Up 3 minutes                                 redis-worker_worker_8
68b4fd1ac5e3        redis-worker_worker     "/app/worker.rb"         3 minutes ago       Up 3 minutes                                 redis-worker_worker_4
185da4e137df        redis-worker_worker     "/app/worker.rb"         3 minutes ago       Up 3 minutes                                 redis-worker_worker_10
a0eb6d2f82a7        redis-worker_producer   "/app/producer.rb"       3 minutes ago       Up 3 minutes                                 redis-worker_producer_1
d0b18174daa6        redis-worker_worker     "/app/worker.rb"         3 minutes ago       Up 3 minutes                                 redis-worker_worker_1
9405439235da        redis-worker_worker     "/app/worker.rb"         3 minutes ago       Up 3 minutes                                 redis-worker_worker_3
9904f7ab16af        redis-worker_worker     "/app/worker.rb"         3 minutes ago       Up 3 minutes                                 redis-worker_worker_5
657d2386da40        redis:latest            "docker-entrypoint.s…"   3 minutes ago       Up 3 minutes        0.0.0.0:6379->6379/tcp   redis-worker_redis_1

# start monitor script
$ docker exec -it a0eb6d2f82a7 /app/monitor.rb

# example output
Queue:
	1	19
	2	30
	3	21
	4	24
	5	25

Processed:
	079f5841-e0fa-46da-83a1-3e9ad0ed1816	61
	2795fd22-6ccb-4a4e-9489-3d7fb2d42dca	60
	430f5d00-8f84-46fb-9fbe-0a9d5ebea8a5	67
	4b70afa3-d77a-4f25-94f2-f2d3e281b578	60
	5aae5ae8-4124-4df6-976c-cf9ddafdf240	62
	7466716d-0306-48b4-bd2f-5d93778daf81	64
	795d07fd-c45a-4a0a-8903-a22329ac52ec	65
	9874b347-16cb-4f3e-87ed-6a54ae86e926	57
	adb258a5-ba05-44dd-a1a3-c6e9ebfd3c1b	58
	fe4f78bf-7dff-4302-ba67-ca3fe578d9f1	62

Source code on GitHub

Updated: