Skip to content

Commit

Permalink
Fix reported bug
Browse files Browse the repository at this point in the history
This patch properly handles the next_start calculation
in the case of fixed_schedules with month components.

Internally, time_bucket with origin is used to calculate
the next start. However, in the case of month intervals, the
timestamp calculated for a bucket is always aligned on the first
day of the month, regardless of origin.
Therefore, we handle these cases differently.

Fixes #5216
  • Loading branch information
konskov committed Jan 30, 2023
1 parent 5661ff1 commit f50a087
Show file tree
Hide file tree
Showing 8 changed files with 1,167 additions and 83 deletions.
124 changes: 83 additions & 41 deletions src/bgw/job_stat.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,61 +170,103 @@ typedef struct
} JobResultCtx;

/*
* time_bucket(schedule_interval, finish_time, origin => initial_start)
* logic is the following
* Ideally we would return
* time_bucket(schedule_interval, finish_time, origin => initial_start, timezone).
* That is what we return when the schedule interval does not have month components.
* However, when there is a month component in the schedule interval,
* then supplying the origin in time_bucket
* does not work and the returned bucket is aligned on the start of the month.
* In those cases, we only have month components. So we compute the difference in
* months between the initial_start's timebucket and the finish time's bucket.
*/
static TimestampTz
get_next_scheduled_execution_slot(BgwJob *job, TimestampTz finish_time)
{
Assert(job->fd.fixed_schedule == true);
Datum timebucket_fini, result, offset;
Datum schedint_datum = IntervalPGetDatum(&job->fd.schedule_interval);
Datum timebucket_fini, timebucket_init, result;

if (job->fd.timezone == NULL)
Interval one_month = {
.day = 0,
.time = 0,
.month = 1,
};

if (job->fd.schedule_interval.month > 0)
{
offset = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(job->fd.initial_start));

timebucket_fini = DirectFunctionCall3(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
TimestampTzGetDatum(job->fd.initial_start));
/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
if (job->fd.timezone == NULL)
{
timebucket_init = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(job->fd.initial_start));
timebucket_fini = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time));
}
else
{
char *tz = text_to_cstring(job->fd.timezone);
timebucket_fini = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz));

timebucket_init = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(job->fd.initial_start),
CStringGetTextDatum(tz));
}
// always the next bucket:
timebucket_fini =
DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
// get the number of months between them
Datum year_init =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("year"), timebucket_init);
Datum year_fini =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("year"), timebucket_fini);

Datum month_init =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("month"), timebucket_init);
Datum month_fini =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("month"), timebucket_fini);

// convert everything to months
float8 month_diff = DatumGetFloat8(year_fini) * 12 + DatumGetFloat8(month_fini) -
(DatumGetFloat8(year_init) * 12 + DatumGetFloat8(month_init));

Datum months_to_add = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&one_month),
Float8GetDatum(month_diff));

result = DirectFunctionCall2(timestamptz_pl_interval, job->fd.initial_start, months_to_add);
}
else
{
char *tz = text_to_cstring(job->fd.timezone);
timebucket_fini = DirectFunctionCall4(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz),
TimestampTzGetDatum(job->fd.initial_start));
/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);

offset = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(job->fd.initial_start),
CStringGetTextDatum(tz));
if (job->fd.timezone == NULL)
{
// it is safe to use the origin in time_bucket calculation
timebucket_fini = DirectFunctionCall3(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
TimestampTzGetDatum(job->fd.initial_start));
result = timebucket_fini;
}
else
{
char *tz = text_to_cstring(job->fd.timezone);
timebucket_fini = DirectFunctionCall4(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz),
TimestampTzGetDatum(job->fd.initial_start));
result = timebucket_fini;
}
}

offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(job->fd.initial_start), offset);
/* if we have a month component, the origin doesn't work so we must manually
include the offset */
if (job->fd.schedule_interval.month)
while (result <= TimestampTzGetDatum(finish_time))
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, offset);
}
/*
* adding the schedule interval above to get the next bucket might still not hit
* the next bucket if we are crossing DST. So we can end up with a next_start value
* that is actually less than the finish time of the job. Hence, we have to make sure
* the next scheduled slot we compute is in the future and not in the past
*/
while (DatumGetTimestampTz(result) <= finish_time)
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);

}
return DatumGetTimestampTz(result);
}

Expand Down
115 changes: 74 additions & 41 deletions test/src/bgw/scheduler_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,56 +67,89 @@ ts_test_next_scheduled_execution_slot(PG_FUNCTION_ARGS)
TimestampTz initial_start = PG_GETARG_TIMESTAMPTZ(2);
text *timezone = PG_ARGISNULL(3) ? NULL : PG_GETARG_TEXT_PP(3);

Datum timebucket_fini, result, offset;
Datum timebucket_fini, timebucket_init, result;
Datum schedint_datum = IntervalPGetDatum(schedule_interval);
Interval one_month = {
.day = 0,
.time = 0,
.month = 1,
};

if (timezone == NULL)
if (schedule_interval->month > 0)
{
offset = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start));

timebucket_fini = DirectFunctionCall3(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
TimestampTzGetDatum(initial_start));
/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
if (timezone == NULL)
{
timebucket_init = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start));
timebucket_fini = DirectFunctionCall2(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time));
}
else
{
char *tz = text_to_cstring(timezone);
timebucket_fini = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz));

timebucket_init = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start),
CStringGetTextDatum(tz));
}
// always the next bucket:
timebucket_fini =
DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);
// get the number of months between them
Datum year_init =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("year"), timebucket_init);
Datum year_fini =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("year"), timebucket_fini);

Datum month_init =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("month"), timebucket_init);
Datum month_fini =
DirectFunctionCall2(timestamptz_part, CStringGetTextDatum("month"), timebucket_fini);

// convert everything to months
float8 month_diff = DatumGetFloat8(year_fini) * 12 + DatumGetFloat8(month_fini) -
(DatumGetFloat8(year_init) * 12 + DatumGetFloat8(month_init));

Datum months_to_add = DirectFunctionCall2(interval_mul,
IntervalPGetDatum(&one_month),
Float8GetDatum(month_diff));
// good, add the months now
result = DirectFunctionCall2(timestamptz_pl_interval, initial_start, months_to_add);
}
else
{
char *tz = text_to_cstring(timezone);
timebucket_fini = DirectFunctionCall4(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz),
TimestampTzGetDatum(initial_start));
/* always the next time_bucket */
result = DirectFunctionCall2(timestamptz_pl_interval, timebucket_fini, schedint_datum);

offset = DirectFunctionCall3(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(initial_start),
CStringGetTextDatum(tz));
if (timezone == NULL)
{
// it is safe to use the origin in time_bucket calculation
timebucket_fini = DirectFunctionCall3(ts_timestamptz_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
TimestampTzGetDatum(initial_start));
result = timebucket_fini;
}
else
{
char *tz = text_to_cstring(timezone);
timebucket_fini = DirectFunctionCall4(ts_timestamptz_timezone_bucket,
schedint_datum,
TimestampTzGetDatum(finish_time),
CStringGetTextDatum(tz),
TimestampTzGetDatum(initial_start));
result = timebucket_fini;
}
}

offset = DirectFunctionCall2(timestamp_mi, TimestampTzGetDatum(initial_start), offset);
/* if we have a month component, the origin doesn't work so we must manually
include the offset */
if (schedule_interval->month)
while (result <= TimestampTzGetDatum(finish_time))
{
result = DirectFunctionCall2(timestamptz_pl_interval, result, offset);
}
/*
* adding the schedule interval above to get the next bucket might still not hit
* the next bucket if we are crossing DST. So we can end up with a next_start value
* that is actually less than the finish time of the job. Hence, we have to make sure
* the next scheduled slot we compute is in the future and not in the past
*/
while (DatumGetTimestampTz(result) <= finish_time)
result = DirectFunctionCall2(timestamptz_pl_interval, result, schedint_datum);

return result;
}
return DatumGetTimestampTz(result);
}

extern Datum
Expand Down
Loading

0 comments on commit f50a087

Please sign in to comment.