Archiving Data with DynamoDB Streams and TTL

---- views

Data archiving is a common task for organizations of all sizes, as it allows them to keep important information for compliance and historical purposes while freeing up storage space and improving performance. In this article, we will explore how to archive data using DynamoDB streams and Time to Live (TTL) in Amazon Web Services (AWS).

DynamoDB is a managed NoSQL database service that offers fast performance, scalability, and low latency. One of its key features is DynamoDB streams, which is a feature that captures changes to data items in a DynamoDB table as a stream of events.

This is a follow up to the article about how to aggregate data with DynamoDB streams.

Diagram of lambda function triggered by a DynamoDB stream TTL

Time to Live (TTL) is a DynamoDB feature that automatically deletes items from a table after a specified amount of time. By combining DynamoDB streams and TTL, you can create an efficient and automated data archiving process.

The use of DynamoDB streams and TTL ensures that only expired items are processed, reducing the cost and overhead of the archiving process.

Creating the DynamoDB Table

We're going to expand the InvoiceTransactions table from the previous article.

Here is what the table looks like before any change:

Partition KeySort KeyAttribute 1Attribute 2
InvoiceNumberTransactionIdAmountInvoiceDate
1212121Client1_trans1$10006062016
1212121Client1_trans2$50006062016
1212122Client2_trans1$20006062016
1212121Client2_trans1$50006062016

What is TTL

Time to Live (TTL) is a DynamoDB feature that automatically deletes items from a table after a specified amount of time.

In our case, we'll use the TTL attribute to indicate when an item is ready for archival.

Enable TTL on the Table

To enable TTL, we specify the attribute that holds the expiration timestamp for each item. The attribute must be of type number and its value must be a Unix timestamp in seconds.

Here the CDK code to add the Expiration TTL attribute:

const invoiceTransactionsTable = new dynamodb.Table(
  this,
  "InvoiceTransactions",
  {
    partitionKey: {
      name: "InvoiceNumber",
      type: dynamodb.AttributeType.STRING,
    },
    sortKey: {
      name: "TransactionId",
      type: dynamodb.AttributeType.STRING,
    },
    stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
    timeToLiveAttribute: "Expiration",
    billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
    removalPolicy: cdk.RemovalPolicy.DESTROY,
  }
);

We also enable the DynamoDB streams functionality by requiring the New and Old images.

Kinesis Firehose and S3 bucket

When the time comes to store the data to be archived in S3, we'll use Kinesis Data Firehose to batch and compress the records

Here is the CDK code to create a Kinesis Firehose that writes data into a S3 bucket:

const firehoseRole = new iam.Role(this, "firehoseRole", {
  assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
});

const archiveBucket = new s3.Bucket(
  this,
  "InvoiceTransactionsArchiveBucket",
  {
    removalPolicy: cdk.RemovalPolicy.DESTROY,
    autoDeleteObjects: true,
  }
);

archiveBucket.grantWrite(firehoseRole);

const firehoseStreamToS3 = new kinesisfirehose.CfnDeliveryStream(
  this,
  "FirehoseStreamToS3",
  {
    deliveryStreamName: "TTL-Archive",
    deliveryStreamType: "DirectPut",
    s3DestinationConfiguration: {
      bucketArn: archiveBucket.bucketArn,
      bufferingHints: {
        sizeInMBs: 1,
        intervalInSeconds: 60,
      },
      compressionFormat: "GZIP",
      encryptionConfiguration: {
        noEncryptionConfig: "NoEncryption",
      },

      prefix: "raw/",
      roleArn: firehoseRole.roleArn,
    },
  }
);

// Ensures our role is created before we try to create a Kinesis Firehose
firehoseStreamToS3.node.addDependency(firehoseRole);

Lambda to Read from DynamoDB Stream and Write to Firehose

The Lambda function processes the streams: it will read the stream events and extract the expired items. The expired items can then be pushed to Firehose stream and eventually be stored in the S3 bucket for long-term storage.

Lambda Code

We'll cover how the lambda is triggered by the DynamoDB stream later and focus on the lambda code itself in this section:

import { DynamoDBStreamHandler } from "aws-lambda";
import { AttributeValue } from "@aws-sdk/client-dynamodb";
import { unmarshall } from "@aws-sdk/util-dynamodb";
import {
  FirehoseClient,
  PutRecordBatchCommand,
} from "@aws-sdk/client-firehose";

const { DELIVERY_STREAM_NAME } = process.env;

const firehoseClient = new FirehoseClient({});

export const handler: DynamoDBStreamHandler = async (event) => {
  const recordsToStream = event.Records.filter(
    (record) => record.dynamodb?.OldImage
  ).map((record) => {
    const oldImage = unmarshall(
      record.dynamodb!.OldImage as { [key: string]: AttributeValue }
    );
    return {
      Data: Buffer.from(JSON.stringify(oldImage)),
    };
  });

  if (recordsToStream.length > 0) {
    await firehoseClient.send(
      new PutRecordBatchCommand({
        DeliveryStreamName: DELIVERY_STREAM_NAME,
        Records: recordsToStream,
      })
    );
  } else {
    console.log("There are no records to send to firehose");
  }
};

There are a few points worth highlighting in this code:

  1. The Firehose stream name is passed as the DELIVERY_STREAM_NAME environment variable.
  2. The lambda is using the PutRecordBatchCommand command to push records to the Firehose stream. It's going to need specific permissions for that.
  3. We only want to process records removed from the table. The lambda only processes DynamoDB stream records with a OldImage property. This will avoid processing records that are inserted into the table (they have no OldImage property, just a NewImage property). But what about records modified in the table (that have both OldImage and NewImage properties)?

Lambda CDK

Here is how to lambda is created via CDK:

const archiveRecords = new nodejs.NodejsFunction(this, `ArchiveRecords`, {
  entry: join(__dirname, "..", "functions", "archive-records.ts"),
  handler: "handler",
  logRetention: logs.RetentionDays.ONE_DAY,
  environment: {
    DELIVERY_STREAM_NAME: firehoseStreamToS3.deliveryStreamName!,
  },
});

invoiceTransactionsTable.grantStreamRead(archiveRecords);
archiveRecords.addToRolePolicy(
  new iam.PolicyStatement({
    actions: ["firehose:PutRecordBatch"],
    resources: [firehoseStreamToS3.attrArn],
  })
);

The lambda is created with the previously created Firehose stream name as an environment variable.

It's granted the only permissions required to accomplish its job, according to the least privilege principle:

  • Reading data from the DynamoDB stream
  • PutRecordBatch on the Firehose

Triggering the Lambda

To trigger the lambda, we add a DynamoEventSource:

archiveRecords.addEventSource(
  new lambdaEventSources.DynamoEventSource(invoiceTransactionsTable, {
    startingPosition: lambda.StartingPosition.LATEST
  })
);

The problem with this code is that the lambda will be triggered every time there is a change on the table. Whether an item is inserted, removed or modified the lambda will be triggered.

How can we trigger the lambda only if items are removed?

Filtering the DynamoDB Stream to REMOVE Events Only

To trigger the lambda only when a specific event happens, you can use the filters property.

Here is the code:

archiveRecords.addEventSource(
  new lambdaEventSources.DynamoEventSource(invoiceTransactionsTable, {
    startingPosition: lambda.StartingPosition.LATEST,
    filters: [
      lambda.FilterCriteria.filter({
        eventName: lambda.FilterRule.isEqual("REMOVE"),
      }),
    ],
  })
);

With this code in place, the lambda will only be triggered when an item is removed from the table.

Filtering the DynamoDB Stream to TTL Events Only

A last optimization we can make is to trigger the lambda when the items are removed as a result of TTL.

Since the TTL process is executed the DynamoDB service, we can add another filter:

archiveRecords.addEventSource(
  new lambdaEventSources.DynamoEventSource(invoiceTransactionsTable, {
    startingPosition: lambda.StartingPosition.LATEST,
    retryAttempts: 3,
    filters: [
      lambda.FilterCriteria.filter({
        eventName: lambda.FilterRule.isEqual("REMOVE"),
        userIdentity: {
          type: lambda.FilterRule.isEqual("Service"),
          principalId: lambda.FilterRule.isEqual("dynamodb.amazonaws.com"),
        },
      }),
    ],
  })
);

Testing the system

To test the system, we can add items to the DynamoDB table, using the AWS SDK, like so:

let EXP=`date -j -f "%a %b %d %T %Z %Y" "\`date\`" "+%s"`
aws dynamodb put-item \
    --table-name InvoiceTransactions \
    --item '{
      "InvoiceNumber": { "S": "FGHI" },
      "TransactionId": { "S": "456" },
      "Amount": { "N": "100" },
      "InvoiceDate": { "S": "06062016" },
      "Expiration": {"N": "'$EXP'"}
  }'

Notice that we set an Expiration attribute on the table corresponding to the current time, to speed up the deletion process.

After a while, the archive bucket will contain some objects corresponding to the deleted items.

Conclusion

In conclusion, by leveraging the powerful combination of DynamoDB streams and TTL, you can create a simple and efficient data archiving solution in AWS.

The process is easy to set up and requires minimal maintenance, making it a cost-effective and scalable solution for organizations of all sizes.

Source code available on github.