JRuby thread pool concurrency example to pull data from ElasticSearch
I realized that I haven’t posted much about JRuby on my blog, so I thought I’d share some code to demonstrate some JRuby code that utilizes the Java java.util.concurrent classes. The “ESData” class is used to connect to ElasticSearch for a set of indexes, and uses a native Java thread pool to fetch the data from each index and write it to a separate file. The inner class “ESWorker” is used to interface with Java’s Callable class and sets up the thread pool.
#!/usr/bin/env jruby
require 'open-uri'
require 'json'
require 'thread_safe'
require 'logger'
java_import java.util.concurrent.Executors
class ESData
def initialize(options={})
# defaults
@es_host = 'localhost'
@es_port = '9200'
@es_index = ''
@out_dir = './out'
@index_fetch_size = 100000
# process hash args
options.each {|name, value| instance_variable_set("@#{name}", value) }
@log = Logger.new STDOUT
# ensure out dir exists
Dir.mkdir(@out_dir) unless File.directory? @out_dir
# create a list of indexes to process
init_index_list
end
def init_index_list
@es_indices = get_index_list
raise "Nothing to do" if @es_indices.nil? || @es_indices.empty?
@es_indices_unprocessed = ThreadSafe::Array.new
@es_indices.each {|i| @es_indices_unprocessed << i}
end
# fetch index list from ElasticSearch
def get_index_list
url = "http://#{@es_host}:#{@es_port}/#{@es_index}/_stats"
response_body = open(url) {|f| f.read }
json_data = JSON.parse response_body
json_data['indices'].keys.sort
end
# fetch data from ElasticSearch index
def fetch_index_data(index)
url = "http://#{@es_host}:#{@es_port}/#{index}/_search?size=#{@index_fetch_size}"
begin
response_body = open(url) {|f| f.read }
json_data = JSON.parse response_body
json_data['hits']['hits']
rescue => e
@log.debug "EXCEPTION: #{e}: #{e.backtrace}"
end
end
# write index data to a file
def write_index_data_to_file(index, hits)
file_path = "#{@out_dir}/#{index}"
File.open(file_path, "w") do |f|
hits.each {|data| f.puts data }
end
end
# grabs an index to process, processes it.
def process_next_index
index = @es_indices_unprocessed.pop
return false if index.nil?
@log.debug "processing index: #{index}"
hits = fetch_index_data index
@log.debug "index #{index}: hits size: #{hits.size}"
write_index_data_to_file index, hits
@log.debug "index wrote: #{index}"
return true
end
# inner class used to interface with Callable
class ESWorker
include java.util.concurrent.Callable
def initialize(es_data)
@es_data = es_data
end
def call
@es_data.process_next_index
end
end
# main method to define a fixed thread pool and process all indexes
def work
executor = java.util.concurrent.Executors::newFixedThreadPool 10
while @es_indices_unprocessed.size > 0
executor.submit( ESWorker.new(self) )
break if executor.is_shutdown
end
executor.shutdown
end
end
# run it:
es_data = ESData.new es_index: 'logstash-2014.03.*'
es_data.work