JRuby: Bulk index Rails model data into Elasticsearch via Sidekiq (Redis queue)
In this post, I’ll share some code to bulk index Rails models into Elasticsearch using Sidekiq. Sidekiq uses a Redis queue to allow for asyncronous jobs. I used JRuby to take advantage of all my CPU cores.
I installed MySQL, Elasticsearch, and Redis via Homebrew. Install as necessary:
# install/start elasticsearch
brew install elasticsearch
elasticsearch --config=/usr/local/opt/elasticsearch/config/elasticsearch.yml
# install/start redis
brew install redis
redis-server /usr/local/etc/redis.conf
I setup a basic Rails project with a single model.
# setup RVM, if you want..
mkdir rails_elasticsearch_sidekiq
echo "jruby-1.7.16.1" > rails_elasticsearch_sidekiq/.ruby-version
echo rails_elasticsearch_sidekiq > rails_elasticsearch_sidekiq/.ruby-gemset
cd rails_elasticsearch_sidekiq
# install rails
gem install rails
rails new . -d mysql
# create/setup database
rake db:create
rake db:migrate
# added "Thing" model, with a single attribute "title"
rails g model Thing title:string
rake db:migrate
Before bulk indexing the data into Elasticsearch, I needed to populate the Thing model with data (~1 million records). In line with this post I decided to use Sidekiq to populate the model.
Added Sidekiq gem, and Sinatra for stand-alone monitoring. Edited file: Gemfile, added:
gem 'sidekiq'
gem 'sinatra'
Installed new gems:
bundle install
Created a Sidekiq/Sinatra stand-alone monitor app via rake task. new file: lib/tasks/sidekiq.rake
namespace :sidekiq do
desc "Start Sidekiq Monitor (standalone)"
task :monitor => :environment do
require 'sidekiq/web'
app = Sidekiq::Web
app.set :environment, Rails.env.to_sym
app.set :bind, '0.0.0.0'
app.set :port, 9494
app.run!
end
end
Started Sidekiq/Sinatra monitor. Accessible at: http://localhost:9494
rake sidekiq:monitor
Created a new folder for Sidekiq workers: app/workers
Added a new Sidekiq worker to create Things. new file: app/workers/thing_creator_worker.rb
require 'securerandom'
class ThingCreatorWorker
include Sidekiq::Worker
sidekiq_options queue: 'elasticsearch', retry: true
def perform
Thing.create!({title: SecureRandom.hex})
end
end
Added a rake task to queue the jobs to create Things. Edit file: lib/tasks/sidekiq.rake
namespace :sidekiq do
# ..snip..
desc "Create Things via Sidekiq"
task :create_things => :environment do
1000000.times { ThingCreatorWorker.perform_async }
end
end
Executed the rake task:
rake sidekiq:create_things
Start Sidekiq worker. Add “–verbose” flag to help troubleshoot issues. I started a few workers until all my CPU cores were pegged at 100%.
sidekiq --queue elasticsearch
Progress can be seen via Sidekiq/Sinatra monitor: http://localhost:9494
When complete verify results via rails console:
rails c
[2] pry(main)> Thing.all.size
(198.0ms) SELECT COUNT(*) FROM `things`
=> 1000000
At this point we’re ready to bulk index data into Elasticsearch. Add gems to Gemfile:
gem 'elasticsearch-model', git: 'git://github.com/elasticsearch/elasticsearch-rails.git'
gem 'elasticsearch-rails', git: 'git://github.com/elasticsearch/elasticsearch-rails.git'
Install gems
bundle install
Integrate Thing model with Elasticsearch. edit file: app/models/thing.rb
require 'elasticsearch/model'
class Thing < ActiveRecord::Base
include Elasticsearch::Model
def as_indexed_json(options={})
self.as_json
end
end
Created sidekiq indexer worker. new file: app/workers/thing_indexer_worker.rb
class ThingIndexerWorker
include Sidekiq::Worker
sidekiq_options queue: 'elasticsearch', retry: true
Logger = Sidekiq.logger.level == Logger::DEBUG ? Sidekiq.logger : nil
Client = Elasticsearch::Client.new host: '127.0.0.1:9200', logger: Logger
def perform(id)
thing = Thing.find(id)
Client.index index: Thing.index_name, type: Thing.index_name.singularize, id: thing.id, body: thing.as_indexed_json
end
end
Created another rake task to load all the Thing [primary keys] and index to Elasticsearch. edit file: lib/tasks/sidekiq.rake
namespace :sidekiq do
# ..snip..
desc "Index Things into Elasticsearch"
task :index_things => :environment do
Thing.all.pluck( Thing.primary_key ).each {|id| ThingIndexerWorker.perform_async(id) }
end
end
Execute the rake task to index the model data into Elasticsearch:
rake sidekiq:index_things
[re-]Start Sidekiq workers:
sidekiq --queue elasticsearch
Verify results in Elasticsearch via rails console:
[10] pry(main)> Thing.search('*').results.total
=> 1000000
Or via cURL:
curl -XGET 'http://127.0.0.1:9200/things/_search?search_type=count&pretty' -d '{"query":{"match_all":{}}}'
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 1000000,
"max_score" : 0.0,
"hits" : [ ]
}
}
Screenshot of Sinatra/Sidekiq monitor with all CPUs pegged: