We use Gearman at work so naturally I’ve been pondering simple alternatives. Obviously this code is not a 100% replacement for Gearman, but here’s a (super) simple worker/job-queue that uses Redis to track job queues, and pub/sub to notify the workers of new tasks to complete.
The main guts of the worker are defined in a simple module. file: lib/worker.rb
require 'redis'
require 'securerandom'
require 'json'
module Worker
# make module methods class methods
extend self
# each worker gets a "unique" id
WORKER_ID = SecureRandom . hex
# module container for worker methods
module WorkerMethods
end
# method used to define worker tasks
def add ( task_name , * args , & block )
raise "Block required" unless block_given?
WorkerMethods . define_singleton_method task_name , block
end
def work
# process existing tasks in list
while RedisWorker . got_tasks?
RedisWorker . do_next_task
end
# subscribe for new work
RedisWorker . subscribe
end
class RedisWorker
# connect to redis for non-pub/sub commands
@redis = Redis . new
def self . do_next_task ( task = nil )
# get task if not passed as argument
task = task_counts . delete_if { | k , v | v == 0 }. keys . sample if task . nil?
# pop task from redis list
json = @redis . lpop task
return if json . nil?
# parse the task
data = JSON . parse json
# debug output
puts "WORKER: #{ WORKER_ID } - #{ data } "
WorkerMethods . send task , data
end
# boolean if any tasks exist
def self . got_tasks?
total = task_counts . inject ( 0 ) { | sum , n | sum + n [ 1 ]}
return total > 0 ? true : false
end
# check redis list size for each worker task
def self . task_counts
WorkerMethods . singleton_methods . each_with_object ({}) { | task , hsh | hsh [ task ] = @redis . llen task }
end
# use redis pub/sub to subscribe to channel for new tasks
def self . subscribe
@channel = defined? ( CHANNEL ) ? CHANNEL : 'job_server'
@pubsub = Redis . new
@pubsub . subscribe ( @channel ) do | on |
on . message do | channel , msg |
# message
data = JSON . parse ( msg )
# process jobs this worker knows how to complete
if WorkerMethods . respond_to? data [ 'task' ]
do_next_task data [ 'task' ]
end
end
end
end
end
end
Workers can be created by including the worker module, and calling the add method with a block. file: worker_start.rb
#!/usr/bin/env ruby
CHANNEL = 'job_server'
$: . unshift File . dirname ( __FILE__ ) + '/lib'
require 'worker'
Worker . add :say_wee do | data |
# data
puts "wee"
end
Worker . add :say_oh_yah do | data |
# data
puts "oh yah"
end
Worker . work
Jobs can be queued like this:
#!/usr/bin/env ruby
require 'redis'
require 'json'
CHANNEL = 'job_server'
redis = Redis . new
tasks = [ 'say_wee' , 'say_oh_yah' ]
1000 . times do
task = tasks . sample
job = { task: tasks . sample }. to_json
redis . rpush task , job
redis . publish CHANNEL , job
end
To try it out:
# watch redis
redis-cli monitor
# start a few workers
./worker_start.rb &
./worker_start.rb &
./worker_start.rb &
# queue some jobs
./add_jobs.rb
Source code on GitHub