Distributed Hypertable Poor Performance for Simple Bucketed Aggregation

The goal is to get the average power (watt) per minute for a set of hosts for the past week, day, hour, 5 min. I’m hoping I can get some help with performance issues as compared to the same setup in ElasticSearch.

What I’m observing is that the time taken to retrieve this information for the exact same data set on the exact same hardware Timescale takes 5x longer (~4000 ms) vs. ElasticSearch (~800 ms). Even limiting the query to the last day Timescale takes 2.5x as long (~1100ms) vs. ElasticSearch (~426 ms).

I have tried various combinations of the below settings, and while some combinations do improve performance slightly it is not enough of an improvement to resolve this issue.

  • Chunk Interval: 1 hour, 1 day, 1 week
  • Replication-factor: 1, 3
  • Partition Column: series_id, NONE
  • Regular Hypertable

Regardless of the combination of settings used above, the problem seems to always center on the Custom Scan (AsyncAppend) step taking nearly all the time.

Setup:

  • Timescale: 1 Access Node, 3 data nodes
  • ElasticSearch: 4 nodes

Hardware Summary:

  • CPU: 2.5 GHz, 32 cores (AMD Epyc)
  • Memory: 256 GB DDR4
  • Disk: 3 TB SSD
  • Network: 10 Gbps

In addition I have setup Timescale running in docker containers (1 access, 3 data) on my Macbook Pro (Intel version), dumped the data from my test system and loaded it into my local setup. While running it this way does improve performance (as expected due to lower network overhead, contention etc.) it is still 2x slower than ElasticSearch.

Timescale Design Summary:

  • Metric data is separated from metadata (schema is setup like Promscale with some modifications)
  • Timestamps are UNIX epoch with millisecond resolution
  • Row count: 20062588
  • Replication-factor: 3
  • Compression: FALSE
  • Partition: series_id
  • Chunk Interval: 8600000
  • Timescale Version: 2.6.0
  • Postgres Version: 14.2

ElasticSearch Design Summary:

  • An index for each day
  • Metadata and metric data stored together as a single flat json doc/object
  • Replication-factor: 3
  • Version: 6.8.23

Query:
Timescale does not resolve an expression and ship the results to the data nodes so we instead convert to an array of ID’s which can be shipped. We use a CTE for situations where we may want to group by and return other labels with the result.

with power_series_data as (
  select id, labels
  from series_telemetry_power s
  where
    s.labels && (select coalesce(array_agg(l.id), array[]::int[]) from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x9000c1s4b1')))
select 
  time_bucket(60000, m.ts) time,
  avg(val) Power
from telemetry_power m
where
  array[m.series_id] && (select coalesce(array_agg(id), array[]::integer[]) from power_series_data)
  and m.ts between 1650997919334 and 1651602719334
group by time
order by 1 desc;

Plan:
All tables vacuum analyze before running
set track_io_timing = on;
explain (analyze, buffers, settings, verbose);

                                                                                                                                                                                                                                 QUERY PLAN                                     
                                                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                
                                                                                                                                                                                             
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=431047.77..431055.27 rows=200 width=16) (actual time=3823.647..3841.920 rows=10078 loops=1)
   Output: (time_bucket('60000'::bigint, ts)), avg(val)
   Group Key: (time_bucket('60000'::bigint, ts))
   Buffers: shared hit=19
   InitPlan 2 (returns $1)
     ->  Aggregate  (cost=8.68..8.69 rows=1 width=32) (actual time=0.189..0.194 rows=1 loops=1)
           Output: COALESCE(array_agg(s.id), '{}'::integer[])
           Buffers: shared hit=19
           InitPlan 1 (returns $0)
             ->  Aggregate  (cost=2.19..2.20 rows=1 width=32) (actual time=0.104..0.106 rows=1 loops=1)
                   Output: COALESCE(array_agg(l.id), '{}'::integer[])
                   Buffers: shared hit=3
                   ->  Index Only Scan using label_key_value_id_key on public.label l  (cost=0.27..2.18 rows=1 width=4) (actual time=0.057..0.097 rows=8 loops=1)
                         Output: l.key, l.value, l.id
                         Index Cond: (l.key = 'Location'::text)
                         Filter: ("substring"(l.value, '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$'::text) = ANY ('{x9000c1s0b0,x9000c1s0b1,x9000c1s1b0,x9000c1s1b1,x9000c1s2b0,x9000c1s2b1,x9000c1s4b0,x9000c1s4b1}'::text[]))
                         Rows Removed by Filter: 19
                         Heap Fetches: 0
                         Buffers: shared hit=3
           ->  Bitmap Heap Scan on public.series_telemetry_power s  (cost=2.23..6.47 rows=4 width=4) (actual time=0.155..0.181 rows=40 loops=1)
                 Output: s.id, s.metric_id, s.labels, s.delete_epoch
                 Recheck Cond: ((s.labels)::integer[] && $0)
                 Heap Blocks: exact=7
                 Buffers: shared hit=19
                 ->  Bitmap Index Scan on series_telemetry_power_labels_idx  (cost=0.00..2.23 rows=4 width=0) (actual time=0.144..0.145 rows=40 loops=1)
                       Index Cond: ((s.labels)::integer[] && $0)
                       Buffers: shared hit=12
   ->  Sort  (cost=431039.08..431040.58 rows=600 width=40) (actual time=3823.626..3830.866 rows=30234 loops=1)
         Output: (time_bucket('60000'::bigint, ts)), (PARTIAL avg(val))
         Sort Key: (time_bucket('60000'::bigint, ts)) DESC
         Sort Method: quicksort  Memory: 5020kB
         Buffers: shared hit=19
         ->  Custom Scan (AsyncAppend)  (cost=312.60..431011.40 rows=600 width=40) (actual time=3802.641..3813.805 rows=30234 loops=1)
               Output: (time_bucket('60000'::bigint, ts)), (PARTIAL avg(val))
               Buffers: shared hit=19
               ->  Append  (cost=312.60..431011.40 rows=600 width=40) (actual time=0.008..7.325 rows=30234 loops=1)
                     ->  Custom Scan (DataNodeScan)  (cost=312.60..103004.65 rows=200 width=40) (actual time=0.005..1.317 rows=10078 loops=1)
                           Output: (time_bucket('60000'::bigint, m.ts)), (PARTIAL avg(m.val))
                           Relations: Aggregate on (public.telemetry_power m)
                           Data node: dn_ld01
                           Fetcher Type: Row by row
                           Chunks: _dist_hyper_125_59475_chunk, _dist_hyper_125_59474_chunk, _dist_hyper_125_59473_chunk, _dist_hyper_125_59472_chunk, _dist_hyper_125_59471_chunk, _dist_hyper_125_59470_chunk, _dist_hyper_125_59469_chunk, _dist_hyper_125_59468_chunk, _dist_hyper_125_59467_chunk, _dist_hyper_125_59466_chunk, _dist_hyper_125_59465_chunk, _dist_hyper_125_59464_chunk, _dist_hyper_125_59463_chunk, _dist_hyper_125_59462_chunk, _dist_hyper_125_59461_chunk, _dist_hyper_125_59460_chunk, _dist_hyper_125_59459_chunk, _dist_hyper_125_59458_chunk, _dist_hyper_125_59457_chunk, _dist_hyper_125_59456_chunk, _dist_hyper_125_59455_chunk, _dist_hyper_125_59454_chunk, _dist_hyper_125_59453_chunk, _dist_hyper_125_59452_chunk, _dist_hyper_125_59451_chunk, _dist_hyper_125_59450_chunk, _dist_hyper_125_59449_chunk, _dist_hyper_125_59448_chunk, _dist_hyper_125_59447_chunk, _dist_hyper_125_59446_chunk, _dist_hyper_125_59445_chunk, _dist_hyper_125_59444_chunk, _dist_hyper_125_59443_chunk, _dist_hyper_125_59442_chunk, _dist_hyper_125_59441_chunk, _dist_hyper_125_59440_chunk, _dist_hyper_125_59439_chunk, _dist_hyper_125_59438_chunk, _dist_hyper_125_59437_chunk, _dist_hyper_125_59436_chunk, _dist_hyper_125_59435_chunk, _dist_hyper_125_59434_chunk, _dist_hyper_125_59433_chunk, _dist_hyper_125_59432_chunk, _dist_hyper_125_59431_chunk, _dist_hyper_125_59430_chunk, _dist_hyper_125_59429_chunk, _dist_hyper_125_59428_chunk, _dist_hyper_125_59427_chunk, _dist_hyper_125_59426_chunk, _dist_hyper_125_59425_chunk, _dist_hyper_125_59424_chunk, _dist_hyper_125_59423_chunk, _dist_hyper_125_59422_chunk, _dist_hyper_125_59421_chunk, _dist_hyper_125_59420_chunk, _dist_hyper_125_59419_chunk, _dist_hyper_125_59418_chunk, _dist_hyper_125_59417_chunk, _dist_hyper_125_59416_chunk, _dist_hyper_125_59415_chunk, _dist_hyper_125_59414_chunk, _dist_hyper_125_59413_chunk, _dist_hyper_125_59412_chunk, _dist_hyper_125_59411_chunk, _dist_hyper_125_59410_chunk, _dist_hyper_125_59409_chunk, _dist_hyper_125_59408_chunk, _dist_hyper_125_59407_chunk, _dist_hyper_125_59405_chunk, _dist_hyper_125_59404_chunk
                           Remote SQL: SELECT public.time_bucket(60000::bigint, ts), _timescaledb_internal.partialize_agg(avg(val)) FROM public.telemetry_power WHERE _timescaledb_internal.chunks_in(public.telemetry_power.*, ARRAY[96682, 96681, 96680, 96679, 96678, 96677, 96676, 96675, 96674, 96673, 96672, 96671, 96670, 96669, 96668, 96667, 96666, 96665, 96664, 96663, 96662, 96661, 96660, 96659, 96658, 96657, 96656, 96655, 96654, 96653, 96652, 96651, 96650, 96649, 96648, 96647, 96646, 96645, 96644, 96643, 96642, 96641, 96640, 96639, 96638, 96637, 96636, 96635, 96634, 96633, 96632, 96631, 96630, 96629, 96628, 96627, 96626, 96625, 96624, 96623, 96622, 96621, 96620, 96619, 96618, 96617, 96616, 96615, 96614, 96612, 96611]) AND ((ARRAY[series_id] && $1::integer[])) AND ((ts >= 1650997919334::bigint)) AND ((ts <= 1651602719334::bigint)) GROUP BY 1
                     ->  Custom Scan (DataNodeScan)  (cost=425.30..157478.07 rows=200 width=40) (actual time=0.002..1.213 rows=10078 loops=1)
                           Output: (time_bucket('60000'::bigint, m_1.ts)), (PARTIAL avg(m_1.val))
                           Relations: Aggregate on (public.telemetry_power m)
                           Data node: dn_ld02
                           Fetcher Type: Row by row
                           Chunks: _dist_hyper_125_59564_chunk, _dist_hyper_125_59563_chunk, _dist_hyper_125_59562_chunk, _dist_hyper_125_59561_chunk, _dist_hyper_125_59560_chunk, _dist_hyper_125_59559_chunk, _dist_hyper_125_59558_chunk, _dist_hyper_125_59557_chunk, _dist_hyper_125_59556_chunk, _dist_hyper_125_59555_chunk, _dist_hyper_125_59554_chunk, _dist_hyper_125_59553_chunk, _dist_hyper_125_59552_chunk, _dist_hyper_125_59550_chunk, _dist_hyper_125_59544_chunk, _dist_hyper_125_59543_chunk, _dist_hyper_125_59542_chunk, _dist_hyper_125_59541_chunk, _dist_hyper_125_59540_chunk, _dist_hyper_125_59539_chunk, _dist_hyper_125_59538_chunk, _dist_hyper_125_59537_chunk, _dist_hyper_125_59536_chunk, _dist_hyper_125_59535_chunk, _dist_hyper_125_59534_chunk, _dist_hyper_125_59533_chunk, _dist_hyper_125_59532_chunk, _dist_hyper_125_59531_chunk, _dist_hyper_125_59530_chunk, _dist_hyper_125_59529_chunk, _dist_hyper_125_59528_chunk, _dist_hyper_125_59527_chunk, _dist_hyper_125_59526_chunk, _dist_hyper_125_59525_chunk, _dist_hyper_125_59524_chunk, _dist_hyper_125_59523_chunk, _dist_hyper_125_59522_chunk, _dist_hyper_125_59521_chunk, _dist_hyper_125_59520_chunk, _dist_hyper_125_59519_chunk, _dist_hyper_125_59518_chunk, _dist_hyper_125_59517_chunk, _dist_hyper_125_59516_chunk, _dist_hyper_125_59515_chunk, _dist_hyper_125_59514_chunk, _dist_hyper_125_59513_chunk, _dist_hyper_125_59512_chunk, _dist_hyper_125_59511_chunk, _dist_hyper_125_59510_chunk, _dist_hyper_125_59509_chunk, _dist_hyper_125_59508_chunk, _dist_hyper_125_59507_chunk, _dist_hyper_125_59506_chunk, _dist_hyper_125_59505_chunk, _dist_hyper_125_59504_chunk, _dist_hyper_125_59503_chunk, _dist_hyper_125_59502_chunk, _dist_hyper_125_59500_chunk, _dist_hyper_125_59499_chunk, _dist_hyper_125_59498_chunk, _dist_hyper_125_59497_chunk, _dist_hyper_125_59496_chunk, _dist_hyper_125_59495_chunk, _dist_hyper_125_59494_chunk, _dist_hyper_125_59493_chunk, _dist_hyper_125_59492_chunk, _dist_hyper_125_59491_chunk, _dist_hyper_125_59490_chunk, _dist_hyper_125_59489_chunk, _dist_hyper_125_59487_chunk, _dist_hyper_125_59486_chunk
                           Remote SQL: SELECT public.time_bucket(60000::bigint, ts), _timescaledb_internal.partialize_agg(avg(val)) FROM public.telemetry_power WHERE _timescaledb_internal.chunks_in(public.telemetry_power.*, ARRAY[96771, 96770, 96769, 96768, 96767, 96766, 96765, 96764, 96763, 96762, 96761, 96760, 96759, 96757, 96751, 96750, 96749, 96748, 96747, 96746, 96745, 96744, 96743, 96742, 96741, 96740, 96739, 96738, 96737, 96736, 96735, 96734, 96733, 96732, 96731, 96730, 96729, 96728, 96727, 96726, 96725, 96724, 96723, 96722, 96721, 96720, 96719, 96718, 96717, 96716, 96715, 96714, 96713, 96712, 96711, 96710, 96709, 96707, 96706, 96705, 96704, 96703, 96702, 96701, 96700, 96699, 96698, 96697, 96696, 96694, 96693]) AND ((ARRAY[series_id] && $1::integer[])) AND ((ts >= 1650997919334::bigint)) AND ((ts <= 1651602719334::bigint)) GROUP BY 1
                     ->  Custom Scan (DataNodeScan)  (cost=452.27..170525.68 rows=200 width=40) (actual time=0.001..1.291 rows=10078 loops=1)
                           Output: (time_bucket('60000'::bigint, m_2.ts)), (PARTIAL avg(m_2.val))
                           Relations: Aggregate on (public.telemetry_power m)
                           Data node: dn_ld03
                           Fetcher Type: Row by row
                           Chunks: _dist_hyper_125_59673_chunk, _dist_hyper_125_59672_chunk, _dist_hyper_125_59671_chunk, _dist_hyper_125_59670_chunk, _dist_hyper_125_59669_chunk, _dist_hyper_125_59668_chunk, _dist_hyper_125_59667_chunk, _dist_hyper_125_59666_chunk, _dist_hyper_125_59665_chunk, _dist_hyper_125_59664_chunk, _dist_hyper_125_59663_chunk, _dist_hyper_125_59662_chunk, _dist_hyper_125_59661_chunk, _dist_hyper_125_59660_chunk, _dist_hyper_125_59659_chunk, _dist_hyper_125_59658_chunk, _dist_hyper_125_59654_chunk, _dist_hyper_125_59653_chunk, _dist_hyper_125_59652_chunk, _dist_hyper_125_59651_chunk, _dist_hyper_125_59650_chunk, _dist_hyper_125_59649_chunk, _dist_hyper_125_59648_chunk, _dist_hyper_125_59647_chunk, _dist_hyper_125_59646_chunk, _dist_hyper_125_59645_chunk, _dist_hyper_125_59644_chunk, _dist_hyper_125_59643_chunk, _dist_hyper_125_59642_chunk, _dist_hyper_125_59641_chunk, _dist_hyper_125_59640_chunk, _dist_hyper_125_59639_chunk, _dist_hyper_125_59638_chunk, _dist_hyper_125_59637_chunk, _dist_hyper_125_59636_chunk, _dist_hyper_125_59635_chunk, _dist_hyper_125_59634_chunk, _dist_hyper_125_59633_chunk, _dist_hyper_125_59632_chunk, _dist_hyper_125_59631_chunk, _dist_hyper_125_59630_chunk, _dist_hyper_125_59629_chunk, _dist_hyper_125_59628_chunk, _dist_hyper_125_59627_chunk, _dist_hyper_125_59626_chunk, _dist_hyper_125_59625_chunk, _dist_hyper_125_59624_chunk, _dist_hyper_125_59623_chunk, _dist_hyper_125_59622_chunk, _dist_hyper_125_59621_chunk, _dist_hyper_125_59620_chunk, _dist_hyper_125_59619_chunk, _dist_hyper_125_59615_chunk, _dist_hyper_125_59614_chunk, _dist_hyper_125_59613_chunk, _dist_hyper_125_59606_chunk, _dist_hyper_125_59602_chunk, _dist_hyper_125_59601_chunk, _dist_hyper_125_59600_chunk, _dist_hyper_125_59598_chunk, _dist_hyper_125_59597_chunk, _dist_hyper_125_59593_chunk, _dist_hyper_125_59583_chunk, _dist_hyper_125_59582_chunk, _dist_hyper_125_59581_chunk, _dist_hyper_125_59580_chunk, _dist_hyper_125_59579_chunk, _dist_hyper_125_59578_chunk, _dist_hyper_125_59577_chunk, _dist_hyper_125_59575_chunk, _dist_hyper_125_59574_chunk
                           Remote SQL: SELECT public.time_bucket(60000::bigint, ts), _timescaledb_internal.partialize_agg(avg(val)) FROM public.telemetry_power WHERE _timescaledb_internal.chunks_in(public.telemetry_power.*, ARRAY[96880, 96879, 96878, 96877, 96876, 96875, 96874, 96873, 96872, 96871, 96870, 96869, 96868, 96867, 96866, 96865, 96861, 96860, 96859, 96858, 96857, 96856, 96855, 96854, 96853, 96852, 96851, 96850, 96849, 96848, 96847, 96846, 96845, 96844, 96843, 96842, 96841, 96840, 96839, 96838, 96837, 96836, 96835, 96834, 96833, 96832, 96831, 96830, 96829, 96828, 96827, 96826, 96822, 96821, 96820, 96813, 96809, 96808, 96807, 96805, 96804, 96800, 96790, 96789, 96788, 96787, 96786, 96785, 96784, 96782, 96781]) AND ((ARRAY[series_id] && $1::integer[])) AND ((ts >= 1650997919334::bigint)) AND ((ts <= 1651602719334::bigint)) GROUP BY 1
 Settings: effective_cache_size = '161555MB', effective_io_concurrency = '900', enable_partitionwise_aggregate = 'on', jit = 'off', max_parallel_workers = '32', max_parallel_workers_per_gather = '16', random_page_cost = '1.1', work_mem = '16000MB'
 Query Identifier: 8067911393521483881
 Planning:
   Buffers: shared hit=31575
 Planning Time: 202.581 ms
 Execution Time: 3844.555 ms
(63 rows)

Schema:

                                  Table "public.telemetry_power"
  Column   |       Type       | Collation | Nullable | Default | Storage | Compression | Stats target | Description 
-----------+------------------+-----------+----------+---------+---------+-------------+--------------+-------------
 ts        | bigint           |           | not null |         | plain   |             |              | 
 val       | double precision |           |          |         | plain   |             |              | 
 series_id | integer          |           |          |         | plain   |             |              | 
Indexes:
    "telemetry_power_series_id_ts_idx" btree (series_id, ts DESC)
    "telemetry_power_ts_idx" btree (ts DESC)
Triggers:
    ts_insert_blocker BEFORE INSERT ON telemetry_powerFOR EACH ROW EXECUTE FUNCTION _timescaledb_internal.insert_blocker()
Child tables: <snip>
Access method: heap
Options: autovacuum_vacuum_threshold=50000, autovacuum_analyze_threshold=50000

                                  Table "public.series_telemetry_power"
    Column    |    Type     | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
--------------+-------------+-----------+----------+---------+----------+-------------+--------------+-------------
 id           | integer     |           | not null |         | plain    |             |              | 
 metric_id    | integer     |           | not null |         | plain    |             |              | 
 labels       | label_array |           | not null |         | extended |             |              | 
 delete_epoch | bigint      |           |          |         | plain    |             |              | 
Partition of: series FOR VALUES IN (654)
Partition constraint: ((metric_id IS NOT NULL) AND (metric_id = 654))
Indexes:
    "series_pkey_654" PRIMARY KEY, btree (id)
    "series_telemetry_power_labels_idx" gin (labels)
    "series_labels_id_654" UNIQUE CONSTRAINT, btree (labels) INCLUDE (id)
Check constraints:
    "series_telemetry_power_labels_check" CHECK (labels[1] = 413 AND labels[1] IS NOT NULL)
    "series_telemetry_power_metric_id_check" CHECK (metric_id = 654)
Access method: heap
Options: autovacuum_vacuum_threshold=100, autovacuum_analyze_threshold=100

                                                       Table "public.label"
 Column |  Type   | Collation | Nullable |              Default              | Storage  | Compression | Stats target | Description 
--------+---------+-----------+----------+-----------------------------------+----------+-------------+--------------+-------------
 id     | integer |           | not null | nextval('label_id_seq'::regclass) | plain    |             |              | 
 key    | text    |           |          |                                   | extended |             |              | 
 value  | text    |           |          |                                   | extended |             |              | 
Indexes:
    "label_pkey" PRIMARY KEY, btree (id) INCLUDE (key, value)
    "label_key_value_id_key" UNIQUE CONSTRAINT, btree (key, value) INCLUDE (id)
Check constraints:
    "label_id_check" CHECK (id > 0)
Access method: heap

                                        Table "public.label_key_position"
     Column      |  Type   | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
-----------------+---------+-----------+----------+---------+----------+-------------+--------------+-------------
 metric_category | text    |           |          |         | extended |             |              | 
 metric_name     | text    |           |          |         | extended |             |              | 
 key             | text    |           |          |         | extended |             |              | 
 pos             | integer |           |          |         | plain    |             |              | 
Indexes:
    "label_key_position_metric_category_metric_name_key_pos_key" UNIQUE CONSTRAINT, btree (metric_category, metric_name, key) INCLUDE (pos)
Access method: heap

ElasticSearch Sample Doc:

"_index" : "telemetry-2022.04.28",
"_type" : "kafka-connect",
"_id" : "telemetry-2022.04.28+1+86136091",
"_score" : 0.0,
"_source" : {
  "TelemetrySource" : "nC",
  "MessageId" : "Telemetry.Power",
  "Timestamp" : 1651176279136,
  "Location" : "x9000c1s2b1n0",
  "ParentalContext" : "Chassis",
  "PhysicalContext" : "VoltageRegulator",
  "PhysicalSubContext" : "Input",
  "ParentalIndex" : null,
  "Index" : 0,
  "DeviceSpecificContext" : null,
  "Value" : 1043.0
}

ElasticSearch Query:

{"search_type":"query_then_fetch","ignore_unavailable":true,"index":["telemetry-2022.04.22","telemetry-2022.04.27","telemetry-2022.04.28","telemetry-2022.04.29","telemetry-2022.04.30","telemetry-2022.05.01","telemetry-2022.05.02","telemetry-2022.05.03"]}
{"size":0,"query":{"bool":{"filter":[{"range":{"Timestamp":{"gte":1650663853916,"lte":1651602719334,"format":"epoch_millis"}}},{"query_string":{"analyze_wildcard":true,"query":"MessageId = CrayTelemetry.Power  and Location = (\"x9000c1s0b0\" OR \"x9000c1s0b1\" OR \"x9000c1s1b0\" OR \"x9000c1s1b1\" OR \"x9000c1s2b0\" OR \"x9000c1s2b1\" OR \"x9000c1s4b0\" OR \"x9000c1s4b1\")"}}]}},"aggs":{"2":{"date_histogram":{"interval":"1m","field":"Timestamp","min_doc_count":1,"extended_bounds":{"min":1650663853916,"max":1651602719334},"format":"epoch_millis"},"aggs":{"3":{"avg":{"field":"Value"}}}}}}

I am just taking a quick look at this and will have to see if someone else can look a little more closely, but, I’d bet that the where clause array[m.series_id] && (select coalesce(array_agg(id), array[]::integer[]) from power_series_data) means that we can’t use an index scan, because you’re now filtering based on an array representation of the series_id rather than the series_id on its own, this confuses the planner dramatically. Not sure if you tried it as m.series_id IN (SELECT id FROM power_series_data). I think that probably would work better.

I’d also recommend doing the with clause with a MATERIALIZED parameter to help make sure it’s an optimization barrier.

I just ran an analogous query on some data I’m testing with our getting started dataset . The results were pretty dramatic:

EXPLAIN (ANALYZE, BUFFERS) 
WITH t as MATERIALIZED (SELECT symbol FROM company where name ILIKE '%ap%')
SELECT * FROM stocks_real_time 
WHERE array[symbol] && (SELECT coalesce(array_agg(symbol), array[]::text[]) FROM t) AND time >= '2022-04-04' and time < '2022-04-05';
                                                                                              QUERY PLAN                                                                                              
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Gather  (cost=1002.76..12334.91 rows=4223 width=24) (actual time=0.713..501.096 rows=39482 loops=1)
   Workers Planned: 1
   Params Evaluated: $1
   Workers Launched: 1
   Buffers: shared hit=4094
   CTE t
     ->  Seq Scan on company  (cost=0.00..2.25 rows=3 width=4) (actual time=0.009..0.062 rows=3 loops=1)
           Filter: (name ~~* '%ap%'::text)
           Rows Removed by Filter: 97
           Buffers: shared hit=1
   InitPlan 2 (returns $1)
     ->  Aggregate  (cost=0.07..0.08 rows=1 width=32) (actual time=0.070..0.070 rows=1 loops=1)
           Buffers: shared hit=1
           ->  CTE Scan on t  (cost=0.00..0.06 rows=3 width=32) (actual time=0.011..0.066 rows=3 loops=1)
                 Buffers: shared hit=1
   ->  Parallel Custom Scan (ChunkAppend) on stocks_real_time  (cost=0.43..10910.28 rows=2484 width=24) (actual time=0.184..281.026 rows=19741 loops=2)
         Buffers: shared hit=4093
         ->  Parallel Index Scan using _hyper_5_2655_chunk_stocks_real_time_time_idx on _hyper_5_2655_chunk  (cost=0.43..10910.28 rows=2484 width=24) (actual time=0.182..279.501 rows=19741 loops=2)
               Index Cond: (("time" >= '2022-04-04 00:00:00+00'::timestamp with time zone) AND ("time" < '2022-04-05 00:00:00+00'::timestamp with time zone))
               Filter: (ARRAY[symbol] && $1)
               Rows Removed by Filter: 191831
               Buffers: shared hit=4093
 Planning:
   Buffers: shared hit=20
 Planning Time: 0.446 ms
 Execution Time: 502.736 ms
(26 rows)

with the array approach vs. the standard in approach:

EXPLAIN (ANALYZE, BUFFERS) 
WITH t as MATERIALIZED  (SELECT symbol FROM company where name ILIKE '%ap%')
SELECT * FROM stocks_real_time 
WHERE symbol IN (SELECT symbol FROM t) AND time >= '2022-04-04' and time < '2022-04-05';
                                                                                      QUERY PLAN                                                                                      
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Nested Loop  (cost=153.22..11233.97 rows=6335 width=24) (actual time=3.136..25.325 rows=39482 loops=1)
   Buffers: shared hit=8769
   CTE t
     ->  Seq Scan on company  (cost=0.00..2.25 rows=3 width=4) (actual time=0.008..0.058 rows=3 loops=1)
           Filter: (name ~~* '%ap%'::text)
           Rows Removed by Filter: 97
           Buffers: shared hit=1
   ->  HashAggregate  (cost=0.07..0.10 rows=3 width=32) (actual time=0.063..0.067 rows=3 loops=1)
         Group Key: t.symbol
         Batches: 1  Memory Usage: 24kB
         Buffers: shared hit=1
         ->  CTE Scan on t  (cost=0.00..0.06 rows=3 width=32) (actual time=0.010..0.060 rows=3 loops=1)
               Buffers: shared hit=1
   ->  Bitmap Heap Scan on _hyper_5_2655_chunk  (cost=150.90..3662.66 rows=8121 width=24) (actual time=1.816..6.958 rows=13161 loops=3)
         Recheck Cond: ((symbol = t.symbol) AND ("time" >= '2022-04-04 00:00:00+00'::timestamp with time zone) AND ("time" < '2022-04-05 00:00:00+00'::timestamp with time zone))
         Heap Blocks: exact=8539
         Buffers: shared hit=8768
         ->  Bitmap Index Scan on _hyper_5_2655_chunk_ix_symbol_time  (cost=0.00..148.87 rows=8121 width=0) (actual time=1.449..1.449 rows=13161 loops=3)
               Index Cond: ((symbol = t.symbol) AND ("time" >= '2022-04-04 00:00:00+00'::timestamp with time zone) AND ("time" < '2022-04-05 00:00:00+00'::timestamp with time zone))
               Buffers: shared hit=229
 Planning:
   Buffers: shared hit=19
 Planning Time: 0.398 ms
 Execution Time: 26.812 ms
(24 rows)

And you’ll note that the second one can use an index scan where the first can’t, so that will speed things up considerably in many cases. The speedups were different in different time frames, depending on the amount of meeting the conditions etc, but there was a pretty consistent speedup.

Let me know if that helps!

Thanks for checking this out. I had tried doing the IN (SELECT …) originally, but when I did that the filter wasn’t getting pushed down to the data nodes which caused even worse performance. All the rows end up getting pulled from the data nodes, and then the access node uses a hash join to filter the rows on series_id.

with power_series_data as (
  select id, labels
  from series_telemetry_power s
  where
    s.labels && (select coalesce(array_agg(l.id), array[]::int[]) from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x90
00c1s4b1'))
)
select 
  time_bucket(60000, m.ts) time,
  avg(val) Power
from telemetry_power m
where
  m.series_id IN (select id from power_series_data)
  and m.ts between 1650997919334 and 1651602719334
group by time
order by 1 desc;
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=2158321.19..2159627.14 rows=200 width=16) (actual time=25733.250..26337.126 rows=10078 loops=1)
   Output: (time_bucket('60000'::bigint, m.ts)), avg(m.val)
   Group Key: (time_bucket('60000'::bigint, m.ts))
   Buffers: shared hit=19
   InitPlan 1 (returns $0)
     ->  Aggregate  (cost=2.19..2.20 rows=1 width=32) (actual time=0.129..0.132 rows=1 loops=1)
           Output: COALESCE(array_agg(l.id), '{}'::integer[])
           Buffers: shared hit=3
           ->  Index Only Scan using label_key_value_id_key on public.label l  (cost=0.27..2.18 rows=1 width=4) (actual time=0.062..0.120 rows=8 loops=1)
                 Output: l.key, l.value, l.id
                 Index Cond: (l.key = 'Location'::text)
                 Filter: ("substring"(l.value, '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$'::text) = ANY ('{x9000c1s0b0,x9000c1s0b1,x9000c1s1b0,x9000c1s1b1,x9000c1s2b0,x9000c1s2b1,x9000c1s4b0,x9000c1s4b1}'::text[]))
                 Rows Removed by Filter: 19
                 Heap Fetches: 0
                 Buffers: shared hit=3
   ->  Sort  (cost=2158318.99..2158753.31 rows=173726 width=16) (actual time=25733.177..25927.462 rows=3430161 loops=1)
         Output: (time_bucket('60000'::bigint, m.ts)), m.val
         Sort Key: (time_bucket('60000'::bigint, m.ts)) DESC
         Sort Method: quicksort  Memory: 259093kB
         Buffers: shared hit=19
         ->  Hash Join  (cost=106.52..2143199.23 rows=173726 width=16) (actual time=81.584..25037.248 rows=3430161 loops=1)
               Output: time_bucket('60000'::bigint, m.ts), m.val
               Inner Unique: true
               Hash Cond: (m.series_id = s.id)
               Buffers: shared hit=19
               ->  Custom Scan (AsyncAppend)  (cost=100.00..2095532.14 rows=17806889 width=20) (actual time=81.330..23122.649 rows=17807937 loops=1)
                     Output: m.ts, m.val, m.series_id
                     ->  Append  (cost=100.00..2095532.14 rows=17806889 width=20) (actual time=0.008..21572.715 rows=17807937 loops=1)
                           ->  Custom Scan (DataNodeScan) on public.telemetry_power m_1  (cost=100.00..479409.06 rows=4254347 width=20) (actual time=0.004..5061.227 rows=4254497 loops=1)
                                 Output: m_1.ts, m_1.val, m_1.series_id
                                 Data node: dn_ld01
                                 Fetcher Type: Row by row
                                 Chunks: _dist_hyper_125_59475_chunk, _dist_hyper_125_59474_chunk, _dist_hyper_125_59473_chunk, _dist_hyper_125_59472_chunk, _dist_hyper_125_59471_chunk, _dist_hyper_125_59470_chunk, _dist_hyper_125_59469_chunk, _dist_hyper_125_59468_chunk, _dist_hyper_125_59467_chunk, _dist_hyper_125_59466_chunk, _dist_hyper_125_59465_chunk, _dist_hyper_125_59464_chunk, _dist_hyper_125_59463_chunk, _dist_hyper_125_59462_chunk, _dist_hyper_125_59461_chunk, _dist_hyper_125_59460_chunk, _dist_hyper_125_59459_chunk, _dist_hyper_125_59458_chunk, _dist_hyper_125_59457_chunk, _dist_hyper_125_59456_chunk, _dist_hyper_125_59455_chunk, _dist_hyper_125_59454_chunk, _dist_hyper_125_59453_chunk, _dist_hyper_125_59452_chunk, _dist_hyper_125_59451_chunk, _dist_hyper_125_59450_chunk, _dist_hyper_125_59449_chunk, _dist_hyper_125_59448_chunk, _dist_hyper_125_59447_chunk, _dist_hyper_125_59446_chunk, _dist_hyper_125_59445_chunk, _dist_hyper_125_59444_chunk, _dist_hyper_125_59443_chunk, _dist_hyper_125_59442_chunk, _dist_hyper_125_59441_chunk, _dist_hyper_125_59440_chunk, _dist_hyper_125_59439_chunk, _dist_hyper_125_59438_chunk, _dist_hyper_125_59437_chunk, _dist_hyper_125_59436_chunk, _dist_hyper_125_59435_chunk, _dist_hyper_125_59434_chunk, _dist_hyper_125_59433_chunk, _dist_hyper_125_59432_chunk, _dist_hyper_125_59431_chunk, _dist_hyper_125_59430_chunk, _dist_hyper_125_59429_chunk, _dist_hyper_125_59428_chunk, _dist_hyper_125_59427_chunk, _dist_hyper_125_59426_chunk, _dist_hyper_125_59425_chunk, _dist_hyper_125_59424_chunk, _dist_hyper_125_59423_chunk, _dist_hyper_125_59422_chunk, _dist_hyper_125_59421_chunk, _dist_hyper_125_59420_chunk, _dist_hyper_125_59419_chunk, _dist_hyper_125_59418_chunk, _dist_hyper_125_59417_chunk, _dist_hyper_125_59416_chunk, _dist_hyper_125_59415_chunk, _dist_hyper_125_59414_chunk, _dist_hyper_125_59413_chunk, _dist_hyper_125_59412_chunk, _dist_hyper_125_59411_chunk, _dist_hyper_125_59410_chunk, _dist_hyper_125_59409_chunk, _dist_hyper_125_59408_chunk, _dist_hyper_125_59407_chunk, _dist_hyper_125_59405_chunk, _dist_hyper_125_59404_chunk
                                 Remote SQL: SELECT ts, val, series_id FROM public.telemetry_power WHERE _timescaledb_internal.chunks_in(public.telemetry_power.*, ARRAY[96682, 96681, 96680, 96679, 96678, 96677, 96676, 96675, 96674, 96673, 96672, 96671, 96670, 96669, 96668, 96667, 96666, 96665, 96664, 96663, 96662, 96661, 96660, 96659, 96658, 96657, 96656, 96655, 96654, 96653, 96652, 96651, 96650, 96649, 96648, 96647, 96646, 96645, 96644, 96643, 96642, 96641, 96640, 96639, 96638, 96637, 96636, 96635, 96634, 96633, 96632, 96631, 96630, 96629, 96628, 96627, 96626, 96625, 96624, 96623, 96622, 96621, 96620, 96619, 96618, 96617, 96616, 96615, 96614, 96612, 96611]) AND ((ts >= 1650997919334::bigint)) AND ((ts <= 1651602719334::bigint)) ORDER BY public.time_bucket(60000::bigint, ts) DESC NULLS FIRST
                           ->  Custom Scan (DataNodeScan) on public.telemetry_power m_2  (cost=100.00..733142.37 rows=6506409 width=20) (actual time=0.001..6694.592 rows=6506760 loops=1)
                                 Output: m_2.ts, m_2.val, m_2.series_id
                                 Data node: dn_ld02
                                 Fetcher Type: Row by row
                                 Chunks: _dist_hyper_125_59564_chunk, _dist_hyper_125_59563_chunk, _dist_hyper_125_59562_chunk, _dist_hyper_125_59561_chunk, _dist_hyper_125_59560_chunk, _dist_hyper_125_59559_chunk, _dist_hyper_125_59558_chunk, _dist_hyper_125_59557_chunk, _dist_hyper_125_59556_chunk, _dist_hyper_125_59555_chunk, _dist_hyper_125_59554_chunk, _dist_hyper_125_59553_chunk, _dist_hyper_125_59552_chunk, _dist_hyper_125_59550_chunk, _dist_hyper_125_59544_chunk, _dist_hyper_125_59543_chunk, _dist_hyper_125_59542_chunk, _dist_hyper_125_59541_chunk, _dist_hyper_125_59540_chunk, _dist_hyper_125_59539_chunk, _dist_hyper_125_59538_chunk, _dist_hyper_125_59537_chunk, _dist_hyper_125_59536_chunk, _dist_hyper_125_59535_chunk, _dist_hyper_125_59534_chunk, _dist_hyper_125_59533_chunk, _dist_hyper_125_59532_chunk, _dist_hyper_125_59531_chunk, _dist_hyper_125_59530_chunk, _dist_hyper_125_59529_chunk, _dist_hyper_125_59528_chunk, _dist_hyper_125_59527_chunk, _dist_hyper_125_59526_chunk, _dist_hyper_125_59525_chunk, _dist_hyper_125_59524_chunk, _dist_hyper_125_59523_chunk, _dist_hyper_125_59522_chunk, _dist_hyper_125_59521_chunk, _dist_hyper_125_59520_chunk, _dist_hyper_125_59519_chunk, _dist_hyper_125_59518_chunk, _dist_hyper_125_59517_chunk, _dist_hyper_125_59516_chunk, _dist_hyper_125_59515_chunk, _dist_hyper_125_59514_chunk, _dist_hyper_125_59513_chunk, _dist_hyper_125_59512_chunk, _dist_hyper_125_59511_chunk, _dist_hyper_125_59510_chunk, _dist_hyper_125_59509_chunk, _dist_hyper_125_59508_chunk, _dist_hyper_125_59507_chunk, _dist_hyper_125_59506_chunk, _dist_hyper_125_59505_chunk, _dist_hyper_125_59504_chunk, _dist_hyper_125_59503_chunk, _dist_hyper_125_59502_chunk, _dist_hyper_125_59500_chunk, _dist_hyper_125_59499_chunk, _dist_hyper_125_59498_chunk, _dist_hyper_125_59497_chunk, _dist_hyper_125_59496_chunk, _dist_hyper_125_59495_chunk, _dist_hyper_125_59494_chunk, _dist_hyper_125_59493_chunk, _dist_hyper_125_59492_chunk, _dist_hyper_125_59491_chunk, _dist_hyper_125_59490_chunk, _dist_hyper_125_59489_chunk, _dist_hyper_125_59487_chunk, _dist_hyper_125_59486_chunk
                                 Remote SQL: SELECT ts, val, series_id FROM public.telemetry_power WHERE _timescaledb_internal.chunks_in(public.telemetry_power.*, ARRAY[96771, 96770, 96769, 96768, 96767, 96766, 96765, 96764, 96763, 96762, 96761, 96760, 96759, 96757, 96751, 96750, 96749, 96748, 96747, 96746, 96745, 96744, 96743, 96742, 96741, 96740, 96739, 96738, 96737, 96736, 96735, 96734, 96733, 96732, 96731, 96730, 96729, 96728, 96727, 96726, 96725, 96724, 96723, 96722, 96721, 96720, 96719, 96718, 96717, 96716, 96715, 96714, 96713, 96712, 96711, 96710, 96709, 96707, 96706, 96705, 96704, 96703, 96702, 96701, 96700, 96699, 96698, 96697, 96696, 96694, 96693]) AND ((ts >= 1650997919334::bigint)) AND ((ts <= 1651602719334::bigint)) ORDER BY public.time_bucket(60000::bigint, ts) DESC NULLS FIRST
                           ->  Custom Scan (DataNodeScan) on public.telemetry_power m_3  (cost=100.00..793946.27 rows=7046133 width=20) (actual time=0.001..8396.836 rows=7046680 loops=1)
                                 Output: m_3.ts, m_3.val, m_3.series_id
                                 Data node: dn_ld03
                                 Fetcher Type: Row by row
                                 Chunks: _dist_hyper_125_59673_chunk, _dist_hyper_125_59672_chunk, _dist_hyper_125_59671_chunk, _dist_hyper_125_59670_chunk, _dist_hyper_125_59669_chunk, _dist_hyper_125_59668_chunk, _dist_hyper_125_59667_chunk, _dist_hyper_125_59666_chunk, _dist_hyper_125_59665_chunk, _dist_hyper_125_59664_chunk, _dist_hyper_125_59663_chunk, _dist_hyper_125_59662_chunk, _dist_hyper_125_59661_chunk, _dist_hyper_125_59660_chunk, _dist_hyper_125_59659_chunk, _dist_hyper_125_59658_chunk, _dist_hyper_125_59654_chunk, _dist_hyper_125_59653_chunk, _dist_hyper_125_59652_chunk, _dist_hyper_125_59651_chunk, _dist_hyper_125_59650_chunk, _dist_hyper_125_59649_chunk, _dist_hyper_125_59648_chunk, _dist_hyper_125_59647_chunk, _dist_hyper_125_59646_chunk, _dist_hyper_125_59645_chunk, _dist_hyper_125_59644_chunk, _dist_hyper_125_59643_chunk, _dist_hyper_125_59642_chunk, _dist_hyper_125_59641_chunk, _dist_hyper_125_59640_chunk, _dist_hyper_125_59639_chunk, _dist_hyper_125_59638_chunk, _dist_hyper_125_59637_chunk, _dist_hyper_125_59636_chunk, _dist_hyper_125_59635_chunk, _dist_hyper_125_59634_chunk, _dist_hyper_125_59633_chunk, _dist_hyper_125_59632_chunk, _dist_hyper_125_59631_chunk, _dist_hyper_125_59630_chunk, _dist_hyper_125_59629_chunk, _dist_hyper_125_59628_chunk, _dist_hyper_125_59627_chunk, _dist_hyper_125_59626_chunk, _dist_hyper_125_59625_chunk, _dist_hyper_125_59624_chunk, _dist_hyper_125_59623_chunk, _dist_hyper_125_59622_chunk, _dist_hyper_125_59621_chunk, _dist_hyper_125_59620_chunk, _dist_hyper_125_59619_chunk, _dist_hyper_125_59615_chunk, _dist_hyper_125_59614_chunk, _dist_hyper_125_59613_chunk, _dist_hyper_125_59606_chunk, _dist_hyper_125_59602_chunk, _dist_hyper_125_59601_chunk, _dist_hyper_125_59600_chunk, _dist_hyper_125_59598_chunk, _dist_hyper_125_59597_chunk, _dist_hyper_125_59593_chunk, _dist_hyper_125_59583_chunk, _dist_hyper_125_59582_chunk, _dist_hyper_125_59581_chunk, _dist_hyper_125_59580_chunk, _dist_hyper_125_59579_chunk, _dist_hyper_125_59578_chunk, _dist_hyper_125_59577_chunk, _dist_hyper_125_59575_chunk, _dist_hyper_125_59574_chunk
                                 Remote SQL: SELECT ts, val, series_id FROM public.telemetry_power WHERE _timescaledb_internal.chunks_in(public.telemetry_power.*, ARRAY[96880, 96879, 96878, 96877, 96876, 96875, 96874, 96873, 96872, 96871, 96870, 96869, 96868, 96867, 96866, 96865, 96861, 96860, 96859, 96858, 96857, 96856, 96855, 96854, 96853, 96852, 96851, 96850, 96849, 96848, 96847, 96846, 96845, 96844, 96843, 96842, 96841, 96840, 96839, 96838, 96837, 96836, 96835, 96834, 96833, 96832, 96831, 96830, 96829, 96828, 96827, 96826, 96822, 96821, 96820, 96813, 96809, 96808, 96807, 96805, 96804, 96800, 96790, 96789, 96788, 96787, 96786, 96785, 96784, 96782, 96781]) AND ((ts >= 1650997919334::bigint)) AND ((ts <= 1651602719334::bigint)) ORDER BY public.time_bucket(60000::bigint, ts) DESC NULLS FIRST
               ->  Hash  (cost=6.47..6.47 rows=4 width=4) (actual time=0.224..0.226 rows=40 loops=1)
                     Output: s.id
                     Buckets: 1024  Batches: 1  Memory Usage: 10kB
                     Buffers: shared hit=19
                     ->  Bitmap Heap Scan on public.series_telemetry_power s  (cost=2.23..6.47 rows=4 width=4) (actual time=0.185..0.214 rows=40 loops=1)
                           Output: s.id
                           Recheck Cond: ((s.labels)::integer[] && $0)
                           Heap Blocks: exact=7
                           Buffers: shared hit=19
                           ->  Bitmap Index Scan on series_telemetry_power_labels_idx  (cost=0.00..2.23 rows=4 width=0) (actual time=0.174..0.174 rows=40 loops=1)
                                 Index Cond: ((s.labels)::integer[] && $0)
                                 Buffers: shared hit=12
 Settings: effective_cache_size = '161555MB', effective_io_concurrency = '900', enable_partitionwise_aggregate = 'on', jit = 'off', max_parallel_workers = '32', max_parallel_workers_per_gather = '16', random_page_cost = '1.1', work_mem = '16000MB'
 Query Identifier: -6516854844957803240
 Planning:
   Buffers: shared hit=34455
 Planning Time: 284.183 ms
 Execution Time: 26449.883 ms
(64 rows)

I did notice that if I run the Remote SQL from the original query directly on the data node it is as you say, the planner chooses a sequential scan, and if I change it to be an IN clause against a list of series_id’s it will use an index scan. The time difference for the remote SQL is significant, as expected, ~250ms vs. 1000ms.

The issue seems to be that Timescale is not able to resolve a sub-select into a set of values to push-down to the data nodes.

Interestingly trying this on a regular hypertable the array version is still faster. While the sub-select is able to use an index the array version chooses a parallel sequential scan, and in this case since we are using most of the data from the chunks that ends up being more effective.

Another interesting thing is I’m unable to get the query planner to use a parallel sequential scan when querying the table directly on one of the data nodes with the remote SQL generated by the access node.

Query Regular Hypertable with SELECT

with power_series_data as materialized (
  select id, labels
  from series_telemetry_power s
  where
    s.labels && (select coalesce(array_agg(l.id), array[]::int[]) from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x90
00c1s4b1'))
)
select 
  time_bucket(60000, m.ts) time,
  avg(val) Power
from telemetry_power m
where
  m.series_id IN (select id from power_series_data)
  and m.ts between 1650663853916 and 1651268653916
group by time
order by 1 desc;

PLAN:

                                                                                                      QUERY PLAN                                                                                                       
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=82707.16..85312.12 rows=115776 width=16) (actual time=74947.088..100027.483 rows=5756 loops=1)
   Output: (time_bucket('60000'::bigint, m_1.ts)), avg(m_1.val)
   Group Key: (time_bucket('60000'::bigint, m_1.ts))
   Buffers: shared hit=655475, temp read=14869 written=14925
   CTE power_series_data
     ->  Bitmap Heap Scan on public.series_telemetry_power s  (cost=4.43..7.55 rows=4 width=60) (actual time=0.409..0.885 rows=40 loops=1)
           Output: s.id, s.labels
           Recheck Cond: ((s.labels)::integer[] && $0)
           Heap Blocks: exact=4
           Buffers: shared hit=16
           InitPlan 1 (returns $0)
             ->  Aggregate  (cost=2.19..2.20 rows=1 width=32) (actual time=0.308..0.334 rows=1 loops=1)
                   Output: COALESCE(array_agg(l.id), '{}'::integer[])
                   Buffers: shared hit=3
                   ->  Index Only Scan using label_key_value_id_key on public.label l  (cost=0.27..2.18 rows=1 width=4) (actual time=0.068..0.228 rows=8 loops=1)
                         Output: l.key, l.value, l.id
                         Index Cond: (l.key = 'Location'::text)
                         Filter: ("substring"(l.value, '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$'::text) = ANY ('{x9000c1s0b0,x9000c1s0b1,x9000c1s1b0,x9000c1s1b1,x9000c1s2b0,x9000c1s2b1,x9000c1s4b0,x9000c1s4b1}'::text[]))
                         Rows Removed by Filter: 19
                         Heap Fetches: 0
                         Buffers: shared hit=3
           ->  Bitmap Index Scan on series_telemetry_power_labels_idx  (cost=0.00..2.23 rows=4 width=0) (actual time=0.386..0.392 rows=40 loops=1)
                 Index Cond: ((s.labels)::integer[] && $0)
                 Buffers: shared hit=12
   ->  Sort  (cost=82699.62..82989.06 rows=115776 width=16) (actual time=74945.128..87300.804 rows=1941855 loops=1)
         Output: (time_bucket('60000'::bigint, m_1.ts)), m_1.val
         Sort Key: (time_bucket('60000'::bigint, m_1.ts)) DESC
         Sort Method: external merge  Disk: 49440kB
         Buffers: shared hit=655475, temp read=14869 written=14925
         ->  Nested Loop  (cost=262.36..71801.99 rows=115776 width=16) (actual time=6.529..61589.393 rows=1941855 loops=1)
               Output: time_bucket('60000'::bigint, m_1.ts), m_1.val
               Buffers: shared hit=655475
               ->  HashAggregate  (cost=0.09..0.13 rows=4 width=4) (actual time=1.884..2.239 rows=40 loops=1)
                     Output: power_series_data.id
                     Group Key: power_series_data.id
                     Batches: 1  Memory Usage: 24kB
                     Buffers: shared hit=16
                     ->  CTE Scan on power_series_data  (cost=0.00..0.08 rows=4 width=4) (actual time=0.424..1.494 rows=40 loops=1)
                           Output: power_series_data.id, power_series_data.labels
                           Buffers: shared hit=16
               ->  Append  (cost=262.27..17575.41 rows=30269 width=20) (actual time=4.359..937.927 rows=48546 loops=40)
                     Buffers: shared hit=655459
                     ->  Bitmap Heap Scan on _timescaledb_internal._hyper_7_309_chunk m_1  (cost=262.27..8646.74 rows=15381 width=20) (actual time=4.203..168.681 rows=23515 loops=40)
                           Output: m_1.ts, m_1.val, m_1.series_id
                           Recheck Cond: ((m_1.series_id = power_series_data.id) AND (m_1.ts >= '1650663853916'::bigint) AND (m_1.ts <= '1651268653916'::bigint))
                           Heap Blocks: exact=312723
                           Buffers: shared hit=316447
                           ->  Bitmap Index Scan on _hyper_7_309_chunk_power_series_id_ts  (cost=0.00..258.42 rows=15381 width=0) (actual time=3.226..3.226 rows=23515 loops=40)
                                 Index Cond: ((m_1.series_id = power_series_data.id) AND (m_1.ts >= '1650663853916'::bigint) AND (m_1.ts <= '1651268653916'::bigint))
                                 Buffers: shared hit=3724
                     ->  Bitmap Heap Scan on _timescaledb_internal._hyper_7_307_chunk m_2  (cost=253.78..8777.33 rows=14888 width=20) (actual time=4.546..176.717 rows=25031 loops=40)
                           Output: m_2.ts, m_2.val, m_2.series_id
                           Recheck Cond: ((m_2.series_id = power_series_data.id) AND (m_2.ts >= '1650663853916'::bigint) AND (m_2.ts <= '1651268653916'::bigint))
                           Heap Blocks: exact=335060
                           Buffers: shared hit=339012
                           ->  Bitmap Index Scan on _hyper_7_307_chunk_power_series_id_ts  (cost=0.00..250.06 rows=14888 width=0) (actual time=3.533..3.533 rows=25031 loops=40)
                                 Index Cond: ((m_2.series_id = power_series_data.id) AND (m_2.ts >= '1650663853916'::bigint) AND (m_2.ts <= '1651268653916'::bigint))
                                 Buffers: shared hit=3952
 Settings: effective_cache_size = '5969MB', effective_io_concurrency = '256', enable_partitionwise_aggregate = 'on', jit = 'off', max_parallel_workers_per_gather = '4', random_page_cost = '1.1', work_mem = '2547kB'
 Planning:
   Buffers: shared hit=63
 Planning Time: 2.046 ms
 Execution Time: 100074.702 ms
(63 rows)

Query Regular Hypertable with ARRAY

with power_series_data as materialized (
  select id, labels
  from series_telemetry_power s
  where
    s.labels && (select coalesce(array_agg(l.id), array[]::int[]) from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x90
00c1s4b1'))
)
select 
  time_bucket(60000, m.ts) time,
  avg(val) Power
from telemetry_power m
where
  array[m.series_id] && (select coalesce(array_agg(id), array[]::integer[]) from power_series_data)
  and m.ts between 1650663853916 and 1651268653916
group by time
order by 1 desc;

PLAN:

                                                                                                      QUERY PLAN                                                                                                       
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=111578.47..125455.53 rows=85133 width=16) (actual time=9877.033..38793.776 rows=5756 loops=1)
   Output: (time_bucket('60000'::bigint, m_2.ts)), avg(m_2.val)
   Group Key: (time_bucket('60000'::bigint, m_2.ts))
   Buffers: shared hit=64857, temp read=8413 written=8468
   CTE power_series_data
     ->  Bitmap Heap Scan on public.series_telemetry_power s  (cost=4.43..7.55 rows=4 width=60) (actual time=0.428..0.798 rows=40 loops=1)
           Output: s.id, s.labels
           Recheck Cond: ((s.labels)::integer[] && $0)
           Heap Blocks: exact=4
           Buffers: shared hit=16
           InitPlan 1 (returns $0)
             ->  Aggregate  (cost=2.19..2.20 rows=1 width=32) (actual time=0.322..0.349 rows=1 loops=1)
                   Output: COALESCE(array_agg(l.id), '{}'::integer[])
                   Buffers: shared hit=3
                   ->  Index Only Scan using label_key_value_id_key on public.label l  (cost=0.27..2.18 rows=1 width=4) (actual time=0.075..0.238 rows=8 loops=1)
                         Output: l.key, l.value, l.id
                         Index Cond: (l.key = 'Location'::text)
                         Filter: ("substring"(l.value, '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$'::text) = ANY ('{x9000c1s0b0,x9000c1s0b1,x9000c1s1b0,x9000c1s1b1,x9000c1s2b0,x9000c1s2b1,x9000c1s4b0,x9000c1s4b1}'::text[]))
                         Rows Removed by Filter: 19
                         Heap Fetches: 0
                         Buffers: shared hit=3
           ->  Bitmap Index Scan on series_telemetry_power_labels_idx  (cost=0.00..2.23 rows=4 width=0) (actual time=0.400..0.406 rows=40 loops=1)
                 Index Cond: ((s.labels)::integer[] && $0)
                 Buffers: shared hit=12
   InitPlan 3 (returns $2)
     ->  Aggregate  (cost=0.09..0.10 rows=1 width=32) (actual time=1.813..1.853 rows=1 loops=1)
           Output: COALESCE(array_agg(power_series_data.id), '{}'::integer[])
           Buffers: shared hit=16
           ->  CTE Scan on power_series_data  (cost=0.00..0.08 rows=4 width=4) (actual time=0.462..1.483 rows=40 loops=1)
                 Output: power_series_data.id, power_series_data.labels
                 Buffers: shared hit=16
   ->  Gather Merge  (cost=111570.82..123665.81 rows=101015 width=16) (actual time=9874.894..26644.822 rows=1941855 loops=1)
         Output: (time_bucket('60000'::bigint, m_2.ts)), m_2.val
         Workers Planned: 4
         Params Evaluated: $2
         Workers Launched: 4
         Buffers: shared hit=64857, temp read=8413 written=8468
         ->  Sort  (cost=110570.76..110633.89 rows=25254 width=16) (actual time=9856.907..12265.904 rows=388371 loops=5)
               Output: (time_bucket('60000'::bigint, m_2.ts)), m_2.val
               Sort Key: (time_bucket('60000'::bigint, m_2.ts)) DESC
               Sort Method: external merge  Disk: 9960kB
               Buffers: shared hit=64841, temp read=8413 written=8468
               Worker 0:  actual time=9852.448..12200.259 rows=386135 loops=1
                 Sort Method: external merge  Disk: 9840kB
                 Buffers: shared hit=13313, temp read=1675 written=1686
               Worker 1:  actual time=9853.430..12294.276 rows=390005 loops=1
                 Sort Method: external merge  Disk: 9944kB
                 Buffers: shared hit=12441, temp read=1688 written=1699
               Worker 2:  actual time=9852.388..12194.623 rows=384456 loops=1
                 Sort Method: external merge  Disk: 9800kB
                 Buffers: shared hit=13293, temp read=1670 written=1681
               Worker 3:  actual time=9854.000..12318.312 rows=390651 loops=1
                 Sort Method: external merge  Disk: 9960kB
                 Buffers: shared hit=12427, temp read=1690 written=1701
               ->  Parallel Append  (cost=0.00..108724.16 rows=25254 width=16) (actual time=1.355..7384.293 rows=388371 loops=5)
                     Buffers: shared hit=64813
                     Worker 0:  actual time=1.558..7387.841 rows=386135 loops=1
                       Buffers: shared hit=13306
                     Worker 1:  actual time=1.713..7374.302 rows=390005 loops=1
                       Buffers: shared hit=12434
                     Worker 2:  actual time=1.823..7386.526 rows=384456 loops=1
                       Buffers: shared hit=13286
                     Worker 3:  actual time=1.651..7377.153 rows=390651 loops=1
                       Buffers: shared hit=12420
                     ->  Parallel Seq Scan on _timescaledb_internal._hyper_7_307_chunk m_2  (cost=0.00..56178.52 rows=13064 width=16) (actual time=0.677..1392.274 rows=200252 loops=5)
                           Output: time_bucket('60000'::bigint, m_2.ts), m_2.val
                           Filter: ((ARRAY[m_2.series_id] && $2) AND (m_2.ts >= '1650663853916'::bigint) AND (m_2.ts <= '1651268653916'::bigint))
                           Rows Removed by Filter: 844862
                           Buffers: shared hit=33520
                           Worker 0:  actual time=0.028..535.514 rows=73750 loops=1
                             Buffers: shared hit=2879
                           Worker 1:  actual time=1.697..2680.368 rows=390005 loops=1
                             Buffers: shared hit=12434
                           Worker 2:  actual time=0.034..535.094 rows=72383 loops=1
                             Buffers: shared hit=2857
                           Worker 3:  actual time=1.617..2679.189 rows=390651 loops=1
                             Buffers: shared hit=12420
                     ->  Parallel Seq Scan on _timescaledb_internal._hyper_7_309_chunk m_1  (cost=0.00..52419.37 rows=12190 width=16) (actual time=1.118..2177.974 rows=313532 loops=3)
                           Output: time_bucket('60000'::bigint, m_1.ts), m_1.val
                           Filter: ((ARRAY[m_1.series_id] && $2) AND (m_1.ts >= '1650663853916'::bigint) AND (m_1.ts <= '1651268653916'::bigint))
                           Rows Removed by Filter: 1311755
                           Buffers: shared hit=31293
                           Worker 0:  actual time=1.541..2181.907 rows=312385 loops=1
                             Buffers: shared hit=10427
                           Worker 2:  actual time=1.799..2182.108 rows=312073 loops=1
                             Buffers: shared hit=10429
 Settings: effective_cache_size = '5969MB', effective_io_concurrency = '256', enable_partitionwise_aggregate = 'on', jit = 'off', max_parallel_workers_per_gather = '4', random_page_cost = '1.1', work_mem = '2547kB'
 Planning:
   Buffers: shared hit=63
 Planning Time: 2.098 ms
 Execution Time: 38831.371 ms
(91 rows)

Three main thoughts

  • Is it possible to have Timescale execute a sub-select and push the resulting vector down to the data nodes for filtering?
  • Is it possible to get the data nodes to choose a parallel scan, can you scan an index in parallel?
  • I’m still concerned about the async append time as this is consistently the long poll in the tent with different combinations of parameters.

Testing with a fixed IN clause cuts the execution time almost in half.

For smaller time ranges last 5 min, hour, or day the performance differences is much closer. Getting the last week is an extreme case for us, but I’m still surprised that there is this much of a gap between ElasticSearch and Timescale.

Time range | ES (ms) | TS (ms)
Last 5 min | ~40ms | ~60ms
Last 1 hr | ~70ms | ~100ms
Last 1 day | ~260ms | ~1100ms

Again, thank you for your help!

I think this would be a great thing to put into a Github issue first of all, and if you would do that, especially around the subselect pushdown and parallel scanning, that would be useful for improving this.

The thing I will say is that in general we’re currently more designed for concurrency than for the fastest queries for a single user (which is probably why you’re seeing less parallelism than maybe you’re expecting), so we make different tradeoffs around that than some other tools, we’re thinking about ways for users to tune that tradeoff a bit more as well though.

The other thing I’d suggest is that we do have a feature that will make this sort of query very fast, which is continuous aggregates (and that’s a feature that we also have some significant improvements coming for soon: Continuous Aggregates finals form by fabriziomello · Pull Request #4269 · timescale/timescaledb · GitHub). You can also use our statistical aggregates, specifically stats_agg which also has a rollup function that allows you to take, say, the one minute aggregate and compute larger buckets from that (edit: note that I hadn’t noticed that you’re doing aggregates across series ids here, but it would also allow you to rollup across series ids, filtering out the ones you don’t want, at different levels of granularity depending on the view you’re looking at, for instance, you might have hourly aggregates that you use for the weekly view and rollup based on series id, as minutely aggregates for a week seems like overkill?), which might be the bigger win for you.

That would be the most significant speedup you could do I think, even on the current version of continuous aggregates, and would likely get faster on the new version coming out soon.

Have you tried that out and does it make a difference for you?

Also, given the size of your data it feels like multi-node would be significant overkill? Are you testing this for a much larger deployment and are you sure that the performance you’re looking for actually scales with both solutions? Continuous aggregates are a great way to power dashboards like it sounds like you have though and I’d definitely recommend taking advantage of them.

Just to note, AFAIK the times of the underlying scans of the Async Append node are not shown correctly in the EXPLAIN output. So the Async Append time you’re seeing is probably due to the underlying scans taking so long, not that node doing something by itself.

It would be interesting to run this with EXPLAIN (VERBOSE) and SET timescaledb.enable_remote_explain = on so that we can see the query plans on the remote nodes.

To answer your other questions:

Is it possible to have Timescale execute a sub-select and push the resulting vector down to the data nodes for filtering?

That would be a distributed join pushdown, currently we have no support for this. The moment you join a distributed table to something else, it’s all downloaded to the access node and joined there.

Is it possible to get the data nodes to choose a parallel scan, can you scan an index in parallel?

Yes, although I’m always struggling to make Postgres choose a parallel plan. You could test it by setting parallel_setup_cost and parallel_tuple_cost to 1, but IIRC the settings are not shipped from access node to data node.

1 Like

I would by happy to create a Github issue for this. Thanks for information around the design considerations that is very helpful to know. I have not looked at the continuous and statistical aggregates yet, I’ll definitely check those out. Yes I’m testing for a much larger deployment. Will be collecting hundreds of different metrics with our current estimates being 30-40 TB ingested per day and we need to retain for at least 30 days which would put us at a bit over 1 PB. I think for that size we will have to use multi-node simply for the storage space.

That would be a distributed join pushdown, currently we have no support for this. The moment you join a distributed table to something else, it’s all downloaded to the access node and joined there.

In this case would it have to be a distributed join though? In the normal non-timescale non-distributed case where I have something like

SELECT * FROM foo WHERE foo.bar IN (SELECT id FROM baz);

Wouldn’t Postgres have to evaluate that sub-select into a vector of id’s that it then uses to scan the index? Wouldn’t Timescale then be able to do the same thing and simply push that vector of ID’s to the data nodes (assuming that baz is a small table stored on the access node)?

This kind of feature would be really useful because in my experience this kind of setup is a common use case when creating Grafana dashboards.

Yep, I think that’s what it does, and that’s a join. The plan you described sounds like a nested loop join with the inner side being a subselect, and the outer side an index scan. Like this:

test=# explain (costs off) select count(*) from live_steps_originals_pg where user_id in (select distinct user_id from live_steps_originals_pg);
                                                               QUERY PLAN                                                               
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
 Aggregate
   ->  Nested Loop
         ->  Unique
               ->  Custom Scan (SkipScan) on live_steps_originals_pg live_steps_originals_pg_1
                     ->  Index Only Scan using live_steps_originals_pg_user_id_idx on live_steps_originals_pg live_steps_originals_pg_1
                           Index Cond: (user_id > NULL::bigint)
         ->  Index Only Scan using live_steps_originals_pg_user_id_idx on live_steps_originals_pg
               Index Cond: (user_id = live_steps_originals_pg_1.user_id)

The subselect should be small in this case, so for distributed case we could evaluate it on the access node and ship the result to data nodes. Then they could perform join locally and use the index scan. I agree that this use case looks important.

Yep, I think that’s what it does, and that’s a join.

I mean, it doesn’t exactly evaluate it into a vector, but into a table. I doubt Postgres can transform a join into array match operator this way…

What you describe is exactly what I was thinking about, and now that I think about it that makes complete sense that it would be a table and join since Postgres probably wants to treat most everything it works with as a table.

I tried setting timescaledb.enable_remote_explain = on and running explain. If I hard code the series_ids and use the IN clause as expected the remote plans use index scans and performance is improved by ~100%. When I run explain on the array version with this setting on I get the error shown below.

Query:

with power_series_data as (
  select id, labels
  from series_telemetry_power s
  where
    s.labels && (select coalesce(array_agg(l.id), array[]::int[]) from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x90
00c1s4b1'))
)
select 
  time_bucket(60000, m.ts) time,
  avg(val) Power
from telemetry_power m
where
  m.series_id IN (7852, 7863, 7869, 7895, 7896, 7898, 7901, 7911, 7918, 7922, 7924, 7928, 7936, 7969, 7974, 7984, 7995, 8114, 8121, 8153, 7991, 8010, 8021, 8027, 8100, 8133, 8084, 8091, 8109, 8137, 7943, 7972, 7982, 8044, 8054, 8111, 8006, 8019, 8082, 8119)
  and m.ts between 1650663853916 and 1651268653916
group by time
order by 1 desc;

Plan:

                                                                                                                                                                                                                                                      QUERY PLAN          
                                                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                   
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=281393.39..281400.89 rows=200 width=16) (actual time=1878.665..2153.229 rows=5756 loops=1)
   Output: (time_bucket('60000'::bigint, ts)), avg(val)
   Group Key: (time_bucket('60000'::bigint, ts))
   Buffers: temp read=134 written=135
   ->  Sort  (cost=281393.39..281394.89 rows=600 width=40) (actual time=1878.609..1991.714 rows=17268 loops=1)
         Output: (time_bucket('60000'::bigint, ts)), (PARTIAL avg(val))
         Sort Key: (time_bucket('60000'::bigint, ts)) DESC
         Sort Method: external merge  Disk: 1072kB
         Buffers: temp read=134 written=135
         ->  Custom Scan (AsyncAppend)  (cost=1715.99..281365.71 rows=600 width=40) (actual time=1203.208..1753.411 rows=17268 loops=1)
               Output: (time_bucket('60000'::bigint, ts)), (PARTIAL avg(val))
               ->  Append  (cost=1715.99..281365.71 rows=600 width=40) (actual time=0.029..333.924 rows=17268 loops=1)
                     ->  Custom Scan (DataNodeScan)  (cost=1715.99..66294.50 rows=200 width=40) (actual time=0.011..40.047 rows=5756 loops=1)
                           Output: (time_bucket('60000'::bigint, m.ts)), (PARTIAL avg(m.val))
                           Relations: Aggregate on (public.telemetry_power m)
                           Data node: dn1
                           Fetcher Type: Row by row
                           Chunks: _dist_hyper_4_210_chunk, _dist_hyper_4_209_chunk, _dist_hyper_4_208_chunk, _dist_hyper_4_207_chunk, _dist_hyper_4_206_chunk, _dist_hyper_4_205_chunk, _dist_hyper_4_204_chunk, _dist_hyper_4_203_chunk, _dist_hyper_4_202_chunk, _dist_hyper_4_201_chunk, _dist_hyper_4_200_chunk, _dist_hyper_4_199_chunk, _dist_hyper_4_198_chunk, _dist_hyper_4_197_chunk, _dist_hyper_4_196_chunk, _dist_hyper_4_195_chunk, _dist_hyper_4_194_chunk, _dist_hyper_4_193_chunk, _dist_hyper_4_192_chunk, _dist_hyper_4_191_chunk, _dist_hyper_4_190_chunk, _dist_hyper_4_189_chunk, _dist_hyper_4_188_chunk, _dist_hyper_4_187_chunk, _dist_hyper_4_186_chunk, _dist_hyper_4_185_chunk, _dist_hyper_4_184_chunk, _dist_hyper_4_183_chunk, _dist_hyper_4_181_chunk, _dist_hyper_4_180_chunk, _dist_hyper_4_179_chunk, _dist_hyper_4_178_chunk, _dist_hyper_4_177_chunk, _dist_hyper_4_176_chunk, _dist_hyper_4_175_chunk, _dist_hyper_4_174_chunk, _dist_hyper_4_173_chunk, _dist_hyper_4_172_chunk, _dist_hyper_4_171_chunk, _dist_hyper_4_170_chunk, _dist_hyper_4_169_chunk
                           Remote SQL: SELECT public.time_bucket(60000::bigint, ts), _timescaledb_internal.partialize_agg(avg(val)) FROM public.telemetry_power WHERE _timescaledb_internal.chunks_in(public.telemetry_power.*, ARRAY[99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58]) AND ((series_id = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[]))) AND ((ts >= 1650663853916::bigint)) AND ((ts <= 1651268653916::bigint)) GROUP BY 1
                           Remote EXPLAIN: 
                             Partial HashAggregate  (cost=45266.06..53769.84 rows=305585 width=40) (actual time=14605.818..14650.739 rows=5756 loops=1)
                               Output: (time_bucket('60000'::bigint, _dist_hyper_4_210_chunk.ts)), _timescaledb_internal.partialize_agg(PARTIAL avg(_dist_hyper_4_210_chunk.val))
                               Group Key: time_bucket('60000'::bigint, _dist_hyper_4_210_chunk.ts)
                               Planned Partitions: 32  Batches: 1  Memory Usage: 1425kB
                               Buffers: shared hit=275352
                               ->  Result  (cost=0.41..33714.94 rows=323179 width=16) (actual time=0.080..12076.899 rows=323179 loops=1)
                                     Output: time_bucket('60000'::bigint, _dist_hyper_4_210_chunk.ts), _dist_hyper_4_210_chunk.val
                                     Buffers: shared hit=275352
                                     ->  Append  (cost=0.41..29675.20 rows=323179 width=16) (actual time=0.067..7365.778 rows=323179 loops=1)
                                           Buffers: shared hit=275352
                                           ->  Index Scan using _dist_hyper_4_210_chunk_telemetry_power_series_ on _timescaledb_internal._dist_hyper_4_210_chunk  (cost=0.41..679.22 rows=7792 width=16) (actual time=0.055..59.192 rows=7792 loops=1)
                                                 Output: _dist_hyper_4_210_chunk.ts, _dist_hyper_4_210_chunk.val
                                                 Index Cond: ((_dist_hyper_4_210_chunk.series_id = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_dist_hyper_4_210_chunk.ts >= '1650663853916'::bigint) AND (_dist_hyper_4_210_chunk.ts <= '1651268653916'::bigint))
                                                 Buffers: shared hit=6658
                                           ->  Index Scan using _dist_hyper_4_209_chunk_telemetry_power_series_ on _timescaledb_internal._dist_hyper_4_209_chunk  (cost=0.41..707.03 rows=8144 width=16) (actual time=0.074..62.185 rows=8144 loops=1)
                                                 Output: _dist_hyper_4_209_chunk.ts, _dist_hyper_4_209_chunk.val
                                                 Index Cond: ((_dist_hyper_4_209_chunk.series_id = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_dist_hyper_4_209_chunk.ts >= '1650663853916'::bigint) AND (_dist_hyper_4_209_chunk.ts <= '1651268653916'::bigint))
                                                 Buffers: shared hit=6889
                                           ->  Index Scan using _dist_hyper_4_208_chunk_telemetry_power_series_ on _timescaledb_internal._dist_hyper_4_208_chunk  (cost=0.41..707.09 rows=8150 width=16) (actual time=0.066..62.892 rows=8150 loops=1)
                                                 Output: _dist_hyper_4_208_chunk.ts, _dist_hyper_4_208_chunk.val
                                                 Index Cond: ((_dist_hyper_4_208_chunk.series_id = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_dist_hyper_4_208_chunk.ts >= '1650663853916'::bigint) AND (_dist_hyper_4_208_chunk.ts <= '1651268653916'::bigint))
                                                 Buffers: shared hit=6939
                                           ->  Index Scan using _dist_hyper_4_207_chunk_telemetry_power_series_ on _timescaledb_internal._dist_hyper_4_207_chunk  (cost=0.41..707.02 rows=8142 width=16) (actual time=0.080..60.485 rows=8142 loops=1)
                                                 Output: _dist_hyper_4_207_chunk.ts, _dist_hyper_4_207_chunk.val
<SNIP>

Error Running Explain:

explain (analyze, verbose, buffers, settings) with power_series_data as (
  select id, labels
  from series_telemetry_power s
  where
    s.labels && (select coalesce(array_agg(l.id), array[]::int[]) from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x9000c1s4b1'))
)
select 
  time_bucket(60000, m.ts) time,
  avg(val) Power
from telemetry_power m
where
  array[m.series_id] && (select coalesce(array_agg(id), array[]::integer[]) from power_series_data)
  and m.ts between 1650663853916 and 1651268653916
group by time
order by 1 desc;
ERROR:  [dn1]: bind message supplies 0 parameters, but prepared statement "" requires 1
Time: 2922.663 ms (00:02.923)

I tested out the parallel query stuff by taking the remote SQL from the explain plan on the access node, logging into the data node directly and executing it.

What I found was that I had to set parallel_setup_cost and parallel_tuple_cost = 1 and force_parallel_mode = on. In addition the partialize_agg function call prevents a parallel plan from being chosen. Once I replace partialize_agg with a regular avg function call a parallel plan is chosen. The call to the chunks_in function in the where clause does not cause this issue.

Unfortunately the parallel plan chosen plans only one worker for some reason. This is despite having max_worker_processes = 59, max_parallel_workers = 48 and max_parallel_workers_per_gather = 24. I also checked pg_stat_activity for backend_type = ‘parallel worker’ and there were none, although there were around 30 other connections that were writing data.

I noticed that the Postgres documentation says user defined functions are marked PARALLEL UNSAFE by default, not sure how this applies to functions implemented in C. Even still not sure how to convince Postgres that it can uses these 32 cores that are lying around to scan this index.

1 Like

One last quick update. I tried setting up a materialized view as a continuous aggregate using stats_agg, but the result ends up being slower than the original query. Likely because the materialized exists only on the access node, and therefore the work cannot be split between the data nodes. Other than not being able to distribute the continuous aggregate among the data nodes it is a very cool feature.

I filed an enhancement request for this, not sure how viable or challenging this would be.

@troy,

Thank you for following up and taking the time to submit the enhancement. Distributed continuous aggregates are a common request and something we’ll continue to prioritize as other changes might help facilitate that. It looks like the product team already put some classification on the ticket, so it will end up in a review discussion at some point for sure.

There was a question about the continuous aggregate I created. I created a cagg bucketing time to 1 hour intervals and using stats_agg to prep val for aggregation. I then queried the cagg without using time_bucket since the cagg is already bucketed on hour intervals and then applied the average aggregation to the rollup of stats (the value column with stats_agg applied). This query runs in ~7500 ms which is ~38x longer than the distributed hypertable.

CAGG

CREATE MATERIALIZED VIEW power_hourly WITH (timescaledb.continuous) AS
SELECT
  time_bucket(3600000, ts) as bucket, series_id sid,
  stats_agg(val) as stats
FROM cray_craytelemetry_power
GROUP BY 1, 2;

Query

with power_series_data as (
  select id, labels
  from series_cray_craytelemetry_power s
  where
    s.labels && (select coalesce(array_agg(l.id), array[]::int[]) from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x90
00c1s4b1'))
)
select 
  bucket time,
  average(rollup(stats)) Power
from power_hourly m
where
  m.sid IN (7852, 7863, 7869, 7895, 7896, 7898, 7901, 7911, 7918, 7922, 7924, 7928, 7936, 7969, 7974, 7984, 7995, 8114, 8121, 8153, 7991, 8010, 8021, 8027, 8100, 8133, 8084, 8091, 8109, 8137, 7943, 7972, 7982, 8044, 8054, 8111, 8006, 8019, 8082, 8119)
  and m.bucket between 1650585600000 and 1651795199000
group by time
order by 1 desc;

Plan

 GroupAggregate  (cost=63220938.36..63220962.17 rows=200 width=16) (actual time=6123.788..6131.963 rows=243 loops=1)
   Output: "*SELECT* 1".bucket, average(rollup("*SELECT* 1".stats))
   Group Key: "*SELECT* 1".bucket
   Buffers: shared hit=11728
   ->  Sort  (cost=63220938.36..63220945.30 rows=2774 width=40) (actual time=6123.713..6126.245 rows=9720 loops=1)
         Output: "*SELECT* 1".bucket, "*SELECT* 1".stats
         Sort Key: "*SELECT* 1".bucket DESC
         Sort Method: quicksort  Memory: 1751kB
         Buffers: shared hit=11728
         ->  Append  (cost=3596.63..63220779.72 rows=2774 width=40) (actual time=42.050..6120.326 rows=9720 loops=1)
               Buffers: shared hit=11728
               ->  Subquery Scan on "*SELECT* 1"  (cost=3596.63..3640.23 rows=1938 width=40) (actual time=42.048..50.563 rows=9720 loops=1)
                     Output: "*SELECT* 1".bucket, "*SELECT* 1".stats
                     Buffers: shared hit=11728
                     ->  HashAggregate  (cost=3596.63..3620.85 rows=1938 width=44) (actual time=42.046..49.138 rows=9720 loops=1)
                           Output: _hyper_131_135507_chunk.bucket, _hyper_131_135507_chunk.sid, _timescaledb_internal.finalize_agg('public.stats_agg(double precision)'::text, NULL::name, NULL::name, '{{pg_catalog,float8}}'::name[], _hyper_131_135507_chunk.agg_3_3, NULL::statssummary1d)
                           Group Key: _hyper_131_135507_chunk.bucket, _hyper_131_135507_chunk.sid
                           Batches: 1  Memory Usage: 4497kB
                           Buffers: shared hit=11728
                           ->  Append  (cost=0.29..3451.25 rows=19383 width=56) (actual time=0.041..23.429 rows=19383 loops=1)
                                 Buffers: shared hit=11728
                                 ->  Index Scan using _hyper_131_135507_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135507_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.040..0.812 rows=800 loops=1)
                                       Output: _hyper_131_135507_chunk.bucket, _hyper_131_135507_chunk.sid, _hyper_131_135507_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135507_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135507_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135507_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135507_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
                                 ->  Index Scan using _hyper_131_135510_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135510_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.022..0.763 rows=800 loops=1)
                                       Output: _hyper_131_135510_chunk.bucket, _hyper_131_135510_chunk.sid, _hyper_131_135510_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135510_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135510_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135510_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135510_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
                                 ->  Index Scan using _hyper_131_135512_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135512_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.021..0.781 rows=800 loops=1)
                                       Output: _hyper_131_135512_chunk.bucket, _hyper_131_135512_chunk.sid, _hyper_131_135512_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135512_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135512_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135512_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135512_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
                                 ->  Index Scan using _hyper_131_135521_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135521_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.022..0.767 rows=800 loops=1)
                                       Output: _hyper_131_135521_chunk.bucket, _hyper_131_135521_chunk.sid, _hyper_131_135521_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135521_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135521_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135521_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135521_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
                                 ->  Index Scan using _hyper_131_135502_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135502_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.025..0.792 rows=800 loops=1)
                                       Output: _hyper_131_135502_chunk.bucket, _hyper_131_135502_chunk.sid, _hyper_131_135502_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135502_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135502_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135502_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135502_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
                                 ->  Index Scan using _hyper_131_135518_chunk__materialized_hypertable_131_sid_bucket on _timescaledb_internal._hyper_131_135518_chunk  (cost=0.29..137.58 rows=800 width=56) (actual time=0.024..0.761 rows=800 loops=1)
                                       Output: _hyper_131_135518_chunk.bucket, _hyper_131_135518_chunk.sid, _hyper_131_135518_chunk.agg_3_3
                                       Index Cond: ((_hyper_131_135518_chunk.sid = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[])) AND (_hyper_131_135518_chunk.bucket < COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint)) AND (_hyper_131_135518_chunk.bucket >= '1650585600000'::bigint) AND (_hyper_131_135518_chunk.bucket <= '1651795199000'::bigint))
                                       Buffers: shared hit=480
<SNIP>
               ->  Subquery Scan on "*SELECT* 2"  (cost=63217021.14..63217125.62 rows=836 width=40) (actual time=6068.536..6068.541 rows=0 loops=1)
                     Output: "*SELECT* 2".bucket, "*SELECT* 2".stats
                     ->  GroupAggregate  (cost=63217021.14..63217117.26 rows=836 width=44) (actual time=6068.534..6068.539 rows=0 loops=1)
                           Output: (time_bucket('3600000'::bigint, cray_craytelemetry_power.ts)), cray_craytelemetry_power.series_id, stats_agg(cray_craytelemetry_power.val)
                           Group Key: (time_bucket('3600000'::bigint, cray_craytelemetry_power.ts)), cray_craytelemetry_power.series_id
                           ->  Sort  (cost=63217021.14..63217042.04 rows=8358 width=20) (actual time=6068.531..6068.535 rows=0 loops=1)
                                 Output: (time_bucket('3600000'::bigint, cray_craytelemetry_power.ts)), cray_craytelemetry_power.series_id, cray_craytelemetry_power.val
                                 Sort Key: (time_bucket('3600000'::bigint, cray_craytelemetry_power.ts)), cray_craytelemetry_power.series_id
                                 Sort Method: quicksort  Memory: 25kB
                                 ->  Custom Scan (AsyncAppend)  (cost=100.10..63216476.66 rows=8358 width=20) (actual time=6068.513..6068.517 rows=0 loops=1)
                                       Output: (time_bucket('3600000'::bigint, cray_craytelemetry_power.ts)), cray_craytelemetry_power.series_id, cray_craytelemetry_power.val
                                       ->  Append  (cost=100.10..63216476.66 rows=8358 width=20) (actual time=5912.126..5912.129 rows=0 loops=1)
                                             ->  Custom Scan (DataNodeScan) on public.cray_craytelemetry_power cray_craytelemetry_power_1  (cost=100.10..26115185.27 rows=3418 width=20) (actual time=1336.498..1336.499 rows=0 loops=1)
                                                   Output: time_bucket('3600000'::bigint, cray_craytelemetry_power_1.ts), cray_craytelemetry_power_1.series_id, cray_craytelemetry_power_1.val
                                                   Filter: (cray_craytelemetry_power_1.ts >= COALESCE(_timescaledb_internal.cagg_watermark(131), '-9223372036854775808'::bigint))
                                                   Rows Removed by Filter: 825516
                                                   Data node: dn_ld01
                                                   Fetcher Type: Row by row
                                                   Chunks: _dist_hyper_96_48339_chunk, _dist_hyper_96_5495_chunk, _dist_hyper_96_7989_chunk, _dist_hyper_96_71122_chunk, _dist_hyper_96_56343_chunk, _dist_hyper_96_57569_chunk, _dist_hyper_96_46488_chunk, _dist_hyper_96_872_
chunk, _dist_hyper_96_69577_chunk, _dist_hyper_96_18171_chunk, _dist_hyper_96_54797_chunk, _dist_hyper_96_10144_chunk, _dist_hyper_96_9223_chunk, _dist_hyper_96_11692_chunk, _dist_hyper_96_61251_chunk, _dist_hyper_96_40003_chunk, _dist_hyper_96_38767_chunk, _dist_hyper_96
_44336_chunk, _dist_hyper_96_67416_chunk, _dist_hyper_96_75755_chunk, _dist_hyper_96_33829_chunk, _dist_hyper_96_296_chunk, _dist_hyper_96_3948_chunk, _dist_hyper_96_18795_chunk, _dist_hyper_96_72349_chunk, _dist_hyper_96_51126_chunk, _dist_hyper_96_53586_chunk, _dist_hyp
er_96_69885_chunk, _dist_hyper_96_52047_chunk, _dist_hyper_96_14472_chunk, _dist_hyper_96_68034_chunk, _dist_hyper_96_16318_chunk, _dist_hyper_96_68342_chunk, _dist_hyper_96_29575_chunk, _dist_hyper_96_34139_chunk, _dist_hyper_96_62483_chunk, _dist_hyper_96_31666_chunk, _
dist_hyper_96_2112_chunk, _dist_hyper_96_12309_chunk, _dist_hyper_96_24965_chunk, _dist_hyper_96_15708_chunk, _dist_hyper_96_36604_chunk, _dist_hyper_96_3639_chunk, _dist_hyper_96_12618_chunk, _dist_hyper_96_66799_chunk, _dist_hyper_96_32283_chunk, _dist_hyper_96_58847_ch
unk, _dist_hyper_96_15400_chunk, _dist_hyper_96_43718_chunk, _dist_hyper_96_16625_chunk, _dist_hyper_96_2729_chunk, _dist_hyper_96_45256_chunk, _dist_hyper_96_1184_chunk, _dist_hyper_96_34447_chunk, _dist_hyper_96_53895_chunk, _dist_hyper_96_29266_chunk, _dist_hyper_96_31
049_chunk, _dist_hyper_96_27428_chunk, _dist_hyper_96_74204_chunk, _dist_hyper_96_7035_chunk, _dist_hyper_96_13855_chunk, _dist_hyper_96_19104_chunk, _dist_hyper_96_37838_chunk, _dist_hyper_96_65575_chunk, _dist_hyper_96_39386_chunk, _dist_hyper_96_56947_chunk, _dist_hype
r_96_33211_chunk, _dist_hyper_96_47411_chunk, _dist_hyper_96_20640_chunk, _dist_hyper_96_28346_chunk, _dist_hyper_96_52355_chunk, _dist_hyper_96_72659_chunk, _dist_hyper_96_45873_chunk, _dist_hyper_96_23419_chunk, _dist_hyper_96_48958_chunk, _dist_hyper_96_48031_chunk, _d
ist_hyper_96_4256_chunk, _dist_hyper_96_11999_chunk, _dist_hyper_96_40313_chunk, _dist_hyper_96_7349_chunk, _dist_hyper_96_14164_chunk, _dist_hyper_96_72048_chunk, _dist_hyper_96_74540_chunk, _dist_hyper_96_59707_chunk, _dist_hyper_96_34757_chunk, _dist_hyper_96_47722_chu
nk, _dist_hyper_96_70503_chunk, _dist_hyper_96_21258_chunk, _dist_hyper_96_35676_chunk, _dist_hyper_96_4565_chunk, _dist_hyper_96_2423_chunk, _dist_hyper_96_60016_chunk, _dist_hyper_96_5805_chunk, _dist_hyper_96_42790_chunk, _dist_hyper_96_37531_chunk, _dist_hyper_96_6402
8_chunk, _dist_hyper_96_12927_chunk, _dist_hyper_96_25891_chunk, _dist_hyper_96_20947_chunk, _dist_hyper_96_38457_chunk, _dist_hyper_96_24038_chunk, _dist_hyper_96_23730_chunk, _dist_hyper_96_64337_chunk, _dist_hyper_96_76370_chunk, _dist_hyper_96_65265_chunk, _dist_hyper
_96_39693_chunk, _dist_hyper_96_49267_chunk, _dist_hyper_96_16935_chunk, _dist_hyper_96_17553_chunk, _dist_hyper_96_38149_chunk, _dist_hyper_96_66490_chunk, _dist_hyper_96_43097_chunk, _dist_hyper_96_44027_chunk, _dist_hyper_96_32903_chunk, _dist_hyper_96_27736_chunk, _di
st_hyper_96_27128_chunk, _dist_hyper_96_51744_chunk, _dist_hyper_96_1805_chunk, _dist_hyper_96_41549_chunk, _dist_hyper_96_49884_chunk, _dist_hyper_96_64647_chunk, _dist_hyper_96_66179_chunk, _dist_hyper_96_13545_chunk, _dist_hyper_96_21566_chunk, _dist_hyper_96_3038_chun
k, _dist_hyper_96_28046_chunk, _dist_hyper_96_40623_chunk, _dist_hyper_96_68653_chunk, _dist_hyper_96_19722_chunk, _dist_hyper_96_55724_chunk, _dist_hyper_96_4871_chunk, _dist_hyper_96_67725_chunk, _dist_hyper_96_26820_chunk, _dist_hyper_96_42484_chunk, _dist_hyper_96_860
5_chunk, _dist_hyper_96_17242_chunk, _dist_hyper_96_70194_chunk, _dist_hyper_96_50507_chunk, _dist_hyper_96_54203_chunk, _dist_hyper_96_6417_chunk, _dist_hyper_96_63717_chunk, _dist_hyper_96_6108_chunk, _dist_hyper_96_21876_chunk, _dist_hyper_96_64956_chunk, _dist_hyper_9
6_46794_chunk, _dist_hyper_96_50197_chunk, _dist_hyper_96_69266_chunk, _dist_hyper_96_564_chunk, _dist_hyper_96_24656_chunk, _dist_hyper_96_20031_chunk, _dist_hyper_96_49577_chunk, _dist_hyper_96_57919_chunk, _dist_hyper_96_62180_chunk, _dist_hyper_96_70813_chunk, _dist_h
yper_96_45562_chunk, _dist_hyper_96_72968_chunk, _dist_hyper_96_35372_chunk, _dist_hyper_96_62792_chunk, _dist_hyper_96_47104_chunk, _dist_hyper_96_16016_chunk, _dist_hyper_96_71739_chunk, _dist_hyper_96_42173_chunk, _dist_hyper_96_63099_chunk, _dist_hyper_96_51435_chunk,
 _dist_hyper_96_44953_chunk, _dist_hyper_96_52666_chunk, _dist_hyper_96_71430_chunk, _dist_hyper_96_5181_chunk, _dist_hyper_96_28962_chunk, _dist_hyper_96_48649_chunk, _dist_hyper_96_29885_chunk, _dist_hyper_96_37223_chunk, _dist_hyper_96_46181_chunk, _dist_hyper_96_75123
_chunk, _dist_hyper_96_8916_chunk, _dist_hyper_96_9532_chunk, _dist_hyper_96_60941_chunk, _dist_hyper_96_10765_chunk, _dist_hyper_96_26201_chunk, _dist_hyper_96_74814_chunk, _dist_hyper_96_43408_chunk, _dist_hyper_96_76064_chunk, _dist_hyper_96_30740_chunk, _dist_hyper_96
_15090_chunk, _dist_hyper_96_58538_chunk, _dist_hyper_96_39075_chunk, _dist_hyper_96_56032_chunk, _dist_hyper_96_25274_chunk, _dist_hyper_96_23112_chunk, _dist_hyper_96_73586_chunk, _dist_hyper_96_22493_chunk, _dist_hyper_96_18486_chunk, _dist_hyper_96_24346_chunk, _dist_
hyper_96_41241_chunk, _dist_hyper_96_61560_chunk, _dist_hyper_96_68957_chunk, _dist_hyper_96_33520_chunk, _dist_hyper_96_13236_chunk, _dist_hyper_96_52974_chunk, _dist_hyper_96_36296_chunk, _dist_hyper_96_9843_chunk, _dist_hyper_96_11072_chunk, _dist_hyper_96_11383_chunk,
 _dist_hyper_96_75446_chunk, _dist_hyper_96_31975_chunk, _dist_hyper_96_73276_chunk, _dist_hyper_96_60634_chunk, _dist_hyper_96_19411_chunk, _dist_hyper_96_44646_chunk, _dist_hyper_96_65871_chunk, _dist_hyper_96_56652_chunk, _dist_hyper_96_35987_chunk, _dist_hyper_96_3135
6_chunk, _dist_hyper_96_25584_chunk, _dist_hyper_96_6726_chunk, _dist_hyper_96_1495_chunk, _dist_hyper_96_8298_chunk, _dist_hyper_96_53283_chunk, _dist_hyper_96_7680_chunk, _dist_hyper_96_17862_chunk, _dist_hyper_96_22803_chunk, _dist_hyper_96_58228_chunk, _dist_hyper_96_
63410_chunk, _dist_hyper_96_59155_chunk, _dist_hyper_96_22184_chunk, _dist_hyper_96_55415_chunk, _dist_hyper_96_20339_chunk, _dist_hyper_96_60325_chunk, _dist_hyper_96_28655_chunk, _dist_hyper_96_55106_chunk, _dist_hyper_96_35066_chunk, _dist_hyper_96_50815_chunk, _dist_h
yper_96_26509_chunk, _dist_hyper_96_40930_chunk, _dist_hyper_96_41863_chunk, _dist_hyper_96_36912_chunk, _dist_hyper_96_67108_chunk, _dist_hyper_96_54512_chunk, _dist_hyper_96_32592_chunk, _dist_hyper_96_61870_chunk, _dist_hyper_96_3330_chunk, _dist_hyper_96_73894_chunk, 
_dist_hyper_96_14781_chunk, _dist_hyper_96_10454_chunk, _dist_hyper_96_57260_chunk
                                                   Remote SQL: SELECT ts, val, series_id FROM public.cray_craytelemetry_power WHERE _timescaledb_internal.chunks_in(public.cray_craytelemetry_power.*, ARRAY[79347, 5495, 7995, 120752, 93578, 94804, 77496, 872, 119207, 24402, 92032, 10168, 9247, 11716, 104662, 64807, 63571, 69140, 110827, 131611, 52423, 296, 3948, 25026, 121979, 82134, 90821, 119515, 83055, 14496, 117304, 22549, 117972, 42037, 52733, 105894, 50260, 2112, 12333, 37427, 21939, 55198, 3639, 12642, 110210, 50877, 96054, 21274, 68522, 22856, 2729, 70060, 1184, 53041, 91130, 41728, 49643, 39890, 123834, 7035, 13879, 25335, 56432, 108986, 64190, 94182, 51805, 78419, 26871, 40808, 83363, 122289, 76881, 35881, 79966, 79039, 4256, 12023, 65117, 7349, 14188, 121678, 124170, 96914, 53351, 78730, 120133, 27489, 54270, 4565, 2423, 97223, 5805, 67594, 56125, 107439, 12951, 38353, 27178, 63261, 36500, 36192, 107748, 132226, 108676, 64497, 80275, 23166, 23784, 62593, 109901, 67901, 68831, 51497, 40198, 39590, 82752, 1805, 66353, 80892, 108058, 109590, 13569, 27797, 3038, 40508, 65427, 118283, 25953, 92959, 4871, 111136, 39282, 67288, 8629, 23473, 119824, 81515, 91438, 6417, 107128, 6108, 28107, 108367, 77802, 81205, 118896, 564, 37118, 26262, 80585, 95126, 105591, 120443, 76210, 122598, 53966, 106203, 78112, 22247, 121369, 66977, 106510, 82443, 69757, 83674, 121060, 5181, 41424, 79657, 42347, 55817, 77189, 124753, 8940, 9556, 104352, 10789, 38663, 124444, 68212, 131920, 48971, 15114, 95745, 63879, 93267, 37736, 35574, 123216, 28724, 24717, 36808, 66045, 104971, 118587, 52114, 13260, 89849, 54890, 9867, 11096, 11407, 130942, 50569, 122906, 103685, 25642, 69450, 109282, 93887, 54581, 49950, 38046, 6726, 1495, 8322, 90518, 7680, 24093, 34905, 95435, 106821, 96362, 28415, 92650, 26570, 97532, 41117, 92341, 53660, 81823, 38971, 65734, 66667, 55506, 110519, 91747, 51186, 105281, 3330, 123524, 14805, 10478, 94495]) AND ((ts >= 1650585600000::bigint)) AND ((ts <= 1651798799000::bigint)) AND ((series_id = ANY ('{7852,7863,7869,7895,7896,7898,7901,7911,7918,7922,7924,7928,7936,7969,7974,7984,7995,8114,8121,8153,7991,8010,8021,8027,8100,8133,8084,8091,8109,8137,7943,7972,7982,8044,8054,8111,8006,8019,8082,8119}'::integer[]))) AND ((public.time_bucket(3600000::bigint, ts) >= 1650585600000::bigint)) AND ((public.time_bucket(3600000::bigint, ts) <= 1651795199000::bigint))
                                             ->  Custom Scan (DataNodeScan) on public.cray_craytelemetry_power cray_craytelemetry_power_2  (cost=100.10..18888359.36 rows=2511 width=20) (actual time=3019.054..3019.054 rows=0 loops=1)
<SNIP>
 Settings: effective_cache_size = '161555MB', effective_io_concurrency = '900', enable_partitionwise_aggregate = 'on', jit = 'off', max_parallel_workers = '32', max_parallel_workers_per_gather = '16', parallel_setup_cost = '1', parallel_tuple_cost = '1', random_page_cost = '1.1', work_mem = '16000MB'
 Query Identifier: 5944616703698342313
 Planning:
   Buffers: shared hit=267193 dirtied=97
 Planning Time: 2079.112 ms
 Execution Time: 6172.272 ms
(163 rows)

To get timescaledb to pushdown the result of a subselect you create an immutable function like so:

CREATE OR REPLACE FUNTION push() RETURNS int[]  LANGUAGE SQL IMMUTABLE
AS $$
  SELECT array_agg(distinct device_id) FROM metrics;
$$;

And then use it in a query like this:

SELECT * FROM metrics_dist WHERE device_id=ANY(push());

This will materialize the function result on the access node and push it down to the data nodes.

This will also use indexes when applicable:

explain verbose select * from metrics_dist where device_id=ANY(t());
                                                                                                         QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Custom Scan (AsyncAppend)  (cost=100.00..7261.53 rows=56216 width=36)
   Output: metrics_dist."time", metrics_dist.device_id, metrics_dist.v0, metrics_dist.v1, metrics_dist.v2, metrics_dist.v3
   ->  Append  (cost=100.00..7261.53 rows=56216 width=36)
         ->  Custom Scan (DataNodeScan) on public.metrics_dist metrics_dist_1  (cost=100.00..1655.86 rows=13674 width=36)
               Output: metrics_dist_1."time", metrics_dist_1.device_id, metrics_dist_1.v0, metrics_dist_1.v1, metrics_dist_1.v2, metrics_dist_1.v3
               Data node: data_node_1
               Fetcher Type: Row by row
               Chunks: _dist_hyper_7_37_chunk, _dist_hyper_7_40_chunk, _dist_hyper_7_43_chunk
               Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist WHERE _timescaledb_internal.chunks_in(public.metrics_dist.*, ARRAY[1, 2, 3]) AND ((device_id = ANY ('{1,2,3,4,5}'::integer[])))
               Remote EXPLAIN:
                 Append  (cost=0.28..843.19 rows=13674 width=36)
                   ->  Index Scan using _dist_hyper_7_37_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_37_chunk  (cost=0.28..208.51 rows=3598 width=36)
                         Output: _dist_hyper_7_37_chunk."time", _dist_hyper_7_37_chunk.device_id, _dist_hyper_7_37_chunk.v0, _dist_hyper_7_37_chunk.v1, _dist_hyper_7_37_chunk.v2, _dist_hyper_7_37_chunk.v3
                         Index Cond: (_dist_hyper_7_37_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_40_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_40_chunk  (cost=0.28..283.16 rows=5038 width=36)
                         Output: _dist_hyper_7_40_chunk."time", _dist_hyper_7_40_chunk.device_id, _dist_hyper_7_40_chunk.v0, _dist_hyper_7_40_chunk.v1, _dist_hyper_7_40_chunk.v2, _dist_hyper_7_40_chunk.v3
                         Index Cond: (_dist_hyper_7_40_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_43_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_43_chunk  (cost=0.28..283.16 rows=5038 width=36)
                         Output: _dist_hyper_7_43_chunk."time", _dist_hyper_7_43_chunk.device_id, _dist_hyper_7_43_chunk.v0, _dist_hyper_7_43_chunk.v1, _dist_hyper_7_43_chunk.v2, _dist_hyper_7_43_chunk.v3
                         Index Cond: (_dist_hyper_7_43_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))

         ->  Custom Scan (DataNodeScan) on public.metrics_dist metrics_dist_2  (cost=100.00..3668.73 rows=28868 width=36)
               Output: metrics_dist_2."time", metrics_dist_2.device_id, metrics_dist_2.v0, metrics_dist_2.v1, metrics_dist_2.v2, metrics_dist_2.v3
               Data node: data_node_2
               Fetcher Type: Row by row
               Chunks: _dist_hyper_7_38_chunk, _dist_hyper_7_41_chunk, _dist_hyper_7_44_chunk
               Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist WHERE _timescaledb_internal.chunks_in(public.metrics_dist.*, ARRAY[1, 2, 3]) AND ((device_id = ANY ('{1,2,3,4,5}'::integer[])))
               Remote EXPLAIN:
                 Append  (cost=0.29..1923.78 rows=28868 width=36)
                   ->  Index Scan using _dist_hyper_7_38_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_38_chunk  (cost=0.29..472.10 rows=7596 width=36)
                         Output: _dist_hyper_7_38_chunk."time", _dist_hyper_7_38_chunk.device_id, _dist_hyper_7_38_chunk.v0, _dist_hyper_7_38_chunk.v1, _dist_hyper_7_38_chunk.v2, _dist_hyper_7_38_chunk.v3
                         Index Cond: (_dist_hyper_7_38_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_41_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_41_chunk  (cost=0.29..653.67 rows=10636 width=36)
                         Output: _dist_hyper_7_41_chunk."time", _dist_hyper_7_41_chunk.device_id, _dist_hyper_7_41_chunk.v0, _dist_hyper_7_41_chunk.v1, _dist_hyper_7_41_chunk.v2, _dist_hyper_7_41_chunk.v3
                         Index Cond: (_dist_hyper_7_41_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_44_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_44_chunk  (cost=0.29..653.67 rows=10636 width=36)
                         Output: _dist_hyper_7_44_chunk."time", _dist_hyper_7_44_chunk.device_id, _dist_hyper_7_44_chunk.v0, _dist_hyper_7_44_chunk.v1, _dist_hyper_7_44_chunk.v2, _dist_hyper_7_44_chunk.v3
                         Index Cond: (_dist_hyper_7_44_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))

         ->  Custom Scan (DataNodeScan) on public.metrics_dist metrics_dist_3  (cost=100.00..1655.86 rows=13674 width=36)
               Output: metrics_dist_3."time", metrics_dist_3.device_id, metrics_dist_3.v0, metrics_dist_3.v1, metrics_dist_3.v2, metrics_dist_3.v3
               Data node: data_node_3
               Fetcher Type: Row by row
               Chunks: _dist_hyper_7_39_chunk, _dist_hyper_7_42_chunk, _dist_hyper_7_45_chunk
               Remote SQL: SELECT "time", device_id, v0, v1, v2, v3 FROM public.metrics_dist WHERE _timescaledb_internal.chunks_in(public.metrics_dist.*, ARRAY[1, 2, 3]) AND ((device_id = ANY ('{1,2,3,4,5}'::integer[])))
               Remote EXPLAIN:
                 Append  (cost=0.28..843.19 rows=13674 width=36)
                   ->  Index Scan using _dist_hyper_7_39_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_39_chunk  (cost=0.28..208.51 rows=3598 width=36)
                         Output: _dist_hyper_7_39_chunk."time", _dist_hyper_7_39_chunk.device_id, _dist_hyper_7_39_chunk.v0, _dist_hyper_7_39_chunk.v1, _dist_hyper_7_39_chunk.v2, _dist_hyper_7_39_chunk.v3
                         Index Cond: (_dist_hyper_7_39_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_42_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_42_chunk  (cost=0.28..283.16 rows=5038 width=36)
                         Output: _dist_hyper_7_42_chunk."time", _dist_hyper_7_42_chunk.device_id, _dist_hyper_7_42_chunk.v0, _dist_hyper_7_42_chunk.v1, _dist_hyper_7_42_chunk.v2, _dist_hyper_7_42_chunk.v3
                         Index Cond: (_dist_hyper_7_42_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))
                   ->  Index Scan using _dist_hyper_7_45_chunk_metrics_dist_device_id_time_idx on _timescaledb_internal._dist_hyper_7_45_chunk  (cost=0.28..283.16 rows=5038 width=36)
                         Output: _dist_hyper_7_45_chunk."time", _dist_hyper_7_45_chunk.device_id, _dist_hyper_7_45_chunk.v0, _dist_hyper_7_45_chunk.v1, _dist_hyper_7_45_chunk.v2, _dist_hyper_7_45_chunk.v3
                         Index Cond: (_dist_hyper_7_45_chunk.device_id = ANY ('{1,2,3,4,5}'::integer[]))

(57 rows)

@sven thanks for the suggestion! I had tried using ANY originally but couldn’t figure out how to make Postgres type system happy so I had switched to the array specific operator (&&). After reading your post I went back and figured out how to make this work. As you say with the ANY formulation the data nodes are able to use index scans and performance is ~1.5-2x better than the && formulation. I prefer a subselect over a function as functions seem to mess with the query planner (even when marked immutable) and lead to poor performance. This brings the query much closer to ElasticSearch, but still takes ~2x (down from ~5x).

with power_series_data as (
  select id, labels
  from series_telemetry_power s
  where
    s.labels && (select array_agg(l.id)::int[] from label l where l.key = 'Location' and substring(l.value from '^x[0-9]+c[0-9]+s[0-9]+b[0-9]+$') in ('x9000c1s0b0','x9000c1s0b1','x9000c1s1b0','x9000c1s1b1','x9000c1s2b0','x9000c1s2b1','x9000c1s4b0','x9000c1s4b1'))
)
select
  time_bucket(60000, m.ts) time,
  avg(val) Power
from telemetry_power m
where
  m.series_id = ANY((select array_agg(id) from power_series_data)::int[])
  and m.ts between 1651708800000 and 1652399999000
group by time
order by 1 desc;

@troy,

On the query plan using the CAGG, I noticed two things. Curious what your thoughts are.

  1. The data coming from the CAGG only takes ~50ms to query. The majority of your time is spent planning and querying the underlying distributed hypertable for recently added data.
  2. Unless I’m reading the query plan incorrectly, that append for “real-time” aggregation returned zero rows. Is that expected? Does this need to be a real-time aggregate and/or could you start with a smaller time_bucket on the CAGG (allowing you to refresh it a little more frequently) and then rollup to 1-hour buckets since you’re using one of the hyperfunction aggregations?