JRuby thread pool concurrency example to pull data from ElasticSearch

I realized that I haven’t posted much about JRuby on my blog, so I thought I’d share some code to demonstrate some JRuby code that utilizes the Java java.util.concurrent classes. The “ESData” class is used to connect to ElasticSearch for a set of indexes, and uses a native Java thread pool to fetch the data from each index and write it to a separate file. The inner class “ESWorker” is used to interface with Java’s Callable class and sets up the thread pool.

#!/usr/bin/env jruby

require 'open-uri'
require 'json'
require 'thread_safe'
require 'logger'

java_import java.util.concurrent.Executors

class ESData

  def initialize(options={})

    # defaults
    @es_host = 'localhost'
    @es_port = '9200'
    @es_index = ''
    @out_dir = './out'
    @index_fetch_size = 100000

    # process hash args
    options.each {|name, value| instance_variable_set("@#{name}", value) }

    @log = Logger.new STDOUT

    # ensure out dir exists
    Dir.mkdir(@out_dir) unless File.directory? @out_dir

    # create a list of indexes to process
    init_index_list

  end

  def init_index_list
    @es_indices = get_index_list
    raise "Nothing to do" if @es_indices.nil? || @es_indices.empty?

    @es_indices_unprocessed = ThreadSafe::Array.new
    @es_indices.each {|i| @es_indices_unprocessed << i}
  end

  # fetch index list from ElasticSearch
  def get_index_list
    url = "http://#{@es_host}:#{@es_port}/#{@es_index}/_stats"
    response_body = open(url) {|f| f.read }
    json_data = JSON.parse response_body
    json_data['indices'].keys.sort
  end

  # fetch data from ElasticSearch index
  def fetch_index_data(index)
    url = "http://#{@es_host}:#{@es_port}/#{index}/_search?size=#{@index_fetch_size}"
    begin
      response_body = open(url) {|f| f.read }
      json_data = JSON.parse response_body
      json_data['hits']['hits']
    rescue => e
      @log.debug "EXCEPTION: #{e}: #{e.backtrace}"
    end
  end

  # write index data to a file
  def write_index_data_to_file(index, hits)
    file_path = "#{@out_dir}/#{index}"
    File.open(file_path, "w") do |f|
      hits.each {|data| f.puts data }
    end
  end

  # grabs an index to process, processes it.
  def process_next_index
    index = @es_indices_unprocessed.pop
    return false if index.nil?
    @log.debug "processing index: #{index}"
    hits = fetch_index_data index
    @log.debug "index #{index}: hits size: #{hits.size}"
    write_index_data_to_file index, hits
    @log.debug "index wrote: #{index}"
    return true
  end

  # inner class used to interface with Callable
  class ESWorker
    include java.util.concurrent.Callable

    def initialize(es_data)
      @es_data = es_data
    end

    def call
      @es_data.process_next_index
    end

  end

  # main method to define a fixed thread pool and process all indexes
  def work
    executor = java.util.concurrent.Executors::newFixedThreadPool 10
    while @es_indices_unprocessed.size > 0
      executor.submit( ESWorker.new(self) )
      break if executor.is_shutdown
    end
    executor.shutdown
  end

end

# run it:
es_data = ESData.new es_index: 'logstash-2014.03.*'
es_data.work

Updated: