Real-time Data Aggregation using DynamoDB Streams and AWS Lambda
DynamoDB streams can be used to aggregate data by using an AWS Lambda function to process the stream records and perform the necessary calculations.
In this tutorial, we will follow the scenario outline in this article.
Imagine a table of invoices, with each invoice consisting of multiple transactions with different amounts. Whenever a new transaction is added to an invoice we want to calculate its total amount.
Here is how it's going to work:
- Create an
InvoiceTransactions
table to store the invoices and their transactions - Make sure to enable the DynamoDB Streams on the
InvoiceTransactions
table. - Create new table
InvoiceTotal
to store the total amount for each invoice - Whenever a new transaction is added to an invoice, it will be recorded by the DynamoDB stream
- A lambda will be triggered by the stream and update the
InvoiceTotal
table with the new total amount.
Creating the system
Create the InvoiceTransactions table
An invoice is made of multiple transactions, and each transaction has an amount and date.
The partition key for this table is the InvoiceNumber
and the sort is the TransactionId
.
Here is the CDK code to create the InvoiceTransactions
table:
const invoiceTransactionsTable = new dynamodb.Table(
this,
"InvoiceTransactions",
{
partitionKey: {
name: "InvoiceNumber",
type: dynamodb.AttributeType.STRING,
},
sortKey: {
name: "TransactionId",
type: dynamodb.AttributeType.STRING,
},
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
removalPolicy: cdk.RemovalPolicy.DESTROY,
}
);
When filled up, the InvoiceTransactions
table will look like this:
Partition Key | Sort Key | Attribute 1 | Attribute 2 |
---|---|---|---|
InvoiceNumber | TransactionId | Amount | InvoiceDate |
1212121 | Client1_trans1 | $100 | 06062016 |
1212121 | Client1_trans2 | $500 | 06062016 |
1212122 | Client2_trans1 | $200 | 06062016 |
1212121 | Client2_trans1 | $500 | 06062016 |
Create the InvoiceTotal table
The InvoiceTotal
table will store the aggregated total amount for each invoice.
Here is the CDK code for the InvoiceTotal
table:
const invoiceTotalTable = new dynamodb.Table(this, "InvoiceTotal", {
partitionKey: {
name: "InvoiceNumber",
type: dynamodb.AttributeType.STRING,
},
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
When filled up, the InvoiceTotal
table will look like this:
InvoiceNumber | Total | UpdateDate |
---|---|---|
1212121 | 600 | 06062016 |
Create the lambda to calculate the total amount
Now let's create the lambda that'll update the total amount for each invoice:
const updateInvoiceTotal = new nodejs.NodejsFunction(
this,
`UpdateInvoiceTotal`,
{
entry: join(__dirname, "..", "functions", "update-invoice-total.ts"),
handler: "handler",
logRetention: logs.RetentionDays.ONE_DAY,
environment: {
INVOICE_TOTAL_TABLE: invoiceTotalTable.tableName,
},
}
);
invoiceTransactionsTable.grantStreamRead(updateInvoiceTotal);
invoiceTotalTable.grantWriteData(updateInvoiceTotal);
Since the lambda will write to the InvoiceTotal
table, we pass table name as an environment variable.
We also need to set the permissions for the lambda to write to this table.
For the lambda to use the data stream we also need to set the correct permissions on the InvoiceTransactions
table with the grantStreamRead
method.
The grantStreamRead
allows for the following permissions on the table:
The following code will make sure that the lambda is triggered by the DynamoDB stream:
updateInvoiceTotal.addEventSource(
new lambdaEventSources.DynamoEventSource(invoiceTransactionsTable, {
startingPosition: lambda.StartingPosition.LATEST,
})
);
Here is the code for the lambda itself:
import { DynamoDBStreamHandler } from "aws-lambda";
import {
AttributeValue,
DynamoDBClient,
ReturnValue,
} from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, UpdateCommand } from "@aws-sdk/lib-dynamodb";
import { unmarshall } from "@aws-sdk/util-dynamodb";
const client = new DynamoDBClient({});
const ddbDocClient = DynamoDBDocumentClient.from(client);
const { INVOICE_TOTAL_TABLE } = process.env;
export const handler: DynamoDBStreamHandler = async (event) => {
for (const record of event.Records) {
if (record.dynamodb && record.dynamodb.NewImage) {
const newImage = unmarshall(
record.dynamodb.NewImage as { [key: string]: AttributeValue }
);
await ddbDocClient.send(
new UpdateCommand({
TableName: INVOICE_TOTAL_TABLE,
Key: { InvoiceNumber: newImage["InvoiceNumber"] },
UpdateExpression: `SET #Total = if_not_exists(#Total, :initial) + :num, #UpdateDate = :date`,
ExpressionAttributeNames: {
"#Total": "Total",
"#UpdateDate": "UpdateDate",
},
ExpressionAttributeValues: {
":num": newImage["Amount"],
":initial": 0,
":date": newImage["InvoiceDate"],
},
ReturnValues: ReturnValue.UPDATED_NEW,
})
);
}
}
};
We're using the TypeScript types from @types/aws-lambda
to help us with type-safety :)
Since the lambda is triggered by a DynamoDB stream, we define the handler function as a DynamoDBStreamHandler
, so we can except an event of type DynamoDBStreamEvent
.
We loop through each record, and if there is a NewImage
on the stream record, we update the InvoiceTotal
table with the new amount.
It's easier to unmarshall the DynamoDB record, so we can use JavaScript objects.
Testing the system
Let's test the scenario by adding some items to the InvoiceTransactions
table:
aws dynamodb put-item \
--table-name InvoiceTransactions \
--item '{
"InvoiceNumber": { "S": "ABC" },
"TransactionId": { "S": "123" },
"Amount": { "N": "100" },
"InvoiceDate": { "S": "06062016" }
}
}'
aws dynamodb put-item \
--table-name InvoiceTransactions \
--item '{
"InvoiceNumber": { "S": "ABC" },
"TransactionId": { "S": "456" },
"Amount": { "N": "300" },
"InvoiceDate": { "S": "06062016" }
}
}'
We're adding two transactions ($100 and $300) for the same invoice.
After adding the two items, the InvoiceTransactions
table contains the two transactions:
InvoiceTransactions:
InvoiceNumber | TransactionId | Amount | InvoiceDate |
---|---|---|---|
ABC | 123 | $100 | 06062016 |
ABC | 456 | $300 | 06062016 |
As expected, the InvoiceTotal
table contains the total amount of $400 ($100 + $300) for the invoice:
InvoiceNumber | Total | UpdateDate |
---|---|---|
ABC | 400 | 06062016 |
Conclusion
In conclusion, DynamoDB streams and AWS Lambda provide a powerful combination for aggregating data in real-time. By using DynamoDB streams to capture data changes and an AWS Lambda function to process those changes, developers can easily perform calculations and store aggregate data.
Source code available on github