Publish and Subscribe to SNS Topics with AWS Lambdas in Go

User Icon By Azam Akram,   Calendar Icon January 4, 2023
aws-lambda-sns-cloudformation-v1

This article guides the reader through the steps required to publish and subscribe to SNS Topics with AWS Lambdas in Go. I will create two lambda functions (named event-producer-lambda and event-consumer-lambda), which publish and subscribe to the events on the SNS topic. I will write both lambda functions in GO language and deploy all AWS resources using the Cloudformation template.

To gain a better understanding of the topic discussed in this article, it is recommended to read my previous blog (Creating and deploying AWS Lambda function made easy in GOlang,) as it provides additional context.

Event-driven architecture is an effective method for decoupling logically separated services while facilitating communication through events when the system's state changes. An event router connects various services, triggers specific events, enables them to execute actions, and generates subsequent events to progress the system.

In the AWS landscape, Lambda functions and Simple Notification Service (SNS) provide powerful tools for building event-driven architectures. Simple Notification Service (SNS) topic acts as an event router, where different services like lambda, SQS, etc receive and transmit events to integrate with each other. By publishing and subscribing to SNS topics, you can create flexible, decoupled applications that respond to events in real-time.

First thing first — define the event structure to publish and receive,

type Event struct {
ID int `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Source string `json:"source,omitempty"`
EventTime string `json:"eventTime,omitempty"`
Payload Payload `json:"payload,omitempty"`
}

type Payload struct {
Number1 int `json:"number1,omitempty"`
Number2 int `json:"number2,omitempty"`
Answer int `json:"Num1,omitempty"`
}

event-producer-lambda function

Whenever an event publisher publishes an event on SNS topic, it triggers event-producer-lambda function. event-producer-lambda functions uses some of the incoming event fields, and adds more  —  then publishes a new event on the same topic.

In an output event (event Name: “SumRequested”), it actually asks another service to calculate the sum of two numbers, i.e. event.Payload.Number1 and event.Payload.Number2.

After publishing the SumRequested event, it waits for the SumCompleted event to get the Asnwer. When it receives a SumCompleted event, it checks if that is coming from the service named Calculator (if event.Source == “Calculator” {..}) and prints the event.Payload.Answer.

func HandleRequest(ctx context.Context, snsEvent events.SNSEvent) error {
log.Println("SNS event received: ", snsEvent)

var event model.Event
for _, record := range snsEvent.Records {
snsRecord := record.SNS
// Read sns message and unmarshal into Event object
err := json.Unmarshal([]byte(snsRecord.Message), &event)
if err != nil {
return err
}

// We have received the answer from consumer lambda,
// so conclude message exchange here
if event.Source == "Calculator" {
log.Println("Answer received: ", event.Payload.Answer)
return nil
}

outputEvent := model.Event{
ID: event.ID,
Name: "SumRequested",
Source: "Calculation Requester",
EventTime: time.Now().Format(time.RFC3339),
Payload: model.Payload{
Number1: event.Payload.Number1,
Number2: event.Payload.Number2,
},
}

log.Println("Event to publish:", outputEvent)
msgId, err := utils.PublishEvent(ctx, &outputEvent)
if err != nil {
log.Fatal(err)
}

log.Println("Event published to SNS, msgId = ", msgId)
return nil
}
return nil
}

func main() {
lambda.Start(HandleRequest)
}

event-consumer-lambda function

event-consumer-lambda function receives SumRequested event, calculates the sum of event.Payload.Number1 and event.Payload.Number2 — and include as event.Payload.Answer in outputs event, SumCompleted.

func HandleRequest(ctx context.Context, snsEvent events.SNSEvent) error {
log.Println("Context: ", ctx)
log.Println("SNS event received: ", snsEvent)

var event model.Event
for _, record := range snsEvent.Records {
snsRecord := record.SNS
err := json.Unmarshal([]byte(snsRecord.Message), &event)
if err != nil {
return err
}

outputEvent := model.Event{
ID: event.ID,
Name: "SumCompleted",
Source: "Calculator",
EventTime: time.Now().Format(time.RFC3339),
Payload: model.Payload{
Number1: event.Payload.Number1,
Number2: event.Payload.Number2,
Answer: event.Payload.Number1 + event.Payload.Number2,
},
}

msgId, err := utils.PublishEvent(ctx, &outputEvent)
if err != nil {
log.Fatal(err)
}

log.Println("Event published to SNS, msgId = ", msgId)
return nil
}

return nil
}

func main() {
lambda.Start(HandleRequest)
}

Publish message to SNS:

func PublishEvent(ctx context.Context, event *model.Event) (msgId string, err error) {
region := os.Getenv("AWS_REGION")
awsConfig := &aws.Config{
Region: &region,
}

snsSession, err := session.NewSession(awsConfig)
if err != nil {
return "", err
}
snsClient := sns.New(snsSession)

eventBytes, err := json.Marshal(event)
if nil != err {
return "", err
}
payload := string(eventBytes)

snsInput := &sns.PublishInput{
Message: aws.String(payload),
TopicArn: aws.String(os.Getenv("SNS_TOPIC_ARN")),
MessageAttributes: map[string]*sns.MessageAttributeValue{
"name": {
DataType: aws.String("String"),
StringValue: aws.String(event.Name),
},
},
}

snsMsg, err := snsClient.Publish(snsInput)
if err != nil {
return "", err
}

log.Println("Published event: ", snsMsg)

return *snsMsg.MessageId, nil
}

Cloudformation template:

{
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "A CF template to create SNS topic and two lambda functions",
"Parameters": {
"pSnsTopicName": {
"Type": "String"
},
"pLambdaCodeBucket": {
"Type": "String"
},
"pLambdaProducerCodeS3KeyPath": {
"Type": "String"
},
"pLambdaConsumerCodeS3KeyPath": {
"Type": "String"
}
},
"Resources": {
"MyDemoSNSTopic": {
"Type": "AWS::SNS::Topic",
"Properties": {
"TopicName": {
"Ref": "pSnsTopicName"
}
}
},
"lfnLambdaRole": {
"Type": "AWS::IAM::Role",
"DependsOn": [
"MyDemoSNSTopic"
],
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
},
"Policies": [
{
"PolicyName": "lambdaCloudWatchPolicy",
"PolicyDocument": {
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
},
{
"PolicyName": "snsPublish",
"PolicyDocument": {
"Statement": [
{
"Effect": "Allow",
"Action": [
"SNS:Publish"
],
"Resource": {
"Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${pSnsTopicName}"
}
}
]
}
}
]
}
},
"lfnEventProducer": {
"Type": "AWS::Lambda::Function",
"DependsOn": [
"lfnLambdaRole"
],
"Properties": {
"Environment": {
"Variables": {
"SNS_TOPIC_ARN": {
"Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${pSnsTopicName}"
}
}
},
"Architectures": [
"x86_64"
],
"Runtime": "go1.x",
"Handler": "main",
"Code": {
"S3Bucket": {
"Ref": "pLambdaCodeBucket"
},
"S3Key": {
"Ref": "pLambdaProducerCodeS3KeyPath"
}
},
"Description": "This is event producer lambda function",
"FunctionName": "event-producer-lambda",
"Role": {
"Fn::GetAtt": [
"lfnLambdaRole",
"Arn"
]
},
"Timeout": "120"
}
},
"lfnEventConsumer": {
"Type": "AWS::Lambda::Function",
"DependsOn": [
"lfnLambdaRole"
],
"Properties": {
"Environment": {
"Variables": {
"SNS_TOPIC_ARN": {
"Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${pSnsTopicName}"
}
}
},
"Architectures": [
"x86_64"
],
"Runtime": "go1.x",
"Handler": "main",
"Code": {
"S3Bucket": {
"Ref": "pLambdaCodeBucket"
},
"S3Key": {
"Ref": "pLambdaConsumerCodeS3KeyPath"
}
},
"Description": "This is event consumer lambda function",
"FunctionName": "event-consumer-lambda",
"Role": {
"Fn::GetAtt": [
"lfnLambdaRole",
"Arn"
]
},
"Timeout": "120"
}
},
"snsPermInvokeProducer": {
"Type": "AWS::Lambda::Permission",
"DependsOn": [
"lfnEventProducer"
],
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Fn::GetAtt": [
"lfnEventProducer",
"Arn"
]
},
"Principal": "sns.amazonaws.com",
"SourceArn": {
"Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${pSnsTopicName}"
}
}
},
"snsPermInvokeConsumer": {
"Type": "AWS::Lambda::Permission",
"DependsOn": [
"lfnEventConsumer"
],
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Fn::GetAtt": [
"lfnEventConsumer",
"Arn"
]
},
"Principal": "sns.amazonaws.com",
"SourceArn": {
"Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${pSnsTopicName}"
}
}
},
"snsSubscriptionProducer": {
"Type": "AWS::SNS::Subscription",
"DependsOn": [
"MyDemoSNSTopic",
"lfnEventProducer"
],
"Properties": {
"Endpoint": {
"Fn::GetAtt": [
"lfnEventProducer",
"Arn"
]
},
"FilterPolicy": {
"name": [
"SumCompleted"
]
},
"Protocol": "lambda",
"TopicArn": {
"Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${pSnsTopicName}"
}
}
},
"snsSubscriptionConsumer": {
"Type": "AWS::SNS::Subscription",
"DependsOn": [
"MyDemoSNSTopic",
"lfnEventConsumer"
],
"Properties": {
"Endpoint": {
"Fn::GetAtt": [
"lfnEventConsumer",
"Arn"
]
},
"FilterPolicy": {
"name": [
"SumRequested"
]
},
"Protocol": "lambda",
"TopicArn": {
"Fn::Sub": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${pSnsTopicName}"
}
}
}
}
}

Cloudformation Template Explanation

MyDemoSNSTopic: Creates SNS topic; we pass the value of pSnsTopicName as parameter to CF template.

lfnLambdaRole: IAM role for Lambda functions to run.

Additionally, we define two more policies for this role,

  • lambdaCloudWatchPolicy to create cloudwatch logs
  • snsPublish to allow publishing messages to the SNS topic

As both event-producer-lambda and event-consumer-lambda functions will be publishing messages to SNS topic, so I assign lfnLambdaRole to both of them.

lfnEventProducer: Creates event-producer-lambda lambda function

lfnEventConsumer: Creates event-consumer-lambda lambda function

SNS_TOPIC_ARN is passed as an environment variable to both lambda functions — that will use it to publish messages on this SNS topic

snsPermInvokeProducer: Provisions SNS topic to invoke event-producer-lambda function (“Action”: “lambda:InvokeFunction”)

snsPermInvokeConsumer: Define similar provisions for event-consumer-lambda function

snsSubscriptionProducer: SNS subscription for event-producer-lambda to receive messages from the SNS topic.

FilterPolicy filters SNS messages in the SNS subscription based on message attributes, in this example event-producer-lambda is only interested in the messages with attribute name = SumCompleted

snsSubscriptionConsumer: Similar SNS subscription for event-consumer-lambda with FilterPolicy on the message attribute name = SumRequested

Build cloudformation stack

aws cloudformation deploy \
--template-file ./deploy/cf.json \
--stack-name my-demo-sns-lambda-stack \
--capabilities CAPABILITY_IAM \
--parameter-overrides \
pSnsTopicName=demo-event-sns-topic \
pLambdaCodeBucket=azam-demo-s3-bucket \
pLambdaProducerCodeS3KeyPath=demo-sns-lambda/aws-sns-lambda-producer-go.zip \
pLambdaConsumerCodeS3KeyPath=demo-sns-lambda/aws-sns-lambda-consumer-go.zip

Parameters to cloudformation template

  • pSnsTopicName: SNS topic name
  • pLambdaCodeBucket: S3 bucket name to upload lambda archive files
  • pLambdaProducerCodeS3KeyPath: event-producer-lambda archive file key in S3 bucket
  • pLambdaConsumerCodeS3KeyPath: event-consumer-lambda archive file key in S3 bucket

Stack deployed successfully

SNS subscription:

SNS topic demo-event-sns-topic is created with two Subscriptions — one for each lambda function.

Lambdas created:

Delete stack

If you are using aws free tier account then it is good practice to undeploy all the resources created (if they are no more needed).

aws cloudformation delete-stack --stack-name my-demo-sns-lambda-stack

Testing

Now everything is deployed, we are all set to test our event producer and consumer functions.

As event-producer-lambda function only receives SNS messages which have attribute name=SubCompleted (based on SNS filter policy), we publish the following message on SNS topic to trigger it,

{
"id": 1,
"name": "SumCompleted",
"payload":
{
"number1": 5,
"number2": 10
}
}

Define Message attribute as shown in image below and publish.

event-producer-lambda function receives the event and fills more JSON fields (more importantly SumRequested, which the consumer is interested in) before publishing it to the SNS topic,

{
"id": 1,
"name": "SumRequested",
"source": "Calculation Requester",
"eventTime": "2022-12-26T17:07:48Z",
"payload":
{
"number1": 5,
"number2": 10
}
}

Cloudwatch logs for event-producer-lambda

You can use better logging framework.

event-consumer-lambda function receives this SumRequested message, and calculates the sum of event.Payload.Number1 and event.Payload.Number2 and publishes the event with name SumCompleted and include sum as event.Payload.Answer.

{
"id": 1,
"name": "SumCompleted",
"source": "Calculator",
"eventTime": "2022-12-26T17:07:49Z",
"payload":
{
"number1": 5,
"number2": 10,
"answer": 15
}
}

Cloudwatch logs for event-consumer-lambda

Conclusion

AWS Lambda and Amazon SNS provide powerful tools for building event-driven architectures in the cloud. By publishing and subscribing to SNS topics, you can create flexible, decoupled applications that respond to events in real-time. AWS Lambda makes it easy to build serverless functions that can be triggered by SNS events, allowing you to process data and take action quickly and efficiently. Whether you're building a simple notification system or a complex microservices architecture, AWS Lambda and SNS can help you achieve your goals.

You can explore more AWS + GO resources here.