Terraforming S3 bucket notification, AWS NodeJS Lambda to fetch metadata, SNS publishing, and filtered SQS subscription policy

In this post, I’ll share some Terraform code which provisions a AWS S3 bucket for file uploads, a S3 bucket notification to trigger an AWS Lambda NodeJS script to fetch S3 metadata and push to a AWS SNS topic, and a AWS SQS queue with a filtered topic subscription. This can be useful if you need S3 bucket notifications to fanout to different SQS queues based on the S3 metadata or path.

Initial project setup

# set nodejs version, used by nvmrc for local execution
echo 8.10.0 > .nvmrc

# set terraform version, used by tfenv
echo 0.11.8 > .terraform-version

I created a Terraform file to setup the backend S3 state configuration and AWS provider version, new file: main.tf

terraform {
  required_version = "0.11.8"

  backend "s3" {
    bucket  = ""
    key     = ""
    profile = ""
    region  = ""
  }
}

provider "aws" {
  profile                 = "${var.aws_profile}"
  region                  = "${var.aws_region}"
  shared_credentials_file = "${var.aws_credentials_file}"
  version                 = "1.37.0"
}

data "aws_caller_identity" "current" {}

Terraform file for configurable parameters, variables.tf

variable "aws_region" {
  type    = "string"
  default = "us-east-1"
}

variable "aws_profile" {
  type    = "string"
  default = ""
}

variable "aws_credentials_file" {
  type    = "string"
  default = "~/.aws/credentials"
}

variable "s3_bucket_name" {
  type    = "string"
  default = ""
}

variable "sns_topic_name" {
  type    = "string"
  default = ""
}

variable "sqs_queue_name" {
  type    = "string"
  default = ""
}

Terraform S3 bucket and bucket notification (to lambda), s3.tf

resource "aws_s3_bucket" "s3_bucket" {
  bucket = "${var.s3_bucket_name}"
  acl    = "private"
  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm = "AES256"
      }
    }
  }
}

resource "aws_s3_bucket_notification" "lambda_bucket_notification" {
  bucket = "${aws_s3_bucket.s3_bucket.id}"

  lambda_function {
    lambda_function_arn = "${aws_lambda_function.meta_lambda.arn}"
    events              = ["s3:ObjectCreated:*"]
    # filter_prefix       = "some-path/"
    # filter_suffix       = "*.csv"
  }
}

This following file defines the lambda resources, its IAM role, policy, and permissions. file: lambda.tf

resource "aws_lambda_function" "meta_lambda" {
  filename         = "meta_lambda.zip"
  function_name    = "meta_lambda"
  role             = "${aws_iam_role.meta_lambda_role.arn}"
  handler          = "meta_lambda.handler"
  source_code_hash = "${data.archive_file.meta_lambda_zip.output_base64sha256}"
  runtime          = "nodejs8.10"
   environment {
    variables = {
      AWS_ACCOUNT_ID = "${data.aws_caller_identity.current.account_id}"
      SNS_TOPIC_NAME = "${var.sns_topic_name}"
    }
  }
}

data "archive_file" "meta_lambda_zip" {
  type        = "zip"
  source_file = "meta_lambda.js"
  output_path = "meta_lambda.zip"
}

resource "aws_iam_role" "meta_lambda_role" {
  name = "meta_lambda_role"
  assume_role_policy = "${data.aws_iam_policy_document.meta_lambda.json}"
}

data "aws_iam_policy_document" "meta_lambda" {
  statement {
    sid = ""
    effect = "Allow"
    actions = [
      "sts:AssumeRole"
    ]
    principals {
      type = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }
  }
}

resource "aws_lambda_permission" "lambda_allow_bucket" {
  statement_id  = "AllowExecutionFromS3Bucket"
  action        = "lambda:InvokeFunction"
  function_name = "${aws_lambda_function.meta_lambda.function_name}"
  principal     = "s3.amazonaws.com"
  source_arn    = "${aws_s3_bucket.s3_bucket.arn}"
}

resource "aws_iam_role_policy_attachment" "attach_lambda_role_policy" {
  role = "${aws_iam_role.meta_lambda_role.name}"
  policy_arn = "${aws_iam_policy.meta_lambda_policy.arn}"
}

resource "aws_iam_policy" "meta_lambda_policy" {
  name   = "meta_lambda_policy"
  policy = "${data.aws_iam_policy_document.meta_lambda_policy_document.json}"
}

data "aws_iam_policy_document" "meta_lambda_policy_document" {
  statement {
    sid = ""
    effect = "Allow"
    actions = [
      "logs:CreateLogGroup"
    ]
    resources = [
      "arn:aws:logs:${var.aws_region}:${data.aws_caller_identity.current.account_id}:*"
    ]
  }

  statement {
    sid = ""
    effect = "Allow"
    actions = [
      "logs:CreateLogStream",
      "logs:PutLogEvents"
    ]
    resources = [
      "arn:aws:logs:${var.aws_region}:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/${aws_lambda_function.meta_lambda.function_name}:*"
    ]
  }

  statement {
    sid = ""
    effect = "Allow"
    actions = [
      "s3:GetObject",
      "s3:ListBucket"
    ]
    resources = [
      "${aws_s3_bucket.s3_bucket.arn}",
      "${aws_s3_bucket.s3_bucket.arn}/*"
    ]
  }

  statement {
    sid = ""
    effect = "Allow"
    actions = [
      "SNS:Publish"
    ]
    resources = [
      "${aws_sns_topic.sns_topic.arn}"
    ]
  }
}

Below is the NodeJS lambda script. It pulls environment variables, defines the exports handler, receives the S3 bucket notification event, collects the metadata from the S3 object/file path, makes a S3 HEAD request to get the S3 metadata, and publishes to the SNS topic, file: meta_lambda.js

'use strict';

// ENV vars
const AWS_REGION_STRING = process.env.AWS_REGION || 'us-east-1';
const AWS_ACCOUNT_ID = process.env.AWS_ACCOUNT_ID;
const SNS_TOPIC_NAME = process.env.SNS_TOPIC_NAME;
const SNS_TOPIC_ARN = `arn:aws:sns:${AWS_REGION_STRING}:${AWS_ACCOUNT_ID}:${SNS_TOPIC_NAME}`;

const AWS = require('aws-sdk');
AWS.config.update({
  region: AWS_REGION_STRING
});

const s3 = new AWS.S3();
const sns = new AWS.SNS();

exports.handler = (message, context, callback) => {
  return main(message).then(function (result) {
    callback(null, result);
  }).catch(function (error) {
    callback(error);
  });
};

const main = async (notification) => {
  let record = notification.Records[0];
  let pathAttributes = getS3PathAttributes(record);
  let s3MetaData = await fetchS3MetaData(record);
  let metaData = {
    ...pathAttributes,
    ...s3MetaData
  };
  let messageAttributes = getMessageAttributes(metaData);
  let sendSnsResponse = await sendSns(record, messageAttributes);
  return sendSnsResponse.MessageId;
}

const getS3PathAttributes = function (record) {
  let attributes = {}

  try {
    attributes.bucket_name = record.s3.bucket.name;
    attributes.object_key = record.s3.object.key;
  } catch (error) {
    console.log(error);
  }

  return attributes;
}

const fetchS3MetaData = async (record) => {
  try {
    let params = {
      Bucket: record.s3.bucket.name,
      Key: record.s3.object.key
    }
    let response = await s3.headObject(params).promise();
    return response.Metadata;
  } catch (error) {
    console.log(error);
    return {};
  }
}

const getMessageAttributes = function (metaData) {
  let messageAttributes = {};
  Object.entries(metaData).forEach(
    ([key, value]) => {
      messageAttributes[key] = {
        DataType: 'String',
        StringValue: value
      }
    }
  );
  return messageAttributes;
}

const sendSns = async (record, messageAttributes) => {
  let params = {
    TopicArn: SNS_TOPIC_ARN,
    Message: JSON.stringify(record),
    MessageStructure: 'string',
    MessageAttributes: messageAttributes
  }
  return await sns.publish(params).promise();
}

Terraform SNS topic and filtered SQS subscription, file: sns.tf

resource "aws_sns_topic" "sns_topic" {
  name = "${var.sns_topic_name}"
}

resource "aws_sns_topic_subscription" "sqs_subscription" {
  topic_arn = "${aws_sns_topic.sns_topic.arn}"
  protocol  = "sqs"
  endpoint  = "${aws_sqs_queue.sqs_queue.arn}"

  filter_policy = <<EOF
  {
    "filter-by": ["this-filter-value"]
  }
  EOF
}

Terraform SQS queue and its IAM policy, file: sqs.tf

resource "aws_sqs_queue" "sqs_queue" {
  name   = "${var.sqs_queue_name}"
  policy = "${data.aws_iam_policy_document.sqs_queue_policy_document.json}"
}

data "aws_iam_policy_document" "sqs_queue_policy_document" {
  policy_id = "arn:aws:sqs:${var.aws_region}:${data.aws_caller_identity.current.account_id}:${var.sqs_queue_name}/SQSDefaultPolicy"

  statement {
    sid    = "sns-to-sqs"
    effect = "Allow"

    principals {
      type        = "AWS"
      identifiers = ["*"]
    }

    actions = [
      "SQS:SendMessage",
    ]

    resources = [
      "arn:aws:sqs:${var.aws_region}:${data.aws_caller_identity.current.account_id}:${var.sqs_queue_name}"
    ]

    condition {
      test     = "ArnEquals"
      variable = "aws:SourceArn"
      values   = [
        "arn:aws:sns:${var.aws_region}:${data.aws_caller_identity.current.account_id}:${var.sns_topic_name}"
      ]
    }
  }
}

I put my configuration variables in secrets.auto.tfvars

aws_region = "us-east-1"
aws_profile = "some-aws-profile-name"
s3_bucket_name = "some-s3-bucket-name"
sns_topic_name = "some-sns-topic-name"
sqs_queue_name = "some-sqs-queue-name"

Here is a BASH script to pass environment variables to the Terraform backend configuration, and execute Terraform init, plan, and apply. file: main.sh

#!/usr/bin/env bash

: "${AWS_PROFILE:=some-aws-profile-name}"
: "${AWS_REGION:=us-east-1}"
: "${STATE_BUCKET:=some-s3-state-bucket}"
: "${STATE_KEY:=some-s3-state-path/terraform/base.tfstate}"

action="$1"
  TFENV=$(which tfenv)
if [ $? -eq 0 ]; then
  $TFENV install $(cat .terraform-version)
  cat .terraform-version | xargs $TFENV use
fi

rm -f *.tfstate
rm -rf ./.terraform

terraform init \
  -force-copy \
  -backend=true \
  -backend-config "bucket=${STATE_BUCKET}" \
  -backend-config "key=${STATE_KEY}" \
  -backend-config "profile=${AWS_PROFILE}" \
  -backend-config "region=${AWS_REGION}"

terraform plan

if [ "$action" == "apply" ]; then
  terraform apply -auto-approve
fi

To test SQS queue delivery I created an SQS client file, add SQS NPM dependency:

nvm use
npm init
npm install aws-sdk --save

And created the NodeJS script, file: sqs-client.js

const AWS_REGION_STRING = process.env.AWS_REGION || 'us-east-1';
const AWS_ACCOUNT_ID = process.env.AWS_ACCOUNT_ID;
const SQS_QUEUE_NAME = process.env.SQS_QUEUE_NAME;

const AWS = require('aws-sdk');
AWS.config.update({
  region: AWS_REGION_STRING
});
const sqs = new AWS.SQS();

let params = {
  QueueUrl: `https://sqs.${AWS_REGION_STRING}.amazonaws.com/${AWS_ACCOUNT_ID}/${SQS_QUEUE_NAME}`
}

sqs.receiveMessage(params, function (err, response) {
  if (err) {
    console.log(err, err.stack);
    return;
  }

  try {
    let messageBody = response.Messages[0].Body;
    let notification = JSON.parse(messageBody);
    let messageAttributes = notification.MessageAttributes;
    let notificationMessage = JSON.parse(notification.Message);

    console.log('messageAttributes', JSON.stringify(messageAttributes, null, 2));
    console.log('notificationMessage.s3.bucket.name', notificationMessage.s3.bucket.name)
    console.log('notificationMessage.s3.object.key', notificationMessage.s3.object.key);
  } catch (error) {
    console.log("no messages yet");
  }
});

I executed the terraform apply script, pushed a file to S3 with metadata, and executed the SQL client script to E2E test this functionality:

# terraform resources
chmod +x main.tf
./main.tf apply

# put file on S3 with the nonmatching metadata
aws --profile some-aws-profile s3 cp example.csv s3://some-s3-bucket/some-path/ --metadata '{"filter-by":"wrong filter value"}'

# attempt to receive SQS messages
AWS_PROFILE=some-aws-profile AWS_REGION=us-east-1 AWS_ACCOUNT_ID=some-aws-account-id SQS_QUEUE_NAME=some-sqs-queue-name node sqs-client.js
# no results

# puts file on S3 with matching metadata
aws --profile some-aws-profile s3 cp example.csv s3://some-s3-bucket/some-path/ --metadata '{"filter-by":"this-filter-value"}'

# attempt to receive filteres SQS message:
AWS_PROFILE=some-aws-profile AWS_REGION=us-east-1 AWS_ACCOUNT_ID=some-aws-account-id SQS_QUEUE_NAME=some-sqs-queue-name node sqs-client.js

# successful output:
messageAttributes {
  "object_key": {
    "Type": "String",
    "Value": "some-path/example.csv"
  },
  "bucket_name": {
    "Type": "String",
    "Value": "some-s3-bucket"
  },
  "filter-by": {
    "Type": "String",
    "Value": "this-filter-value"
  }
}
notificationMessage.s3.bucket.name some-s3-bucket
notificationMessage.s3.object.key some-path/example.csv

Source code on Github

Updated: