Skip to content

Commit

Permalink
Fix NULL start value handling in CAgg refresh
Browse files Browse the repository at this point in the history
The CAgg refresh job did not handle the NULL value of start_offset for a
time_bucket function with a variable width properly. This problem has
led to the creation of invalid invalidation records and 'timestamp out
of range' errors during the next refresh.

Fixes: timescale#5474
  • Loading branch information
jnidzwetzki committed Mar 5, 2024
1 parent a1cd7c4 commit 2858110
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 8 deletions.
1 change: 1 addition & 0 deletions .unreleased/bugfix_6729
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes: #6729 Fix NULL start value handling in CAgg refresh
11 changes: 9 additions & 2 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,20 @@ get_time_from_config(const Dimension *dim, const Jsonb *config, const char *json
}

int64
policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config, bool *start_isnull)
policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dimension *dim,
const Jsonb *config, bool *start_isnull)
{
int64 res = get_time_from_config(dim, config, POL_REFRESH_CONF_KEY_START_OFFSET, start_isnull);

/* interpret NULL as min value for that type */
if (*start_isnull)
return ts_time_get_min(ts_dimension_get_partition_type(dim));
{
Oid type = ts_dimension_get_partition_type(dim);

return ts_continuous_agg_bucket_width_variable(cagg) ? ts_time_get_nobegin_or_min(type) :
ts_time_get_min(type);
}

return res;
}

Expand Down
4 changes: 2 additions & 2 deletions tsl/src/bgw_policy/continuous_aggregate_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ extern Datum policy_refresh_cagg_check(PG_FUNCTION_ARGS);
extern Datum policy_refresh_cagg_remove(PG_FUNCTION_ARGS);

int32 policy_continuous_aggregate_get_mat_hypertable_id(const Jsonb *config);
int64 policy_refresh_cagg_get_refresh_start(const Dimension *dim, const Jsonb *config,
bool *start_isnull);
int64 policy_refresh_cagg_get_refresh_start(const ContinuousAgg *cagg, const Dimension *dim,
const Jsonb *config, bool *start_isnull);
int64 policy_refresh_cagg_get_refresh_end(const Dimension *dim, const Jsonb *config,
bool *end_isnull);
bool policy_refresh_cagg_refresh_start_lt(int32 materialization_id, Oid cmp_type,
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,11 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
errmsg("configuration materialization hypertable id %d not found",
materialization_id)));

ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id, false);

open_dim = get_open_dimension_for_hypertable(mat_ht, true);
dim_type = ts_dimension_get_partition_type(open_dim);
refresh_start = policy_refresh_cagg_get_refresh_start(open_dim, config, &start_isnull);
refresh_start = policy_refresh_cagg_get_refresh_start(cagg, open_dim, config, &start_isnull);
refresh_end = policy_refresh_cagg_get_refresh_end(open_dim, config, &end_isnull);

if (refresh_start >= refresh_end)
Expand All @@ -420,7 +422,7 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
policy_data->refresh_window.type = dim_type;
policy_data->refresh_window.start = refresh_start;
policy_data->refresh_window.end = refresh_end;
policy_data->cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id, false);
policy_data->cagg = cagg;
policy_data->start_is_null = start_isnull;
policy_data->end_is_null = end_isnull;
}
Expand Down
41 changes: 40 additions & 1 deletion tsl/test/expected/cagg_invalidation.out
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ WHERE cagg_id = :cond_10_id;

-- should trigger two individual refreshes
CALL refresh_continuous_aggregate('cond_10', 0, 200);
-- Allow at most 5 individual invalidations per refreshe
-- Allow at most 5 individual invalidations per refresh
SET timescaledb.materializations_per_refresh_window=5;
-- Insert into every second bucket
INSERT INTO conditions VALUES (20, 1, 1.0);
Expand Down Expand Up @@ -1226,6 +1226,7 @@ CALL refresh_continuous_aggregate('cond_10', 0, 200);
WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window"
DETAIL: Expected an integer but current value is "-".
\set VERBOSITY terse
RESET timescaledb.materializations_per_refresh_window;
-- Test refresh with undefined invalidation threshold and variable sized buckets
CREATE TABLE timestamp_ht (
time timestamptz NOT NULL,
Expand Down Expand Up @@ -1275,3 +1276,41 @@ CREATE MATERIALIZED VIEW temperature_1month_hierarchical_ts
FROM temperature_4h_2
GROUP BY 1 ORDER BY 1;
NOTICE: continuous aggregate "temperature_1month_hierarchical_ts" is already up-to-date
---------------------------------------------------------------------
--- Issue 5474
---------------------------------------------------------------------
CREATE TABLE i5474 (
time timestamptz NOT NULL,
sensor_id integer NOT NULL,
cpu double precision NOT NULL,
temperature double precision NOT NULL);
SELECT create_hypertable('i5474','time');
create_hypertable
---------------------
(16,public,i5474,t)
(1 row)

CREATE MATERIALIZED VIEW i5474_summary_daily
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time, 'AWST') AS bucket,
sensor_id,
avg(cpu) AS avg_cpu
FROM i5474
GROUP BY bucket, sensor_id;
NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date
SELECT add_continuous_aggregate_policy('i5474_summary_daily',
start_offset => NULL,
end_offset => INTERVAL '10 MINUTES',
schedule_interval => INTERVAL '1 MINUTE'
) new_job_id \gset
-- Check that start_offset = NULL is handled properly by the refresh job...
CALL run_job(:new_job_id);
-- ...and the CAgg can be refreshed afterward
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-03-21 05:00:00+00');
NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date
INSERT INTO i5474 (time, sensor_id, cpu, temperature) VALUES ('2000-01-01 05:00:00+00', 1, 1.0, 1.0);
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');
-- CAgg should be up-to-date now
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');
NOTICE: continuous aggregate "i5474_summary_daily" is already up-to-date
40 changes: 39 additions & 1 deletion tsl/test/sql/cagg_invalidation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ WHERE cagg_id = :cond_10_id;
-- should trigger two individual refreshes
CALL refresh_continuous_aggregate('cond_10', 0, 200);

-- Allow at most 5 individual invalidations per refreshe
-- Allow at most 5 individual invalidations per refresh
SET timescaledb.materializations_per_refresh_window=5;

-- Insert into every second bucket
Expand Down Expand Up @@ -719,6 +719,7 @@ SET timescaledb.materializations_per_refresh_window='-';
INSERT INTO conditions VALUES (140, 1, 1.0);
CALL refresh_continuous_aggregate('cond_10', 0, 200);
\set VERBOSITY terse
RESET timescaledb.materializations_per_refresh_window;

-- Test refresh with undefined invalidation threshold and variable sized buckets
CREATE TABLE timestamp_ht (
Expand Down Expand Up @@ -765,3 +766,40 @@ CREATE MATERIALIZED VIEW temperature_1month_hierarchical_ts
SELECT time_bucket('1 month', bucket_4h, 'Europe/Berlin'), avg(average)
FROM temperature_4h_2
GROUP BY 1 ORDER BY 1;

---------------------------------------------------------------------
--- Issue 5474
---------------------------------------------------------------------
CREATE TABLE i5474 (
time timestamptz NOT NULL,
sensor_id integer NOT NULL,
cpu double precision NOT NULL,
temperature double precision NOT NULL);

SELECT create_hypertable('i5474','time');

CREATE MATERIALIZED VIEW i5474_summary_daily
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time, 'AWST') AS bucket,
sensor_id,
avg(cpu) AS avg_cpu
FROM i5474
GROUP BY bucket, sensor_id;

SELECT add_continuous_aggregate_policy('i5474_summary_daily',
start_offset => NULL,
end_offset => INTERVAL '10 MINUTES',
schedule_interval => INTERVAL '1 MINUTE'
) new_job_id \gset

-- Check that start_offset = NULL is handled properly by the refresh job...
CALL run_job(:new_job_id);

-- ...and the CAgg can be refreshed afterward
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-03-21 05:00:00+00');
INSERT INTO i5474 (time, sensor_id, cpu, temperature) VALUES ('2000-01-01 05:00:00+00', 1, 1.0, 1.0);
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');

-- CAgg should be up-to-date now
CALL refresh_continuous_aggregate('i5474_summary_daily', NULL, '2023-01-01 01:00:00+00');

0 comments on commit 2858110

Please sign in to comment.