Skip to content

Commit

Permalink
Fix next_start calculation for fixed schedules
Browse files Browse the repository at this point in the history
This patch fixes several issues with next_start calculation.

- Previously, the offset was added twice in some cases.
This is fixed by this patch.

- Additionally, schedule intervals with month components
were not handled correctly.
Internally, time_bucket with origin is used to calculate
the next start. However, in the case of month intervals, the
timestamp calculated for a bucket is always aligned on the first
day of the month, regardless of origin.
Therefore, previously the result was aligned with origin by adding
the difference between origin and its respective time bucket.
This difference was computed as a fixed length interval in terms
of days and time. That computation led to incorrect computation of
next start occasionally, for example when a job should be executed on
the last day of a month.
That is fixed by adding an appropriate interval of months to
initial_start and letting Postgres handle this computation properly.

Fixes #5216
  • Loading branch information
konskov committed Feb 8, 2023
1 parent caf79e0 commit 6291a6d
Show file tree
Hide file tree
Showing 9 changed files with 1,169 additions and 83 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ accidentally triggering the load of a previous DB version.**
* #5218 Add role-level security to job error log
* #5214 Fix use of prepared statement in async module
* #5290 Compression can't be enabled on continuous aggregates when segmentby/orderby columns need quotation
* #5239 Fix next_start calculation for fixed schedules

## 2.9.3 (2023-02-03)

Expand All @@ -35,6 +36,7 @@ upgrade as soon as possible.
* @ssmoss for reporting issues on continuous aggregates
* @jaskij for reporting the compliation issue that occurred with clang


## 2.9.2 (2023-01-26)

This release contains bug fixes since the 2.9.1 release.
Expand Down
126 changes: 85 additions & 41 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,61 +170,105 @@ typedef struct
} JobResultCtx;

/*
* time_bucket(schedule_interval, finish_time, origin => initial_start)
* logic is the following
* Ideally we would return
* time_bucket(schedule_interval, finish_time, origin => initial_start, timezone).
* That is what we return when the schedule interval does not have month components.
* However, when there is a month component in the schedule interval,
* then supplying the origin in time_bucket
* does not work and the returned bucket is aligned on the start of the month.
* In those cases, we only have month components. So we compute the difference in
* months between the initial_start's timebucket and the finish time's bucket.
*/
static TimestampTz
get_next_scheduled_execution_slot(BgwJob *job, TimestampTz finish_time)
{
Assert(job->fd.fixed_schedule == true);
Datum timebucket_fini, result, offset;
Datum schedint_datum = IntervalPGetDatum(&job->fd.schedule_interval);
Datum timebucket_fini, timebucket_init, result;

if (job->fd.timezone == NULL)
Interval one_month = {
.day = 0,
.time = 0,
.month = 1,
};

if (job->fd.schedule_interval.month > 0)
{
offset = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(job->fd.initial_start));

timebucket_fini = DirectFunctionCall3(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
TimestampTzGetDatum(job->fd.initial_start));
/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
if (job->fd.timezone == NULL)
{
timebucket_init = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(job->fd.initial_start));
timebucket_fini = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time));
}
else
{
char *tz = text_to_cstring(job->fd.timezone);
timebucket_fini = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz));

timebucket_init = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(job->fd.initial_start),
CStringGetTextDatum(tz));
}
/* always the next bucket */
timebucket_fini =
DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
/* get the number of months between them */
Datum year_init =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("year"), timebucket_init);
Datum year_fini =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("year"), timebucket_fini);

Datum month_init =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("month"), timebucket_init);
Datum month_fini =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("month"), timebucket_fini);

/* convert everything to months */
float8 month_diff = DatumGetFloat8(year_fini) * 12 + DatumGetFloat8(month_fini) -
(DatumGetFloat8(year_init) * 12 + DatumGetFloat8(month_init));

Datum months_to_add = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&one_month),
Float8GetDatum(month_diff));

result = DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(job->fd.initial_start),
months_to_add);
}
else
{
char *tz = text_to_cstring(job->fd.timezone);
timebucket_fini = DirectFunctionCall4(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz),
TimestampTzGetDatum(job->fd.initial_start));
/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);

offset = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(job->fd.initial_start),
CStringGetTextDatum(tz));
if (job->fd.timezone == NULL)
{
/* it is safe to use the origin in time_bucket calculation */
timebucket_fini = DirectFunctionCall3(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
TimestampTzGetDatum(job->fd.initial_start));
result = timebucket_fini;
}
else
{
char *tz = text_to_cstring(job->fd.timezone);
timebucket_fini = DirectFunctionCall4(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz),
TimestampTzGetDatum(job->fd.initial_start));
result = timebucket_fini;
}
}

offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(job->fd.initial_start), offset);
/* if we have a month component, the origin doesn't work so we must manually
include the offset */
if (job->fd.schedule_interval.month)
while (result <= TimestampTzGetDatum(finish_time))
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, offset);
}
/*
* adding the schedule interval above to get the next bucket might still not hit
* the next bucket if we are crossing DST. So we can end up with a next_start value
* that is actually less than the finish time of the job. Hence, we have to make sure
* the next scheduled slot we compute is in the future and not in the past
*/
while (DatumGetTimestampTz(result) <= finish_time)
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);

}
return DatumGetTimestampTz(result);
}

Expand Down
116 changes: 75 additions & 41 deletions test/src/bgw/scheduler_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,56 +67,90 @@ ts_test_next_scheduled_execution_slot(PG_FUNCTION_ARGS)
TimestampTz initial_start = PG_GETARG_TIMESTAMPTZ(2);
text *timezone = PG_ARGISNULL(3) ? NULL : PG_GETARG_TEXT_PP(3);

Datum timebucket_fini, result, offset;
Datum timebucket_fini, timebucket_init, result;
Datum schedint_datum = IntervalPGetDatum(schedule_interval);
Interval one_month = {
.day = 0,
.time = 0,
.month = 1,
};

if (timezone == NULL)
if (schedule_interval->month > 0)
{
offset = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start));

timebucket_fini = DirectFunctionCall3(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
TimestampTzGetDatum(initial_start));
/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
if (timezone == NULL)
{
timebucket_init = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start));
timebucket_fini = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time));
}
else
{
char *tz = text_to_cstring(timezone);
timebucket_fini = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz));

timebucket_init = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start),
CStringGetTextDatum(tz));
}
/* always the next bucket */
timebucket_fini =
DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
/* get the number of months between them */
Datum year_init =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("year"), timebucket_init);
Datum year_fini =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("year"), timebucket_fini);

Datum month_init =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("month"), timebucket_init);
Datum month_fini =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("month"), timebucket_fini);

/* convert everything to months */
float8 month_diff = DatumGetFloat8(year_fini) * 12 + DatumGetFloat8(month_fini) -
(DatumGetFloat8(year_init) * 12 + DatumGetFloat8(month_init));

Datum months_to_add = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&one_month),
Float8GetDatum(month_diff));
result = DirectFunctionCall2(timestamptz_pl_interval,
TimestampTzGetDatum(initial_start),
months_to_add);
}
else
{
char *tz = text_to_cstring(timezone);
timebucket_fini = DirectFunctionCall4(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz),
TimestampTzGetDatum(initial_start));
/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);

offset = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start),
CStringGetTextDatum(tz));
if (timezone == NULL)
{
/* it is safe to use the origin in time_bucket calculation */
timebucket_fini = DirectFunctionCall3(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
TimestampTzGetDatum(initial_start));
result = timebucket_fini;
}
else
{
char *tz = text_to_cstring(timezone);
timebucket_fini = DirectFunctionCall4(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz),
TimestampTzGetDatum(initial_start));
result = timebucket_fini;
}
}

offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(initial_start), offset);
/* if we have a month component, the origin doesn't work so we must manually
include the offset */
if (schedule_interval->month)
while (result <= TimestampTzGetDatum(finish_time))
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, offset);
}
/*
* adding the schedule interval above to get the next bucket might still not hit
* the next bucket if we are crossing DST. So we can end up with a next_start value
* that is actually less than the finish time of the job. Hence, we have to make sure
* the next scheduled slot we compute is in the future and not in the past
*/
while (DatumGetTimestampTz(result) <= finish_time)
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);

return result;
}
return DatumGetTimestampTz(result);
}

extern Datum
Expand Down
Loading

0 comments on commit 6291a6d

Please sign in to comment.