Building IoT Pipelines for Faster Analytics With IoT Core

Building IoT Pipelines for Faster Analytics With IoT Core

What Is AWS IoT Core

AWS IoT Core is Amazon’s managed IoT service that lets you easily connect and manage Internet of Things devices. This can be done using the MQTT protocol, which was specifically designed for IoT use cases to be lightweight and fault-tolerant. If MQTT isn’t your cup of tea, you also have the option to use HTTP, although this is less common for reasons beyond this post.

While IoT Core can be used solely as a message broker, allowing IoT devices to send and receive messages by way of publishing and subscribing to MQTT topics, you can also use the message routing functionality to receive select messages using a SQL-like rule and forward them to other Amazon Web Services. These include Kinesis, SQS, and Kafka, but most importantly, AWS Lambda.

If you’ve followed the Do More With Timescale on AWS series over the past couple of months, you’ve undoubtedly seen that with a simple AWS Lambda function, we can very easily insert any kind of time-series data into a Timescale database.

Today, we’ll go over how to set up a message routing rule and set up a Lambda function as an action so that for every MQTT message, a Lambda function gets triggered.

If you’d rather watch a video on how to achieve this, click on the video below!

Writing the Action Lambda Function

Before deploying any resources to AWS, let’s write our AWS Lambda function, which we will use as a trigger in AWS IoT Core to insert MQTT messages into Timescale.

For the sake of simplicity, I’ve written the Lambda function in Python and kept the code as simple as possible. A side effect of this is that the function doesn’t have adequate type-checking, error handling, or secret management. For that reason, it is not recommended to use this function in a production environment.


Let’s start by writing the initialization portion of our function. This will be executed once, so long the function stays “hot.” By importing our libraries and creating a database connection in the initialization portion, we forgo having to create a new database connection for each Lambda execution. This will save valuable time (and consequently, a lot of money).

First, import the psycopg2 library, which we’ll use to connect to our Timescale database.

import psycopg2

Then we create a variable called connection_string which holds the connection string to our database. As mentioned previously, for the sake of simplicity, we are omitting any sensible secret management.

An easy way to do this would be to add your connection string as an environment variable to your Lambda function and use the os.getenv function to retrieve it or to use an alternative secret management solution like AWS Secrets Manager.

Since we will be using Lambdas, we strongly recommend adding a connection pooler to your service and using transaction mode. Add a connection pooler to your service by going to the Connection info panel of your service in the Timescale dashboard, clicking the Connection pooler tab, and then clicking Add a connection pooler. From there, choose “Transaction pool” in the Pool drop-down. The Service URL will be your connection string.

Do note that this connection string does not contain your service password, so you will need to manually add this as follows.

conn_str = "postgres://tsdbadmin:[email protected]:5432/tsdb"

Afterward, we create a psycopg2 connection object from which we create a cursor that we will use to insert rows into our database.

conn = psycopg2.connect(conn_str)
cursor = conn.cursor()

Lambda handler

Then, we move on to the main handler of our Lambda function. This will be run exactly once for every execution.

We execute a SQL insert statement into the sensor hypertable. In this case, the event consists of a single float, which could be a temperature reading from an IoT sensor or the battery percentage of an electric car.

We also use the PostgreSQL NOW() function to indicate when this event happened. In a production environment, it might be advisable to add a timestamp on the IoT sensor itself, as MQTT messages routed through AWS IoT Core can incur a small time delay.

After our execution, we commit the transaction and return the function. As mentioned at the beginning of this blog post, this simple function can benefit hugely from logging and more graceful error handling.

def handler(event, context):
	# try to insert the value into the sensor table
    cursor.execute("INSERT INTO sensor (time, value) VALUES (NOW(), %s);", [event["value"]])

You can find the full Lambda function code in this GitHub repository.

Creating the Sensor Hypertable

Before we continue, it’s important to create the sensor hypertable in our Timescale instance by executing the following SQL queries on our database.

SELECT create_hypertable('sensor', 'time');

Creating the Lambda Function in AWS

Publish the function

Once we’ve successfully written the code, we can package our function along with the psycopg2 library in a Docker container and publish it to AWS ECR. You can find the full code, Dockerfile, and build script in this GitHub repository.

Create the function

Once the Docker container has finished publishing to ECR, we can create a new Lambda function using its container image URI. We will name our function timescale-insert. Make sure to pay attention to what architecture you used to build the lambda function. If you are using an M1 (or higher) Mac, this will be arm64. In most other cases, you can use x86_64.

Creating the Lambda function

Due to the nature of containers, we don’t need to change any settings once the Lambda function has been created!

Create the IoT Core Message Routing Rule

IoT Core has a feature called ‘message routing,’ which allows you to (as the name suggests) route messages to different services. Throughout this section, you will learn more about how rules work.

Create a new rule by pressing the orange 'Create rule' button.

Creating a new rule in IoT Core

Give the rule an appropriate name. Since IoT Core rule names cannot include dashes, we will name it: timescale_insert.

Specifying the rule properties

Next up, we need to create a SQL statement that will be used to ‘query’ all incoming MQTT messages in IoT Core. This SQL statement consists of three parts:

  1. A SELECT clause that selects specific fields of the MQTT message payload.
  2. A FROM clause that is used to specify the MQTT topic we want to query on
  3. A WHERE clause that is used to exclude certain MQTT messages where a condition is not met.

In our query, we will be selecting every field (*) of messages on the my-topic/thing topic. We won’t be using a where clause because we want to insert every MQTT message on the topic in our Timescale database. Then, we click ‘Next.’

 Configuring the SQL statement in IoT Core

After configuring our SQL statement, we need to add an action. Select the Lambda action type, then find and select the timescale-insert function we created earlier! As you can see, there is an option to add multiple rule actions to a single IoT Core rule. This would allow you to stream your data to multiple destinations, for example, two (or more) Timescale databases or hypertables. You name it; it can be done!

Setting the rule actions in IoT Core

Then lastly, click on the orange ‘Create rule’ button.

If all goes well, your newly created rule should be active!

Checking if the IoT Core rule is active

Testing Our Message Routing Rule

Now that we’re done writing our Lambda function, let’s set up our Timescale hypertable and IoT Core message rule. It’s time to put it to the test!

To do this, I’ve repurposed a Python script written by AWS. You can find the original AWS tutorial or clone the modified Python script here. Do note that you will have to add your own certificates and ‘things’ to AWS IoT Core for this to work (which could be a blog post on its own).

In case you don’t feel like reading the Python code, these are the sequential steps taken by the script:

  1. Create an MQTT connection to the AWS IoT Core endpoint using the appropriate device certificates.
  2. Generate an array of floats.
  3. Iterate over the array:
    a) Synthesize JSON message with a float from the array created in step 2.
    b) Publish the JSON message to the my-topic/thing MQTT topic.
    c) Sleep for one second.
  4. Gracefully disconnect.

We run this script for a handful of seconds, connect to our Timescale database using psql and execute a query to retrieve all the rows in the sensor table.

Query to retrieve all the rows in the sensor table

And as you can see, we’ve achieved our goal of piping MQTT data from AWS IoT Core into a Timescale database with a single AWS Lambda function!

AWS IoT Core: The End

Congratulations! You have successfully set up an AWS IoT Core message rule that streams MQTT messages originating from a simple (albeit fake) sensor into a Timescale database.

You are now primed and ready to build an endless stream of performant IoT pipelines that can accelerate your real-time IoT dashboards and analytics!

If you want to learn more about how Timescale can improve your workloads on AWS, check out this YouTube playlist filled with other AWS and Timescale-related tutorials!

More resources

Ingest and query in milliseconds, even at terabyte scale.
This post was written by
7 min read

Related posts