Skip to content

Mastering Event-Driven Integration with Serverless Event Gateway in AWS

Building event-driven architectures is an interesting approach for decoupling different services, letting them play their role nicely in isolation

In this post, I will analyze one of the many use cases where introducing an event-driven approach can be extremely efficient.

The use case I want to focus on is the integration of an external service into an existing event-driven architecture.

What’s an external service?

Think about any application that we need to integrate into our architecture that needs to be triggered or that may provide data to our business logic. “External” does not necessarily mean that the service is not owned by the same organization but mainly that it was not designed to be part of an event-driven application and is not capable by default of listening or producing events.

Integrating an external system is a very common requirement. That said, the integration can be challenging.

In theory, the idea is simple: let the external service react to events posted in an event engine and allow the service to create new events.

In real-world scenarios, the direct interactions between an external service and an event broker can be hard or impossible.

Let’s consider the following scenarios:

  • The service we need to use belongs to a third party
  • The service belongs to our organization but the codebase is not ready to support a direct event integration or it could be too hard or not economical to make the required changes

The Serverless Event Gateway pattern defines a way to let an external service be part of an event-driven architecture receiving and posting events via HTTP.

HTTP is more likely to be a commonly supported communication strategy and can be used to support the gateway pattern channels.

There are 3 main requirements:

  • The external service must be able to receive events via HTTP
  • The event hub must be capable of forwarding events using the HTTP protocol
  • The external service must be able to execute HTTP requests to send new events to the event hub

A use case for the Serverless Event Gateway pattern

Let's analyze the following use case:

  • Our sample event-driven application ingests new orders through an API gateway
  • The orders are stored in a pending state while they wait for the shipment confirmation
  • The order shipment is done by an external carrier that needs to be informed of the newly placed order and must confirm when the shipment is done

Here is our architectural solution to cover the above use case.

A sample implementation in Terraform can be found here . Please note that the Terraform sample is not intended to be production-ready but it simply creates a live implementation of the architecture we are discussing.

How does it work and what are the benefits of the design and of the AWS tools provisioned by the above solution? Let’s follow the proposed architecture step by step.

Step 1

A client invokes the Orders API gateway to insert a new order. The API Gateway method proxies the request to a Lambda function. Here is the relevant implementation in OpenAPI.

Plain Text
paths = {
      "/orders" = {
        post = {
          x-amazon-apigateway-integration = {
            httpMethod           = "POST"
            type                 = "aws_proxy"
            uri                  = module.create-order-function.lambda_function_qualified_invoke_arn
          }
        }
      }

Steps 2/3

The API Gateway invokes the lambda function and inserts the new order data in the Orders DynamoDB table.

Plain Text
…
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ.get('DYNAMO_DB_TABLE_NAME'))

PENDING = 'pending'


@app.post("/orders")
def create_order():
    order_id = str(uuid.uuid4())
    reason = "Waiting for shipping"
    response = table.put_item(
        Item={
            'PK': order_id,
            'state': PENDING,
            'reason': reason
        }
    )
    return {
        "statusCode": 200,
        "headers": {
            "Content-Type": "application/json"
        },
        "body": json.dumps({
            "order_id ": order_id,
            "state": PENDING,
            "reason": reason
        })
    }
…

After this stage the order state is PENDING and the reason is “Waiting for shipping”.

If you are wondering what the fancy @app annotation is, please look at the Python lambda powertools library . It’s a cool framework that helps a lot in dealing with Python lambdas in AWS.

Steps 4/5

Here things start to get interesting. We said that our application is based on events. This means that once we insert a new order we need to notify any subscribed target that a new order has been created.

We could have extended the above lambda method by directly inserting an event in the Event Bus using an API call. This was not going to be a terrible choice but it would have added a new responsibility to the lambda function. If you consider this in terms of transactions you start to see the benefit of offloading the event creation from our code.

If the lambda must insert the data in the DB and create a new event it means that is our responsibility to ensure that both actions succeed. If something bad happens in the middle we must catch the error and add a retry policy to ensure that the whole “transaction” has been committed.

Relying on DynamoDB streams will remove this responsibility from our shoulders. Enabling the stream on the Orders table will generate a stream of data changes that we can consume in different ways.

A common way to consume a DynamoDB stream was to use an event source trigger for a lambda function or via a Kinesis data stream. AWS has recently released a new tool called Event Bridge Pipes that can make our life easier.

Amazon EventBridge Pipes connects sources to targets. Pipes are intended for point-to-point integrations between supported sources and targets , with support for advanced transformations and enrichment . It reduces the need for specialized knowledge and integration code when developing event-driven architectures, fostering consistency across your company’s applications.

We can create a Pipe to consume the stream, modify the data, and invoke a target without adding a new lambda function to our design.

This is pretty cool and the reasons are obvious — adding a new lambda to process the stream will require us to manage all the implications, such as retry policy, monitoring, concurrencies, etc.

At the time of writing, the Terraform resource for the event pipes are still a bit rusty. In the sample repo, I decided to use a Cloudformation stack for deploying the pipe:

Plain Text
resource "aws_cloudformation_stack" "dynamodb_stream_pipe" {
  name = "dynamodb-orders-stream"

  parameters = {
    RoleArn   = aws_iam_role.orders_pipe_role.arn
    SourceArn = aws_dynamodb_table.orders-table.stream_arn
    TargetArn = aws_cloudwatch_event_bus.orders_bus.arn
  }

  template_body = jsonencode({
    "Parameters" : {
      "SourceArn" : {
        "Type" : "String",
      },
      "TargetArn" : {
        "Type" : "String",
      },
      "RoleArn" : {
        "Type" : "String"
      }
    },
    "Resources" : {
      "OrdersPipe" : {
        "Type" : "AWS::Pipes::Pipe",
        "Properties" : {
          "Name" : "dynamodb-orders-stream",
          "RoleArn" : { "Ref" : "RoleArn" }
          "Source" : { "Ref" : "SourceArn" },
          "SourceParameters" : {
            "DynamoDBStreamParameters" : {
              "StartingPosition" : "LATEST",
              "BatchSize" : 1
            }
          }
          "Target" : { "Ref" : "TargetArn" },
          "TargetParameters" : {
            "EventBridgeEventBusParameters" : {
              "Source" : "dynamodb.orders",
              "DetailType": "dynamodb-orders-stream"
            },
            "InputTemplate" : "{\"eventName\": <$.eventName>,\"order\": {\"id\": <$.dynamodb.NewImage.PK.S>,\"state\": <$.dynamodb.NewImage.state.S> } }"
          }
        }
      }
    }
  })
}

What is happening here?

  • We create a Pipe that consumes the DynamoDB stream as the data source
  • We select an EventBridge Bus as the target
  • We add an InputTemplate that transforms each data record in the format we need to pass along to the bus

The Pipe will consume records in the following format from the DynamoDB stream:

JSON
{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb"  
            "NewImage":{
               "state":{
                  "S":"pending"
               },
               "PK":{
                  "S":"{UUID}"
               }
            }.....
      }

The data in output after the transformation looks like this:

JSON
{
"eventName": "INSERT",
"order": {
  "id":"{UUID}",
  "state": "pending"
  } 
}

Step 6

The pipe will take care of sending the event to the target Event Bus.

Step 7

The requirements say that we want to forward to the carrier rest API the events related to every new order created.

As I outlined above when discussing how to consume the DynamoDB stream, we could have created another lambda function with the responsibility to invoke the carrier rest API. For the same reasons expressed above, we want to offload this responsibility to AWS using an Event Bridge Api Destination .

Apart from dozens of AWS services and an EventBridge Event Bus, AWS allows us to select an API Destination as the target of an event rule match.

The setup is quite simple.

We first create an event connection. This resource can be reused by many API destinations and define some configurations like the authorization type.

Note that in our sample implementation, the carrier API is represented by a simple lambda function with URL invocation enabled. There is no real authorization check in place.

Plain Text
resource "aws_cloudwatch_event_connection" "carrier_connection" {
  name               = "carrier"
  authorization_type = "BASIC"

  auth_parameters {
    basic {
      username = "user"
      password = "Pass1234!"
    }
  }
}

We then create the API destination resource adding details like URL and the HTTP method that needs to match the remote API signature:

Plain Text
resource "aws_cloudwatch_event_api_destination" "carrier_api_destination" {
  name                             = "carrier-api-destination"
  invocation_endpoint              = "${module.carrier-mock-function.lambda_function_url}/orders"
  http_method                      = "POST"
  invocation_rate_limit_per_second = 20
  connection_arn                   = aws_cloudwatch_event_connection.carrier_connection.arn
}

Note the invocation_rate_limit_per_second. Without writing any code we can define the max invocation rate that EventBridge will use when it calls the same destination from many triggers. This is pretty cool, as implementing the same logic from scratch is not a trivial task.

An Event Rule is used to match the events we want to forward to the API target:

Plain Text
resource "aws_cloudwatch_event_rule" "create_orders_rule" {
  name           = "order-created"
  event_bus_name = aws_cloudwatch_event_bus.orders_bus.name

  event_pattern = jsonencode({
    source = ["dynamodb.orders"]
    detail = {
      eventName = ["INSERT"]
    }
  })
}

And finally we define the target resource:

Plain Text
resource "aws_cloudwatch_event_target" "carrie_api_destination" {
  event_bus_name = aws_cloudwatch_event_bus.orders_bus.name
  rule           = aws_cloudwatch_event_rule.create_orders_rule.name
  arn            = aws_cloudwatch_event_api_destination.carrier_api_destination.arn
  role_arn       = aws_iam_role.carrier_api_destination_role.arn
  input_transformer {
    input_template = "{\"order_id\" : \"<order_id>\", \"state\" : \"<state>\"}"
    input_paths = {
      order_id = "$.detail.order.id"
      state = "$.detail.order.state"
    }
  }
}

Again we are using an input template to modify the event object. We create on the fly the payload expected by the rest API endpoint without adding a function to achieve the same result.

Steps 8/9

When the order shipment is ready the carrier service needs to let the system know about it. We need to provide an API gateway route to ingest events and the related details.

Here we can leverage the ability of API Gateway to directly proxy to an AWS service. We add a request template to transform the payload received by the API route into the desired event object format we want to add to the event hub.

The implementation in OpenAPI looks as below:

Plain Text
...
"/confirm-shipment" = {
        post = {
          "x-amazon-apigateway-integration" = {
            httpMethod  = "POST"
            type        = "aws"
            uri         = "arn:aws:apigateway:${data.aws_region.current.name}:events:action/PutEvents"
            credentials = aws_iam_role.integration_api.arn,
            "responses" = {
              "default" = {
                "statusCode" = "200"
              }
            },
            "requestParameters" : {
              "integration.request.header.X-Amz-Target" : "'AWSEvents.PutEvents'",
              "integration.request.header.Content-Type" : "'application/x-amz-json-1.1'"
            },
            "requestTemplates" = {
              "application/json" = <<EOF
{
  "Entries":[{
    "Source":"carrier",
    "Detail":"{\"shipped_at\": $util.escapeJavaScript($input.json('$.shipped_at')),\"order_id\": $util.escapeJavaScript($input.json('$.order_id'))}",
    "DetailType": "carrier.order_shipped",
    "EventBusName": "orders"
  }]
}
EOF
...

The API endpoint receives a payload in the following format:

JSON
{
  "oder_id" : {order id},
  "shipped_at" : {ts}
}

and, before sending the payload to the EventBridge backend, the payload is transformed as:

JSON
{
  "Entries":[{
    "Source":"carrier",
    "Detail":"{\"shipped_at\": {ts},\"order_id\": \"{order_id}\"))}",
    "DetailType": "carrier.order_shipped",
    "EventBusName": "orders"
  }]
}

Steps 10/11

Finally, a second event rule matches the order_shipped event and invokes a lambda function that updates the Orders DB table.

Plain Text
resource "aws_cloudwatch_event_rule" "order_shipped_rule" {
  name           = "order-shipped"
  event_bus_name = aws_cloudwatch_event_bus.orders_bus.name

  event_pattern = jsonencode({
    source = ["carrier"]
    "detail-type" = ["carrier.order_shipped"]
  })
}


resource "aws_cloudwatch_event_target" "confirm_shipment_function" {
  event_bus_name = aws_cloudwatch_event_bus.orders_bus.name
  rule           = aws_cloudwatch_event_rule.order_shipped_rule.name
  arn            = module.confirm-shipment-function.lambda_function_arn
  input_transformer {
    input_template = "{\"order_id\" : \"<order_id>\", \"shipped_at\" : \"<shipped_at>\"}"
    input_paths = {
      order_id = "$.detail.order_id"
      shipped_at = "$.detail.shipped_at"
    }
  }
}

Let’s test our solution!

Assuming the full deployment of the terraform code we can collect the orders API URL from the terraform output:

Plain Text
api=$(terraform output -json | jq -r .orders_api_gateway_url.value)
echo "Orders Api Gateway - $api"

Orders Api Gateway - https://****.execute-api.us-east-1.amazonaws.com/

Let’s create an order:

Plain Text
order=$(curl -XPOST "$api"prod/orders -s | jq -r ".body | fromjson")
echo "New Order is \n$order"

New Order is
{
  "order_id": "ec9a9f9d-ea66-4a5d-a5a0-8fc6d39566d1",
  "state": "pending",
  "reason": "Waiting for shipping"
}

And now let’s verify that the state and reason have been updated accordingly:

Plain Text
id=$(echo "$order" | jq -r .order_id)
curl -XGET "$api"prod/order/"$id" -s | jq ".body | fromjson | ."
{
  "id": "ec9a9f9d-ea66-4a5d-a5a0-8fc6d39566d1",
  "state": "shipped",
  "reason": "Shipped at 2023-06-27T11:58:06.389133"
}

Conclusion

Event-driven architectures are becoming more common with each passing day, especially when the main cloud providers have started to make their deployment easier and more maintainable. Combined with a serverless approach we can obtain solutions that are scalable and extremely flexible at the same time.

On the AWS side, the introduction of services like EventBridge Pipe is a clear symptom that AWS is pushing to complete its offers of serverless tools. Reducing the requirements to write functions for data transformation and streaming processing is a very cool addition and I think we will see more services like this in the future.

Real-world use cases are certainly much more complex than what we described in this post. That said, the tools we used are designed to scale both in terms of performance and in terms of costs and can offer a very efficient solution to be considered.

Insight, imagination and expertly engineered solutions to accelerate and sustain progress.

Contact