Terraforming S3 bucket notification, AWS NodeJS Lambda to fetch metadata, SNS publishing, and filtered SQS subscription policy
In this post, I’ll share some Terraform code which provisions a AWS S3 bucket for file uploads, a S3 bucket notification to trigger an AWS Lambda NodeJS script to fetch S3 metadata and push to a AWS SNS topic, and a AWS SQS queue with a filtered topic subscription. This can be useful if you need S3 bucket notifications to fanout to different SQS queues based on the S3 metadata or path.
Initial project setup
# set nodejs version, used by nvmrc for local execution
echo 8.10.0 > .nvmrc
# set terraform version, used by tfenv
echo 0.11.8 > .terraform-version
I created a Terraform file to setup the backend S3 state configuration and AWS provider version, new file: main.tf
terraform {
required_version = "0.11.8"
backend "s3" {
bucket = ""
key = ""
profile = ""
region = ""
}
}
provider "aws" {
profile = "${var.aws_profile}"
region = "${var.aws_region}"
shared_credentials_file = "${var.aws_credentials_file}"
version = "1.37.0"
}
data "aws_caller_identity" "current" {}
Terraform file for configurable parameters, variables.tf
variable "aws_region" {
type = "string"
default = "us-east-1"
}
variable "aws_profile" {
type = "string"
default = ""
}
variable "aws_credentials_file" {
type = "string"
default = "~/.aws/credentials"
}
variable "s3_bucket_name" {
type = "string"
default = ""
}
variable "sns_topic_name" {
type = "string"
default = ""
}
variable "sqs_queue_name" {
type = "string"
default = ""
}
Terraform S3 bucket and bucket notification (to lambda), s3.tf
resource "aws_s3_bucket" "s3_bucket" {
bucket = "${var.s3_bucket_name}"
acl = "private"
server_side_encryption_configuration {
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
}
resource "aws_s3_bucket_notification" "lambda_bucket_notification" {
bucket = "${aws_s3_bucket.s3_bucket.id}"
lambda_function {
lambda_function_arn = "${aws_lambda_function.meta_lambda.arn}"
events = ["s3:ObjectCreated:*"]
# filter_prefix = "some-path/"
# filter_suffix = "*.csv"
}
}
This following file defines the lambda resources, its IAM role, policy, and permissions. file: lambda.tf
resource "aws_lambda_function" "meta_lambda" {
filename = "meta_lambda.zip"
function_name = "meta_lambda"
role = "${aws_iam_role.meta_lambda_role.arn}"
handler = "meta_lambda.handler"
source_code_hash = "${data.archive_file.meta_lambda_zip.output_base64sha256}"
runtime = "nodejs8.10"
environment {
variables = {
AWS_ACCOUNT_ID = "${data.aws_caller_identity.current.account_id}"
SNS_TOPIC_NAME = "${var.sns_topic_name}"
}
}
}
data "archive_file" "meta_lambda_zip" {
type = "zip"
source_file = "meta_lambda.js"
output_path = "meta_lambda.zip"
}
resource "aws_iam_role" "meta_lambda_role" {
name = "meta_lambda_role"
assume_role_policy = "${data.aws_iam_policy_document.meta_lambda.json}"
}
data "aws_iam_policy_document" "meta_lambda" {
statement {
sid = ""
effect = "Allow"
actions = [
"sts:AssumeRole"
]
principals {
type = "Service"
identifiers = ["lambda.amazonaws.com"]
}
}
}
resource "aws_lambda_permission" "lambda_allow_bucket" {
statement_id = "AllowExecutionFromS3Bucket"
action = "lambda:InvokeFunction"
function_name = "${aws_lambda_function.meta_lambda.function_name}"
principal = "s3.amazonaws.com"
source_arn = "${aws_s3_bucket.s3_bucket.arn}"
}
resource "aws_iam_role_policy_attachment" "attach_lambda_role_policy" {
role = "${aws_iam_role.meta_lambda_role.name}"
policy_arn = "${aws_iam_policy.meta_lambda_policy.arn}"
}
resource "aws_iam_policy" "meta_lambda_policy" {
name = "meta_lambda_policy"
policy = "${data.aws_iam_policy_document.meta_lambda_policy_document.json}"
}
data "aws_iam_policy_document" "meta_lambda_policy_document" {
statement {
sid = ""
effect = "Allow"
actions = [
"logs:CreateLogGroup"
]
resources = [
"arn:aws:logs:${var.aws_region}:${data.aws_caller_identity.current.account_id}:*"
]
}
statement {
sid = ""
effect = "Allow"
actions = [
"logs:CreateLogStream",
"logs:PutLogEvents"
]
resources = [
"arn:aws:logs:${var.aws_region}:${data.aws_caller_identity.current.account_id}:log-group:/aws/lambda/${aws_lambda_function.meta_lambda.function_name}:*"
]
}
statement {
sid = ""
effect = "Allow"
actions = [
"s3:GetObject",
"s3:ListBucket"
]
resources = [
"${aws_s3_bucket.s3_bucket.arn}",
"${aws_s3_bucket.s3_bucket.arn}/*"
]
}
statement {
sid = ""
effect = "Allow"
actions = [
"SNS:Publish"
]
resources = [
"${aws_sns_topic.sns_topic.arn}"
]
}
}
Below is the NodeJS lambda script. It pulls environment variables, defines the exports handler, receives the S3 bucket notification event, collects the metadata from the S3 object/file path, makes a S3 HEAD request to get the S3 metadata, and publishes to the SNS topic, file: meta_lambda.js
'use strict';
// ENV vars
const AWS_REGION_STRING = process.env.AWS_REGION || 'us-east-1';
const AWS_ACCOUNT_ID = process.env.AWS_ACCOUNT_ID;
const SNS_TOPIC_NAME = process.env.SNS_TOPIC_NAME;
const SNS_TOPIC_ARN = `arn:aws:sns:${AWS_REGION_STRING}:${AWS_ACCOUNT_ID}:${SNS_TOPIC_NAME}`;
const AWS = require('aws-sdk');
AWS.config.update({
region: AWS_REGION_STRING
});
const s3 = new AWS.S3();
const sns = new AWS.SNS();
exports.handler = (message, context, callback) => {
return main(message).then(function (result) {
callback(null, result);
}).catch(function (error) {
callback(error);
});
};
const main = async (notification) => {
let record = notification.Records[0];
let pathAttributes = getS3PathAttributes(record);
let s3MetaData = await fetchS3MetaData(record);
let metaData = {
...pathAttributes,
...s3MetaData
};
let messageAttributes = getMessageAttributes(metaData);
let sendSnsResponse = await sendSns(record, messageAttributes);
return sendSnsResponse.MessageId;
}
const getS3PathAttributes = function (record) {
let attributes = {}
try {
attributes.bucket_name = record.s3.bucket.name;
attributes.object_key = record.s3.object.key;
} catch (error) {
console.log(error);
}
return attributes;
}
const fetchS3MetaData = async (record) => {
try {
let params = {
Bucket: record.s3.bucket.name,
Key: record.s3.object.key
}
let response = await s3.headObject(params).promise();
return response.Metadata;
} catch (error) {
console.log(error);
return {};
}
}
const getMessageAttributes = function (metaData) {
let messageAttributes = {};
Object.entries(metaData).forEach(
([key, value]) => {
messageAttributes[key] = {
DataType: 'String',
StringValue: value
}
}
);
return messageAttributes;
}
const sendSns = async (record, messageAttributes) => {
let params = {
TopicArn: SNS_TOPIC_ARN,
Message: JSON.stringify(record),
MessageStructure: 'string',
MessageAttributes: messageAttributes
}
return await sns.publish(params).promise();
}
Terraform SNS topic and filtered SQS subscription, file: sns.tf
resource "aws_sns_topic" "sns_topic" {
name = "${var.sns_topic_name}"
}
resource "aws_sns_topic_subscription" "sqs_subscription" {
topic_arn = "${aws_sns_topic.sns_topic.arn}"
protocol = "sqs"
endpoint = "${aws_sqs_queue.sqs_queue.arn}"
filter_policy = <<EOF
{
"filter-by": ["this-filter-value"]
}
EOF
}
Terraform SQS queue and its IAM policy, file: sqs.tf
resource "aws_sqs_queue" "sqs_queue" {
name = "${var.sqs_queue_name}"
policy = "${data.aws_iam_policy_document.sqs_queue_policy_document.json}"
}
data "aws_iam_policy_document" "sqs_queue_policy_document" {
policy_id = "arn:aws:sqs:${var.aws_region}:${data.aws_caller_identity.current.account_id}:${var.sqs_queue_name}/SQSDefaultPolicy"
statement {
sid = "sns-to-sqs"
effect = "Allow"
principals {
type = "AWS"
identifiers = ["*"]
}
actions = [
"SQS:SendMessage",
]
resources = [
"arn:aws:sqs:${var.aws_region}:${data.aws_caller_identity.current.account_id}:${var.sqs_queue_name}"
]
condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [
"arn:aws:sns:${var.aws_region}:${data.aws_caller_identity.current.account_id}:${var.sns_topic_name}"
]
}
}
}
I put my configuration variables in secrets.auto.tfvars
aws_region = "us-east-1"
aws_profile = "some-aws-profile-name"
s3_bucket_name = "some-s3-bucket-name"
sns_topic_name = "some-sns-topic-name"
sqs_queue_name = "some-sqs-queue-name"
Here is a BASH script to pass environment variables to the Terraform backend configuration, and execute Terraform init, plan, and apply. file: main.sh
#!/usr/bin/env bash
: "${AWS_PROFILE:=some-aws-profile-name}"
: "${AWS_REGION:=us-east-1}"
: "${STATE_BUCKET:=some-s3-state-bucket}"
: "${STATE_KEY:=some-s3-state-path/terraform/base.tfstate}"
action="$1"
TFENV=$(which tfenv)
if [ $? -eq 0 ]; then
$TFENV install $(cat .terraform-version)
cat .terraform-version | xargs $TFENV use
fi
rm -f *.tfstate
rm -rf ./.terraform
terraform init \
-force-copy \
-backend=true \
-backend-config "bucket=${STATE_BUCKET}" \
-backend-config "key=${STATE_KEY}" \
-backend-config "profile=${AWS_PROFILE}" \
-backend-config "region=${AWS_REGION}"
terraform plan
if [ "$action" == "apply" ]; then
terraform apply -auto-approve
fi
To test SQS queue delivery I created an SQS client file, add SQS NPM dependency:
nvm use
npm init
npm install aws-sdk --save
And created the NodeJS script, file: sqs-client.js
const AWS_REGION_STRING = process.env.AWS_REGION || 'us-east-1';
const AWS_ACCOUNT_ID = process.env.AWS_ACCOUNT_ID;
const SQS_QUEUE_NAME = process.env.SQS_QUEUE_NAME;
const AWS = require('aws-sdk');
AWS.config.update({
region: AWS_REGION_STRING
});
const sqs = new AWS.SQS();
let params = {
QueueUrl: `https://sqs.${AWS_REGION_STRING}.amazonaws.com/${AWS_ACCOUNT_ID}/${SQS_QUEUE_NAME}`
}
sqs.receiveMessage(params, function (err, response) {
if (err) {
console.log(err, err.stack);
return;
}
try {
let messageBody = response.Messages[0].Body;
let notification = JSON.parse(messageBody);
let messageAttributes = notification.MessageAttributes;
let notificationMessage = JSON.parse(notification.Message);
console.log('messageAttributes', JSON.stringify(messageAttributes, null, 2));
console.log('notificationMessage.s3.bucket.name', notificationMessage.s3.bucket.name)
console.log('notificationMessage.s3.object.key', notificationMessage.s3.object.key);
} catch (error) {
console.log("no messages yet");
}
});
I executed the terraform apply script, pushed a file to S3 with metadata, and executed the SQL client script to E2E test this functionality:
# terraform resources
chmod +x main.tf
./main.tf apply
# put file on S3 with the nonmatching metadata
aws --profile some-aws-profile s3 cp example.csv s3://some-s3-bucket/some-path/ --metadata '{"filter-by":"wrong filter value"}'
# attempt to receive SQS messages
AWS_PROFILE=some-aws-profile AWS_REGION=us-east-1 AWS_ACCOUNT_ID=some-aws-account-id SQS_QUEUE_NAME=some-sqs-queue-name node sqs-client.js
# no results
# puts file on S3 with matching metadata
aws --profile some-aws-profile s3 cp example.csv s3://some-s3-bucket/some-path/ --metadata '{"filter-by":"this-filter-value"}'
# attempt to receive filteres SQS message:
AWS_PROFILE=some-aws-profile AWS_REGION=us-east-1 AWS_ACCOUNT_ID=some-aws-account-id SQS_QUEUE_NAME=some-sqs-queue-name node sqs-client.js
# successful output:
messageAttributes {
"object_key": {
"Type": "String",
"Value": "some-path/example.csv"
},
"bucket_name": {
"Type": "String",
"Value": "some-s3-bucket"
},
"filter-by": {
"Type": "String",
"Value": "this-filter-value"
}
}
notificationMessage.s3.bucket.name some-s3-bucket
notificationMessage.s3.object.key some-path/example.csv