Skip to content

Commit

Permalink
Use new validation functions
Browse files Browse the repository at this point in the history
Old patch was using old validation functions, but there are already
validation functions that both read and validate the policy, so using
those. Also removing the old `job_config_check` function since that is
no longer use and instead adding a `job_config_check` that calls the
checking function with the configuration.
  • Loading branch information
mkindahl committed Jun 7, 2022
1 parent 2829050 commit 8feec03
Show file tree
Hide file tree
Showing 27 changed files with 216 additions and 303 deletions.
77 changes: 44 additions & 33 deletions src/bgw/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,49 @@ job_execute_procedure(FuncExpr *funcexpr)
ExecuteCallStmt(call, params, false, dest);
}

/**
* Run configuration check validation function.
*
* This will run the configuration check validation function registered for
* the job. If a new job is added, the job_id is going to be zero.
*/
void
ts_bgw_job_run_config_check(Oid check, int32 job_id, Jsonb *config)
{
List *args =
list_make2(makeConst(INT4OID, -1, InvalidOid, 4, Int32GetDatum(job_id), false, true),
makeConst(JSONBOID, -1, InvalidOid, -1, JsonbPGetDatum(config), false, false));
FuncExpr *funcexpr =
makeFuncExpr(check, VOIDOID, args, InvalidOid, InvalidOid, COERCE_EXPLICIT_CALL);

/* Nothing to check if there is no check function provided */
if (!OidIsValid(check))
return;

switch (get_func_prokind(check))
{
case PROKIND_FUNCTION:
job_execute_function(funcexpr);
break;
case PROKIND_PROCEDURE:
job_execute_procedure(funcexpr);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("unsupported function type")));
break;
}
}

/* Run the check function on a configuration. It will generate errors if there
* is anything wrong with the configuration, otherwise just return. If the
* check function does not exist, no checking will be done.*/
static void
job_config_check_mats(BgwJob *job, Jsonb *config)
job_config_check(BgwJob *job, Jsonb *config)
{
const Oid proc_args[] = { INT4OID, JSONBOID };
List *name, *args;
List *name;
Oid proc;
FuncExpr *funcexpr;
bool started = false;

/* Both should either be empty or contain a schema and name */
Expand All @@ -131,24 +164,7 @@ job_config_check_mats(BgwJob *job, Jsonb *config)
name = list_make2(makeString(NameStr(job->fd.check_schema)),
makeString(NameStr(job->fd.check_name)));
proc = LookupFuncName(name, 2, proc_args, false);
args =
list_make2(makeConst(INT4OID, -1, InvalidOid, 4, Int32GetDatum(job->fd.id), false, true),
makeConst(JSONBOID, -1, InvalidOid, -1, JsonbPGetDatum(config), false, false));
funcexpr = makeFuncExpr(proc, VOIDOID, args, InvalidOid, InvalidOid, COERCE_EXPLICIT_CALL);
switch (get_func_prokind(proc))
{
case PROKIND_FUNCTION:
job_execute_function(funcexpr);
break;
case PROKIND_PROCEDURE:
job_execute_procedure(funcexpr);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("unsupported function type")));
break;
}

ts_bgw_job_run_config_check(proc, job->fd.id, config);
if (started)
{
/* if job does its own transaction handling it might not have set a snapshot */
Expand All @@ -175,10 +191,6 @@ bgw_job_from_tupleinfo(TupleInfo *ti, size_t alloc_size)
Assert(alloc_size >= sizeof(BgwJob));
job = MemoryContextAllocZero(ti->mctx, alloc_size);
tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
memcpy(job, GETSTRUCT(tuple), sizeof(FormData_bgw_job));

if (should_free)
heap_freetuple(tuple);

/*
* Using heap_deform_tuple instead of GETSTRUCT since the tuple can
Expand Down Expand Up @@ -241,6 +253,7 @@ bgw_job_from_tupleinfo(TupleInfo *ti, size_t alloc_size)

if (!nulls[AttrNumberGetAttrOffset(Anum_bgw_job_config)])
job->fd.config = DatumGetJsonbP(values[AttrNumberGetAttrOffset(Anum_bgw_job_config)]);

MemoryContextSwitchTo(old_ctx);
if (should_free)
heap_freetuple(tuple);
Expand Down Expand Up @@ -768,10 +781,7 @@ bgw_job_tuple_update_by_id(TupleInfo *ti, void *const data)
repl[AttrNumberGetAttrOffset(Anum_bgw_job_config)] = true;
if (updated_job->fd.config)
{
// ts_cm_functions->job_config_check(&updated_job->fd.proc_schema,
// &updated_job->fd.proc_name,
// updated_job->fd.config);
job_config_check_mats(bgw_job_from_tupleinfo(ti, sizeof(BgwJob)), updated_job->fd.config);
job_config_check(bgw_job_from_tupleinfo(ti, sizeof(BgwJob)), updated_job->fd.config);
values[AttrNumberGetAttrOffset(Anum_bgw_job_config)] =
JsonbPGetDatum(updated_job->fd.config);
}
Expand Down Expand Up @@ -1131,10 +1141,11 @@ ts_bgw_job_run_and_set_next_start(BgwJob *job, job_main_func func, int64 initial
return ret;
}

int ts_bgw_job_insert_relation(
Name application_name, Interval *schedule_interval, Interval *max_runtime,
int32 max_retries, Interval *retry_period, Name proc_schema, Name proc_name, Name check_schema,
Name check_name, Name owner, bool scheduled, int32 hypertable_id, Jsonb *config)
int
ts_bgw_job_insert_relation(Name application_name, Interval *schedule_interval,
Interval *max_runtime, int32 max_retries, Interval *retry_period,
Name proc_schema, Name proc_name, Name check_schema, Name check_name,
Name owner, bool scheduled, int32 hypertable_id, Jsonb *config)
{
Catalog *catalog = ts_catalog_get();
Relation rel;
Expand Down
7 changes: 4 additions & 3 deletions src/bgw/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ extern TimestampTz ts_bgw_job_timeout_at(BgwJob *job, TimestampTz start_time);
extern TSDLLEXPORT bool ts_bgw_job_delete_by_id(int32 job_id);
extern TSDLLEXPORT bool ts_bgw_job_update_by_id(int32 job_id, BgwJob *job);
extern TSDLLEXPORT int32 ts_bgw_job_insert_relation(
Name application_name, Interval *schedule_interval, Interval *max_runtime,
int32 max_retries, Interval *retry_period, Name proc_schema, Name proc_name, Name check_schema,
Name check_name, Name owner, bool scheduled, int32 hypertable_id, Jsonb *config);
Name application_name, Interval *schedule_interval, Interval *max_runtime, int32 max_retries,
Interval *retry_period, Name proc_schema, Name proc_name, Name check_schema, Name check_name,
Name owner, bool scheduled, int32 hypertable_id, Jsonb *config);
extern TSDLLEXPORT void ts_bgw_job_permission_check(BgwJob *job);

extern TSDLLEXPORT void ts_bgw_job_validate_job_owner(Oid owner);

extern bool ts_bgw_job_execute(BgwJob *job);
extern TSDLLEXPORT void ts_bgw_job_run_config_check(Oid check, int32 job_id, Jsonb *config);

extern TSDLLEXPORT Datum ts_bgw_job_entrypoint(PG_FUNCTION_ARGS);
extern void ts_bgw_job_set_scheduler_test_hook(scheduler_test_hook_type hook);
Expand Down
8 changes: 0 additions & 8 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,6 @@ job_execute_default_fn(BgwJob *job)
pg_unreachable();
}

static void
job_config_check_default_fn(Name proc_schema, Name proc_name, Jsonb *config)
{
error_no_default_fn_community();
pg_unreachable();
}

static bool
process_compress_table_default(AlterTableCmd *cmd, Hypertable *ht,
WithClauseResult *with_clause_options)
Expand Down Expand Up @@ -401,7 +394,6 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.job_delete = error_no_default_fn_pg_community,
.job_run = error_no_default_fn_pg_community,
.job_execute = job_execute_default_fn,
.job_config_check = job_config_check_default_fn,

.move_chunk = error_no_default_fn_pg_community,
.move_chunk_proc = error_no_default_fn_pg_community,
Expand Down
1 change: 0 additions & 1 deletion src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ typedef struct CrossModuleFunctions
PGFunction job_run;

bool (*job_execute)(BgwJob *job);
void (*job_config_check)(Name proc_schema, Name proc_name, Jsonb *config);

void (*create_upper_paths_hook)(PlannerInfo *, UpperRelationKind, RelOptInfo *, RelOptInfo *,
TsRelType input_reltype, Hypertable *ht, void *extra);
Expand Down
6 changes: 5 additions & 1 deletion tsl/src/bgw_policy/compression_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ policy_compression_check(PG_FUNCTION_ARGS)
{
if (PG_NARGS() != 2 || PG_ARGISNULL(0) || PG_ARGISNULL(1))
PG_RETURN_VOID();
policy_compression_validate(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1));

PolicyCompressionData policy_data;
policy_compression_read_and_validate_config(PG_GETARG_JSONB_P(1), &policy_data);
ts_cache_release(policy_data.hcache);

PG_RETURN_VOID();
}

Expand Down
2 changes: 1 addition & 1 deletion tsl/src/bgw_policy/continuous_aggregate_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ policy_refresh_cagg_check(PG_FUNCTION_ARGS)
if (PG_NARGS() != 2 || PG_ARGISNULL(0) || PG_ARGISNULL(1))
PG_RETURN_VOID();

policy_refresh_cagg_validate(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1));
policy_refresh_cagg_read_and_validate_config(PG_GETARG_JSONB_P(1), NULL);

PG_RETURN_VOID();
}
Expand Down
104 changes: 1 addition & 103 deletions tsl/src/bgw_policy/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,85 +202,6 @@ check_valid_index(Hypertable *ht, const char *index_name)
ReleaseSysCache(idxtuple);
}

void
policy_reorder_validate(int32 job_id, Jsonb *config)
{
int32 htid = policy_reorder_get_hypertable_id(config);
Hypertable *ht = ts_hypertable_get_by_id(htid);
if (!ht)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("configuration hypertable id %d not found", htid)));

check_valid_index(ht, policy_reorder_get_index_name(config));
}


void
policy_retention_validate(int32 job_id, Jsonb *config)
{
Oid object_relid;
Hypertable *hypertable;
Cache *hcache;
const Dimension *open_dim;

object_relid = ts_hypertable_id_to_relid(policy_retention_get_hypertable_id(config));
hypertable = ts_hypertable_cache_get_cache_and_entry(object_relid, CACHE_FLAG_NONE, &hcache);
open_dim = get_open_dimension_for_hypertable(hypertable);

/* We only care about the errors from this function, so we ignore the
* return value. */
get_window_boundary(open_dim,
config,
policy_retention_get_drop_after_int,
policy_retention_get_drop_after_interval);

ts_cache_release(hcache);
}

void
policy_refresh_cagg_validate(int32 job_id, Jsonb *config)
{
int32 materialization_id;
Hypertable *mat_ht;
const Dimension *open_dim;
Oid dim_type;
int64 refresh_start, refresh_end;

materialization_id = policy_continuous_aggregate_get_mat_hypertable_id(config);
mat_ht = ts_hypertable_get_by_id(materialization_id);
open_dim = get_open_dimension_for_hypertable(mat_ht);
dim_type = ts_dimension_get_partition_type(open_dim);
refresh_start = policy_refresh_cagg_get_refresh_start(open_dim, config);
refresh_end = policy_refresh_cagg_get_refresh_end(open_dim, config);

if (refresh_start >= refresh_end)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid refresh window"),
errdetail("start_offset: %s, end_offset: %s",
ts_internal_to_time_string(refresh_start, dim_type),
ts_internal_to_time_string(refresh_end, dim_type)),
errhint("The start of the window must be before the end.")));
}

void
policy_compression_validate(int32 job_id, Jsonb *config)
{
Cache *hcache;
Oid table_relid = ts_hypertable_id_to_relid(policy_compression_get_hypertable_id(config));
Hypertable *ht = ts_hypertable_cache_get_cache_and_entry(table_relid, CACHE_FLAG_NONE, &hcache);
const Dimension *dim = hyperspace_get_open_dimension(ht->space, 0);

/* We ignore the return value from this function since we just try to read
* the config to see that it does not generate any errors. */
get_window_boundary(dim,
config,
policy_compression_get_compress_after_int,
policy_compression_get_compress_after_interval);
ts_cache_release(hcache);
}

bool
policy_reorder_execute(int32 job_id, Jsonb *config)
{
Expand Down Expand Up @@ -329,13 +250,13 @@ policy_reorder_read_and_validate_config(Jsonb *config, PolicyReorderData *policy
{
int32 htid = policy_reorder_get_hypertable_id(config);
Hypertable *ht = ts_hypertable_get_by_id(htid);
const char *index_name = policy_reorder_get_index_name(config);

if (!ht)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("configuration hypertable id %d not found", htid)));

const char *index_name = policy_reorder_get_index_name(config);
check_valid_index(ht, index_name);

if (policy)
Expand Down Expand Up @@ -749,26 +670,3 @@ job_execute(BgwJob *job)

return true;
}

/*
* Check configuration for a job type.
*/
void
job_config_check(Name proc_schema, Name proc_name, Jsonb *config)
{
if (namestrcmp(proc_schema, INTERNAL_SCHEMA_NAME) == 0)
{
if (namestrcmp(proc_name, "policy_retention") == 0)
policy_retention_read_and_validate_config(config, NULL);
else if (namestrcmp(proc_name, "policy_reorder") == 0)
policy_reorder_read_and_validate_config(config, NULL);
else if (namestrcmp(proc_name, "policy_compression") == 0)
{
PolicyCompressionData policy_data;
policy_compression_read_and_validate_config(config, &policy_data);
ts_cache_release(policy_data.hcache);
}
else if (namestrcmp(proc_name, "policy_refresh_continuous_aggregate") == 0)
policy_refresh_cagg_read_and_validate_config(config, NULL);
}
}
6 changes: 0 additions & 6 deletions tsl/src/bgw_policy/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,9 @@ typedef void (*reorder_func)(Oid tableOid, Oid indexOid, bool verbose, Oid wait_

/* Functions exposed only for testing */
extern bool policy_reorder_execute(int32 job_id, Jsonb *config);
extern void policy_reorder_validate(int32 job_id, Jsonb *config);
extern bool policy_retention_execute(int32 job_id, Jsonb *config);
extern void policy_retention_validate(int32 job_id, Jsonb *config);
extern bool policy_refresh_cagg_execute(int32 job_id, Jsonb *config);
extern void policy_refresh_cagg_validate(int32 job_id, Jsonb *config);
extern bool policy_recompression_execute(int32 job_id, Jsonb *config);
extern void policy_compression_validate(int32 job_id, Jsonb *config);
extern void policy_reorder_read_and_validate_config(Jsonb *config, PolicyReorderData *policy_data);
extern void policy_retention_read_and_validate_config(Jsonb *config,
PolicyRetentionData *policy_data);
Expand All @@ -66,7 +62,5 @@ extern void policy_compression_read_and_validate_config(Jsonb *config,
extern void policy_recompression_read_and_validate_config(Jsonb *config,
PolicyCompressionData *policy_data);
extern bool job_execute(BgwJob *job);
extern void job_config_check(Name proc_schema, Name proc_name, Jsonb *config);
// extern void job_config_check_mats(BgwJob *job, Jsonb *config);

#endif /* TIMESCALEDB_TSL_BGW_POLICY_JOB_H */
5 changes: 2 additions & 3 deletions tsl/src/bgw_policy/job_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ job_add(PG_FUNCTION_ARGS)
errmsg("permission denied for function \"%s\"", func_name),
errhint("Job owner must have EXECUTE privilege on the function.")));

if (check != InvalidOid)
if (OidIsValid(check))
{
check_name_str = get_func_name(check);
if (check_name_str == NULL)
Expand Down Expand Up @@ -110,7 +110,7 @@ job_add(PG_FUNCTION_ARGS)
namestrcpy(&owner_name, GetUserNameFromId(owner, false));

if (config)
job_config_check(&proc_schema, &proc_name, config);
ts_bgw_job_run_config_check(check, 0, config);

job_id = ts_bgw_job_insert_relation(&application_name,
schedule_interval,
Expand All @@ -125,7 +125,6 @@ job_add(PG_FUNCTION_ARGS)
scheduled,
0,
config);

if (!PG_ARGISNULL(3))
{
TimestampTz initial_start = PG_GETARG_TIMESTAMPTZ(3);
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/bgw_policy/reorder_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ policy_reorder_check(PG_FUNCTION_ARGS)

TS_PREVENT_FUNC_IF_READ_ONLY();

policy_reorder_validate(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1));
policy_reorder_read_and_validate_config(PG_GETARG_JSONB_P(1), NULL);

PG_RETURN_VOID();
}
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/bgw_policy/retention_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ policy_retention_check(PG_FUNCTION_ARGS)

TS_PREVENT_FUNC_IF_READ_ONLY();

policy_retention_validate(PG_GETARG_INT32(0), PG_GETARG_JSONB_P(1));
policy_retention_read_and_validate_config(PG_GETARG_JSONB_P(1), NULL);

PG_RETURN_VOID();
}
Expand Down
1 change: 0 additions & 1 deletion tsl/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ CrossModuleFunctions tsl_cm_functions = {
.job_delete = job_delete,
.job_run = job_run,
.job_execute = job_execute,
.job_config_check = job_config_check,

/* gapfill */
.gapfill_marker = gapfill_marker,
Expand Down
Loading

0 comments on commit 8feec03

Please sign in to comment.