Software engineer, data guy, Open Source enthusiast, New Hampshire resident, husband, father. Fan of guitars, hiking, photography, homebrewing, sarcasm.
Using Nifi to convert CSV to JSON and store in Elasticsearch
In this post I’ll share a Nifi workflow that takes in CSV files, converts them to JSON, and stores them in different Elasticsearch indexes based on the file schema. I created a JRuby ExecuteScript processor to use the header row of the CSV file as the JSON schema, and the filename to determine which index/type to use for each Elasticsearch document.
For this post I used homebrew on OSX, but you could use Docker, install from source, etc.
Example install:
My planned Nifi workflow:
Get a list of CSV files from a local directory
ExecuteScript processor to convert them to JSON, use the header row as the JSON schema, and set index/type attribute on each flow file
SplitJson processor to convert JSON array of objects to individual JSON records
PutElasticsearch processor to send to Elasticsearch, routed to index/type based on flow file attribute
Screenshot of workflow:
1. GetFile
Input directory: /nifi/in/
File filter: [^\.].*\.csv$
2. ExecuteScript
Script engine: ruby
Script file: /nifi/scripts/csv_to_json.rb
Contents of /nifi/scripts/csv_to_json.rb:
3. SplitJson
Converts JSON array of objects to individual objects
JsonPath Expression: $.*
4. PutElasticsearch
Cluster name: elasticsearch
ElasticSearch Hosts: localhost:9300
Identifier Attribute: uuid
Index: ${index_type}
Type: ${index_type}
Note: cluster name must match Elasticsearch server configuration, ex: