In this post, I’ll share some Terraform code which provisions a AWS S3 bucket for file uploads, a S3 bucket notification to trigger an AWS Lambda NodeJS script to fetch S3 metadata and push to a AWS SNS topic, and a AWS SQS queue with a filtered topic subscription. This can be useful if you need S3 bucket notifications to fanout to different SQS queues based on the S3 metadata or path.
Initial project setup
# set nodejs version, used by nvmrc for local execution
echo 8.10.0 > .nvmrc
# set terraform version, used by tfenv
echo 0.11.8 > .terraform-version
I created a Terraform file to setup the backend S3 state configuration and AWS provider version, new file: main.tf
terraform {
required_version = "0.11.8"
backend "s3" {
bucket = ""
key = ""
profile = ""
region = ""
}
}
provider "aws" {
profile = " ${ var . aws_profile } "
region = " ${ var . aws_region } "
shared_credentials_file = " ${ var . aws_credentials_file } "
version = "1.37.0"
}
data "aws_caller_identity" "current" {}
Terraform file for configurable parameters, variables.tf
variable "aws_region" {
type = "string"
default = "us-east-1"
}
variable "aws_profile" {
type = "string"
default = ""
}
variable "aws_credentials_file" {
type = "string"
default = "~/.aws/credentials"
}
variable "s3_bucket_name" {
type = "string"
default = ""
}
variable "sns_topic_name" {
type = "string"
default = ""
}
variable "sqs_queue_name" {
type = "string"
default = ""
}
Terraform S3 bucket and bucket notification (to lambda), s3.tf
resource "aws_s3_bucket" "s3_bucket" {
bucket = " ${ var . s3_bucket_name } "
acl = "private"
server_side_encryption_configuration {
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
}
resource "aws_s3_bucket_notification" "lambda_bucket_notification" {
bucket = " ${ aws_s3_bucket . s3_bucket . id } "
lambda_function {
lambda_function_arn = " ${ aws_lambda_function . meta_lambda . arn } "
events = [ "s3:ObjectCreated:*" ]
# filter_prefix = "some-path/"
# filter_suffix = "*.csv"
}
}
This following file defines the lambda resources, its IAM role, policy, and permissions. file: lambda.tf
resource "aws_lambda_function" "meta_lambda" {
filename = "meta_lambda.zip"
function_name = "meta_lambda"
role = " ${ aws_iam_role . meta_lambda_role . arn } "
handler = "meta_lambda.handler"
source_code_hash = " ${data . archive_file . meta_lambda_zip . output_base64sha256 } "
runtime = "nodejs8.10"
environment {
variables = {
AWS_ACCOUNT_ID = " ${data . aws_caller_identity . current . account_id } "
SNS_TOPIC_NAME = " ${ var . sns_topic_name } "
}
}
}
data "archive_file" "meta_lambda_zip" {
type = "zip"
source_file = "meta_lambda.js"
output_path = "meta_lambda.zip"
}
resource "aws_iam_role" "meta_lambda_role" {
name = "meta_lambda_role"
assume_role_policy = " ${data . aws_iam_policy_document . meta_lambda . json } "
}
data "aws_iam_policy_document" "meta_lambda" {
statement {
sid = ""
effect = "Allow"
actions = [
"sts:AssumeRole"
]
principals {
type = "Service"
identifiers = [ "lambda.amazonaws.com" ]
}
}
}
resource "aws_lambda_permission" "lambda_allow_bucket" {
statement_id = "AllowExecutionFromS3Bucket"
action = "lambda:InvokeFunction"
function_name = " ${ aws_lambda_function . meta_lambda . function_name } "
principal = "s3.amazonaws.com"
source_arn = " ${ aws_s3_bucket . s3_bucket . arn } "
}
resource "aws_iam_role_policy_attachment" "attach_lambda_role_policy" {
role = " ${ aws_iam_role . meta_lambda_role . name } "
policy_arn = " ${ aws_iam_policy . meta_lambda_policy . arn } "
}
resource "aws_iam_policy" "meta_lambda_policy" {
name = "meta_lambda_policy"
policy = " ${data . aws_iam_policy_document . meta_lambda_policy_document . json } "
}
data "aws_iam_policy_document" "meta_lambda_policy_document" {
statement {
sid = ""
effect = "Allow"
actions = [
"logs:CreateLogGroup"
]
resources = [
"arn:aws:logs: ${ var . aws_region } : ${data . aws_caller_identity . current . account_id } :*"
]
}
statement {
sid = ""
effect = "Allow"
actions = [
"logs:CreateLogStream" ,
"logs:PutLogEvents"
]
resources = [
"arn:aws:logs: ${ var . aws_region } : ${data . aws_caller_identity . current . account_id } :log-group:/aws/lambda/ ${ aws_lambda_function . meta_lambda . function_name } :*"
]
}
statement {
sid = ""
effect = "Allow"
actions = [
"s3:GetObject" ,
"s3:ListBucket"
]
resources = [
" ${ aws_s3_bucket . s3_bucket . arn } " ,
" ${ aws_s3_bucket . s3_bucket . arn } /*"
]
}
statement {
sid = ""
effect = "Allow"
actions = [
"SNS:Publish"
]
resources = [
" ${ aws_sns_topic . sns_topic . arn } "
]
}
}
Below is the NodeJS lambda script. It pulls environment variables, defines the exports handler, receives the S3 bucket notification event, collects the metadata from the S3 object/file path, makes a S3 HEAD request to get the S3 metadata, and publishes to the SNS topic, file: meta_lambda.js
' use strict ' ;
// ENV vars
const AWS_REGION_STRING = process . env . AWS_REGION || ' us-east-1 ' ;
const AWS_ACCOUNT_ID = process . env . AWS_ACCOUNT_ID ;
const SNS_TOPIC_NAME = process . env . SNS_TOPIC_NAME ;
const SNS_TOPIC_ARN = `arn:aws:sns: ${ AWS_REGION_STRING } : ${ AWS_ACCOUNT_ID } : ${ SNS_TOPIC_NAME } ` ;
const AWS = require ( ' aws-sdk ' );
AWS . config . update ({
region : AWS_REGION_STRING
});
const s3 = new AWS . S3 ();
const sns = new AWS . SNS ();
exports . handler = ( message , context , callback ) => {
return main ( message ). then ( function ( result ) {
callback ( null , result );
}). catch ( function ( error ) {
callback ( error );
});
};
const main = async ( notification ) => {
let record = notification . Records [ 0 ];
let pathAttributes = getS3PathAttributes ( record );
let s3MetaData = await fetchS3MetaData ( record );
let metaData = {
... pathAttributes ,
... s3MetaData
};
let messageAttributes = getMessageAttributes ( metaData );
let sendSnsResponse = await sendSns ( record , messageAttributes );
return sendSnsResponse . MessageId ;
}
const getS3PathAttributes = function ( record ) {
let attributes = {}
try {
attributes . bucket_name = record . s3 . bucket . name ;
attributes . object_key = record . s3 . object . key ;
} catch ( error ) {
console . log ( error );
}
return attributes ;
}
const fetchS3MetaData = async ( record ) => {
try {
let params = {
Bucket : record . s3 . bucket . name ,
Key : record . s3 . object . key
}
let response = await s3 . headObject ( params ). promise ();
return response . Metadata ;
} catch ( error ) {
console . log ( error );
return {};
}
}
const getMessageAttributes = function ( metaData ) {
let messageAttributes = {};
Object . entries ( metaData ). forEach (
([ key , value ]) => {
messageAttributes [ key ] = {
DataType : ' String ' ,
StringValue : value
}
}
);
return messageAttributes ;
}
const sendSns = async ( record , messageAttributes ) => {
let params = {
TopicArn : SNS_TOPIC_ARN ,
Message : JSON . stringify ( record ),
MessageStructure : ' string ' ,
MessageAttributes : messageAttributes
}
return await sns . publish ( params ). promise ();
}
Terraform SNS topic and filtered SQS subscription, file: sns.tf
resource "aws_sns_topic" "sns_topic" {
name = " ${ var . sns_topic_name } "
}
resource "aws_sns_topic_subscription" "sqs_subscription" {
topic_arn = " ${ aws_sns_topic . sns_topic . arn } "
protocol = "sqs"
endpoint = " ${ aws_sqs_queue . sqs_queue . arn } "
filter_policy = << EOF
{
"filter-by": ["this-filter-value"]
}
EOF
}
Terraform SQS queue and its IAM policy, file: sqs.tf
resource "aws_sqs_queue" "sqs_queue" {
name = " ${ var . sqs_queue_name } "
policy = " ${data . aws_iam_policy_document . sqs_queue_policy_document . json } "
}
data "aws_iam_policy_document" "sqs_queue_policy_document" {
policy_id = "arn:aws:sqs: ${ var . aws_region } : ${data . aws_caller_identity . current . account_id } : ${ var . sqs_queue_name } /SQSDefaultPolicy"
statement {
sid = "sns-to-sqs"
effect = "Allow"
principals {
type = "AWS"
identifiers = [ "*" ]
}
actions = [
"SQS:SendMessage" ,
]
resources = [
"arn:aws:sqs: ${ var . aws_region } : ${data . aws_caller_identity . current . account_id } : ${ var . sqs_queue_name } "
]
condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [
"arn:aws:sns: ${ var . aws_region } : ${data . aws_caller_identity . current . account_id } : ${ var . sns_topic_name } "
]
}
}
}
I put my configuration variables in secrets.auto.tfvars
aws_region = "us-east-1"
aws_profile = "some-aws-profile-name"
s3_bucket_name = "some-s3-bucket-name"
sns_topic_name = "some-sns-topic-name"
sqs_queue_name = "some-sqs-queue-name"
Here is a BASH script to pass environment variables to the Terraform backend configuration, and execute Terraform init, plan, and apply. file: main.sh
#!/usr/bin/env bash
: " ${ AWS_PROFILE :=some-aws-profile-name } "
: " ${ AWS_REGION :=us-east-1 } "
: " ${ STATE_BUCKET :=some-s3-state-bucket } "
: " ${ STATE_KEY :=some-s3-state-path/terraform/base.tfstate } "
action = " $1 "
TFENV = $( which tfenv)
if [ $? -eq 0 ] ; then
$TFENV install $( cat .terraform-version)
cat .terraform-version | xargs $TFENV use
fi
rm -f * .tfstate
rm -rf ./.terraform
terraform init \
-force-copy \
-backend = true \
-backend-config "bucket= ${ STATE_BUCKET } " \
-backend-config "key= ${ STATE_KEY } " \
-backend-config "profile= ${ AWS_PROFILE } " \
-backend-config "region= ${ AWS_REGION } "
terraform plan
if [ " $action " == "apply" ] ; then
terraform apply -auto-approve
fi
To test SQS queue delivery I created an SQS client file, add SQS NPM dependency:
nvm use
npm init
npm install aws-sdk --save
And created the NodeJS script, file: sqs-client.js
const AWS_REGION_STRING = process . env . AWS_REGION || ' us-east-1 ' ;
const AWS_ACCOUNT_ID = process . env . AWS_ACCOUNT_ID ;
const SQS_QUEUE_NAME = process . env . SQS_QUEUE_NAME ;
const AWS = require ( ' aws-sdk ' );
AWS . config . update ({
region : AWS_REGION_STRING
});
const sqs = new AWS . SQS ();
let params = {
QueueUrl : `https://sqs. ${ AWS_REGION_STRING } .amazonaws.com/ ${ AWS_ACCOUNT_ID } / ${ SQS_QUEUE_NAME } `
}
sqs . receiveMessage ( params , function ( err , response ) {
if ( err ) {
console . log ( err , err . stack );
return ;
}
try {
let messageBody = response . Messages [ 0 ]. Body ;
let notification = JSON . parse ( messageBody );
let messageAttributes = notification . MessageAttributes ;
let notificationMessage = JSON . parse ( notification . Message );
console . log ( ' messageAttributes ' , JSON . stringify ( messageAttributes , null , 2 ));
console . log ( ' notificationMessage.s3.bucket.name ' , notificationMessage . s3 . bucket . name )
console . log ( ' notificationMessage.s3.object.key ' , notificationMessage . s3 . object . key );
} catch ( error ) {
console . log ( " no messages yet " );
}
});
I executed the terraform apply script, pushed a file to S3 with metadata, and executed the SQL client script to E2E test this functionality:
# terraform resources
chmod +x main.tf
./main.tf apply
# put file on S3 with the nonmatching metadata
aws --profile some-aws-profile s3 cp example.csv s3://some-s3-bucket/some-path/ --metadata '{"filter-by":"wrong filter value"}'
# attempt to receive SQS messages
AWS_PROFILE = some-aws-profile AWS_REGION = us-east-1 AWS_ACCOUNT_ID = some-aws-account-id SQS_QUEUE_NAME = some-sqs-queue-name node sqs-client.js
# no results
# puts file on S3 with matching metadata
aws --profile some-aws-profile s3 cp example.csv s3://some-s3-bucket/some-path/ --metadata '{"filter-by":"this-filter-value"}'
# attempt to receive filteres SQS message:
AWS_PROFILE = some-aws-profile AWS_REGION = us-east-1 AWS_ACCOUNT_ID = some-aws-account-id SQS_QUEUE_NAME = some-sqs-queue-name node sqs-client.js
# successful output:
messageAttributes {
"object_key" : {
"Type" : "String" ,
"Value" : "some-path/example.csv"
} ,
"bucket_name" : {
"Type" : "String" ,
"Value" : "some-s3-bucket"
} ,
"filter-by" : {
"Type" : "String" ,
"Value" : "this-filter-value"
}
}
notificationMessage.s3.bucket.name some-s3-bucket
notificationMessage.s3.object.key some-path/example.csv
Source code on Github