Skip to content

Commit

Permalink
add noise to cron scheduling (#20665)
Browse files Browse the repository at this point in the history
* add noise to cron scheduling

* fix tests

* add a workspace id to test cron scheduling jitter

* add unit test and fix scheduling noise for crons
  • Loading branch information
mfsiega-airbyte authored Dec 21, 2022
1 parent 401aa44 commit 16e890a
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;

import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
Expand All @@ -19,6 +20,7 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.validation.json.JsonValidationException;
import io.micronaut.context.annotation.Requires;
Expand All @@ -32,32 +34,54 @@
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTimeZone;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class ConfigFetchActivityImpl implements ConfigFetchActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigFetchActivityImpl.class);
private final static long MS_PER_SECOND = 1000L;
private final static long MIN_CRON_INTERVAL_SECONDS = 60;
private static final Set<UUID> SCHEDULING_NOISE_WORKSPACE_IDS = Set.of(
// Testing
UUID.fromString("0ace5e1f-4787-43df-8919-456f5f4d03d1"),
UUID.fromString("20810d92-41a4-4cfd-85db-fb50e77cf36b"),
// Prod
UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"));
private static final long SCHEDULING_NOISE_CONSTANT = 15;

private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final WorkspaceHelper workspaceHelper;
private final Integer syncJobMaxAttempts;
private final Supplier<Long> currentSecondsSupplier;

public ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier);
}

@VisibleForTesting
protected ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final WorkspaceHelper workspaceHelper,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.workspaceHelper = workspaceHelper;
this.syncJobMaxAttempts = syncJobMaxAttempts;
this.currentSecondsSupplier = currentSecondsSupplier;
}
Expand Down Expand Up @@ -128,15 +152,41 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final StandardSync
+ MIN_CRON_INTERVAL_SECONDS
: currentSecondsSupplier.get()) * MS_PER_SECOND);
final Date nextRunStart = cronExpression.getNextValidTimeAfter(new Date(earliestNextRun));
final Duration timeToWait = Duration.ofSeconds(
Duration timeToWait = Duration.ofSeconds(
Math.max(0, nextRunStart.getTime() / MS_PER_SECOND - currentSecondsSupplier.get()));

timeToWait = addSchedulingNoiseForAllowListedWorkspace(timeToWait, standardSync);
return new ScheduleRetrieverOutput(timeToWait);
} catch (final ParseException e) {
throw (DateTimeException) new DateTimeException(e.getMessage()).initCause(e);
}
}
}

private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, StandardSync standardSync) {
final UUID workspaceId;
try {
workspaceId = workspaceHelper.getWorkspaceForConnectionId(standardSync.getConnectionId());
} catch (JsonValidationException | ConfigNotFoundException e) {
// We tolerate exceptions and fail open by doing nothing.
return timeToWait;
}
if (!SCHEDULING_NOISE_WORKSPACE_IDS.contains(workspaceId)) {
// Only apply to a specific set of workspaces.
return timeToWait;
}
if (!standardSync.getScheduleType().equals(ScheduleType.CRON)) {
// Only apply noise to cron connections.
return timeToWait;
}

// We really do want to add some scheduling noise for this connection.
final long minutesToWait = (long) (Math.random() * SCHEDULING_NOISE_CONSTANT);
LOGGER.debug("Adding {} minutes noise to wait", minutesToWait);
// Note: we add an extra second to make the unit tests pass in case `minutesToWait` was 0.
return timeToWait.plusMinutes(minutesToWait).plusSeconds(1);
}

/**
* @param standardSync
* @param connectionId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.workers.temporal.scheduling.activities;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import io.airbyte.config.BasicSchedule;
import io.airbyte.config.Cron;
import io.airbyte.config.Schedule;
Expand All @@ -14,11 +17,13 @@
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverInput;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Calendar;
import java.util.Optional;
Expand All @@ -31,7 +36,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
Expand All @@ -45,6 +49,9 @@ class ConfigFetchActivityTest {
@Mock
private JobPersistence mJobPersistence;

@Mock
private WorkspaceHelper mWorkspaceHelper;

@Mock
private Job mJob;

Expand All @@ -69,13 +76,14 @@ class ConfigFetchActivityTest {
.withTimeUnit(BasicSchedule.TimeUnit.MINUTES)
.withUnits(5L)));

public static final String UTC = "UTC";
private final static StandardSync standardSyncWithCronScheduleType = new StandardSync()
.withScheduleType(ScheduleType.CRON)
.withStatus(Status.ACTIVE)
.withScheduleData(new ScheduleData()
.withCron(new Cron()
.withCronExpression("0 0 12 * * ?")
.withCronTimeZone("UTC")));
.withCronTimeZone(UTC)));

private final static StandardSync standardSyncWithScheduleDisable = new StandardSync()
.withSchedule(new Schedule()
Expand All @@ -93,7 +101,8 @@ class ConfigFetchActivityTest {
@BeforeEach
void setup() {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> Instant.now().getEpochSecond());
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> Instant.now().getEpochSecond());
}

@Nested
Expand All @@ -102,10 +111,10 @@ class TimeToWaitTest {
@Test
@DisplayName("Test that the job gets scheduled if it is not manual and if it is the first run with legacy schedule schema")
void testFirstJobNonManual() throws IOException, JsonValidationException, ConfigNotFoundException {
Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.empty());

Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithLegacySchedule);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -119,7 +128,7 @@ void testFirstJobNonManual() throws IOException, JsonValidationException, Config
@Test
@DisplayName("Test that the job will wait for a long time if it is manual in the legacy schedule schema")
void testManual() throws IOException, JsonValidationException, ConfigNotFoundException {
Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithoutSchedule);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -133,7 +142,7 @@ void testManual() throws IOException, JsonValidationException, ConfigNotFoundExc
@Test
@DisplayName("Test that the job will wait for a long time if it is disabled")
void testDisable() throws IOException, JsonValidationException, ConfigNotFoundException {
Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithScheduleDisable);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -147,7 +156,7 @@ void testDisable() throws IOException, JsonValidationException, ConfigNotFoundEx
@Test
@DisplayName("Test that the connection will wait for a long time if it is deleted")
void testDeleted() throws IOException, JsonValidationException, ConfigNotFoundException {
Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithScheduleDeleted);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -163,13 +172,13 @@ void testDeleted() throws IOException, JsonValidationException, ConfigNotFoundEx
void testWait() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);

Mockito.when(mJob.getStartedAtInSecond())
when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));

Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.of(mJob));

Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithLegacySchedule);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -185,13 +194,13 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep
void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10);

Mockito.when(mJob.getStartedAtInSecond())
when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));

Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.of(mJob));

Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithLegacySchedule);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -207,7 +216,7 @@ void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotF
@Test
@DisplayName("Test that the job will wait a long time if it is MANUAL scheduleType")
void testManualScheduleType() throws IOException, JsonValidationException, ConfigNotFoundException {
Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithManualScheduleType);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -221,10 +230,10 @@ void testManualScheduleType() throws IOException, JsonValidationException, Confi
@Test
@DisplayName("Test that the job will be immediately scheduled if it is a BASIC_SCHEDULE type on the first run")
void testBasicScheduleTypeFirstRun() throws IOException, JsonValidationException, ConfigNotFoundException {
Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.empty());

Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithBasicScheduleType);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -240,13 +249,13 @@ void testBasicScheduleTypeFirstRun() throws IOException, JsonValidationException
void testBasicScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);

Mockito.when(mJob.getStartedAtInSecond())
when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));

Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.of(mJob));

Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithBasicScheduleType);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -260,19 +269,22 @@ void testBasicScheduleSubsequentRun() throws IOException, JsonValidationExceptio
@Test
@DisplayName("Test that the job will wait to be scheduled if it is a CRON type")
void testCronScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException {
final Calendar mockRightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
final Calendar mockRightNow = Calendar.getInstance(TimeZone.getTimeZone(UTC));
mockRightNow.set(Calendar.HOUR_OF_DAY, 0);
mockRightNow.set(Calendar.MINUTE, 0);
mockRightNow.set(Calendar.SECOND, 0);
mockRightNow.set(Calendar.MILLISECOND, 0);

when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.randomUUID());

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L);
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L);

Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.of(mJob));

Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithCronScheduleType);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -286,20 +298,23 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException
@Test
@DisplayName("Test that the job will only be scheduled once per minimum cron interval")
void testCronScheduleMinimumInterval() throws IOException, JsonValidationException, ConfigNotFoundException {
final Calendar mockRightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
final Calendar mockRightNow = Calendar.getInstance(TimeZone.getTimeZone(UTC));
mockRightNow.set(Calendar.HOUR_OF_DAY, 12);
mockRightNow.set(Calendar.MINUTE, 0);
mockRightNow.set(Calendar.SECOND, 0);
mockRightNow.set(Calendar.MILLISECOND, 0);

when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.randomUUID());

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L);
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L);

Mockito.when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.of(mJob));

Mockito.when(mConfigRepository.getStandardSync(connectionId))
when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithCronScheduleType);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);
Expand All @@ -310,6 +325,36 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti
.hasHours(24);
}

@Test
@DisplayName("Test that for specific workspace ids, we add some noise in the cron scheduling")
void testCronSchedulingNoise() throws IOException, JsonValidationException, ConfigNotFoundException {
final Calendar mockRightNow = Calendar.getInstance(TimeZone.getTimeZone(UTC));
mockRightNow.set(Calendar.HOUR_OF_DAY, 0);
mockRightNow.set(Calendar.MINUTE, 0);
mockRightNow.set(Calendar.SECOND, 0);
mockRightNow.set(Calendar.MILLISECOND, 0);

when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"));

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L);

when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.of(mJob));

when(mConfigRepository.getStandardSync(connectionId))
.thenReturn(standardSyncWithCronScheduleType);

final ScheduleRetrieverInput input = new ScheduleRetrieverInput(connectionId);

final ScheduleRetrieverOutput output = configFetchActivity.getTimeToWait(input);

// Note: compareTo returns positive if the left side is greater than the right.
Assertions.assertThat(output.getTimeToWait().compareTo(Duration.ofHours(12)) > 0).isTrue();
}

@Nested
class TestGetMaxAttempt {

Expand Down

0 comments on commit 16e890a

Please sign in to comment.