I realized that I haven’t posted much about JRuby on my blog, so I thought I’d share some code to demonstrate some JRuby code that utilizes the Java java.util.concurrent classes. The “ESData” class is used to connect to ElasticSearch for a set of indexes, and uses a native Java thread pool to fetch the data from each index and write it to a separate file. The inner class “ESWorker” is used to interface with Java’s Callable class and sets up the thread pool.
#!/usr/bin/env jruby
require 'open-uri'
require 'json'
require 'thread_safe'
require 'logger'
java_import java . util . concurrent . Executors
class ESData
def initialize ( options = {})
# defaults
@es_host = 'localhost'
@es_port = '9200'
@es_index = ''
@out_dir = './out'
@index_fetch_size = 100000
# process hash args
options . each { | name , value | instance_variable_set ( "@ #{ name } " , value ) }
@log = Logger . new STDOUT
# ensure out dir exists
Dir . mkdir ( @out_dir ) unless File . directory? @out_dir
# create a list of indexes to process
init_index_list
end
def init_index_list
@es_indices = get_index_list
raise "Nothing to do" if @es_indices . nil? || @es_indices . empty?
@es_indices_unprocessed = ThreadSafe :: Array . new
@es_indices . each { | i | @es_indices_unprocessed << i }
end
# fetch index list from ElasticSearch
def get_index_list
url = "http:// #{ @es_host } : #{ @es_port } / #{ @es_index } /_stats"
response_body = open ( url ) { | f | f . read }
json_data = JSON . parse response_body
json_data [ 'indices' ]. keys . sort
end
# fetch data from ElasticSearch index
def fetch_index_data ( index )
url = "http:// #{ @es_host } : #{ @es_port } / #{ index } /_search?size= #{ @index_fetch_size } "
begin
response_body = open ( url ) { | f | f . read }
json_data = JSON . parse response_body
json_data [ 'hits' ][ 'hits' ]
rescue => e
@log . debug "EXCEPTION: #{ e } : #{ e . backtrace } "
end
end
# write index data to a file
def write_index_data_to_file ( index , hits )
file_path = " #{ @out_dir } / #{ index } "
File . open ( file_path , "w" ) do | f |
hits . each { | data | f . puts data }
end
end
# grabs an index to process, processes it.
def process_next_index
index = @es_indices_unprocessed . pop
return false if index . nil?
@log . debug "processing index: #{ index } "
hits = fetch_index_data index
@log . debug "index #{ index } : hits size: #{ hits . size } "
write_index_data_to_file index , hits
@log . debug "index wrote: #{ index } "
return true
end
# inner class used to interface with Callable
class ESWorker
include java . util . concurrent . Callable
def initialize ( es_data )
@es_data = es_data
end
def call
@es_data . process_next_index
end
end
# main method to define a fixed thread pool and process all indexes
def work
executor = java . util . concurrent . Executors :: newFixedThreadPool 10
while @es_indices_unprocessed . size > 0
executor . submit ( ESWorker . new ( self ) )
break if executor . is_shutdown
end
executor . shutdown
end
end
# run it:
es_data = ESData . new es_index: 'logstash-2014.03.*'
es_data . work