Distributed Hypertable Poor Performance for Simple Bucketed Aggregation

Three main thoughts

  • Is it possible to have Timescale execute a sub-select and push the resulting vector down to the data nodes for filtering?
  • Is it possible to get the data nodes to choose a parallel scan, can you scan an index in parallel?
  • I’m still concerned about the async append time as this is consistently the long poll in the tent with different combinations of parameters.

Testing with a fixed IN clause cuts the execution time almost in half.

For smaller time ranges last 5 min, hour, or day the performance differences is much closer. Getting the last week is an extreme case for us, but I’m still surprised that there is this much of a gap between ElasticSearch and Timescale.

Time range | ES (ms) | TS (ms)
Last 5 min | ~40ms | ~60ms
Last 1 hr | ~70ms | ~100ms
Last 1 day | ~260ms | ~1100ms

Again, thank you for your help!

I think this would be a great thing to put into a Github issue first of all, and if you would do that, especially around the subselect pushdown and parallel scanning, that would be useful for improving this.

The thing I will say is that in general we’re currently more designed for concurrency than for the fastest queries for a single user (which is probably why you’re seeing less parallelism than maybe you’re expecting), so we make different tradeoffs around that than some other tools, we’re thinking about ways for users to tune that tradeoff a bit more as well though.

The other thing I’d suggest is that we do have a feature that will make this sort of query very fast, which is continuous aggregates (and that’s a feature that we also have some significant improvements coming for soon: Continuous Aggregates finals form by fabriziomello · Pull Request #4269 · timescale/timescaledb · GitHub). You can also use our statistical aggregates, specifically stats_agg which also has a rollup function that allows you to take, say, the one minute aggregate and compute larger buckets from that (edit: note that I hadn’t noticed that you’re doing aggregates across series ids here, but it would also allow you to rollup across series ids, filtering out the ones you don’t want, at different levels of granularity depending on the view you’re looking at, for instance, you might have hourly aggregates that you use for the weekly view and rollup based on series id, as minutely aggregates for a week seems like overkill?), which might be the bigger win for you.

That would be the most significant speedup you could do I think, even on the current version of continuous aggregates, and would likely get faster on the new version coming out soon.

Have you tried that out and does it make a difference for you?

Also, given the size of your data it feels like multi-node would be significant overkill? Are you testing this for a much larger deployment and are you sure that the performance you’re looking for actually scales with both solutions? Continuous aggregates are a great way to power dashboards like it sounds like you have though and I’d definitely recommend taking advantage of them.

Just to note, AFAIK the times of the underlying scans of the Async Append node are not shown correctly in the EXPLAIN output. So the Async Append time you’re seeing is probably due to the underlying scans taking so long, not that node doing something by itself.

It would be interesting to run this with EXPLAIN (VERBOSE) and SET timescaledb.enable_remote_explain = on so that we can see the query plans on the remote nodes.

To answer your other questions:

Is it possible to have Timescale execute a sub-select and push the resulting vector down to the data nodes for filtering?

That would be a distributed join pushdown, currently we have no support for this. The moment you join a distributed table to something else, it’s all downloaded to the access node and joined there.

Is it possible to get the data nodes to choose a parallel scan, can you scan an index in parallel?

Yes, although I’m always struggling to make Postgres choose a parallel plan. You could test it by setting parallel_setup_cost and parallel_tuple_cost to 1, but IIRC the settings are not shipped from access node to data node.

1 Like

I would by happy to create a Github issue for this. Thanks for information around the design considerations that is very helpful to know. I have not looked at the continuous and statistical aggregates yet, I’ll definitely check those out. Yes I’m testing for a much larger deployment. Will be collecting hundreds of different metrics with our current estimates being 30-40 TB ingested per day and we need to retain for at least 30 days which would put us at a bit over 1 PB. I think for that size we will have to use multi-node simply for the storage space.

That would be a distributed join pushdown, currently we have no support for this. The moment you join a distributed table to something else, it’s all downloaded to the access node and joined there.

In this case would it have to be a distributed join though? In the normal non-timescale non-distributed case where I have something like

SELECT * FROM foo WHERE foo.bar IN (SELECT id FROM baz);

Wouldn’t Postgres have to evaluate that sub-select into a vector of id’s that it then uses to scan the index? Wouldn’t Timescale then be able to do the same thing and simply push that vector of ID’s to the data nodes (assuming that baz is a small table stored on the access node)?

This kind of feature would be really useful because in my experience this kind of setup is a common use case when creating Grafana dashboards.

Yep, I think that’s what it does, and that’s a join. The plan you described sounds like a nested loop join with the inner side being a subselect, and the outer side an index scan. Like this:

test=# explain (costs off) select count(*) from live_steps_originals_pg where user_id in (select distinct user_id from live_steps_originals_pg);
                                                               QUERY PLAN                                                               
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
 Aggregate
   ->  Nested Loop
         ->  Unique
               ->  Custom Scan (SkipScan) on live_steps_originals_pg live_steps_originals_pg_1
                     ->  Index Only Scan using live_steps_originals_pg_user_id_idx on live_steps_originals_pg live_steps_originals_pg_1
                           Index Cond: (user_id > NULL::bigint)
         ->  Index Only Scan using live_steps_originals_pg_user_id_idx on live_steps_originals_pg
               Index Cond: (user_id = live_steps_originals_pg_1.user_id)

The subselect should be small in this case, so for distributed case we could evaluate it on the access node and ship the result to data nodes. Then they could perform join locally and use the index scan. I agree that this use case looks important.

Yep, I think that’s what it does, and that’s a join.

I mean, it doesn’t exactly evaluate it into a vector, but into a table. I doubt Postgres can transform a join into array match operator this way…

What you describe is exactly what I was thinking about, and now that I think about it that makes complete sense that it would be a table and join since Postgres probably wants to treat most everything it works with as a table.

I tried setting timescaledb.enable_remote_explain = on and running explain. If I hard code the series_ids and use the IN clause as expected the remote plans use index scans and performance is improved by ~100%. When I run explain on the array version with this setting on I get the error shown below.

Query:

with power_series_data as (
  select id, labels
  from series_telemetry_power s
  where
    s.labels && (select coalesce(array_agg(l.id), array[]::int[]) from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x90
00c1s4b1'))
)
select 
  time_bucket(60000, m.ts) time,
  avg(val) Power
from telemetry_power m
where
  m.series_id IN (7852, 7863, 7869, 7895, 7896, 7898, 7901, 7911, 7918, 7922, 7924, 7928, 7936, 7969, 7974, 7984, 7995, 8114, 8121, 8153, 7991, 8010, 8021, 8027, 8100, 8133, 8084, 8091, 8109, 8137, 7943, 7972, 7982, 8044, 8054, 8111, 8006, 8019, 8082, 8119)
  and m.ts between 1650663853916 and 1651268653916
group by time
order by 1 desc;

Plan:

                                                                                                                                                                                                                                                      QUERY PLAN          
                                                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                   
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=281393.39..281400.89 rows=200 width=16) (actual time=1878.665..2153.229 rows=5756 loops=1)
   Output: (time_bucket('60000'::bigint, ts)), avg(val)
   Group Key: (time_bucket('60000'::bigint, ts))
   Buffers: temp read=134 written=135
   ->  Sort  (cost=281393.39..281394.89 rows=600 width=40) (actual time=1878.609..1991.714 rows=17268 loops=1)
         Output: (time_bucket('60000'::bigint, ts)), (PARTIAL avg(val))
         Sort Key: (time_bucket('60000'::bigint, ts)) DESC
         Sort Method: external merge  Disk: 1072kB
         Buffers: temp read=134 written=135
         ->  Custom Scan (AsyncAppend)  (cost=1715.99..281365.71 rows=600 width=40) (actual time=1203.208..1753.411 rows=17268 loops=1)
               Output: (time_bucket('60000'::bigint, ts)), (PARTIAL avg(val))
               ->  Append  (cost=1715.99..281365.71 rows=600 width=40) (actual time=0.029..333.924 rows=17268 loops=1)
                     ->  Custom Scan (DataNodeScan)  (cost=1715.99..66294.50 rows=200 width=40) (actual time=0.011..40.047 rows=5756 loops=1)
                           Output: (time_bucket('60000'::bigint, m.ts)), (PARTIAL avg(m.val))
                           Relations: Aggregate on (public.telemetry_power m)
                           Data node: dn1
                           Fetcher Type: Row by row
                           Chunks: _dist_hyper_4_210_chunk, _dist_hyper_4_209_chunk, _dist_hyper_4_208_chunk, _dist_hyper_4_207_chunk, _dist_hyper_4_206_chunk, _dist_hyper_4_205_chunk, _dist_hyper_4_204_chunk, _dist_hyper_4_203_chunk, _dist_hyper_4_202_chunk, _dist_hyper_4_201_chunk, _dist_hyper_4_200_chunk, _dist_hyper_4_199_chunk, _dist_hyper_4_198_chunk, _dist_hyper_4_197_chunk, _dist_hyper_4_196_chunk, _dist_hyper_4_195_chunk, _dist_hyper_4_194_chunk, _dist_hyper_4_193_chunk, _dist_hyper_4_192_chunk, _dist_hyper_4_191_chunk, _dist_hyper_4_190_chunk, _dist_hyper_4_189_chunk, _dist_hyper_4_188_chunk, _dist_hyper_4_187_chunk, _dist_hyper_4_186_chunk, _dist_hyper_4_185_chunk, _dist_hyper_4_184_chunk, _dist_hyper_4_183_chunk, _dist_hyper_4_181_chunk, _dist_hyper_4_180_chunk, _dist_hyper_4_179_chunk, _dist_hyper_4_178_chunk, _dist_hyper_4_177_chunk, _dist_hyper_4_176_chunk, _dist_hyper_4_175_chunk, _dist_hyper_4_174_chunk, _dist_hyper_4_173_chunk, _dist_hyper_4_172_chunk, _dist_hyper_4_171_chunk, _dist_hyper_4_170_chunk, _dist_hyper_4_169_chunk
                           Remote SQL: SELECT public.time_bucket(60000::bigint, ts), _timescaledb_internal.partialize_agg(avg(val)) FROM public.telemetry_power WHERE _timescaledb_internal.chunks_in(public.telemetry_power.*, ARRAY[99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58]) AND ((series_id = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[]))) AND ((ts >= 1650663853916::bigint)) AND ((ts <= 1651268653916::bigint)) GROUP BY 1
                           Remote EXPLAIN: 
                             Partial HashAggregate  (cost=45266.06..53769.84 rows=305585 width=40) (actual time=14605.818..14650.739 rows=5756 loops=1)
                               Output: (time_bucket('60000'::bigint, _dist_hyper_4_210_chunk.ts)), _timescaledb_internal.partialize_agg(PARTIAL avg(_dist_hyper_4_210_chunk.val))
                               Group Key: time_bucket('60000'::bigint, _dist_hyper_4_210_chunk.ts)
                               Planned Partitions: 32  Batches: 1  Memory Usage: 1425kB
                               Buffers: shared hit=275352
                               ->  Result  (cost=0.41..33714.94 rows=323179 width=16) (actual time=0.080..12076.899 rows=323179 loops=1)
                                     Output: time_bucket('60000'::bigint, _dist_hyper_4_210_chunk.ts), _dist_hyper_4_210_chunk.val
                                     Buffers: shared hit=275352
                                     ->  Append  (cost=0.41..29675.20 rows=323179 width=16) (actual time=0.067..7365.778 rows=323179 loops=1)
                                           Buffers: shared hit=275352
                                           ->  Index Scan using _dist_hyper_4_210_chunk_telemetry_power_series_ on _timescaledb_internal._dist_hyper_4_210_chunk  (cost=0.41..679.22 rows=7792 width=16) (actual time=0.055..59.192 rows=7792 loops=1)
                                                 Output: _dist_hyper_4_210_chunk.ts, _dist_hyper_4_210_chunk.val
                                                 Index Cond: ((_dist_hyper_4_210_chunk.series_id = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_dist_hyper_4_210_chunk.ts >= '1650663853916'::bigint) AND (_dist_hyper_4_210_chunk.ts <= '1651268653916'::bigint))
                                                 Buffers: shared hit=6658
                                           ->  Index Scan using _dist_hyper_4_209_chunk_telemetry_power_series_ on _timescaledb_internal._dist_hyper_4_209_chunk  (cost=0.41..707.03 rows=8144 width=16) (actual time=0.074..62.185 rows=8144 loops=1)
                                                 Output: _dist_hyper_4_209_chunk.ts, _dist_hyper_4_209_chunk.val
                                                 Index Cond: ((_dist_hyper_4_209_chunk.series_id = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_dist_hyper_4_209_chunk.ts >= '1650663853916'::bigint) AND (_dist_hyper_4_209_chunk.ts <= '1651268653916'::bigint))
                                                 Buffers: shared hit=6889
                                           ->  Index Scan using _dist_hyper_4_208_chunk_telemetry_power_series_ on _timescaledb_internal._dist_hyper_4_208_chunk  (cost=0.41..707.09 rows=8150 width=16) (actual time=0.066..62.892 rows=8150 loops=1)
                                                 Output: _dist_hyper_4_208_chunk.ts, _dist_hyper_4_208_chunk.val
                                                 Index Cond: ((_dist_hyper_4_208_chunk.series_id = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_dist_hyper_4_208_chunk.ts >= '1650663853916'::bigint) AND (_dist_hyper_4_208_chunk.ts <= '1651268653916'::bigint))
                                                 Buffers: shared hit=6939
                                           ->  Index Scan using _dist_hyper_4_207_chunk_telemetry_power_series_ on _timescaledb_internal._dist_hyper_4_207_chunk  (cost=0.41..707.02 rows=8142 width=16) (actual time=0.080..60.485 rows=8142 loops=1)
                                                 Output: _dist_hyper_4_207_chunk.ts, _dist_hyper_4_207_chunk.val
<SNIP>

Error Running Explain:

explain (analyze, verbose, buffers, settings) with power_series_data as (
  select id, labels
  from series_telemetry_power s
  where
    s.labels && (select coalesce(array_agg(l.id), array[]::int[]) from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x9000c1s4b1'))
)
select 
  time_bucket(60000, m.ts) time,
  avg(val) Power
from telemetry_power m
where
  array[m.series_id] && (select coalesce(array_agg(id), array[]::integer[]) from power_series_data)
  and m.ts between 1650663853916 and 1651268653916
group by time
order by 1 desc;
ERROR:  [dn1]: bind message supplies 0 parameters, but prepared statement "" requires 1
Time: 2922.663 ms (00:02.923)

I tested out the parallel query stuff by taking the remote SQL from the explain plan on the access node, logging into the data node directly and executing it.

What I found was that I had to set parallel_setup_cost and parallel_tuple_cost = 1 and force_parallel_mode = on. In addition the partialize_agg function call prevents a parallel plan from being chosen. Once I replace partialize_agg with a regular avg function call a parallel plan is chosen. The call to the chunks_in function in the where clause does not cause this issue.

Unfortunately the parallel plan chosen plans only one worker for some reason. This is despite having max_worker_processes = 59, max_parallel_workers = 48 and max_parallel_workers_per_gather = 24. I also checked pg_stat_activity for backend_type = ‘parallel worker’ and there were none, although there were around 30 other connections that were writing data.

I noticed that the Postgres documentation says user defined functions are marked PARALLEL UNSAFE by default, not sure how this applies to functions implemented in C. Even still not sure how to convince Postgres that it can uses these 32 cores that are lying around to scan this index.

1 Like

One last quick update. I tried setting up a materialized view as a continuous aggregate using stats_agg, but the result ends up being slower than the original query. Likely because the materialized exists only on the access node, and therefore the work cannot be split between the data nodes. Other than not being able to distribute the continuous aggregate among the data nodes it is a very cool feature.

I filed an enhancement request for this, not sure how viable or challenging this would be.

@troy,

Thank you for following up and taking the time to submit the enhancement. Distributed continuous aggregates are a common request and something we’ll continue to prioritize as other changes might help facilitate that. It looks like the product team already put some classification on the ticket, so it will end up in a review discussion at some point for sure.

There was a question about the continuous aggregate I created. I created a cagg bucketing time to 1 hour intervals and using stats_agg to prep val for aggregation. I then queried the cagg without using time_bucket since the cagg is already bucketed on hour intervals and then applied the average aggregation to the rollup of stats (the value column with stats_agg applied). This query runs in ~7500 ms which is ~38x longer than the distributed hypertable.

CAGG

CREATE MATERIALIZED VIEW power_hourly WITH (timescaledb.continuous) AS
SELECT
  time_bucket(3600000, ts) as bucket, series_id sid,
  stats_agg(val) as stats
FROM cray_craytelemetry_power
GROUP BY 1, 2;

Query

with power_series_data as (
  select id, labels
  from series_cray_craytelemetry_power s
  where
    s.labels && (select coalesce(array_agg(l.id), array[]::int[]) from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x90
00c1s4b1'))
)
select 
  bucket time,
  average(rollup(stats)) Power
from power_hourly m
where
  m.sid IN (7852, 7863, 7869, 7895, 7896, 7898, 7901, 7911, 7918, 7922, 7924, 7928, 7936, 7969, 7974, 7984, 7995, 8114, 8121, 8153, 7991, 8010, 8021, 8027, 8100, 8133, 8084, 8091, 8109, 8137, 7943, 7972, 7982, 8044, 8054, 8111, 8006, 8019, 8082, 8119)
  and m.bucket between 1650585600000 and 1651795199000
group by time
order by 1 desc;

Plan

 GroupAggregate  (cost=63220938.36..63220962.17 rows=200 width=16) (actual time=6123.788..6131.963 rows=243 loops=1)
   Output: "*SELECT* 1".bucket, average(rollup("*SELECT* 1".stats))
   Group Key: "*SELECT* 1".bucket
   Buffers: shared hit=11728
   ->  Sort  (cost=63220938.36..63220945.30 rows=2774 width=40) (actual time=6123.713..6126.245 rows=9720 loops=1)
         Output: "*SELECT* 1".bucket, "*SELECT* 1".stats
         Sort Key: "*SELECT* 1".bucket DESC
         Sort Method: quicksort  Memory: 1751kB
         Buffers: shared hit=11728
         ->  Append  (cost=3596.63..63220779.72 rows=2774 width=40) (actual time=42.050..6120.326 rows=9720 loops=1)
               Buffers: shared hit=11728
               ->  Subquery Scan on "*SELECT* 1"  (cost=3596.63..3640.23 rows=1938 width=40) (actual time=42.048..50.563 rows=9720 loops=1)
                     Output: "*SELECT* 1".bucket, "*SELECT* 1".stats
                     Buffers: shared hit=11728
                     ->  HashAggregate  (cost=3596.63..3620.85 rows=1938 width=44) (actual time=42.046..49.138 rows=9720 loops=1)
                           Output: _hyper_131_135507_chunk.bucket, _hyper_131_135507_chunk.sid, _timescaledb_internal.finalize_agg('public.stats_agg(double precision)'::text, NULL::name, NULL::name, '{{pg_catalog,float8}}'::name[], _hyper_131_135507_chunk.agg_3_3, NULL::statssummary1d)
                           Group Key: _hyper_131_135507_chunk.bucket, _hyper_131_135507_chunk.sid
                           Batches: 1  Memory Usage: 4497kB
                           Buffers: shared hit=11728
                           ->  Append  (cost=0.29..3451.25 rows=19383 width=56) (actual time=0.041..23.429 rows=19383 loops=1)
                                 Buffers: shared hit=11728
                                 ->  Index Scan using _hyper_131_135507_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135507_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.040..0.812 rows=800 loops=1)
                                       Output: _hyper_131_135507_chunk.bucket, _hyper_131_135507_chunk.sid, _hyper_131_135507_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135507_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135507_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135507_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135507_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
                                 ->  Index Scan using _hyper_131_135510_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135510_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.022..0.763 rows=800 loops=1)
                                       Output: _hyper_131_135510_chunk.bucket, _hyper_131_135510_chunk.sid, _hyper_131_135510_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135510_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135510_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135510_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135510_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
                                 ->  Index Scan using _hyper_131_135512_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135512_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.021..0.781 rows=800 loops=1)
                                       Output: _hyper_131_135512_chunk.bucket, _hyper_131_135512_chunk.sid, _hyper_131_135512_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135512_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135512_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135512_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135512_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
                                 ->  Index Scan using _hyper_131_135521_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135521_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.022..0.767 rows=800 loops=1)
                                       Output: _hyper_131_135521_chunk.bucket, _hyper_131_135521_chunk.sid, _hyper_131_135521_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135521_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135521_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135521_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135521_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
                                 ->  Index Scan using _hyper_131_135502_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135502_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.025..0.792 rows=800 loops=1)
                                       Output: _hyper_131_135502_chunk.bucket, _hyper_131_135502_chunk.sid, _hyper_131_135502_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135502_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135502_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135502_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135502_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
                                 ->  Index Scan using _hyper_131_135518_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135518_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.024..0.761 rows=800 loops=1)
                                       Output: _hyper_131_135518_chunk.bucket, _hyper_131_135518_chunk.sid, _hyper_131_135518_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135518_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135518_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135518_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135518_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
<SNIP>
               ->  Subquery Scan on "*SELECT* 2"  (cost=63217021.14..63217125.62 rows=836 width=40) (actual time=6068.536..6068.541 rows=0 loops=1)
                     Output: "*SELECT* 2".bucket, "*SELECT* 2".stats
                     ->  GroupAggregate  (cost=63217021.14..63217117.26 rows=836 width=44) (actual time=6068.534..6068.539 rows=0 loops=1)
                           Output: (time_bucket('3600000'::bigint, cray_craytelemetry_power.ts)), cray_craytelemetry_power.series_id, stats_agg(cray_craytelemetry_power.val)
                           Group Key: (time_bucket('3600000'::bigint, cray_craytelemetry_power.ts)), cray_craytelemetry_power.series_id
                           ->  Sort  (cost=63217021.14..63217042.04 rows=8358 width=20) (actual time=6068.531..6068.535 rows=0 loops=1)
                                 Output: (time_bucket('3600000'::bigint, cray_craytelemetry_power.ts)), cray_craytelemetry_power.series_id, cray_craytelemetry_power.val
                                 Sort Key: (time_bucket('3600000'::bigint, cray_craytelemetry_power.ts)), cray_craytelemetry_power.series_id
                                 Sort Method: quicksort  Memory: 25kB
                                 ->  Custom Scan (AsyncAppend)  (cost=100.10..63216476.66 rows=8358 width=20) (actual time=6068.513..6068.517 rows=0 loops=1)
                                       Output: (time_bucket('3600000'::bigint, cray_craytelemetry_power.ts)), cray_craytelemetry_power.series_id, cray_craytelemetry_power.val
                                       ->  Append  (cost=100.10..63216476.66 rows=8358 width=20) (actual time=5912.126..5912.129 rows=0 loops=1)
                                             ->  Custom Scan (DataNodeScan) on public.cray_craytelemetry_power cray_craytelemetry_power_1  (cost=100.10..26115185.27 rows=3418 width=20) (actual time=1336.498..1336.499 rows=0 loops=1)
                                                   Output: time_bucket('3600000'::bigint, cray_craytelemetry_power_1.ts), cray_craytelemetry_power_1.series_id, cray_craytelemetry_power_1.val
                                                   Filter: (cray_craytelemetry_power_1.ts >= COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint))
                                                   Rows Removed by Filter: 825516
                                                   Data node: dn_ld01
                                                   Fetcher Type: Row by row
                                                   Chunks: _dist_hyper_96_48339_chunk, _dist_hyper_96_5495_chunk, _dist_hyper_96_7989_chunk, _dist_hyper_96_71122_chunk, _dist_hyper_96_56343_chunk, _dist_hyper_96_57569_chunk, _dist_hyper_96_46488_chunk, _dist_hyper_96_872_
chunk, _dist_hyper_96_69577_chunk, _dist_hyper_96_18171_chunk, _dist_hyper_96_54797_chunk, _dist_hyper_96_10144_chunk, _dist_hyper_96_9223_chunk, _dist_hyper_96_11692_chunk, _dist_hyper_96_61251_chunk, _dist_hyper_96_40003_chunk, _dist_hyper_96_38767_chunk, _dist_hyper_96
_44336_chunk, _dist_hyper_96_67416_chunk, _dist_hyper_96_75755_chunk, _dist_hyper_96_33829_chunk, _dist_hyper_96_296_chunk, _dist_hyper_96_3948_chunk, _dist_hyper_96_18795_chunk, _dist_hyper_96_72349_chunk, _dist_hyper_96_51126_chunk, _dist_hyper_96_53586_chunk, _dist_hyp
er_96_69885_chunk, _dist_hyper_96_52047_chunk, _dist_hyper_96_14472_chunk, _dist_hyper_96_68034_chunk, _dist_hyper_96_16318_chunk, _dist_hyper_96_68342_chunk, _dist_hyper_96_29575_chunk, _dist_hyper_96_34139_chunk, _dist_hyper_96_62483_chunk, _dist_hyper_96_31666_chunk, _
dist_hyper_96_2112_chunk, _dist_hyper_96_12309_chunk, _dist_hyper_96_24965_chunk, _dist_hyper_96_15708_chunk, _dist_hyper_96_36604_chunk, _dist_hyper_96_3639_chunk, _dist_hyper_96_12618_chunk, _dist_hyper_96_66799_chunk, _dist_hyper_96_32283_chunk, _dist_hyper_96_58847_ch
unk, _dist_hyper_96_15400_chunk, _dist_hyper_96_43718_chunk, _dist_hyper_96_16625_chunk, _dist_hyper_96_2729_chunk, _dist_hyper_96_45256_chunk, _dist_hyper_96_1184_chunk, _dist_hyper_96_34447_chunk, _dist_hyper_96_53895_chunk, _dist_hyper_96_29266_chunk, _dist_hyper_96_31
049_chunk, _dist_hyper_96_27428_chunk, _dist_hyper_96_74204_chunk, _dist_hyper_96_7035_chunk, _dist_hyper_96_13855_chunk, _dist_hyper_96_19104_chunk, _dist_hyper_96_37838_chunk, _dist_hyper_96_65575_chunk, _dist_hyper_96_39386_chunk, _dist_hyper_96_56947_chunk, _dist_hype
r_96_33211_chunk, _dist_hyper_96_47411_chunk, _dist_hyper_96_20640_chunk, _dist_hyper_96_28346_chunk, _dist_hyper_96_52355_chunk, _dist_hyper_96_72659_chunk, _dist_hyper_96_45873_chunk, _dist_hyper_96_23419_chunk, _dist_hyper_96_48958_chunk, _dist_hyper_96_48031_chunk, _d
ist_hyper_96_4256_chunk, _dist_hyper_96_11999_chunk, _dist_hyper_96_40313_chunk, _dist_hyper_96_7349_chunk, _dist_hyper_96_14164_chunk, _dist_hyper_96_72048_chunk, _dist_hyper_96_74540_chunk, _dist_hyper_96_59707_chunk, _dist_hyper_96_34757_chunk, _dist_hyper_96_47722_chu
nk, _dist_hyper_96_70503_chunk, _dist_hyper_96_21258_chunk, _dist_hyper_96_35676_chunk, _dist_hyper_96_4565_chunk, _dist_hyper_96_2423_chunk, _dist_hyper_96_60016_chunk, _dist_hyper_96_5805_chunk, _dist_hyper_96_42790_chunk, _dist_hyper_96_37531_chunk, _dist_hyper_96_6402
8_chunk, _dist_hyper_96_12927_chunk, _dist_hyper_96_25891_chunk, _dist_hyper_96_20947_chunk, _dist_hyper_96_38457_chunk, _dist_hyper_96_24038_chunk, _dist_hyper_96_23730_chunk, _dist_hyper_96_64337_chunk, _dist_hyper_96_76370_chunk, _dist_hyper_96_65265_chunk, _dist_hyper
_96_39693_chunk, _dist_hyper_96_49267_chunk, _dist_hyper_96_16935_chunk, _dist_hyper_96_17553_chunk, _dist_hyper_96_38149_chunk, _dist_hyper_96_66490_chunk, _dist_hyper_96_43097_chunk, _dist_hyper_96_44027_chunk, _dist_hyper_96_32903_chunk, _dist_hyper_96_27736_chunk, _di
st_hyper_96_27128_chunk, _dist_hyper_96_51744_chunk, _dist_hyper_96_1805_chunk, _dist_hyper_96_41549_chunk, _dist_hyper_96_49884_chunk, _dist_hyper_96_64647_chunk, _dist_hyper_96_66179_chunk, _dist_hyper_96_13545_chunk, _dist_hyper_96_21566_chunk, _dist_hyper_96_3038_chun
k, _dist_hyper_96_28046_chunk, _dist_hyper_96_40623_chunk, _dist_hyper_96_68653_chunk, _dist_hyper_96_19722_chunk, _dist_hyper_96_55724_chunk, _dist_hyper_96_4871_chunk, _dist_hyper_96_67725_chunk, _dist_hyper_96_26820_chunk, _dist_hyper_96_42484_chunk, _dist_hyper_96_860
5_chunk, _dist_hyper_96_17242_chunk, _dist_hyper_96_70194_chunk, _dist_hyper_96_50507_chunk, _dist_hyper_96_54203_chunk, _dist_hyper_96_6417_chunk, _dist_hyper_96_63717_chunk, _dist_hyper_96_6108_chunk, _dist_hyper_96_21876_chunk, _dist_hyper_96_64956_chunk, _dist_hyper_9
6_46794_chunk, _dist_hyper_96_50197_chunk, _dist_hyper_96_69266_chunk, _dist_hyper_96_564_chunk, _dist_hyper_96_24656_chunk, _dist_hyper_96_20031_chunk, _dist_hyper_96_49577_chunk, _dist_hyper_96_57919_chunk, _dist_hyper_96_62180_chunk, _dist_hyper_96_70813_chunk, _dist_h
yper_96_45562_chunk, _dist_hyper_96_72968_chunk, _dist_hyper_96_35372_chunk, _dist_hyper_96_62792_chunk, _dist_hyper_96_47104_chunk, _dist_hyper_96_16016_chunk, _dist_hyper_96_71739_chunk, _dist_hyper_96_42173_chunk, _dist_hyper_96_63099_chunk, _dist_hyper_96_51435_chunk,
 _dist_hyper_96_44953_chunk, _dist_hyper_96_52666_chunk, _dist_hyper_96_71430_chunk, _dist_hyper_96_5181_chunk, _dist_hyper_96_28962_chunk, _dist_hyper_96_48649_chunk, _dist_hyper_96_29885_chunk, _dist_hyper_96_37223_chunk, _dist_hyper_96_46181_chunk, _dist_hyper_96_75123
_chunk, _dist_hyper_96_8916_chunk, _dist_hyper_96_9532_chunk, _dist_hyper_96_60941_chunk, _dist_hyper_96_10765_chunk, _dist_hyper_96_26201_chunk, _dist_hyper_96_74814_chunk, _dist_hyper_96_43408_chunk, _dist_hyper_96_76064_chunk, _dist_hyper_96_30740_chunk, _dist_hyper_96
_15090_chunk, _dist_hyper_96_58538_chunk, _dist_hyper_96_39075_chunk, _dist_hyper_96_56032_chunk, _dist_hyper_96_25274_chunk, _dist_hyper_96_23112_chunk, _dist_hyper_96_73586_chunk, _dist_hyper_96_22493_chunk, _dist_hyper_96_18486_chunk, _dist_hyper_96_24346_chunk, _dist_
hyper_96_41241_chunk, _dist_hyper_96_61560_chunk, _dist_hyper_96_68957_chunk, _dist_hyper_96_33520_chunk, _dist_hyper_96_13236_chunk, _dist_hyper_96_52974_chunk, _dist_hyper_96_36296_chunk, _dist_hyper_96_9843_chunk, _dist_hyper_96_11072_chunk, _dist_hyper_96_11383_chunk,
 _dist_hyper_96_75446_chunk, _dist_hyper_96_31975_chunk, _dist_hyper_96_73276_chunk, _dist_hyper_96_60634_chunk, _dist_hyper_96_19411_chunk, _dist_hyper_96_44646_chunk, _dist_hyper_96_65871_chunk, _dist_hyper_96_56652_chunk, _dist_hyper_96_35987_chunk, _dist_hyper_96_3135
6_chunk, _dist_hyper_96_25584_chunk, _dist_hyper_96_6726_chunk, _dist_hyper_96_1495_chunk, _dist_hyper_96_8298_chunk, _dist_hyper_96_53283_chunk, _dist_hyper_96_7680_chunk, _dist_hyper_96_17862_chunk, _dist_hyper_96_22803_chunk, _dist_hyper_96_58228_chunk, _dist_hyper_96_
63410_chunk, _dist_hyper_96_59155_chunk, _dist_hyper_96_22184_chunk, _dist_hyper_96_55415_chunk, _dist_hyper_96_20339_chunk, _dist_hyper_96_60325_chunk, _dist_hyper_96_28655_chunk, _dist_hyper_96_55106_chunk, _dist_hyper_96_35066_chunk, _dist_hyper_96_50815_chunk, _dist_h
yper_96_26509_chunk, _dist_hyper_96_40930_chunk, _dist_hyper_96_41863_chunk, _dist_hyper_96_36912_chunk, _dist_hyper_96_67108_chunk, _dist_hyper_96_54512_chunk, _dist_hyper_96_32592_chunk, _dist_hyper_96_61870_chunk, _dist_hyper_96_3330_chunk, _dist_hyper_96_73894_chunk, 
_dist_hyper_96_14781_chunk, _dist_hyper_96_10454_chunk, _dist_hyper_96_57260_chunk
                                                   Remote SQL: SELECT ts, val, series_id FROM public.cray_craytelemetry_power WHERE _timescaledb_internal.chunks_in(public.cray_craytelemetry_power.*, ARRAY[79347, 5495, 7995, 120752, 93578, 94804, 77496, 872, 119207, 24402, 92032, 10168, 9247, 11716, 104662, 64807, 63571, 69140, 110827, 131611, 52423, 296, 3948, 25026, 121979, 82134, 90821, 119515, 83055, 14496, 117304, 22549, 117972, 42037, 52733, 105894, 50260, 2112, 12333, 37427, 21939, 55198, 3639, 12642, 110210, 50877, 96054, 21274, 68522, 22856, 2729, 70060, 1184, 53041, 91130, 41728, 49643, 39890, 123834, 7035, 13879, 25335, 56432, 108986, 64190, 94182, 51805, 78419, 26871, 40808, 83363, 122289, 76881, 35881, 79966, 79039, 4256, 12023, 65117, 7349, 14188, 121678, 124170, 96914, 53351, 78730, 120133, 27489, 54270, 4565, 2423, 97223, 5805, 67594, 56125, 107439, 12951, 38353, 27178, 63261, 36500, 36192, 107748, 132226, 108676, 64497, 80275, 23166, 23784, 62593, 109901, 67901, 68831, 51497, 40198, 39590, 82752, 1805, 66353, 80892, 108058, 109590, 13569, 27797, 3038, 40508, 65427, 118283, 25953, 92959, 4871, 111136, 39282, 67288, 8629, 23473, 119824, 81515, 91438, 6417, 107128, 6108, 28107, 108367, 77802, 81205, 118896, 564, 37118, 26262, 80585, 95126, 105591, 120443, 76210, 122598, 53966, 106203, 78112, 22247, 121369, 66977, 106510, 82443, 69757, 83674, 121060, 5181, 41424, 79657, 42347, 55817, 77189, 124753, 8940, 9556, 104352, 10789, 38663, 124444, 68212, 131920, 48971, 15114, 95745, 63879, 93267, 37736, 35574, 123216, 28724, 24717, 36808, 66045, 104971, 118587, 52114, 13260, 89849, 54890, 9867, 11096, 11407, 130942, 50569, 122906, 103685, 25642, 69450, 109282, 93887, 54581, 49950, 38046, 6726, 1495, 8322, 90518, 7680, 24093, 34905, 95435, 106821, 96362, 28415, 92650, 26570, 97532, 41117, 92341, 53660, 81823, 38971, 65734, 66667, 55506, 110519, 91747, 51186, 105281, 3330, 123524, 14805, 10478, 94495]) AND ((ts >= 1650585600000::bigint)) AND ((ts <= 1651798799000::bigint)) AND ((series_id = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[]))) AND ((public.time_bucket(3600000::bigint, ts) >= 1650585600000::bigint)) AND ((public.time_bucket(3600000::bigint, ts) <= 1651795199000::bigint))
                                             ->  Custom Scan (DataNodeScan) on public.cray_craytelemetry_power cray_craytelemetry_power_2  (cost=100.10..18888359.36 rows=2511 width=20) (actual time=3019.054..3019.054 rows=0 loops=1)
<SNIP>
 Settings: effective_cache_size = '161555MB', effective_io_concurrency = '900', enable_partitionwise_aggregate = 'on', jit = 'off', max_parallel_workers = '32', max_parallel_workers_per_gather = '16', parallel_setup_cost = '1', parallel_tuple_cost = '1', random_page_cost = '1.1', work_mem = '16000MB'
 Query Identifier: 5944616703698342313
 Planning:
   Buffers: shared hit=267193 dirtied=97
 Planning Time: 2079.112 ms
 Execution Time: 6172.272 ms
(163 rows)

To get timescaledb to pushdown the result of a subselect you create an immutable function like so:

CREATE OR REPLACE FUNTION push() RETURNS int[]  LANGUAGE SQL IMMUTABLE
AS $$
  SELECT array_agg(distinct device_id) FROM metrics;
$$;

And then use it in a query like this:

SELECT * FROM metrics_dist WHERE device_id=ANY(push());

This will materialize the function result on the access node and push it down to the data nodes.

This will also use indexes when applicable:

explain verbose select * from metrics_dist where device_id=ANY(t());
                                                                                                         QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Custom Scan (AsyncAppend)  (cost=100.00..7261.53 rows=56216 width=36)
   Output: metrics_dist."time", metrics_dist.device_id, metrics_dist.v0, metrics_dist.v1, metrics_dist.v2, metrics_dist.v3
   ->  Append  (cost=100.00..7261.53 rows=56216 width=36)
         ->  Custom Scan (DataNodeScan) on public.metrics_dist metrics_dist_1  (cost=100.00..1655.86 rows=13674 width=36)
               Output: metrics_dist_1."time", metrics_dist_1.device_id, metrics_dist_1.v0, metrics_dist_1.v1, metrics_dist_1.v2, metrics_dist_1.v3
               Data node: data_node_1
               Fetcher Type: Row by row
               Chunks: _dist_hyper_7_37_chunk, _dist_hyper_7_40_chunk, _dist_hyper_7_43_chunk
               Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist WHERE _timescaledb_internal.chunks_in(public.metrics_dist.*, ARRAY[1, 2, 3]) AND ((device_id = ANY ('{1,2,3,4,5}'::integer[])))
               Remote EXPLAIN:
                 Append  (cost=0.28..843.19 rows=13674 width=36)
                   ->  Index Scan using _dist_hyper_7_37_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_37_chunk  (cost=0.28..208.51 rows=3598 width=36)
                         Output: _dist_hyper_7_37_chunk."time", _dist_hyper_7_37_chunk.device_id, _dist_hyper_7_37_chunk.v0, _dist_hyper_7_37_chunk.v1, _dist_hyper_7_37_chunk.v2, _dist_hyper_7_37_chunk.v3
                         Index Cond: (_dist_hyper_7_37_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_40_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_40_chunk  (cost=0.28..283.16 rows=5038 width=36)
                         Output: _dist_hyper_7_40_chunk."time", _dist_hyper_7_40_chunk.device_id, _dist_hyper_7_40_chunk.v0, _dist_hyper_7_40_chunk.v1, _dist_hyper_7_40_chunk.v2, _dist_hyper_7_40_chunk.v3
                         Index Cond: (_dist_hyper_7_40_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_43_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_43_chunk  (cost=0.28..283.16 rows=5038 width=36)
                         Output: _dist_hyper_7_43_chunk."time", _dist_hyper_7_43_chunk.device_id, _dist_hyper_7_43_chunk.v0, _dist_hyper_7_43_chunk.v1, _dist_hyper_7_43_chunk.v2, _dist_hyper_7_43_chunk.v3
                         Index Cond: (_dist_hyper_7_43_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))

         ->  Custom Scan (DataNodeScan) on public.metrics_dist metrics_dist_2  (cost=100.00..3668.73 rows=28868 width=36)
               Output: metrics_dist_2."time", metrics_dist_2.device_id, metrics_dist_2.v0, metrics_dist_2.v1, metrics_dist_2.v2, metrics_dist_2.v3
               Data node: data_node_2
               Fetcher Type: Row by row
               Chunks: _dist_hyper_7_38_chunk, _dist_hyper_7_41_chunk, _dist_hyper_7_44_chunk
               Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist WHERE _timescaledb_internal.chunks_in(public.metrics_dist.*, ARRAY[1, 2, 3]) AND ((device_id = ANY ('{1,2,3,4,5}'::integer[])))
               Remote EXPLAIN:
                 Append  (cost=0.29..1923.78 rows=28868 width=36)
                   ->  Index Scan using _dist_hyper_7_38_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_38_chunk  (cost=0.29..472.10 rows=7596 width=36)
                         Output: _dist_hyper_7_38_chunk."time", _dist_hyper_7_38_chunk.device_id, _dist_hyper_7_38_chunk.v0, _dist_hyper_7_38_chunk.v1, _dist_hyper_7_38_chunk.v2, _dist_hyper_7_38_chunk.v3
                         Index Cond: (_dist_hyper_7_38_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_41_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_41_chunk  (cost=0.29..653.67 rows=10636 width=36)
                         Output: _dist_hyper_7_41_chunk."time", _dist_hyper_7_41_chunk.device_id, _dist_hyper_7_41_chunk.v0, _dist_hyper_7_41_chunk.v1, _dist_hyper_7_41_chunk.v2, _dist_hyper_7_41_chunk.v3
                         Index Cond: (_dist_hyper_7_41_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_44_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_44_chunk  (cost=0.29..653.67 rows=10636 width=36)
                         Output: _dist_hyper_7_44_chunk."time", _dist_hyper_7_44_chunk.device_id, _dist_hyper_7_44_chunk.v0, _dist_hyper_7_44_chunk.v1, _dist_hyper_7_44_chunk.v2, _dist_hyper_7_44_chunk.v3
                         Index Cond: (_dist_hyper_7_44_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))

         ->  Custom Scan (DataNodeScan) on public.metrics_dist metrics_dist_3  (cost=100.00..1655.86 rows=13674 width=36)
               Output: metrics_dist_3."time", metrics_dist_3.device_id, metrics_dist_3.v0, metrics_dist_3.v1, metrics_dist_3.v2, metrics_dist_3.v3
               Data node: data_node_3
               Fetcher Type: Row by row
               Chunks: _dist_hyper_7_39_chunk, _dist_hyper_7_42_chunk, _dist_hyper_7_45_chunk
               Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist WHERE _timescaledb_internal.chunks_in(public.metrics_dist.*, ARRAY[1, 2, 3]) AND ((device_id = ANY ('{1,2,3,4,5}'::integer[])))
               Remote EXPLAIN:
                 Append  (cost=0.28..843.19 rows=13674 width=36)
                   ->  Index Scan using _dist_hyper_7_39_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_39_chunk  (cost=0.28..208.51 rows=3598 width=36)
                         Output: _dist_hyper_7_39_chunk."time", _dist_hyper_7_39_chunk.device_id, _dist_hyper_7_39_chunk.v0, _dist_hyper_7_39_chunk.v1, _dist_hyper_7_39_chunk.v2, _dist_hyper_7_39_chunk.v3
                         Index Cond: (_dist_hyper_7_39_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_42_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_42_chunk  (cost=0.28..283.16 rows=5038 width=36)
                         Output: _dist_hyper_7_42_chunk."time", _dist_hyper_7_42_chunk.device_id, _dist_hyper_7_42_chunk.v0, _dist_hyper_7_42_chunk.v1, _dist_hyper_7_42_chunk.v2, _dist_hyper_7_42_chunk.v3
                         Index Cond: (_dist_hyper_7_42_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_45_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_45_chunk  (cost=0.28..283.16 rows=5038 width=36)
                         Output: _dist_hyper_7_45_chunk."time", _dist_hyper_7_45_chunk.device_id, _dist_hyper_7_45_chunk.v0, _dist_hyper_7_45_chunk.v1, _dist_hyper_7_45_chunk.v2, _dist_hyper_7_45_chunk.v3
                         Index Cond: (_dist_hyper_7_45_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))

(57 rows)

@sven thanks for the suggestion! I had tried using ANY originally but couldn’t figure out how to make Postgres type system happy so I had switched to the array specific operator (&&). After reading your post I went back and figured out how to make this work. As you say with the ANY formulation the data nodes are able to use index scans and performance is ~1.5-2x better than the && formulation. I prefer a subselect over a function as functions seem to mess with the query planner (even when marked immutable) and lead to poor performance. This brings the query much closer to ElasticSearch, but still takes ~2x (down from ~5x).

with power_series_data as (
  select id, labels
  from series_telemetry_power s
  where
    s.labels && (select array_agg(l.id)::int[] from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x9000c1s4b1'))
)
select
  time_bucket(60000, m.ts) time,
  avg(val) Power
from telemetry_power m
where
  m.series_id = ANY((select array_agg(id) from power_series_data)::int[])
  and m.ts between 1651708800000 and 1652399999000
group by time
order by 1 desc;

@troy,

On the query plan using the CAGG, I noticed two things. Curious what your thoughts are.

  1. The data coming from the CAGG only takes ~50ms to query. The majority of your time is spent planning and querying the underlying distributed hypertable for recently added data.
  2. Unless I’m reading the query plan incorrectly, that append for “real-time” aggregation returned zero rows. Is that expected? Does this need to be a real-time aggregate and/or could you start with a smaller time_bucket on the CAGG (allowing you to refresh it a little more frequently) and then rollup to 1-hour buckets since you’re using one of the hyperfunction aggregations?

Thanks @ryanbooz, this is my first time using CAGG’s and I didn’t realize they would query the underlying table. Yes 0 rows should be returned because the time frame specified by the query does not cover the current time (or future) therefor no new rows are coming into the underlying table for this time range.

I’m assuming the Filter mentioned in the query plan is being run on the access node?

(cray_craytelemetry_power_1.ts >= COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint))

As suggested, I created a new CAGG with 1 minute time buckets and disabled real time. Using this the time to query minute granularity (i.e. no further bucketing) is an improvement over querying the distributed hypertable directly with a 1 minute time bucket. Your comment also prompted me to go through the query plans a again and I noticed that the biggest time savings the CAGG (without real time) has over the direct query is in the query planning time. The CAGG takes ~300ms to plan while the direct query takes ~2000ms. The execution times do not see that big of a difference with the CAGG typically being ~200ms faster.

This combined with the ANY suggestion from Sven consistently brings the query time down to 2x or slightly less than 2x of the time needed for ElasticSearch (1700ms v. 768ms).

Yes, that FILTER is being run on the access node because CAGGs currently can’t be distributed. They can use distributed hypertables, but they cannot be distributed ATM.

A few additional things on CAGGs that may be helpful:

  • TimescaleDB 2.7 was just released for install yesterday. If you’re up for testing it, you could recreate this CAGG using the new finalized version (which is now the default in 2.7). This will remove one more step that the current CAGGs have to take to finalize the data returned by each row.

  • Additionally, this thread has gotten a bit long and so I might have missed it, but the default chunk_time_interval of your CAGG (materialized hypertable) is 10x that of whatever the chunk_time_interval is of the underlying hypertable where the raw data resides. That interval can be changed at any time, but it will only impact newly created chunks. So, it might make sense to decrease that interval on the materialized hypertable if you have a known query pattern. For instance, if you didn’t change your default chunk_time_interval on the underlying hypertable, then it is 7 days. That means the materialized hypertable has an interval of 70 days per chunk. If you’re only querying the last week or two, maybe it makes sense to lower the interval so that you have less data per chunk. While I don’t often give the advice to decrease the chunk_time_interval (if there’s memory, usually density of data is beneficial especially once compression gets into the mix), it’s an option to consider/play with.

  • And, finally, TimescaleDB compression on older raw data could improve the actual query performance (and compression is supported on distributed hypertables). I realize the schema is somewhat like the Promscale schema, so I’m not sure how feasible that is for what you’re querying. There have even been some further improvements in TimescaleDB 2.7 that speed up the querying of compressed data further in some cases. This doesn’t specifically relate to CAGGs, although in 2.7 it would likely increase the speed of CAGG generation and maybe querying some data (particularly because you are using a query form that “pushes down” the IDs “manually”)

Anyway, the improvements in a few of these features in 2.7 is :fire: , so definitely worth a look if that’s an option for you.

@ryanbooz @sven Just picking this up again after being pulled into other work for a bit there.

I tested with a build of the main branch containing the change by @akuzm to allow partialize_agg to be used in parallel queries. I still needed to set parallel_setup_cost and parallel_tuple_cost = 0 to get the query planner to choose a parallel plan. Once I got the query planner to choose a parallel plan the query time was nearly as fast as Elastic.

In addition I tried compressing the table which makes the query as fast or even slightly faster than Elastic. My plan is to compress data older than 24 hours.

The distributed hypertable has chunk_time_interval of 1 hour with data coming in at ~1 Hz. I tried creating the CAGG same as mentioned above: rolling the data up to 1 hour intervals and using stats_agg. I noticed an improvement in performance, but it is still not nearly as good as querying the compressed distributed hypertable. I’m going to continue to experiment with different CAGG configurations.

Any suggestions on setting parallel_setup_cost and parallel_tuple_cost? The only stuff I can find is to set it to 0 for try and force parallel plans.

Once the next release comes out with the partialize_agg change I consider this issue resolved.

Thank so much for all your help!!!

It’s been a great learning experience for us too @troy. And it’s good to know that changes in 2.7+ have had the meaningful impact we were anticipating! Keep us updated!