Skip to content

Commit

Permalink
Add migration for CAggs using time_bucket_ng
Browse files Browse the repository at this point in the history
The function time_bucket_ng is deprecated. This PR adds a migration path
for existing CAggs. Since time_bucket and time_bucket_ng use different
origin values, a custom origin is set if needed to let time_bucket
create the same buckets as created by time_bucket_ng so far.
  • Loading branch information
jnidzwetzki committed Apr 23, 2024
1 parent 8703497 commit c1d7753
Show file tree
Hide file tree
Showing 24 changed files with 5,231 additions and 35 deletions.
1 change: 1 addition & 0 deletions .unreleased/feature_6837
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #6837 Add migration path for CAggs using time_bucket_ng
5 changes: 5 additions & 0 deletions sql/cagg_migrate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -596,3 +596,8 @@ BEGIN
WHERE mat_hypertable_id OPERATOR(pg_catalog.=) _cagg_data.mat_hypertable_id;
END;
$BODY$;

-- Migrate a CAgg which is using the experimental time_bucket_ng function
-- into a CAgg using the regular time_bucket function
CREATE OR REPLACE PROCEDURE @extschema@.cagg_migrate_to_time_bucket(cagg REGCLASS)
AS '@MODULE_PATHNAME@', 'ts_continuous_agg_migrate_to_time_bucket' LANGUAGE C;
3 changes: 3 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,6 @@ DROP FUNCTION IF EXISTS _timescaledb_functions.policy_job_error_retention_check(
--
-- END bgw_job_stat_history
--

CREATE PROCEDURE @extschema@.cagg_migrate_to_time_bucket(cagg REGCLASS)
AS '@MODULE_PATHNAME@', 'ts_continuous_agg_migrate_to_time_bucket' LANGUAGE C;
3 changes: 3 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,6 @@ SET
check_name = 'policy_job_error_retention_check'
WHERE
id = 2;


DROP PROCEDURE IF EXISTS @extschema@.cagg_migrate_to_time_bucket(cagg REGCLASS);
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger);
CROSSMODULE_WRAPPER(continuous_agg_refresh);
CROSSMODULE_WRAPPER(continuous_agg_validate_query);
CROSSMODULE_WRAPPER(continuous_agg_get_bucket_function);
CROSSMODULE_WRAPPER(continuous_agg_migrate_to_time_bucket);
CROSSMODULE_WRAPPER(cagg_try_repair);

CROSSMODULE_WRAPPER(chunk_freeze_chunk);
Expand Down Expand Up @@ -334,6 +335,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.continuous_agg_update_options = continuous_agg_update_options_default,
.continuous_agg_validate_query = error_no_default_fn_pg_community,
.continuous_agg_get_bucket_function = error_no_default_fn_pg_community,
.continuous_agg_migrate_to_time_bucket = error_no_default_fn_pg_community,
.cagg_try_repair = process_cagg_try_repair,

/* compression */
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ typedef struct CrossModuleFunctions
WithClauseResult *with_clause_options);
PGFunction continuous_agg_validate_query;
PGFunction continuous_agg_get_bucket_function;
PGFunction continuous_agg_migrate_to_time_bucket;
PGFunction cagg_try_repair;

PGFunction compressed_data_send;
Expand Down
2 changes: 1 addition & 1 deletion src/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ extern void ts_extension_invalidate(void);
extern TSDLLEXPORT bool ts_extension_is_loaded(void);
extern void ts_extension_check_version(const char *so_version);
extern void ts_extension_check_server_version(void);
extern Oid ts_extension_schema_oid(void);
extern TSDLLEXPORT Oid ts_extension_schema_oid(void);
extern TSDLLEXPORT char *ts_extension_schema_name(void);
extern const char *ts_experimental_schema_name(void);
extern const char *ts_extension_get_so_name(void);
Expand Down
4 changes: 4 additions & 0 deletions src/time_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

#include "export.h"

#define TIME_BUCKET_NG_DEFAULT_ORIGIN_TIMETZ "2000-01-01 00:00:00+01"
#define TIME_BUCKET_NG_DEFAULT_ORIGIN_TIME "2000-01-01 00:00:00"
#define TIME_BUCKET_NG_DEFAULT_ORIGIN_DATE "2000-01-01"

extern TSDLLEXPORT Datum ts_int16_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_int32_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_int64_bucket(PG_FUNCTION_ARGS);
Expand Down
2 changes: 1 addition & 1 deletion src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ ts_continuous_agg_bucket_on_interval(Oid bucket_function)

/* The function has to be a currently allowed function or one of the deprecated bucketing
* functions */
Assert(func_info->allowed_in_cagg_definition || IS_DEPRECATED_BUCKET_FUNC(func_info));
Assert(func_info->allowed_in_cagg_definition || IS_DEPRECATED_TIME_BUCKET_NG_FUNC(func_info));

Oid first_bucket_arg = func_info->arg_types[0];

Expand Down
5 changes: 3 additions & 2 deletions src/ts_catalog/continuous_agg.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@
SetUserIdAndSecContext(saved_uid, saved_secctx); \
} while (0);

/* Does the function belong to a bucket function that is no longer allowed in CAgg definitions? */
#define IS_DEPRECATED_BUCKET_FUNC(funcinfo) \
/* Does the function belong to a time_bucket_ng function that is no longer allowed
* in CAgg definitions? */
#define IS_DEPRECATED_TIME_BUCKET_NG_FUNC(funcinfo) \
((funcinfo->origin == ORIGIN_TIMESCALE_EXPERIMENTAL) && \
(strcmp("time_bucket_ng", funcinfo->funcname) == 0))

Expand Down
32 changes: 30 additions & 2 deletions tsl/src/continuous_aggs/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ function_allowed_in_cagg_definition(Oid funcid)

/* Allow creation of CAggs with deprecated bucket function in debug builds for testing purposes
*/
if (ts_guc_debug_allow_cagg_with_deprecated_funcs && IS_DEPRECATED_BUCKET_FUNC(finfo))
if (ts_guc_debug_allow_cagg_with_deprecated_funcs && IS_DEPRECATED_TIME_BUCKET_NG_FUNC(finfo))
return true;

return false;
Expand Down Expand Up @@ -264,7 +264,7 @@ caggtimebucket_validate(CAggTimebucketInfo *tbinfo, List *groupClause, List *tar
* deprecated time_bucket_ng function). */
if (!function_allowed_in_cagg_definition(fe->funcid))
{
if (IS_DEPRECATED_BUCKET_FUNC(finfo))
if (IS_DEPRECATED_TIME_BUCKET_NG_FUNC(finfo))
{
if (is_cagg_create)
{
Expand Down Expand Up @@ -1569,3 +1569,31 @@ time_bucket_info_has_fixed_width(const CAggTimebucketInfo *tbinfo)
return tbinfo->bucket_time_width->month == 0 && tbinfo->bucket_time_timezone == NULL;
}
}

ContinuousAgg *
cagg_get_by_relid_or_fail(const Oid cagg_relid)
{
ContinuousAgg *cagg;

if (!OidIsValid(cagg_relid))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid continuous aggregate")));

cagg = ts_continuous_agg_find_by_relid(cagg_relid);

if (NULL == cagg)
{
const char *relname = get_rel_name(cagg_relid);

if (relname == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
(errmsg("continuous aggregate does not exist"))));
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
(errmsg("relation \"%s\" is not a continuous aggregate", relname))));
}

return cagg;
}
1 change: 1 addition & 0 deletions tsl/src/continuous_aggs/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ extern Oid get_watermark_function_oid(void);
extern Oid cagg_get_boundary_converter_funcoid(Oid typoid);

extern bool time_bucket_info_has_fixed_width(const CAggTimebucketInfo *tbinfo);
extern ContinuousAgg *cagg_get_by_relid_or_fail(const Oid cagg_relid);

static inline int64
cagg_get_time_min(const ContinuousAgg *cagg)
Expand Down
30 changes: 1 addition & 29 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ static void continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
int32 chunk_id, const bool do_merged_refresh,
const InternalTimeRange merged_refresh_window,
const CaggRefreshCallContext callctx);
static ContinuousAgg *get_cagg_by_relid(const Oid cagg_relid);
static void emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshCallContext callctx);
static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
Expand Down Expand Up @@ -621,33 +620,6 @@ continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
}
}

static ContinuousAgg *
get_cagg_by_relid(const Oid cagg_relid)
{
ContinuousAgg *cagg;

if (!OidIsValid(cagg_relid))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid continuous aggregate")));

cagg = ts_continuous_agg_find_by_relid(cagg_relid);

if (NULL == cagg)
{
const char *relname = get_rel_name(cagg_relid);

if (relname == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
(errmsg("continuous aggregate does not exist"))));
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
(errmsg("relation \"%s\" is not a continuous aggregate", relname))));
}
return cagg;
}

#define REFRESH_FUNCTION_NAME "refresh_continuous_aggregate()"
/*
* Refresh a continuous aggregate across the given window.
Expand All @@ -663,7 +635,7 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)

ts_feature_flag_check(FEATURE_CAGG);

cagg = get_cagg_by_relid(cagg_relid);
cagg = cagg_get_by_relid_or_fail(cagg_relid);
refresh_window.type = cagg->partition_type;

if (!PG_ARGISNULL(1))
Expand Down
Loading

0 comments on commit c1d7753

Please sign in to comment.