Days of trial and errors (well, mostly errors) with compression

’m having a challenge that makes me question if Timescale is a right fit for this:
The DB is roughly 2 TB, a few hundred million rows. It is recording financial trade data.

We are recording trade activity on roughly 140 symbols.

Data arrives in a live stream, and it gets written in a table.

We regularly receive authoritative data that should overwrite the live stream data. It always arrives by chunk of 24h, and it always arrives in order, day by day. It never arrives earlier than 24h after a day has passed, but sometimes it arrives later.
So we may receive day 1, 2, 3 then nothing for 3 days, then days 4, 5 then nothing for a day, then day 6, etc.
When these blocks of 24h of data arrive, we erase the live data for that day and insert the authoritative data.
It can never arrive earlier than 24h after a day has passed, and never later than 120h.

The crucial part here is that we receive this authoritative data PER symbol, so symbol A may be at day 5 and symbol B may be up-to-date only until day 3 at this point in time, etc.

So, this part works… But when we tried to add compression and things started to fail.

The chunks have a duration of 1 day and this is the center of the problem because all symbols share the same chunk for a day.

Data for all symbols arrive at different time, so compression can only apply up to the timestamp where all symbols have been received.

I implemented a remove/add of the compression policy to update the time up to which data can be compressed, but delete operations (on live data not covered by the compression yet) fail.

I tried to implement backfilling, but the script on GitHub has errors.

Likewise, I tried partitioning by symbol, with 140 partitions (one per symbol). My expectation was to have 1 chunk / day / symbol and each could run their own compression, but it doesn’t look like it is working that way.

The delete operations, which are always ahead of the last time that should be compressed, do fail.

I’m running out of ideas…

So, in short, I need to replace live data with authoritative data on a regular basis, by slices of 24h at once. And these slices arrive per symbol (we have 140) and share the same chunks (size of 1 day).

  • I tried to keep compression limited to chunks where all the symbols have been updated through policy updates, and it failed as the delete operation to remove the live data for the day to be written fails.
  • I tried to make 1 chunk / symbol / day with partitioning, so they each symbol can have its own compression going without caring about the other chunks, but the delete operation fails too.
1 Like

Hi @thomasd3 ,

Welcome to the community!

Could you share some more info so I can dig deeper into what’s wrong:

  • What’s the compression policy on the hypertable?
  • What’s the error when you try to delete?
  • Would it be a solution (maybe not an ideal one?) to only compress data after you know for sure there won’t be any more changes (eg. 7 days)?

:wave: @thomasd3,

I’ll just add a little context to @Attila’s questions.

It sounds like you were trying to remove the compression policy so that you could push it out to 7 days, giving you the time you needed to update the data accordingly before the compression policy kicks in and prevents deletes and updates.

I think the missing piece is that when you remove/enable a compression policy (through ALTER TABLE...) that doesn’t decompress the data that’s already compressed. You didn’t mention decompressing the data, so my guess is that even though you changed the policy, you still had chunks in the 1-6 day range that are compressed.

If that’s the case, you can ensure that the current policy doesn’t compress until after 7 days have passed (or whatever the proper interval is), and then run something like:

SELECT decompress_chunk(i,true) FROM show_chunks('hypertable_name', newer_than=>now()-'7 days'::interval) as i;

This should decompress any chunks that have a range_end value newer than 7 days.

If that doesn’t work, your other answers will help us track down the issue for you I’m sure!

re: space partitioning
TimescaleDB uses a hashing function to determine space partitioning. This means that the resulting hash of the partitioned values will create fewer (often many fewer) partitions than there are IDs. There are a number of reasons for doing it this way, but primarily it’s to balance spreading out the data with not creating (potentially) millions of chunks per chunk_time_interval since we don’t know upfront how many identifiers there are in a dataset (and it can change over time).

In a single-node TimescaleDB instance, space partitioning rarely improves the performance of ingest or queries and isn’t recommended.

I think once you get past the compression situation, things will start to look up pretty quickly.

HTH!

@ryanbooz,

Let me cover how the system works with some code:

First, the table itself:

-- create table
CREATE TABLE IF NOT EXISTS exchange.{tableName}
(
    ticker      VARCHAR(16)  NOT NULL,
    ts          TIMESTAMP    NOT NULL,
    price       FLOAT4       NOT NULL,
    quantity    FLOAT4       NOT NULL,
    direction   BOOL         NOT NULL
);

 -- create hypertable
SELECT create_hypertable('exchange.{tableName}', 'ts', create_default_indexes => false, chunk_time_interval => INTERVAL '1 day', if_not_exists => TRUE);

 -- add compression
ALTER TABLE exchange.{tableName} SET (autovacuum_enabled = on);
ALTER TABLE exchange.{tableName} SET (timescaledb.compress, timescaledb.compress_segmentby = 'ticker', timescaledb.compress_orderby= 'ts desc');

Let’s go over the original case, without compression.

I get live data for all symbols, store a few seconds of it, and then send it to the db. It is written using a COPY command for speed.

At some point, I receive 24h (midnight to midnight) of data for a SINGLE symbol.

I delete the live data for the corresponding 24h:

DELETE FROM exchange.{tableName} 
WHERE ticker = '{ticker}' AND ts >= '{fromTime.Format}' AND ts < '{toTime.Format}'

And then I do a COPY operation to insert the whole 24h for that symbol (called ‘ticker’ here) in the database.

The main reason I use DELETE is that COPY will not overwrite existing data. In the past I experimented with creating a temp table, doing the COPY in it and then write it with ON CONFLICT to replace existing data, but it turns out that DELETE + COPY was just much simpler.

So, this works very well.

Now, the compression:

Chunks are set to 24h since it matches the batch of data received.

I have a table that keeps track of the last write operations for these 24h blocks:

CREATE TABLE IF NOT EXISTS exchange.{tableName}
(
     ticker     VARCHAR(16)  NOT NULL,
     last_ts    TIMESTAMP    NOT NULL,
     status     VARCHAR      NOT NULL,
     UNIQUE (ticker)
);

In theory, anything older than min(last_ts) on this table has been written in its final form.

Each time I finalize writing a full day, I run this:

-- remove current compression policy
SELECT remove_compression_policy(hypertable => 'exchange.{tableName}', if_exists => TRUE);

-- add new compression policy
SELECT add_compression_policy(hypertable => 'exchange.{tableName}', compress_after => current_date - min(last_ts) + INTERVAL '1 days')
FROM exchange.capture_tracker;

-- alter job to start now
WITH id AS (SELECT job_id FROM timescaledb_information.jobs WHERE proc_name='policy_compression' AND hypertable_name = '{tableName}')
SELECT alter_job(job_id => (SELECT job_id FROM id), schedule_interval => INTERVAL '15 minutes', next_start => now());

This should delete the current policy, recreate one where I put compress_after set to the oldest write from the table above, to the safe zone, and then it starts the compression job immediately.

When I run this, at some point I get errors that tell me I’m trying to DELETE a compressed chunk.

That’s one of the tests.

I’ve also tried to add partitions (based on the symbol (‘ticker’ in the code here)) to create 1 chunk / day / symbol, I tried to put int32.max as the number of partitions, thinking that it should be enough to allow the hashing function to have one partition / symbol, and let the compression policy run on its own, but this has also failed with the exact same error.

Thanks for all of the detail @thomasd3.

I honestly still think the issue here is that you’re not decompressing the chunks before trying to DELETE rows from them. Again, remove_compression_policy() doesn’t decompress any chunks, it only removes the automated policy that would compress chunks in the future.

So, if you set the policy yesterday to compress_after => '2 days', for instance, all chunks older than two days ago would be compressed.

Then today, if you removed that policy and set a new one that said compress_after => '4 days', you’d still have one or two chunks that are compressed which you’re probably going to try and delete data from.

Instead, if your space can handle it, I think we’d suggest this process instead:

  • only partition based on time. Complicating the partitioning to try and get 140 chunk a day will probably impact your queries in other ways that are undesirable and overcomplicates some of this
  • Remove all compression policies
  • decompress all chunks newer_than=>'6 days' (sample query shown in my previous response)
  • add a new compression policy that just compresses any chunk after 6 days and let that run normally.

If you really do need the data to be compressed (for space reasons) while you wait for the finalized form to come, then you’re process above will have to deal with decompression too so that any data you’re trying to COPY is inserted into compressed chunks.

Does that make any more sense and seem like a plausable cause?