Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed schedule bug #5225

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ typedef.list
/test/testcluster
/test/log
/test/temp_schedule
/build
/build*
**/GPATH
**/GTAGS
**/GRTAGS
Expand Down
3 changes: 3 additions & 0 deletions sql/job_api.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.alter_job_set_hypertable_id(
hypertable REGCLASS )
RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_job_alter_set_hypertable_id'
LANGUAGE C VOLATILE;

CREATE OR REPLACE FUNCTION @extschema@.get_next_scheduled_slot(schedint INTERVAL, finish_time TIMESTAMPTZ, start_time TIMESTAMPTZ, timezone TEXT = NULL)
RETURNS TIMESTAMPTZ AS '@MODULE_PATHNAME@', 'ts_get_next_scheduled_slot' LANGUAGE C VOLATILE;
58 changes: 34 additions & 24 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,51 +179,61 @@ get_next_scheduled_execution_slot(BgwJob *job, TimestampTz finish_time)
Datum timebucket_fini, result, offset;
Datum schedint_datum = IntervalPGetDatum(&job->fd.schedule_interval);

// print_timestamptz_datum(finish_time, "finish_time is");

if (job->fd.timezone == NULL)
{
offset = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(job->fd.initial_start));

timebucket_fini = DirectFunctionCall3(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
TimestampTzGetDatum(job->fd.initial_start));
/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
/* offset: initial_start - bucket_start */
offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(job->fd.initial_start), offset);

timebucket_fini = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time));
timebucket_fini = DirectFunctionCall2(timestamptz_pl_interval,
timebucket_fini,
offset);

/* two cases now: either this time bucket is less than the next multiple, or it is the next multiple */
result = timebucket_fini;

// print_timestamptz_datum(result, "result is before loop");

while (result <= TimestampTzGetDatum(finish_time))
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);
}
// print_timestamptz_datum(result, "result is after loop");
}
else
{
char *tz = text_to_cstring(job->fd.timezone);
timebucket_fini = DirectFunctionCall4(ts_timestamptz_timezone_bucket,

timebucket_fini = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz),
TimestampTzGetDatum(job->fd.initial_start));
CStringGetTextDatum(tz));

/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);

offset = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(job->fd.initial_start),
CStringGetTextDatum(tz));
}
/* offset: initial_start - time_bucket_init */
offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(job->fd.initial_start), offset);

result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, offset);

offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(job->fd.initial_start), offset);
/* if we have a month component, the origin doesn't work so we must manually
include the offset */
if (job->fd.schedule_interval.month)
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, offset);
while (result <= TimestampTzGetDatum(finish_time))
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);
}
}
/*
* adding the schedule interval above to get the next bucket might still not hit
* the next bucket if we are crossing DST. So we can end up with a next_start value
* that is actually less than the finish time of the job. Hence, we have to make sure
* the next scheduled slot we compute is in the future and not in the past
*/
while (DatumGetTimestampTz(result) <= finish_time)
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);

return DatumGetTimestampTz(result);
}
Expand Down
3 changes: 3 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ CROSSMODULE_WRAPPER(create_distributed_restore_point);
CROSSMODULE_WRAPPER(hypertable_distributed_set_replication_factor);
CROSSMODULE_WRAPPER(health_check);

CROSSMODULE_WRAPPER(get_next_scheduled_slot);

TS_FUNCTION_INFO_V1(ts_dist_set_id);
Datum
ts_dist_set_id(PG_FUNCTION_ARGS)
Expand Down Expand Up @@ -555,6 +557,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.hypertable_distributed_set_replication_factor = error_no_default_fn_pg_community,
.update_compressed_chunk_relstats = update_compressed_chunk_relstats_default,
.health_check = error_no_default_fn_pg_community,
.get_next_scheduled_slot = error_no_default_fn_pg_community,
};

TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default;
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ typedef struct CrossModuleFunctions
PGFunction chunks_drop_stale;
void (*update_compressed_chunk_relstats)(Oid uncompressed_relid, Oid compressed_relid);
PGFunction health_check;
PGFunction get_next_scheduled_slot;
} CrossModuleFunctions;

extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions;
Expand Down
128 changes: 99 additions & 29 deletions test/src/bgw/scheduler_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ static const char *test_job_type_names[_MAX_TEST_JOB_TYPE] = {
[TEST_JOB_TYPE_JOB_4] = "bgw_test_job_4",
};

// static void
// print_timestamptz_datum(Datum tstz_datum, const char *name)
// {
// elog(NOTICE, "%s is %s", name, DatumGetCString(DirectFunctionCall1(timestamptz_out, tstz_datum)));
// }

/*
* logic is the following
* when there is no month component, the origin works as expected
* but when there is a month component, then the origin does not work
* for those cases, we only have month components
*/
/* this is copied from the job_stat/get_next_scheduled_execution_slot */
extern Datum
ts_test_next_scheduled_execution_slot(PG_FUNCTION_ARGS)
Expand All @@ -69,54 +81,112 @@ ts_test_next_scheduled_execution_slot(PG_FUNCTION_ARGS)

Datum timebucket_fini, result, offset;
Datum schedint_datum = IntervalPGetDatum(schedule_interval);
Interval one_month = { .day = 0, .time = 0, .month = 1, };

// print_timestamptz_datum(initial_start, "initial_start");
// print_timestamptz_datum(finish_time, "finish_time is");

if (timezone == NULL)
{
offset = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start));
// then we have months or multiples of months, no days or less
if (schedule_interval->month > 0)
{
Datum timebucket_init = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start));
timebucket_fini = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time));
// TimestampTz timestamp_bucket_fini = DatumGetTimestampTz(timebucket_fini);
// TimestampTz timestamp_bucket_init = DatumGetTimestampTz(timebucket_init);
// get the number of months between them
Datum year_init = DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("year"), timebucket_init);
Datum year_fini = DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("year"), timebucket_fini);

Datum month_init = DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("month"), timebucket_init);
Datum month_fini = DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("month"), timebucket_fini);

// convert everything to months
float8 month_diff = DatumGetFloat8(year_fini)*12 + DatumGetFloat8(month_fini) - ( DatumGetFloat8(year_init)*12 + DatumGetFloat8(month_init) );

Datum months_to_add = DirectFunctionCall2(interval_mul, IntervalPGetDatum(&one_month), Float8GetDatum(month_diff));
// good, add the months now
result = DirectFunctionCall2(timestamptz_pl_interval, initial_start, months_to_add);
while (result <= TimestampTzGetDatum(finish_time))
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);
}

timebucket_fini = DirectFunctionCall3(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
TimestampTzGetDatum(initial_start));
/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
}
else
{
offset = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start));

/* offset: initial_start - bucket_start */
offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(initial_start), offset);

timebucket_fini = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time));
/* always the next bucket */
timebucket_fini = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
timebucket_fini = DirectFunctionCall2(timestamptz_pl_interval,
timebucket_fini,
offset);

/* two cases now: either this time bucket is less than the next multiple, or it is the next multiple */
result = timebucket_fini;

// print_timestamptz_datum(result, "result is before loop");

while (result <= TimestampTzGetDatum(finish_time))
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);
}
}
// print_timestamptz_datum(result, "result is after loop");
}
else
{
char *tz = text_to_cstring(timezone);
timebucket_fini = DirectFunctionCall4(ts_timestamptz_timezone_bucket,

// Datum timebucket_init = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
// schedint_datum,
// TimestampTzGetDatum(initial_start),
// CStringGetTextDatum(tz));

timebucket_fini = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz),
TimestampTzGetDatum(initial_start));
CStringGetTextDatum(tz));

/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);

offset = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start),
CStringGetTextDatum(tz));
/* offset: initial_start - time_bucket_init */
offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(initial_start), offset);

result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, offset);
// offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(initial_start), timebucket_init);
// /* always the next time_bucket */
// result = DirectFunctionCall5(ts_timestamptz_timezone_bucket, schedint_datum,
// timebucket_fini,
// CStringGetTextDatum(tz),
// TimestampTzGetDatum(initial_start), offset);

while (result <= TimestampTzGetDatum(finish_time))
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);
}
}

offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(initial_start), offset);
/* if we have a month component, the origin doesn't work so we must manually
include the offset */
if (schedule_interval->month)
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, offset);
}
/*
* adding the schedule interval above to get the next bucket might still not hit
* the next bucket if we are crossing DST. So we can end up with a next_start value
* that is actually less than the finish time of the job. Hence, we have to make sure
* the next scheduled slot we compute is in the future and not in the past
*/
while (DatumGetTimestampTz(result) <= finish_time)
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);

return result;
return DatumGetTimestampTz(result);
}

extern Datum
Expand Down
Loading