Send newly inserted rows in materialised view (aggregates) to sqs

I have timescale db installed on an ec2. I have setup a table and materialised view for continuous aggregates. I’ve created a policy which refreshes the aggregates periodically.
I have a usecase for which I want to send the refreshed rows data in materialised view to an sqs queue. For a table we can just create a trigger on insert and listen to it, whenever a row is inserted to that table, listener will capture and it and we can apply our logic to send that data to sqs.
But aggregates are stored in a materialised view for which we cannot create triggers. What should be the approach in this case

I’m working on a tool which may solve your issue, timescaledb-event-streamer [1]. While the name sounds super official, it is more a side project of mine right now. May eventually become official (don’t know yet).

The tool supports CDC (change capture events) for hypertables and continuous aggregates, meaning it’ll use logical replication to capture and replicate inserts, update, deletes, …

While it doesn’t have a sink for AWS SQS right now, it has AWS Kinesis and it shouldn’t be too complicated to quickly create a SQS target sink.

The tool is somewhat experimental still, but I’m happy to help getting it working. Maybe you can explain a bit more on the use case?

PS: it seems like you’re aware that you only get the insert events on a scheduled materialization run, but I wanted to mention it anyways, since some people were surprised by the discrepancy between querying a realtime continuous aggregate and when events are generated.

[1] GitHub - noctarius/timescaledb-event-streamer: timescaledb-event-streamer is a command line program to create a stream of CDC (Chance Data Capture) TimescaleDB Hypertable events from a PostgreSQL installation running the TimescaleDB extension.

1 Like

I’d like to use your tool.
I’ve installed timescaledb on an aws ec2 and inserting data from aws lambda. I have created a hypertable and continous aggregate policy on it. I want to send the newly inserted rows to sqs whenever the aggregates are refreshed. In my cases kinesis can also work.
If possible can you provide an SOP or steps to make it work with my current application. I’m working on an ubuntu system.

1 Like

Hey @Nikhil_Agrawal!

Sorry for the late answer, but I was at a festival over the weekend. Anyway, I tried myself on a quick&dirty SQS sink (which wasn’t too complicated). That means, I guess you’d be able to stream events into AWS SQS without any extra work now. It isn’t tested though, since I never used SQS before. It isn’t documented yet (just as AWS Kinesis).

I also put together a release management yesterday, meaning, there are pre-built executables now. You can find them here: [1]
If you prefer Docker images, it’s also available over here: [2]

After downloading, you need to create a configuration. There is two full blown examples for either TOML [3] or YAML [4] (whatever you prefer). Both files contain the necessary elements for AWS SQS. The properties should be fairly explanatory.

For all the other properties, see the repositories README [5].

Before kicking off the tool, you need to “install” one function into the database which is used to prevent the tool from requiring full superuser permissions (not a fan of high permissions) [6], create a user for the replication, and configure PostgreSQL to provide logical replication support (wal_level=logical - [7]). Afterwards, you can just start the tool with a simple command (sorry, no service descriptor yet) by pointing it to the configuration you want.

CREATE ROLE repl_user LOGIN REPLICATION ENCRYPTED PASSWORD '<<password>>';
GRANT <<hypertable_owner>> TO repl_user;
$ timescaledb-event-streamer -config=./config.toml
$ timescaledb-event-streamer -config=./config.yml

I hope that helps :slight_smile:

[1] Releases · noctarius/timescaledb-event-streamer · GitHub
[2] Package timescaledb-event-streamer · GitHub
[3] timescaledb-event-streamer/config.example.toml at main · noctarius/timescaledb-event-streamer · GitHub
[4] timescaledb-event-streamer/config.example.yml at main · noctarius/timescaledb-event-streamer · GitHub
[5] GitHub - noctarius/timescaledb-event-streamer: timescaledb-event-streamer is a command line program to create a stream of CDC (Chance Data Capture) TimescaleDB Hypertable events from a PostgreSQL installation running the TimescaleDB extension.
[6] https://raw.githubusercontent.com/noctarius/timescaledb-event-streamer/main/create_timescaledb_catalog_publication.sql
[7] PostgreSQL: Documentation: 15: 20.5. Write Ahead Log

Thanks for the detailed explaination. I think it will make my work super easy. Thanks a lot for the help. I will try running it in my system and let you know!!

By the way, there is also a new one-line downloader: GitHub - noctarius/timescaledb-event-streamer: timescaledb-event-streamer is a command line program to create a stream of CDC (Chance Data Capture) TimescaleDB Hypertable events from a PostgreSQL installation running the TimescaleDB extension.

In this configuration (GitHub - noctarius/timescaledb-event-streamer: timescaledb-event-streamer is a command line program to create a stream of CDC (Chance Data Capture) TimescaleDB Hypertable events from a PostgreSQL installation running the TimescaleDB extension.), I can see in the timescaleDB configuration for hypertables, but not for continuous aggregates. Can you give some example for continuous aggregate config.

Just give it the name of the continuous aggregates. It’s automatically recognized and resolved to the underlying hypertable.

Hi, I’ve followed all your mentioned steps. Now when I run ./timescaledb-event-streamer -config=config.yml I’m getting this error -

timescaledb-event-streamer version 0.0.3 (git revision 13b67e91e7a69425631bd428ebb97395e89d6fcd; branch v0.0.3)
Loading configuration file: config.yml
[2023/06/29T16:11:45.854] [INFO] [Replicator] Discovered System Information:
[2023/06/29T16:11:45.854] [INFO] [Replicator] * PostgreSQL version 14.8
[2023/06/29T16:11:45.854] [INFO] [Replicator] * TimescaleDB version 2.10.3
[2023/06/29T16:11:45.854] [INFO] [Replicator] * PostgreSQL System Identity 7197010462710543712
[2023/06/29T16:11:45.854] [INFO] [Replicator] * PostgreSQL Timeline 1
[2023/06/29T16:11:45.854] [INFO] [Replicator] * PostgreSQL Database d3newschema
[2023/06/29T16:11:46.418] [INFO] [SystemCatalog] Selected hypertables for replication:
[2023/06/29T16:11:46.418] [INFO] [SystemCatalog] * “public”.“variable_numerical” (type: Hypertable)
[2023/06/29T16:11:46.419] [INFO] [FileStateStorage] Starting FileStateStorage at /tmp/statestorage.dat
[2023/06/29T16:11:46.419] [INFO] [FileStateStorage] Loading FileStateStorage at /tmp/statestorage.dat
[2023/06/29T16:11:46.554] [INFO] [ReplicationConnection] SystemId: 7197010462710543712, Timeline: 1, XLogPos: 37AE/C55E6E38, Database: d3newschema
Publication missing but wasn’t asked to create it either

Can you please help me where I might’ve gone wrong?