Export data from Postgresql as JSON and import into Elasticsearch

In this post I’ll demo some code that exports data from Postgresql as JSON and imports it into Elasticsearch using elasticsearch-dump. I chose to use nodejs for simple data manipulation scripts.

Step 1: Setup

# set nvm version
echo v8.11.1 > .nvmrc
nvm use .

# init npm
npm init

# add packages
npm install --save elasticdump
npm install --save faker

Step 2: Database setup

For demo purposes I decided to use a table schema that had an array datatype column. I used a table previously created via Rails:

class CreateUsers < ActiveRecord::Migration[5.2]
  def change
    create_table :users do |t|
      t.string :email
      t.string :first_name
      t.string :last_name
      t.string :products, array: true

      t.timestamps
    end

    add_index :users, :products, using: 'gin'
  end
end

To create this table outside of Rails, I could have executed the following SQL:

-- Table: public.users

-- DROP TABLE public.users;

CREATE TABLE public.users
(
    id bigint NOT NULL DEFAULT nextval('users_id_seq'::regclass),
    email character varying COLLATE pg_catalog."default",
    first_name character varying COLLATE pg_catalog."default",
    last_name character varying COLLATE pg_catalog."default",
    products character varying[] COLLATE pg_catalog."default",
    created_at timestamp without time zone NOT NULL,
    updated_at timestamp without time zone NOT NULL,
    CONSTRAINT users_pkey PRIMARY KEY (id)
)
WITH (
    OIDS = FALSE
)
TABLESPACE pg_default;

ALTER TABLE public.users
    OWNER to eric;

-- Index: index_users_on_products

-- DROP INDEX public.index_users_on_products;

CREATE INDEX index_users_on_products
    ON public.users USING gin
    (products COLLATE pg_catalog."default")
    TABLESPACE pg_default;

Step 3: Create Postgresql import data

Next I executed a node script to create a CSV file with a million records. I used the faker npm package for randomized data.

const fs = require('fs');
const stream = fs.createWriteStream("input.csv");
const faker = require('faker');

let lineCount = 1000000
let currentDate = new Date().toISOString().split('T')[0]

stream.once('open', (fd) => {

  for(var i = 0; i < lineCount; i++) {

    // NOTE: simple string concatenation does not escape CSV properly, just for demo/development purposes.
    let productString =
      '"{' +
      [faker.commerce.productName(), faker.commerce.productName(), faker.commerce.productName()].map(p => '"' + p + '"').join(",")
      + '}"'

    let data = [
      faker.internet.email(),
      faker.name.firstName(),
      faker.name.lastName(),
      productString,
      currentDate,
      currentDate
    ]

    let line = data.join(",") + "\n"
    stream.write(line)
  }

  stream.end();
});

Executed via:

# run script
node create-csv.js

# review results
head -1 input.csv
Eric.London@example.com,Eric,London,"{"Tasty Frozen Bacon","Small Granite Chips","Awesome Metal Shirt"}",2018-05-12,2018-05-12

Step 4: Import CSV data into Postgresql

I imported this data into Postgresql via:

# import
psql -d pg-export-es-import_development -c "COPY users(email,first_name,last_name,products,created_at,updated_at) from '/some/path/to/input.csv' CSV"

# check row count
psql -d pg-export-es-import_development -c "select count(*) from users"
#  count
#---------
# 1000000
#(1 row)

# review results
psql -d pg-export-es-import_development -c "select * from users limit 1"
# id |          email           | first_name | last_name |                              products                              |     created_at      |     updated_at
#----+--------------------------+------------+-----------+--------------------------------------------------------------------+---------------------+---------------------
#  1 | Eric.London@example.com  | Eric       | London    | {"Tasty Frozen Bacon","Small Granite Chips","Awesome Metal Shirt"} | 2018-05-12 00:00:00 | 2018-05-12 00:00:00
#(1 row)

Step 5: Export data from Postgresql as JSON

At this point, I was ready to complete the actual goal of this post: exporting data from Postgresql as JSON. Postgresql provides an aggregate function called json_agg which can format your result in JSON. Elasticdump expects 1 JSON object per line, and json_agg() outputs the entire result-set as one JSON array of objects, so I used sed to clean up the output.

# export from postgresql as json
# sed 1: remove opening JSON array bracket
# sed 2: remove empty space from beginning of each line
# sed 3: remove empty space from ending of each line
# sed 4: remove trailing comma from end of each line
# sed 5: remove closing JSON array bracket
psql -t -A -d pg-export-es-import_development -c "select json_agg(u) from ( select id, email, first_name, last_name, products from users ) u" | sed 's/^\[//' | sed 's/^[[:blank:]]//' | sed 's/[[:blank:]]$//' | sed 's/,$//' | sed 's/]$//' > export.json

# check results
head -1 export.json
{"id":1,"email":"Eric.London@example.com","first_name":"Eric","last_name":"London","products":["Tasty Frozen Bacon","Small Granite Chips","Awesome Metal Shirt"]}

# check line count of file
wc -l export.json
 1000000 export.json

Step 6: Format JSON data per Elasticdump

Elasticdump expects the JSON data to be formatted as an Elasticsearch document with index/type/source, so I executed another script to reformat the JSON data.

const fs = require('fs')
const readline = require('readline')

let inputFile = fs.createReadStream('export.json')
let lineReader = readline.createInterface({
  input: inputFile
});

let writeStream = fs.createWriteStream("formatted.json", {flags: 'a'})

lineReader.on('line', function (line) {
  let jsonData = JSON.parse(line)

  let obj = {
    _index: 'people',
    _type: 'person',
    _id: jsonData.id,
    _source: jsonData
  }

  let newLine = JSON.stringify(obj) + "\n"
  writeStream.write(newLine)
});

Executed:

# run script
node format-json.js

# check results
head -1 formatted.json
{"_index":"people","_type":"person","_id":1,"_source":{"id":1,"email":"Eric.London@example.com","first_name":"Eric","last_name":"London","products":["Tasty Frozen Bacon","Small Granite Chips","Awesome Metal Shirt"]}}

Step 7: Import JSON data into Elasticsearch

# import
node_modules/elasticdump/bin/elasticdump --input=formatted.json --output=http://localhost:9200/

# check index mapping:
curl 'http://localhost:9200/people/_mapping?pretty'
{
  "people" : {
    "mappings" : {
      "person" : {
        "properties" : {
          "email" : {
            "type" : "string"
          },
          "first_name" : {
            "type" : "string"
          },
          "id" : {
            "type" : "long"
          },
          "last_name" : {
            "type" : "string"
          },
          "products" : {
            "type" : "string"
          }
        }
      }
    }
  }
}

# check sample data
curl 'http://localhost:9200/people/_search?size=1&pretty'
{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1000000,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "people",
      "_type" : "person",
      "_id" : "1517",
      "_score" : 1.0,
      "_source" : {
        "id" : 1,
        "email" : "Eric.London@example.com",
        "first_name" : "Eric",
        "last_name" : "London",
        "products" : [ "Tasty Wooden Chips", "Sleek Steel Pizza", "Licensed Concrete Chair" ]
      }
    } ]
  }
}

Nice.

Updated: