Schedule backfill

I’m working with compression, and have started looking at backfill.

I want to schedule the backfill with decompress_backfill and add_job, but something seems off.

I add the job with this command:
SELECT add_job(‘decompress_backfill’,‘1 days’,’{“staging_table”:“backfill_event_data”, “destination_hypertable”:“event_data”}’, NOW());

But when I then try to run the job, I get this:
ERROR: function public.decompress_backfill(integer, jsonb) does not exist
SQL state: 42883

I can’t get my head around it.

Any insight would be much appreciated.

1 Like

Hi @benneharli ,

Can you share how you run the job and what the decompress_backfill function looks like?

Hi,

Usually I’d just run it as this in a query: CALL decompress_backfill(staging_table=>‘backfill_event_data’, destination_hypertable=>‘event_data’);

When adding the job, I do it like this: SELECT add_job(‘decompress_backfill’,‘x days’,’{“staging_table”:“backfill_event_data”, “destination_hypertable”:“event_data”}’, NOW());

When I try to run the job, I do it like this where is the job number/index: CALL run_job(n);

Below is the function documentation.

NOTICE: CREATE OR REPLACE PROCEDURE public.decompress_backfill(staging_table regclass, destination_hypertable regclass, on_conflict_action text DEFAULT ‘NOTHING’::text, delete_from_staging boolean DEFAULT true, compression_job_push_interval interval DEFAULT ‘1 day’::interval, on_conflict_update_columns text[] DEFAULT ‘{}’::text[], skip_empty_ranges boolean DEFAULT false)
LANGUAGE plpgsql
AS procedure
DECLARE
source text := staging_table::text; – Forms a properly quoted table name from our regclass
dest text := destination_hypertable::text;

dest_nspname name;
dest_relname name;

hypertable_row _timescaledb_catalog.hypertable;
dimension_row _timescaledb_catalog.dimension;
dimension_slice_row _timescaledb_catalog.dimension_slice;

min_time_internal bigint;
max_time_internal bigint;

unformatted_move_stmt text ;
on_conflict_clause text := '';
r_start text := NULL;
r_end text := NULL;
r_end_prev text := NULL;
affected bigint;
old_compression_job_time timestamptz;
chunks_decompressed bool;
current_slice_has_rows boolean := true;

BEGIN
SELECT (get_schema_and_table_name(destination_hypertable)).* INTO STRICT dest_nspname, dest_relname;
–This should throw an error if we can’t cast the staging table’s type into the hypertable’s type, which means the inserts won’t work.
EXECUTE FORMAT('SELECT row(h.)::%1$s FROM %2$s AS h LIMIT 1’, source, dest);
– Make sure our source table has been analyzed so our selects are better later
EXECUTE FORMAT(‘ANALYZE %s’, source);
–Get our hypertable
SELECT h.
INTO STRICT hypertable_row FROM _timescaledb_catalog.hypertable h
WHERE table_name = dest_relname AND schema_name = dest_nspname ;

--And our time dimension, which is always the first dimension
SELECT d.* INTO STRICT dimension_row FROM _timescaledb_catalog.dimension d WHERE hypertable_id = hypertable_row.id ORDER BY id LIMIT 1 ;

-- Push the compression job out for some period of time so we don't end up compressing a decompressed chunk 
-- Don't disable completely because at least then if we fail and fail to move it back things won't get completely weird
SELECT move_compression_job(hypertable_row.id, hypertable_row.schema_name, hypertable_row.table_name, now() + compression_job_push_interval) INTO old_compression_job_time;
--Get the min and max times in timescale internal format from the source table, this will tell us which chunks we need to decompress
EXECUTE FORMAT($$SELECT _timescaledb_internal.time_to_internal(min(%1$I)) , 
    _timescaledb_internal.time_to_internal(max(%1$I)) 
    FROM %2$s $$, dimension_row.column_name, source)
    INTO STRICT min_time_internal, max_time_internal;

--Set up our move statement to be used with the right formatting in each of the loop executions
-- Note that the table names and literal time values are properly formatted outside and so are 
-- passed in as raw strings. We cannot re-format as they will then have extra quotes.
IF delete_from_staging THEN 
    unformatted_move_stmt = $$  
        WITH to_insert AS (DELETE 
        FROM %1$s --source table
        WHERE %2$I >= %3$s -- time column >= range start
        AND %2$I < %4$s -- time column < range end
        RETURNING * )
        INSERT INTO %5$s 
        SELECT * FROM to_insert
        %6$s -- ON CONFLICT CLAUSE if it exists
        $$;
ELSE
    unformatted_move_stmt = $$  
        WITH to_insert AS (SELECT *
        FROM %1$s --source table
        WHERE %2$I >= %3$s -- time column >= range start
        AND %2$I < %4$s) -- time column < range end)
        INSERT INTO %5$s 
        SELECT * FROM to_insert
        %6$s -- ON CONFLICT CLAUSE if it exists
        $$;
END IF;
IF UPPER(on_conflict_action) = 'NOTHING' THEN
    on_conflict_clause = 'ON CONFLICT DO NOTHING';
ELSEIF UPPER(on_conflict_action) = 'UPDATE' THEN
    SELECT 'ON CONFLICT DO UPDATE SET ' || STRING_AGG(FORMAT('%1$I = EXCLUDED.%1$I', on_conflict_update_column), ', ')
    FROM UNNEST(on_conflict_update_columns) AS on_conflict_update_column INTO on_conflict_clause;
END IF;
--Loop through the dimension slices that that are impacted
FOR dimension_slice_row IN 
    SELECT ds.* 
    FROM _timescaledb_catalog.dimension_slice ds 
    WHERE dimension_id = dimension_row.id
    -- find the dimension slices that overlap with the data in our staging table 
    -- the range_ends are non inclusive, the range_starts are inclusive
    AND max_time_internal >= ds.range_start AND min_time_internal < ds.range_end
    ORDER BY ds.range_end
LOOP
    --Set the previous r_end, so that we can insert from the previous (or the min) to
    --the start, this will catch any rows that are in the source table for which we
    --haven't yet made a chunk in the dest hypertable. 
    r_end_prev = COALESCE(r_end, _timescaledb_internal.time_literal_sql(min_time_internal, dimension_row.column_type));
    -- now actually move rows
    r_start = _timescaledb_internal.time_literal_sql(dimension_slice_row.range_start, dimension_row.column_type);
    r_end = _timescaledb_internal.time_literal_sql(dimension_slice_row.range_end, dimension_row.column_type);
    
    -- catch any stray rows that fall into a chunk that doesn't exist yet by expanding
    -- our range to the lower of r_end_prev and r_start, there is a case where r_start
    -- can be lower, which is if r_end_prev was actually the minimum in the in the
    -- source table.  We won't compress the new chunks that are created, the
    -- compression job will pick those up when we re-activate it.
    r_start =LEAST(r_end_prev, r_start);
    -- check if the current slice contains data that needs to be moved
    IF skip_empty_ranges THEN
        EXECUTE FORMAT(
            'SELECT count(*) > 0 FROM %1$s WHERE %2$s >= %3$s AND %2$s < %4$s LIMIT 1',
            source, dimension_row.column_name, r_start, r_end)
        INTO current_slice_has_rows;
    END IF;
    -- skip if there is nothing to move and the flag is set
    CONTINUE WHEN skip_empty_ranges AND NOT current_slice_has_rows;
    -- decompress the chunks in the dimension slice, committing transactions after each decompress
    CALL decompress_dimension_slice(dimension_slice_row, chunks_decompressed);
    EXECUTE FORMAT(unformatted_move_stmt
        , source 
        , dimension_row.column_name
        , r_start 
        , r_end
        , dest 
        , on_conflict_clause
        );
    GET DIAGNOSTICS affected = ROW_COUNT;
    RAISE NOTICE '% rows moved in range % to %', affected, r_start, r_end ;
    COMMIT;
    -- recompress the chunks in the dimension slice, committing transactions after each recompress
    IF chunks_decompressed THEN
        CALL compress_dimension_slice(dimension_slice_row);
    END IF;
END LOOP;
-- catch any stray rows that fall into new chunks that need to be created between our
-- final chunk and the max in the source table, We won't compress the new chunks that are
-- created, the job will pick those up when we re-activate it.
r_start = COALESCE(r_end, _timescaledb_internal.time_literal_sql(min_time_internal, dimension_row.column_type)); --if there were no rows inserted into a chunk, r_end wouldn't be defined.
r_end = _timescaledb_internal.time_literal_sql(max_time_internal+1, dimension_row.column_type); -- add one here, so that we can still use < rather than <= (our internal representation is a bigint)
EXECUTE FORMAT(unformatted_move_stmt
    , source 
    , dimension_row.column_name
    , r_start
    , r_end
    , dest 
    , on_conflict_clause
    );
GET DIAGNOSTICS affected = ROW_COUNT;
RAISE NOTICE '% rows moved in range % to %', affected, r_start, r_end ;
COMMIT;

–Move our job back to where it was
SELECT move_compression_job(hypertable_row.id, hypertable_row.schema_name, hypertable_row.table_name, old_compression_job_time) INTO old_compression_job_time;
COMMIT;
END;
procedure

DO

Hey @benneharli,

Running a scheduled job as a User-defined Action requires a stored procedure with a specific parameter signature. The backfill script that you’re using from our TimescaleDB-extras repo wasn’t written to be used with the UDA engine.

Instead, you’ll have to create a wrapper stored procedure that takes in the configuration you’re attempting to use (eg. {“staging_table”:“backfill_event_data”, “destination_hypertable”:“event_data”}) and then parse that config input and call the backfill procedure accordingly.

As a very rough example modified from the example shown here (this is not meant to be run directly, modify as you see fit):

CREATE OR REPLACE PROCEDURE backfill_on_schedule (job_id int, config jsonb)
LANGUAGE PLPGSQL
AS $$
DECLARE
  source_table regclass;
  destination_table regclass;
BEGIN
  SELECT jsonb_object_field_text (config, 'staging_table')::text INTO STRICT source_table;
  SELECT jsonb_object_field_text (config, 'destination_hypertable')::text INTO STRICT destination_table;

  IF source_table IS NULL THEN
    RAISE EXCEPTION 'Config must provide the source table';
  END IF;

  CALL decompress_backfill(staging_table=>source_table, destination_hypertable=>destination_table);
END
$$;

You would then schedule this procedure to call the backfill job something like:

SELECT add_job(‘backfill_on_schedule’,‘x days’,’{“staging_table”:“backfill_event_data”, “destination_hypertable”:“event_data”}’, NOW());

Does that help?

1 Like

If that helps? Well, that would be the understatement of the century :slight_smile:

I’ll give it a whirl, but it seems like exactly what I was after!

It seems ::interval needs to be ::text, but other than that, it’s great!

1 Like

Oops! that’s what copy/paste gets you. :slight_smile: Thanks for pointing it out. I updated the example above.

Glad it was helpful! :tada: