JRuby: Bulk index Rails model data into Elasticsearch via Sidekiq (Redis queue)

In this post, I'll share some code to bulk index Rails models into Elasticsearch using Sidekiq. Sidekiq uses a Redis queue to allow for asyncronous jobs. I used JRuby to take advantage of all my CPU cores.

I installed MySQL, Elasticsearch, and Redis via Homebrew. Install as necessary:

# install/start elasticsearch
brew install elasticsearch
elasticsearch --config=/usr/local/opt/elasticsearch/config/elasticsearch.yml

# install/start redis
brew install redis
redis-server /usr/local/etc/redis.conf

I setup a basic Rails project with a single model.

# setup RVM, if you want..
mkdir rails_elasticsearch_sidekiq
echo "jruby-1.7.16.1" > rails_elasticsearch_sidekiq/.ruby-version
echo rails_elasticsearch_sidekiq > rails_elasticsearch_sidekiq/.ruby-gemset
cd rails_elasticsearch_sidekiq

# install rails
gem install rails
rails new . -d mysql

# create/setup database
rake db:create
rake db:migrate

# added "Thing" model, with a single attribute "title"
rails g model Thing title:string
rake db:migrate

Before bulk indexing the data into Elasticsearch, I needed to populate the Thing model with data (~1 million records). In line with this post I decided to use Sidekiq to populate the model.

Added Sidekiq gem, and Sinatra for stand-alone monitoring. Edited file: Gemfile, added:

gem 'sidekiq'
gem 'sinatra'

Installed new gems:

bundle install

Created a Sidekiq/Sinatra stand-alone monitor app via rake task. new file: lib/tasks/sidekiq.rake

namespace :sidekiq do

  desc "Start Sidekiq Monitor (standalone)"
  task :monitor => :environment do

    require 'sidekiq/web'
    app = Sidekiq::Web
    app.set :environment, Rails.env.to_sym
    app.set :bind, '0.0.0.0'
    app.set :port, 9494
    app.run!

  end

end

Started Sidekiq/Sinatra monitor. Accessible at: http://localhost:9494

rake sidekiq:monitor

Created a new folder for Sidekiq workers: app/workers

Added a new Sidekiq worker to create Things. new file: app/workers/thing_creator_worker.rb

require 'securerandom'

class ThingCreatorWorker
  include Sidekiq::Worker

  sidekiq_options queue: 'elasticsearch', retry: true

  def perform
    Thing.create!({title: SecureRandom.hex})
  end

end

Added a rake task to queue the jobs to create Things. Edit file: lib/tasks/sidekiq.rake

namespace :sidekiq do

  # ..snip..

  desc "Create Things via Sidekiq"
  task :create_things => :environment do
    1000000.times { ThingCreatorWorker.perform_async }
  end

end

Executed the rake task:

rake sidekiq:create_things

Start Sidekiq worker. Add "--verbose" flag to help troubleshoot issues. I started a few workers until all my CPU cores were pegged at 100%.

sidekiq --queue elasticsearch

Progress can be seen via Sidekiq/Sinatra monitor: http://localhost:9494

When complete verify results via rails console:

rails c
[2] pry(main)> Thing.all.size
   (198.0ms)  SELECT COUNT(*) FROM `things`
=> 1000000

At this point we're ready to bulk index data into Elasticsearch. Add gems to Gemfile:

gem 'elasticsearch-model', git: 'git://github.com/elasticsearch/elasticsearch-rails.git'
gem 'elasticsearch-rails', git: 'git://github.com/elasticsearch/elasticsearch-rails.git'

Install gems

bundle install

Integrate Thing model with Elasticsearch. edit file: app/models/thing.rb

require 'elasticsearch/model'

class Thing < ActiveRecord::Base
  include Elasticsearch::Model

  def as_indexed_json(options={})
    self.as_json
  end

end

Created sidekiq indexer worker. new file: app/workers/thing_indexer_worker.rb

class ThingIndexerWorker
  include Sidekiq::Worker

  sidekiq_options queue: 'elasticsearch', retry: true

  Logger = Sidekiq.logger.level == Logger::DEBUG ? Sidekiq.logger : nil
  Client = Elasticsearch::Client.new host: '127.0.0.1:9200', logger: Logger

  def perform(id)
    thing = Thing.find(id)
    Client.index index: Thing.index_name, type: Thing.index_name.singularize, id: thing.id, body: thing.as_indexed_json
  end

end

Created another rake task to load all the Thing [primary keys] and index to Elasticsearch. edit file: lib/tasks/sidekiq.rake

namespace :sidekiq do

  # ..snip..

  desc "Index Things into Elasticsearch"
  task :index_things => :environment do
    Thing.all.pluck( Thing.primary_key ).each {|id| ThingIndexerWorker.perform_async(id) }
  end

end

Execute the rake task to index the model data into Elasticsearch:

rake sidekiq:index_things

[re-]Start Sidekiq workers:

sidekiq --queue elasticsearch

Verify results in Elasticsearch via rails console:

[10] pry(main)> Thing.search('*').results.total
=> 1000000

Or via cURL:

curl -XGET 'http://127.0.0.1:9200/things/_search?search_type=count&pretty' -d '{"query":{"match_all":{}}}'
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1000000,
    "max_score" : 0.0,
    "hits" : [ ]
  }
}

Screenshot of Sinatra/Sidekiq monitor with all CPUs pegged:

Sidekiq Elasticsearch Indexing