Archiving Data with DynamoDB Streams and TTL
Data archiving is a common task for organizations of all sizes, as it allows them to keep important information for compliance and historical purposes while freeing up storage space and improving performance. In this article, we will explore how to archive data using DynamoDB streams and Time to Live (TTL) in Amazon Web Services (AWS).
DynamoDB is a managed NoSQL database service that offers fast performance, scalability, and low latency. One of its key features is DynamoDB streams, which is a feature that captures changes to data items in a DynamoDB table as a stream of events.
This is a follow up to the article about how to aggregate data with DynamoDB streams.
Time to Live (TTL) is a DynamoDB feature that automatically deletes items from a table after a specified amount of time. By combining DynamoDB streams and TTL, you can create an efficient and automated data archiving process.
The use of DynamoDB streams and TTL ensures that only expired items are processed, reducing the cost and overhead of the archiving process.
Creating the DynamoDB Table
We're going to expand the InvoiceTransactions
table from the previous article.
Here is what the table looks like before any change:
Partition Key | Sort Key | Attribute 1 | Attribute 2 |
---|---|---|---|
InvoiceNumber | TransactionId | Amount | InvoiceDate |
1212121 | Client1_trans1 | $100 | 06062016 |
1212121 | Client1_trans2 | $500 | 06062016 |
1212122 | Client2_trans1 | $200 | 06062016 |
1212121 | Client2_trans1 | $500 | 06062016 |
What is TTL
Time to Live (TTL) is a DynamoDB feature that automatically deletes items from a table after a specified amount of time.
In our case, we'll use the TTL attribute to indicate when an item is ready for archival.
Enable TTL on the Table
To enable TTL, we specify the attribute that holds the expiration timestamp for each item. The attribute must be of type number and its value must be a Unix timestamp in seconds.
Here the CDK code to add the Expiration
TTL attribute:
const invoiceTransactionsTable = new dynamodb.Table(
this,
"InvoiceTransactions",
{
partitionKey: {
name: "InvoiceNumber",
type: dynamodb.AttributeType.STRING,
},
sortKey: {
name: "TransactionId",
type: dynamodb.AttributeType.STRING,
},
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
timeToLiveAttribute: "Expiration",
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
removalPolicy: cdk.RemovalPolicy.DESTROY,
}
);
We also enable the DynamoDB streams functionality by requiring the New and Old images.
Kinesis Firehose and S3 bucket
When the time comes to store the data to be archived in S3, we'll use Kinesis Data Firehose to batch and compress the records
Here is the CDK code to create a Kinesis Firehose that writes data into a S3 bucket:
const firehoseRole = new iam.Role(this, "firehoseRole", {
assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
});
const archiveBucket = new s3.Bucket(
this,
"InvoiceTransactionsArchiveBucket",
{
removalPolicy: cdk.RemovalPolicy.DESTROY,
autoDeleteObjects: true,
}
);
archiveBucket.grantWrite(firehoseRole);
const firehoseStreamToS3 = new kinesisfirehose.CfnDeliveryStream(
this,
"FirehoseStreamToS3",
{
deliveryStreamName: "TTL-Archive",
deliveryStreamType: "DirectPut",
s3DestinationConfiguration: {
bucketArn: archiveBucket.bucketArn,
bufferingHints: {
sizeInMBs: 1,
intervalInSeconds: 60,
},
compressionFormat: "GZIP",
encryptionConfiguration: {
noEncryptionConfig: "NoEncryption",
},
prefix: "raw/",
roleArn: firehoseRole.roleArn,
},
}
);
// Ensures our role is created before we try to create a Kinesis Firehose
firehoseStreamToS3.node.addDependency(firehoseRole);
Lambda to Read from DynamoDB Stream and Write to Firehose
The Lambda function processes the streams: it will read the stream events and extract the expired items. The expired items can then be pushed to Firehose stream and eventually be stored in the S3 bucket for long-term storage.
Lambda Code
We'll cover how the lambda is triggered by the DynamoDB stream later and focus on the lambda code itself in this section:
import { DynamoDBStreamHandler } from "aws-lambda";
import { AttributeValue } from "@aws-sdk/client-dynamodb";
import { unmarshall } from "@aws-sdk/util-dynamodb";
import {
FirehoseClient,
PutRecordBatchCommand,
} from "@aws-sdk/client-firehose";
const { DELIVERY_STREAM_NAME } = process.env;
const firehoseClient = new FirehoseClient({});
export const handler: DynamoDBStreamHandler = async (event) => {
const recordsToStream = event.Records.filter(
(record) => record.dynamodb?.OldImage
).map((record) => {
const oldImage = unmarshall(
record.dynamodb!.OldImage as { [key: string]: AttributeValue }
);
return {
Data: Buffer.from(JSON.stringify(oldImage)),
};
});
if (recordsToStream.length > 0) {
await firehoseClient.send(
new PutRecordBatchCommand({
DeliveryStreamName: DELIVERY_STREAM_NAME,
Records: recordsToStream,
})
);
} else {
console.log("There are no records to send to firehose");
}
};
There are a few points worth highlighting in this code:
- The Firehose stream name is passed as the
DELIVERY_STREAM_NAME
environment variable. - The lambda is using the
PutRecordBatchCommand
command to push records to the Firehose stream. It's going to need specific permissions for that. - We only want to process records removed from the table. The lambda only processes DynamoDB stream records with a
OldImage
property. This will avoid processing records that are inserted into the table (they have noOldImage
property, just aNewImage
property). But what about records modified in the table (that have bothOldImage
andNewImage
properties)?
Lambda CDK
Here is how to lambda is created via CDK:
const archiveRecords = new nodejs.NodejsFunction(this, `ArchiveRecords`, {
entry: join(__dirname, "..", "functions", "archive-records.ts"),
handler: "handler",
logRetention: logs.RetentionDays.ONE_DAY,
environment: {
DELIVERY_STREAM_NAME: firehoseStreamToS3.deliveryStreamName!,
},
});
invoiceTransactionsTable.grantStreamRead(archiveRecords);
archiveRecords.addToRolePolicy(
new iam.PolicyStatement({
actions: ["firehose:PutRecordBatch"],
resources: [firehoseStreamToS3.attrArn],
})
);
The lambda is created with the previously created Firehose stream name as an environment variable.
It's granted the only permissions required to accomplish its job, according to the least privilege principle:
- Reading data from the DynamoDB stream
- PutRecordBatch on the Firehose
Triggering the Lambda
To trigger the lambda, we add a DynamoEventSource:
archiveRecords.addEventSource(
new lambdaEventSources.DynamoEventSource(invoiceTransactionsTable, {
startingPosition: lambda.StartingPosition.LATEST
})
);
The problem with this code is that the lambda will be triggered every time there is a change on the table. Whether an item is inserted, removed or modified the lambda will be triggered.
How can we trigger the lambda only if items are removed?
Filtering the DynamoDB Stream to REMOVE Events Only
To trigger the lambda only when a specific event happens, you can use the filters
property.
Here is the code:
archiveRecords.addEventSource(
new lambdaEventSources.DynamoEventSource(invoiceTransactionsTable, {
startingPosition: lambda.StartingPosition.LATEST,
filters: [
lambda.FilterCriteria.filter({
eventName: lambda.FilterRule.isEqual("REMOVE"),
}),
],
})
);
With this code in place, the lambda will only be triggered when an item is removed from the table.
Filtering the DynamoDB Stream to TTL Events Only
A last optimization we can make is to trigger the lambda when the items are removed as a result of TTL.
Since the TTL process is executed the DynamoDB service, we can add another filter:
archiveRecords.addEventSource(
new lambdaEventSources.DynamoEventSource(invoiceTransactionsTable, {
startingPosition: lambda.StartingPosition.LATEST,
retryAttempts: 3,
filters: [
lambda.FilterCriteria.filter({
eventName: lambda.FilterRule.isEqual("REMOVE"),
userIdentity: {
type: lambda.FilterRule.isEqual("Service"),
principalId: lambda.FilterRule.isEqual("dynamodb.amazonaws.com"),
},
}),
],
})
);
Testing the system
To test the system, we can add items to the DynamoDB table, using the AWS SDK, like so:
let EXP=`date -j -f "%a %b %d %T %Z %Y" "\`date\`" "+%s"`
aws dynamodb put-item \
--table-name InvoiceTransactions \
--item '{
"InvoiceNumber": { "S": "FGHI" },
"TransactionId": { "S": "456" },
"Amount": { "N": "100" },
"InvoiceDate": { "S": "06062016" },
"Expiration": {"N": "'$EXP'"}
}'
Notice that we set an Expiration
attribute on the table corresponding to the current time, to speed up the deletion process.
After a while, the archive bucket will contain some objects corresponding to the deleted items.
Conclusion
In conclusion, by leveraging the powerful combination of DynamoDB streams and TTL, you can create a simple and efficient data archiving solution in AWS.
The process is easy to set up and requires minimal maintenance, making it a cost-effective and scalable solution for organizations of all sizes.
Source code available on github.