Skip to content

Commit

Permalink
Fix if_not_exists behavior for CAgg policy with NULL offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
konskov committed Jan 17, 2024
1 parent 15c14bd commit 55ae29c
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 22 deletions.
3 changes: 3 additions & 0 deletions .unreleased/bugfix_5688
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Implements: #6531 Fix if_not_exists behavior for CAgg policy with NULL offsets

Fixes: #5688
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/compression_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
POL_COMPRESSION_CONF_KEY_COMPRESS_AFTER,
partitioning_type,
compress_after_type,
compress_after_datum);
compress_after_datum,
false /* isnull */);
else
{
Assert(created_before != NULL);
Expand All @@ -221,7 +222,8 @@ policy_compression_add_internal(Oid user_rel_oid, Datum compress_after_datum,
POL_COMPRESSION_CONF_KEY_COMPRESS_CREATED_BEFORE,
partitioning_type,
INTERVALOID,
IntervalPGetDatum(created_before));
IntervalPGetDatum(created_before),
false /* isnull */);
}

if (is_equal)
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,14 @@ policy_refresh_cagg_add_internal(Oid cagg_oid, Oid start_offset_type, NullableDa
POL_REFRESH_CONF_KEY_START_OFFSET,
cagg->partition_type,
policyconf.offset_start.type,
policyconf.offset_start.value) &&
policyconf.offset_start.value,
policyconf.offset_start.isnull) &&
policy_config_check_hypertable_lag_equality(existing->fd.config,
POL_REFRESH_CONF_KEY_END_OFFSET,
cagg->partition_type,
policyconf.offset_end.type,
policyconf.offset_end.value))
policyconf.offset_end.value,
policyconf.offset_end.isnull))
{
/* If all arguments are the same, do nothing */
ereport(NOTICE,
Expand Down
28 changes: 25 additions & 3 deletions tsl/src/bgw_policy/policy_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "jsonb_utils.h"
#include "policy_utils.h"
#include "time_utils.h"
#include "policies_v2.h"

/* Helper function to compare jsonb label value in the config
* with passed in value.
Expand All @@ -30,18 +31,33 @@
*/
bool
policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label,
Oid partitioning_type, Oid lag_type, Datum lag_datum)
Oid partitioning_type, Oid lag_type, Datum lag_datum,
bool isnull)
{
/*
* start_offset and end_offset for CAgg policies are allowed to have NULL values
* In that case, config_value will be NULL but this is not an error
*/

bool null_ok = (strcmp(json_label, POL_REFRESH_CONF_KEY_END_OFFSET) == 0 ||
strcmp(json_label, POL_REFRESH_CONF_KEY_START_OFFSET) == 0);

if (IS_INTEGER_TYPE(partitioning_type) && lag_type != INTERVALOID)
{
bool found;
int64 config_value = ts_jsonb_get_int64_field(config, json_label, &found);

if (!found)
if (!found && !null_ok)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for existing job", json_label)));

if (!found && isnull)
return true;

if ((!found && !isnull) || (found && isnull))
return false;

switch (lag_type)
{
case INT2OID:
Expand All @@ -59,11 +75,17 @@ policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_labe
if (lag_type != INTERVALOID)
return false;
Interval *config_value = ts_jsonb_get_interval_field(config, json_label);
if (config_value == NULL)
if (config_value == NULL && !null_ok)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not find %s in config for job", json_label)));

if (config_value == NULL && isnull)
return true;

if ((config_value == NULL && !isnull) || (config_value != NULL && isnull))
return false;

return DatumGetBool(
DirectFunctionCall2(interval_eq, IntervalPGetDatum(config_value), lag_datum));
}
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/bgw_policy/policy_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
#include <postgres.h>
#include "job.h"
bool policy_config_check_hypertable_lag_equality(Jsonb *config, const char *json_label,
Oid dim_type, Oid lag_type, Datum lag_datum);
Oid dim_type, Oid lag_type, Datum lag_datum,
bool isnull);
int64 subtract_integer_from_now_internal(int64 interval, Oid time_dim_type, Oid now_func,
bool *overflow);
Datum subtract_interval_from_now(Interval *lag, Oid time_dim_type);
Expand Down
6 changes: 4 additions & 2 deletions tsl/src/bgw_policy/retention_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
POL_RETENTION_CONF_KEY_DROP_AFTER,
partitioning_type,
window_type,
window_datum);
window_datum,
false /* isnull */);
else
{
Assert(created_before != NULL);
Expand All @@ -227,7 +228,8 @@ policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE,
partitioning_type,
INTERVALOID,
IntervalPGetDatum(created_before));
IntervalPGetDatum(created_before),
false /* isnull */);
}

if (is_equal)
Expand Down
139 changes: 127 additions & 12 deletions tsl/test/expected/cagg_policy.out
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,14 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
\set ON_ERROR_STOP 0
\set VERBOSITY default
SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_exists=>true);
ERROR: could not find start_offset in config for existing job
WARNING: continuous aggregate policy already exists for "mat_m1"
DETAIL: A policy already exists with different arguments.
HINT: Remove the existing policy before adding a new one.
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('int_tab');
ERROR: "int_tab" is not a continuous aggregate
SELECT remove_continuous_aggregate_policy('mat_m1');
Expand All @@ -282,6 +289,63 @@ SELECT remove_continuous_aggregate_policy('mat_m1');

(1 row)

-- add with NULL offset, readd with NULL offset
SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true);
add_continuous_aggregate_policy
---------------------------------
1010
(1 row)

SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE
NOTICE: continuous aggregate policy already exists for "mat_m1", skipping
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING
WARNING: continuous aggregate policy already exists for "mat_m1"
DETAIL: A policy already exists with different arguments.
HINT: Remove the existing policy before adding a new one.
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('mat_m1');
remove_continuous_aggregate_policy
------------------------------------

(1 row)

SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true);
add_continuous_aggregate_policy
---------------------------------
1011
(1 row)

SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true);
NOTICE: continuous aggregate policy already exists for "mat_m1", skipping
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true);
WARNING: continuous aggregate policy already exists for "mat_m1"
DETAIL: A policy already exists with different arguments.
HINT: Remove the existing policy before adding a new one.
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('mat_m1');
remove_continuous_aggregate_policy
------------------------------------

(1 row)

--this one will fail
SELECT remove_continuous_aggregate_policy('mat_m1');
ERROR: continuous aggregate policy not found for "mat_m1"
Expand Down Expand Up @@ -523,7 +587,7 @@ SELECT timescaledb_experimental.show_policies('max_mat_view_date');
SELECT add_retention_policy('continuous_agg_max_mat_date', '25 days'::interval);
add_retention_policy
----------------------
1025
1027
(1 row)

SELECT timescaledb_experimental.alter_policies('max_mat_view_date', refresh_start_offset =>'25 days'::interval);
Expand Down Expand Up @@ -613,13 +677,13 @@ SELECT add_job('custom_func','1h', config:='{"type":"function"}'::jsonb, initial
SELECT _timescaledb_functions.alter_job_set_hypertable_id( :job_id, 'max_mat_view_date'::regclass);
alter_job_set_hypertable_id
-----------------------------
1027
1029
(1 row)

SELECT * FROM timescaledb_information.jobs WHERE job_id != 1 ORDER BY 1;
job_id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | config | next_start | initial_start | hypertable_schema | hypertable_name | check_schema | check_name
--------+----------------------------+-------------------+-------------+-------------+--------------+-------------+-------------+-------------------+-----------+----------------+----------------------+------------------------------+------------------------------+-----------------------+----------------------------+--------------+------------
1027 | User-Defined Action [1027] | @ 1 hour | @ 0 | -1 | @ 5 mins | public | custom_func | default_perm_user | t | t | {"type": "function"} | Fri Dec 31 16:00:00 1999 PST | Fri Dec 31 16:00:00 1999 PST | _timescaledb_internal | _materialized_hypertable_6 | |
1029 | User-Defined Action [1029] | @ 1 hour | @ 0 | -1 | @ 5 mins | public | custom_func | default_perm_user | t | t | {"type": "function"} | Fri Dec 31 16:00:00 1999 PST | Fri Dec 31 16:00:00 1999 PST | _timescaledb_internal | _materialized_hypertable_6 | |
(1 row)

SELECT timescaledb_experimental.remove_all_policies('max_mat_view_date', true); -- ignore custom job
Expand Down Expand Up @@ -665,7 +729,7 @@ DETAIL: The start and end offsets must cover at least two buckets in the valid
SELECT add_continuous_aggregate_policy('max_mat_view_date', '13 days', '-1 day', '1 day'::interval);
add_continuous_aggregate_policy
---------------------------------
1028
1030
(1 row)

SELECT remove_continuous_aggregate_policy('max_mat_view_date');
Expand All @@ -678,7 +742,7 @@ SELECT remove_continuous_aggregate_policy('max_mat_view_date');
SELECT add_continuous_aggregate_policy('max_mat_view_date', NULL, NULL, '1 day'::interval);
add_continuous_aggregate_policy
---------------------------------
1029
1031
(1 row)

SELECT remove_continuous_aggregate_policy('max_mat_view_date');
Expand Down Expand Up @@ -721,7 +785,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '1000000 years', '1 day' , '1 h'::interval);
add_continuous_aggregate_policy
---------------------------------
1031
1033
(1 row)

SELECT remove_continuous_aggregate_policy('max_mat_view_timestamp');
Expand Down Expand Up @@ -767,7 +831,12 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;

\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '15 day', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: could not find start_offset in config for job
WARNING: continuous aggregate policy already exists for "max_mat_view_timestamp"
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: invalid input syntax for type interval: "xyz"
\set ON_ERROR_STOP 1
Expand Down Expand Up @@ -1086,13 +1155,13 @@ ERROR: compress_after value for compression policy should be greater than the s
SELECT add_compression_policy('mat_smallint', 5::smallint);
add_compression_policy
------------------------
1051
1053
(1 row)

SELECT add_compression_policy('mat_bigint', 20::bigint);
add_compression_policy
------------------------
1052
1054
(1 row)

-- end of coverage tests
Expand Down Expand Up @@ -1137,6 +1206,52 @@ SELECT time_bucket('1 day', time) as dayb, device_id,
FROM metrics
GROUP BY 1, 2
WITH NO DATA;
-- this was previously crashing
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, NULL, '1 h'::interval, if_not_exists => true);
add_continuous_aggregate_policy
---------------------------------
1055
(1 row)

SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval, if_not_exists => true);
WARNING: continuous aggregate policy already exists for "metrics_cagg"
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('metrics_cagg');
remove_continuous_aggregate_policy
------------------------------------

(1 row)

SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true);
add_continuous_aggregate_policy
---------------------------------
1056
(1 row)

SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE
NOTICE: continuous aggregate policy already exists for "metrics_cagg", skipping
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING
WARNING: continuous aggregate policy already exists for "metrics_cagg"
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)

SELECT remove_continuous_aggregate_policy('metrics_cagg');
remove_continuous_aggregate_policy
------------------------------------

(1 row)

--can set compression policy only after setting up refresh policy --
\set ON_ERROR_STOP 0
SELECT add_compression_policy('metrics_cagg', '1 day'::interval);
Expand All @@ -1155,7 +1270,7 @@ ERROR: cannot use "compress_created_before" with continuous aggregate "metrics_
SELECT add_compression_policy('metrics_cagg', '8 day'::interval) AS "COMP_JOB" ;
COMP_JOB
----------
1054
1058
(1 row)

SELECT remove_compression_policy('metrics_cagg');
Expand Down Expand Up @@ -1297,7 +1412,7 @@ SELECT set_integer_now_func('t', 'unix_now');
SELECT add_retention_policy('t', 20);
add_retention_policy
----------------------
1056
1060
(1 row)

CREATE MATERIALIZED VIEW cagg(a, sumb) WITH (timescaledb.continuous)
Expand Down
18 changes: 18 additions & 0 deletions tsl/test/sql/cagg_policy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ SELECT add_continuous_aggregate_policy('mat_m1', 20, 10, '1h'::interval, if_not_

SELECT remove_continuous_aggregate_policy('int_tab');
SELECT remove_continuous_aggregate_policy('mat_m1');
-- add with NULL offset, readd with NULL offset
SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_m1', 20, NULL, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE
SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING
SELECT remove_continuous_aggregate_policy('mat_m1');
SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_m1', NULL, 20, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_m1', NULL, NULL, '1h'::interval, if_not_exists=>true);
SELECT remove_continuous_aggregate_policy('mat_m1');

--this one will fail
SELECT remove_continuous_aggregate_policy('mat_m1');
SELECT remove_continuous_aggregate_policy('mat_m1', if_not_exists=>true);
Expand Down Expand Up @@ -533,6 +543,14 @@ FROM metrics
GROUP BY 1, 2
WITH NO DATA;

-- this was previously crashing
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, NULL, '1 h'::interval, if_not_exists => true);
SELECT add_continuous_aggregate_policy('metrics_cagg', '7 day'::interval, '1 day'::interval, '1 h'::interval, if_not_exists => true);
SELECT remove_continuous_aggregate_policy('metrics_cagg');
SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, '1 day'::interval, '1h'::interval, if_not_exists=>true); -- same param values, so we get a NOTICE
SELECT add_continuous_aggregate_policy('metrics_cagg', NULL, NULL, '1h'::interval, if_not_exists=>true); -- different values, so we get a WARNING
SELECT remove_continuous_aggregate_policy('metrics_cagg');
--can set compression policy only after setting up refresh policy --
\set ON_ERROR_STOP 0
SELECT add_compression_policy('metrics_cagg', '1 day'::interval);
Expand Down

0 comments on commit 55ae29c

Please sign in to comment.