Hypertable of hypertables

So i have this crazy idea but not sure if its possible in postgres/timescaledb or completely crazy so please bear with me.

We have a pipeline that ingests timeseries data and stores it in timescale. For the most part the data end up in a few tables and its all manageable. All the data that comes in is associated to a device type and what i would like to do is create new hypertable every time a new device time is created. So far so easy. The problem starts with the ingestion pipeline. We use Kafka and Kafka connect with sink connectors and i dont want to have to create a new stream for each new device type (too many moving parts).

What i would like to do is have one ingestion table that acts as a proxy for all the other tables and then based on the value in a specific field (ex. target_table) routes the values to the correct hypertable.

Sorta the same concept used with the chunks where based on the time field the data is routed to the proper chunks.

Is something like that possible with timescale or postgres?

1 Like

Hi @David_Sooter ! The idea is very good, and I think you can achieve it using a simple trigger over a view BEFORE INSERT UPDATE, you can do something INSTEAD. Or write a simple SQL function that can do the same.

I also think that the promscale works like this by dynamically adding this.

The value proposition of this would automatically create a hypertable like some other table that is already available.

Let’s dive into a minimal implementation of this idea:

DROP TABLE model CASCADE;
DROP view mother CASCADE;
CREATE TABLE model ( time TIMESTAMP NOT NULL, identifier text, value decimal);
CREATE view mother AS select '' as table_name, null::timestamp as time, null::text as identifier, null::decimal as value;

CREATE OR REPLACE FUNCTION feed_child_table()
RETURNS trigger AS
$BODY$
DECLARE
  table_exists boolean;
  create_table text;
  insert_data text;
BEGIN
   SELECT true INTO table_exists
   FROM timescaledb_information.hypertables
   WHERE hypertable_name = NEW.table_name
   LIMIT 1;

   IF table_exists IS NULL THEN
     create_table := 'CREATE TABLE IF NOT EXISTS ' || NEW.table_name || '( like model )';
     EXECUTE create_table;
   END IF;

   insert_data := 'INSERT INTO ' || NEW.table_name || ' (time, identifier, value) VALUES ($1, $2, $3)';
   EXECUTE insert_data USING NEW.time, NEW.identifier, NEW.value;

  RETURN NULL;
END;
$BODY$
LANGUAGE plpgsql;

CREATE TRIGGER feed_child_table_trigger
INSTEAD OF INSERT ON mother FOR EACH ROW
EXECUTE PROCEDURE feed_child_table();

INSERT INTO mother (table_name, time, identifier, value) VALUES
('a', '2021-08-26 10:09:00.01'::timestamp, 'id1', 10.1),
('a', '2021-08-26 10:09:00.08'::timestamp, 'id2', 10.0),
('b', '2021-08-26 10:09:00.23'::timestamp, 'id3', 10.2),
('b', '2021-08-26 10:09:00.40'::timestamp, 'id4', 10.3);
1 Like

You sir are a genius. I made a few minor changes for my case bit that is exactly what i was thinking of. The performace is about 10 times slower on the inserts (20.000) entries compared to inserting directly into another table.

Half of the problem is the selects (but i could create the table in another process) and i think the other half is because we are now doing single inserts. Do you know if there is any way to handle it in batches in this situation?

Once again thanks a lof the the support.

Thanks, @David_Sooter! You’re the genius! I’m just following your crazy idea :nerd_face:

I never worked with batches but probably we can adapt the trigger to work with the batch instead of each row. I’ll try to adapt the code and we can see if performance boosts.

Also, another idea would be catch the table not exists error and just create the table in case it fails to insert.

I did just that with the exception and am able to avoid the selects for the most part.

I know that the INSTEAD OF INSERT Trigger only works for each row. I dont know if it could be solved with a rule instead of a trigger.

1 Like

Very good! if you can share the updated example, it would be great for further evolution! I’ll check if I can manage to make it work with rules :slight_smile:

Here is what i changed. It is a bit adapted to our model needs.

I also added converting the table to a hypertable and adding compression

DROP TABLE IF EXISTS model CASCADE;

DROP view IF EXISTS mother CASCADE;

CREATE TABLE model ( id TEXT NOT NULL, timestamp TIMESTAMPTZ NOT NULL, measure text NOT NULL, value text, metadata jsonb default '{}');

CREATE view mother AS select '' as table_name, ''as id, null::timestamptz as timestamp, ''::text as measure, ''::text as value, null::jsonb as metadata;

CREATE OR REPLACE FUNCTION feed_child_table()

RETURNS trigger AS

$BODY$



DECLARE

  table_exists boolean;
  create_table text;
  next_command text;
  create_hypertable text;
  insert_data text;

BEGIN
   
   insert_data := 'INSERT INTO ' || NEW.table_name || ' (id, timestamp, measure, value, metadata) VALUES ($1, $2, $3, $4, $5)';

  BEGIN
   EXECUTE insert_data USING  NEW.id, NEW.timestamp, NEW.measure, NEW.value, NEW.metadata;
  EXCEPTION WHEN  undefined_table then 
     -- CREATE TABLE
	 create_table := 'CREATE TABLE IF NOT EXISTS ' || NEW.table_name || '( like model )';
	 create_hypertable := 'SELECT create_hypertable('''|| NEW.table_name ||''',''timestamp'')';
     EXECUTE create_table;
 	 EXECUTE create_hypertable;
	 -- CREATE COMPRESSION POLICY
	next_command := 'ALTER TABLE ' || NEW.table_name || ' SET (
	  timescaledb.compress,
	  timescaledb.compress_segmentby = ''id,measure''
	)';
	EXECUTE next_command;
	next_command := 'SELECT add_compression_policy('''|| NEW.table_name ||''', INTERVAL ''7 days'')';
	EXECUTE next_command; 
	 
	 -- RETRY INSERT 
	 EXECUTE insert_data USING  NEW.id, NEW.timestamp::timestamptz, NEW.measure, NEW.value, NEW.metadata;
 
  END;


  RETURN NULL;

END;
$BODY$

LANGUAGE plpgsql;



CREATE TRIGGER feed_child_table_trigger
INSTEAD OF INSERT ON mother  FOR EACH ROW
EXECUTE PROCEDURE feed_child_table();



INSERT INTO mother (table_name, id,timestamp, measure, value, metadata) VALUES

('w', '1', '2021-08-26 10:09:00.01'::timestamp, 'id1', '10.1',null),
('a', '1', '2021-08-26 10:09:00.01'::timestamp, 'id1', '10.1',null),
('a', '1', '2021-08-26 10:09:00.01'::timestamp, 'id1', '10.1',null),
('a', '1', '2021-08-26 10:09:00.01'::timestamp, 'id1', '10.1',null),

('r', '2', '2021-08-26 10:09:00.08'::timestamp, 'id2', '10.0',null),

('b', '2','2021-08-26 10:09:00.23'::timestamp, 'id3', '10.2',null),

('b', '2','2021-08-26 10:09:00.40'::timestamp, 'id4', '10.3',null);

So small update. i tried it with a rule but the performance is worse. i dont really know if there is another way to bulk process it

DROP FUNCTION nb_redirect_inserts(mother) cascade;
create or replace function nb_redirect_inserts(vals mother )
REturns void AS $BODY$
DECLARE

  table_exists boolean;
  create_table text;
  next_command text;
  create_hypertable text;
  insert_data text;

BEGIN
   
   insert_data := 'INSERT INTO ' || vals.table_name || ' (id, timestamp, measure, value, metadata) VALUES ($1, $2, $3, $4, $5)';

  BEGIN
   EXECUTE insert_data USING  vals.id, vals.timestamp, vals.measure, vals.value, vals.metadata;
  EXCEPTION WHEN  undefined_table then 
     -- CREATE TABLE
	 create_table := 'CREATE TABLE IF NOT EXISTS ' || vals.table_name || '( like model )';
	 create_hypertable := 'SELECT create_hypertable('''|| vals.table_name ||''',''timestamp'')';
     EXECUTE create_table;
 	 EXECUTE create_hypertable;
	 -- CREATE COMPRESSION POLICY
	next_command := 'ALTER TABLE ' || vals.table_name || ' SET (
	  timescaledb.compress,
	  timescaledb.compress_segmentby = ''id,measure''
	)';
	EXECUTE next_command;
	next_command := 'SELECT add_compression_policy('''|| vals.table_name ||''', INTERVAL ''7 days'')';
	EXECUTE next_command; 
	 
	 -- RETRY INSERT 
	 EXECUTE insert_data USING  vals.id, vals.timestamp::timestamptz, vals.measure, vals.value, vals.metadata;
 
  END;

END;
$BODY$

LANGUAGE plpgsql;


CREATE OR REPLACE RULE redirect_inserts AS 
on insert to mother 
DO instead select nb_redirect_inserts(NEW);

INSERT INTO mother (table_name, id,timestamp, measure, value, metadata) VALUES

('w', '1', '2021-08-26 10:09:00.01'::timestamp, 'id1', '10.1',null),
('a', '1', '2021-08-26 10:09:00.01'::timestamp, 'id1', '10.1',null),
('a', '1', '2021-08-26 10:09:00.01'::timestamp, 'id1', '10.1',null),
('gg', '1', '2021-08-26 10:09:00.01'::timestamp, 'id1', '10.1',null),

('r', '2', '2021-08-26 10:09:00.08'::timestamp, 'id2', '10.0',null),

('b', '2','2021-08-26 10:09:00.23'::timestamp, 'id3', '10.2',null),

('b', '2','2021-08-26 10:09:00.40'::timestamp, 'id4', '10.3',null);

@fabriziomello maybe can help here. Let’s see.