diff --git a/airbyte-notification/src/main/resources/customerio/auto_disable_notification_template.json b/airbyte-notification/src/main/resources/customerio/auto_disable_notification_template.json
index 29d407b46c66..03cfbc512555 100644
--- a/airbyte-notification/src/main/resources/customerio/auto_disable_notification_template.json
+++ b/airbyte-notification/src/main/resources/customerio/auto_disable_notification_template.json
@@ -8,7 +8,7 @@
},
"message_data": {
"email_title": "Connection Auto-Disabled",
- "email_body": "Your connection from %s to %s was disabled because it failed consecutively 100 times or that there were only failed jobs in the past 14 days.
Please address the failing issues and re-enable the connection. The most recent attempted %s You can access its logs here: %s"
+ "email_body": "Your connection from %s to %s was disabled because it failed 100 times consecutively or that there were only failed jobs in the past 14 days.
Please address the failing issues and re-enable the connection. The most recent attempted %s You can access its logs here: %s"
},
"disable_message_retention": false,
diff --git a/airbyte-notification/src/main/resources/customerio/auto_disable_warning_notification_template.json b/airbyte-notification/src/main/resources/customerio/auto_disable_warning_notification_template.json
index 3157b664f148..2c1b0242a315 100644
--- a/airbyte-notification/src/main/resources/customerio/auto_disable_warning_notification_template.json
+++ b/airbyte-notification/src/main/resources/customerio/auto_disable_warning_notification_template.json
@@ -8,7 +8,7 @@
},
"message_data": {
"email_title": "Connection Auto-Disabled Warning",
- "email_body": "Your connection from %s to %s is about to be disabled because it failed consecutively 50 times or that there were only failed jobs in the past 7 days. Once it has failed 100 times consecutively or has been failing for 14 days in a row, the connection will be automatically disabled.
Please address the failing issues and re-enable the connection. The most recent attempted %s You can access its logs here: %s"
+ "email_body": "Your connection from %s to %s is about to be disabled because it failed 50 times consecutively or that there were only failed jobs in the past 7 days. Once it has failed 100 times consecutively or has been failing for 14 days in a row, the connection will be automatically disabled.
The most recent attempted %s You can access its logs here: %s"
},
"disable_message_retention": false,
diff --git a/airbyte-scheduler/models/src/main/java/io/airbyte/scheduler/models/JobWithStatusAndTimestamp.java b/airbyte-scheduler/models/src/main/java/io/airbyte/scheduler/models/JobWithStatusAndTimestamp.java
new file mode 100644
index 000000000000..5c82d3c42a84
--- /dev/null
+++ b/airbyte-scheduler/models/src/main/java/io/airbyte/scheduler/models/JobWithStatusAndTimestamp.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2021 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.scheduler.models;
+
+import java.util.Objects;
+
+public class JobWithStatusAndTimestamp {
+
+ private final long id;
+ private final JobStatus status;
+ private final long createdAtInSecond;
+ private final long updatedAtInSecond;
+
+ public JobWithStatusAndTimestamp(final long id,
+ final JobStatus status,
+ final long createdAtInSecond,
+ final long updatedAtInSecond) {
+ this.id = id;
+ this.status = status;
+ this.createdAtInSecond = createdAtInSecond;
+ this.updatedAtInSecond = updatedAtInSecond;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+
+ public long getCreatedAtInSecond() {
+ return createdAtInSecond;
+ }
+
+ public long getUpdatedAtInSecond() {
+ return updatedAtInSecond;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final JobWithStatusAndTimestamp jobWithStatusAndTimestamp = (JobWithStatusAndTimestamp) o;
+ return id == jobWithStatusAndTimestamp.id &&
+ status == jobWithStatusAndTimestamp.status &&
+ createdAtInSecond == jobWithStatusAndTimestamp.createdAtInSecond &&
+ updatedAtInSecond == jobWithStatusAndTimestamp.updatedAtInSecond;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, status, createdAtInSecond, updatedAtInSecond);
+ }
+
+ @Override
+ public String toString() {
+ return "Job{" +
+ "id=" + id +
+ ", status=" + status +
+ ", createdAtInSecond=" + createdAtInSecond +
+ ", updatedAtInSecond=" + updatedAtInSecond +
+ '}';
+ }
+
+}
diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java
index 4118cda02534..ebcbc93423ed 100644
--- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java
+++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java
@@ -32,6 +32,7 @@
import io.airbyte.scheduler.models.AttemptWithJobInfo;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
+import io.airbyte.scheduler.models.JobWithStatusAndTimestamp;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.file.Path;
@@ -386,18 +387,24 @@ public List listJobsWithStatus(final ConfigType configType, final JobStatus
}
@Override
- public List listJobStatusWithConnection(final UUID connectionId, final Set configTypes, final Instant jobCreatedAtTimestamp)
+ public List listJobStatusAndTimestampWithConnection(final UUID connectionId,
+ final Set configTypes,
+ final Instant jobCreatedAtTimestamp)
throws IOException {
final LocalDateTime timeConvertedIntoLocalDateTime = LocalDateTime.ofInstant(jobCreatedAtTimestamp, ZoneOffset.UTC);
- final String JobStatusSelect = "SELECT status FROM jobs ";
+ final String JobStatusSelect = "SELECT id, status, created_at, updated_at FROM jobs ";
return jobDatabase.query(ctx -> ctx
.fetch(JobStatusSelect + "WHERE " +
"scope = ? AND " +
"CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) + " AND " +
"created_at >= ? ORDER BY created_at DESC", connectionId.toString(), timeConvertedIntoLocalDateTime))
.stream()
- .map(r -> JobStatus.valueOf(r.get("status", String.class).toUpperCase()))
+ .map(r -> new JobWithStatusAndTimestamp(
+ r.get("id", Long.class),
+ JobStatus.valueOf(r.get("status", String.class).toUpperCase()),
+ r.get("created_at", Long.class) / 1000,
+ r.get("updated_at", Long.class) / 1000))
.toList();
}
diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java
index 678d9f1ce817..1bab93efe285 100644
--- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java
+++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java
@@ -9,6 +9,7 @@
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.commons.map.MoreMaps;
+import io.airbyte.config.CustomerioNotificationConfiguration;
import io.airbyte.config.Notification;
import io.airbyte.config.Notification.NotificationType;
import io.airbyte.config.StandardDestinationDefinition;
@@ -26,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,20 +79,10 @@ private void notifyJob(final String reason,
try {
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
- final Instant jobStartedDate = Instant.ofEpochSecond(job.getStartedAtInSecond().orElse(job.getCreatedAtInSecond()));
- final DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.FULL).withZone(ZoneId.systemDefault());
- final Instant jobUpdatedDate = Instant.ofEpochSecond(job.getUpdatedAtInSecond());
- final Instant adjustedJobUpdatedDate = jobUpdatedDate.equals(jobStartedDate) ? Instant.now() : jobUpdatedDate;
- final Duration duration = Duration.between(jobStartedDate, adjustedJobUpdatedDate);
- final String durationString = formatDurationPart(duration.toDaysPart(), "day")
- + formatDurationPart(duration.toHoursPart(), "hour")
- + formatDurationPart(duration.toMinutesPart(), "minute")
- + formatDurationPart(duration.toSecondsPart(), "second");
final String sourceConnector = String.format("%s version %s", sourceDefinition.getName(), sourceDefinition.getDockerImageTag());
final String destinationConnector = String.format("%s version %s", destinationDefinition.getName(), destinationDefinition.getDockerImageTag());
final String failReason = Strings.isNullOrEmpty(reason) ? "" : String.format(", as the %s", reason);
- final String jobDescription =
- String.format("sync started on %s, running for%s%s.", formatter.format(jobStartedDate), durationString, failReason);
+ final String jobDescription = getJobDescription(job, failReason);
final String logUrl = connectionPageUrl + connectionId;
final ImmutableMap jobMetadata = TrackingMetadata.generateJobAttemptMetadata(job);
final ImmutableMap sourceMetadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition);
@@ -114,24 +106,31 @@ private void notifyJob(final String reason,
workspaceId,
action,
MoreMaps.merge(jobMetadata, sourceMetadata, destinationMetadata, notificationMetadata.build()));
- if (FAILURE_NOTIFICATION.equals(action)) {
- if (!notificationClient.notifyJobFailure(sourceConnector, destinationConnector, jobDescription, logUrl)) {
- LOGGER.warn("Failed to successfully notify failure: {}", notification);
- }
- } else if (SUCCESS_NOTIFICATION.equals(action)) {
- if (!notificationClient.notifyJobSuccess(sourceConnector, destinationConnector, jobDescription, logUrl)) {
- LOGGER.warn("Failed to successfully notify success: {}", notification);
- }
- // alert message currently only supported by email through customer.io
- } else if (CONNECTION_DISABLED_NOTIFICATION.equals(action) && notification.getNotificationType().equals(NotificationType.CUSTOMERIO)) {
- if (!notificationClient.notifyConnectionDisabled(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription, logUrl)) {
- LOGGER.warn("Failed to successfully notify auto-disable connection: {}", notification);
- }
- } else if (CONNECTION_DISABLED_WARNING_NOTIFICATION.equals(action)
- && notification.getNotificationType().equals(NotificationType.CUSTOMERIO)) {
- if (!notificationClient.notifyConnectionDisabled(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription, logUrl)) {
- LOGGER.warn("Failed to successfully notify auto-disable connection warning: {}", notification);
- }
+
+ switch (action) {
+ case FAILURE_NOTIFICATION:
+ if (!notificationClient.notifyJobFailure(sourceConnector, destinationConnector, jobDescription, logUrl)) {
+ LOGGER.warn("Failed to successfully notify failure: {}", notification);
+ }
+ break;
+ case SUCCESS_NOTIFICATION:
+ if (!notificationClient.notifyJobSuccess(sourceConnector, destinationConnector, jobDescription, logUrl)) {
+ LOGGER.warn("Failed to successfully notify success: {}", notification);
+ }
+ break;
+ case CONNECTION_DISABLED_NOTIFICATION:
+ if (notification.getNotificationType().equals(NotificationType.CUSTOMERIO)
+ && !notificationClient.notifyConnectionDisabled(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription,
+ logUrl)) {
+ LOGGER.warn("Failed to successfully notify auto-disable connection: {}", notification);
+ }
+ break;
+ case CONNECTION_DISABLED_WARNING_NOTIFICATION:
+ if (notification.getNotificationType().equals(NotificationType.CUSTOMERIO)
+ && !notificationClient.notifyConnectionDisableWarning(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription,
+ logUrl)) {
+ LOGGER.warn("Failed to successfully notify auto-disable connection warning: {}", notification);
+ }
}
} catch (final Exception e) {
LOGGER.error("Failed to notify: {} due to an exception", notification, e);
@@ -154,6 +153,62 @@ public void notifyJobByEmail(final String reason, final String action, final Job
}
}
+ // This method allows for the alert to be sent without the customerio configuration set in the
+ // database
+ // This is only needed because there is no UI element to allow for users to create that
+ // configuration.
+ // Once that exists, this can be removed and we should be using `notifyJobByEmail`.
+ // The alert is sent to the email associated with the workspace.
+ public void autoDisableConnectionAlertWithoutCustomerioConfig(final String action, final Job job) {
+ try {
+ final UUID workspaceId = workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(job.getId());
+ final StandardWorkspace workspace = configRepository.getStandardWorkspace(workspaceId, true);
+
+ final Notification customerioNotification = new Notification()
+ .withNotificationType(NotificationType.CUSTOMERIO)
+ .withCustomerioConfiguration(new CustomerioNotificationConfiguration());
+ final NotificationClient notificationClient = getNotificationClient(customerioNotification);
+
+ final UUID connectionId = UUID.fromString(job.getScope());
+ final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
+ final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
+ final String sourceConnector = String.format("%s version %s", sourceDefinition.getName(), sourceDefinition.getDockerImageTag());
+ final String destinationConnector = String.format("%s version %s", destinationDefinition.getName(), destinationDefinition.getDockerImageTag());
+ final String logUrl = connectionPageUrl + connectionId;
+ final String jobDescription = getJobDescription(job, "");
+
+ switch (action) {
+ case CONNECTION_DISABLED_NOTIFICATION:
+ if (!notificationClient.notifyConnectionDisabled(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription, logUrl)) {
+ LOGGER.warn("Failed to successfully notify auto-disable connection: {}", customerioNotification);
+ }
+ break;
+ case CONNECTION_DISABLED_WARNING_NOTIFICATION:
+ if (!notificationClient.notifyConnectionDisableWarning(workspace.getEmail(), sourceConnector, destinationConnector, jobDescription,
+ logUrl)) {
+ LOGGER.warn("Failed to successfully notify auto-disable connection warning: {}", customerioNotification);
+ }
+ break;
+ default:
+ LOGGER.error(
+ "Incorrect action supplied, this method only supports Connection Disabled Notification and Connection Disabled Warning Notification.");
+ }
+ } catch (final Exception e) {
+ LOGGER.error("Unable to send auto disable alert:", e);
+ }
+ }
+
+ private String getJobDescription(final Job job, final String reason) {
+ final Instant jobStartedDate = Instant.ofEpochSecond(job.getStartedAtInSecond().orElse(job.getCreatedAtInSecond()));
+ final DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.FULL).withZone(ZoneId.systemDefault());
+ final Instant jobUpdatedDate = Instant.ofEpochSecond(job.getUpdatedAtInSecond());
+ final Instant adjustedJobUpdatedDate = jobUpdatedDate.equals(jobStartedDate) ? Instant.now() : jobUpdatedDate;
+ final Duration duration = Duration.between(jobStartedDate, adjustedJobUpdatedDate);
+ final String durationString = DurationFormatUtils.formatDurationWords(duration.toMillis(), true, true);
+
+ return String.format("sync started on %s, running for %s%s.", formatter.format(jobStartedDate), durationString, reason);
+ }
+
public void failJob(final String reason, final Job job) {
notifyJob(reason, FAILURE_NOTIFICATION, job);
}
diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java
index 69ba6ab67a13..bb2eb6863afa 100644
--- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java
+++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java
@@ -12,6 +12,7 @@
import io.airbyte.scheduler.models.AttemptWithJobInfo;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
+import io.airbyte.scheduler.models.JobWithStatusAndTimestamp;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
@@ -164,11 +165,14 @@ public interface JobPersistence {
* @param connectionId The ID of the connection
* @param configTypes The types of jobs
* @param jobCreatedAtTimestamp The timestamp after which you want the jobs
- * @return List of job statuses from a specific connection that have attempts after the provided
- * timestamp, sorted by jobs' createAt in descending order
+ * @return List of jobs that only include information regarding id, status, timestamps from a
+ * specific connection that have attempts after the provided timestamp, sorted by jobs'
+ * createAt in descending order
* @throws IOException
*/
- List listJobStatusWithConnection(UUID connectionId, Set configTypes, Instant jobCreatedAtTimestamp)
+ List listJobStatusAndTimestampWithConnection(UUID connectionId,
+ Set configTypes,
+ Instant jobCreatedAtTimestamp)
throws IOException;
Optional getLastReplicationJob(UUID connectionId) throws IOException;
diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java
index 49395c1df973..4b37f42cb240 100644
--- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java
+++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java
@@ -37,6 +37,7 @@
import io.airbyte.scheduler.models.AttemptWithJobInfo;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
+import io.airbyte.scheduler.models.JobWithStatusAndTimestamp;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
@@ -1355,11 +1356,11 @@ private Job addStateToJob(final Job job) throws IOException, SQLException {
}
@Nested
- @DisplayName("When listing job statuses with specified connection id and timestamp")
- class ListJobStatusWithConnection {
+ @DisplayName("When listing job statuses and timestamps with specified connection id and timestamp")
+ class ListJobStatusAndTimestampWithConnection {
@Test
- @DisplayName("Should list only job statuses of specified connection id")
+ @DisplayName("Should list only job statuses and timestamps of specified connection id")
public void testConnectionIdFiltering() throws IOException {
jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, DEFAULT_MINIMUM_AGE_IN_DAYS, DEFAULT_EXCESSIVE_NUMBER_OF_JOBS,
DEFAULT_MINIMUM_RECENCY_COUNT);
@@ -1369,7 +1370,7 @@ public void testConnectionIdFiltering() throws IOException {
final long wrongConnectionSyncJobId = jobPersistence.enqueueJob(UUID.randomUUID().toString(), SYNC_JOB_CONFIG).orElseThrow();
final int wrongSyncJobAttemptNumber0 = jobPersistence.createAttempt(wrongConnectionSyncJobId, LOG_PATH);
jobPersistence.failAttempt(wrongConnectionSyncJobId, wrongSyncJobAttemptNumber0);
- assertEquals(0, jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH).size());
+ assertEquals(0, jobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH).size());
// create a connection with relevant connection id
final long syncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
@@ -1377,9 +1378,10 @@ public void testConnectionIdFiltering() throws IOException {
jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber0);
// check to see current status of only relevantly scoped job
- final List jobStatuses = jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
- assertEquals(jobStatuses.size(), 1);
- assertEquals(JobStatus.INCOMPLETE, jobStatuses.get(0));
+ final List jobs =
+ jobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
+ assertEquals(jobs.size(), 1);
+ assertEquals(JobStatus.INCOMPLETE, jobs.get(0).getStatus());
}
@Test
@@ -1395,9 +1397,10 @@ public void testTimestampFiltering() throws IOException {
jobPersistence.failJob(syncJobId);
// Check to see current status of all jobs from beginning of time, expecting only 1 job
- final List jobStatuses = jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
- assertEquals(jobStatuses.size(), 1);
- assertEquals(JobStatus.FAILED, jobStatuses.get(0));
+ final List initialJobs =
+ jobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
+ assertEquals(initialJobs.size(), 1);
+ assertEquals(JobStatus.FAILED, initialJobs.get(0).getStatus());
// Edit time supplier to return later time
final Instant timeAfterFirstJob = NOW.plusSeconds(60);
@@ -1410,22 +1413,25 @@ public void testTimestampFiltering() throws IOException {
// Check to see current status of all jobs from beginning of time, expecting both jobs in createAt
// descending order (most recent first)
- final List allQueryJobStatuses =
- jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
- assertEquals(2, allQueryJobStatuses.size());
- assertEquals(JobStatus.SUCCEEDED, allQueryJobStatuses.get(0));
- assertEquals(JobStatus.FAILED, allQueryJobStatuses.get(1));
+ final List allQueryJobs =
+ jobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
+ assertEquals(2, allQueryJobs.size());
+ assertEquals(JobStatus.SUCCEEDED, allQueryJobs.get(0).getStatus());
+ assertEquals(JobStatus.FAILED, allQueryJobs.get(1).getStatus());
// Look up jobs with a timestamp after the first job. Expecting only the second job status
- final List timestampFilteredJobStatuses =
- jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), timeAfterFirstJob);
- assertEquals(1, timestampFilteredJobStatuses.size());
- assertEquals(JobStatus.SUCCEEDED, timestampFilteredJobStatuses.get(0));
+ final List timestampFilteredJobs =
+ jobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), timeAfterFirstJob);
+ assertEquals(1, timestampFilteredJobs.size());
+ assertEquals(JobStatus.SUCCEEDED, timestampFilteredJobs.get(0).getStatus());
+ assertTrue(timeAfterFirstJob.getEpochSecond() <= timestampFilteredJobs.get(0).getCreatedAtInSecond());
+ assertTrue(timeAfterFirstJob.getEpochSecond() <= timestampFilteredJobs.get(0).getUpdatedAtInSecond());
// Check to see if timestamp filtering is working by only looking up jobs with timestamp after
// second job. Expecting no job status output
final Instant timeAfterSecondJob = timeAfterFirstJob.plusSeconds(60);
- assertEquals(0, jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), timeAfterSecondJob).size());
+ assertEquals(0,
+ jobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), timeAfterSecondJob).size());
}
@Test
@@ -1453,12 +1459,12 @@ public void testMultipleJobStatusTypes() throws IOException {
// Check to see current status of all jobs from beginning of time, expecting all jobs in createAt
// descending order (most recent first)
- final List allJobStatuses =
- jobPersistence.listJobStatusWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
- assertEquals(3, allJobStatuses.size());
- assertEquals(JobStatus.CANCELLED, allJobStatuses.get(0));
- assertEquals(JobStatus.SUCCEEDED, allJobStatuses.get(1));
- assertEquals(JobStatus.FAILED, allJobStatuses.get(2));
+ final List allJobs =
+ jobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, Sets.newHashSet(ConfigType.SYNC), Instant.EPOCH);
+ assertEquals(3, allJobs.size());
+ assertEquals(JobStatus.CANCELLED, allJobs.get(0).getStatus());
+ assertEquals(JobStatus.SUCCEEDED, allJobs.get(1).getStatus());
+ assertEquals(JobStatus.FAILED, allJobs.get(2).getStatus());
}
@Test
@@ -1482,10 +1488,11 @@ public void testMultipleConfigTypes() throws IOException {
jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
// expect order to be from most recent to least recent
- final List allJobStatuses = jobPersistence.listJobStatusWithConnection(CONNECTION_ID, configTypes, Instant.EPOCH);
- assertEquals(2, allJobStatuses.size());
- assertEquals(JobStatus.INCOMPLETE, allJobStatuses.get(0));
- assertEquals(JobStatus.FAILED, allJobStatuses.get(1));
+ final List allJobs =
+ jobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, configTypes, Instant.EPOCH);
+ assertEquals(2, allJobs.size());
+ assertEquals(JobStatus.INCOMPLETE, allJobs.get(0).getStatus());
+ assertEquals(JobStatus.FAILED, allJobs.get(1).getStatus());
}
}
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
index 1da1cd3676de..ab077bf5d570 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
@@ -181,7 +181,7 @@ private void registerConnectionManager(final WorkerFactory factory) {
jobCreator),
new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()),
new ConnectionDeletionActivityImpl(connectionHelper),
- new AutoDisableConnectionActivityImpl(configRepository, jobPersistence, featureFlags, configs));
+ new AutoDisableConnectionActivityImpl(configRepository, jobPersistence, featureFlags, configs, jobNotifier));
}
private void registerSync(final WorkerFactory factory) {
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java
index c166f29f5b43..96b31fdc2e1b 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java
@@ -13,6 +13,7 @@
import io.airbyte.workers.temporal.exception.RetryableException;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionActivityInput;
+import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionOutput;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverInput;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput;
@@ -266,7 +267,11 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
if (autoDisableConnectionVersion != Workflow.DEFAULT_VERSION) {
final AutoDisableConnectionActivityInput autoDisableConnectionActivityInput =
new AutoDisableConnectionActivityInput(connectionId, Instant.ofEpochMilli(Workflow.currentTimeMillis()));
- runMandatoryActivity(autoDisableConnectionActivity::autoDisableFailingConnection, autoDisableConnectionActivityInput);
+ final AutoDisableConnectionOutput output = runMandatoryActivityWithOutput(
+ autoDisableConnectionActivity::autoDisableFailingConnection, autoDisableConnectionActivityInput);
+ if (output.isDisabled()) {
+ log.info("Auto-disabled for constantly failing for Connection {}", connectionId);
+ }
}
resetNewConnectionInput(connectionUpdaterInput);
diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java
index f78e04995c3a..101e9b12e451 100644
--- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java
+++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java
@@ -5,19 +5,27 @@
package io.airbyte.workers.temporal.scheduling.activities;
import static io.airbyte.scheduler.models.Job.REPLICATION_TYPES;
+import static io.airbyte.scheduler.persistence.JobNotifier.CONNECTION_DISABLED_NOTIFICATION;
+import static io.airbyte.scheduler.persistence.JobNotifier.CONNECTION_DISABLED_WARNING_NOTIFICATION;
+import static java.time.temporal.ChronoUnit.DAYS;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.Configs;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
+import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
+import io.airbyte.scheduler.models.JobWithStatusAndTimestamp;
+import io.airbyte.scheduler.persistence.JobNotifier;
import io.airbyte.scheduler.persistence.JobPersistence;
+import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.exception.RetryableException;
-import java.time.temporal.ChronoUnit;
+import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
@@ -28,6 +36,7 @@ public class AutoDisableConnectionActivityImpl implements AutoDisableConnectionA
private JobPersistence jobPersistence;
private FeatureFlags featureFlags;
private Configs configs;
+ private JobNotifier jobNotifier;
// Given a connection id and current timestamp, this activity will set a connection to INACTIVE if
// either:
@@ -35,56 +44,136 @@ public class AutoDisableConnectionActivityImpl implements AutoDisableConnectionA
// limit
// - all the jobs in the past `configs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()` days are
// failures, and that the connection's first job is at least that many days old
+ // Notifications will be sent if a connection is disabled or warned if it has reached halfway to
+ // disable limits
@Override
public AutoDisableConnectionOutput autoDisableFailingConnection(final AutoDisableConnectionActivityInput input) {
if (featureFlags.autoDisablesFailingConnections()) {
try {
final int maxDaysOfOnlyFailedJobs = configs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable();
- final List jobStatuses = jobPersistence.listJobStatusWithConnection(input.getConnectionId(), REPLICATION_TYPES,
- input.getCurrTimestamp().minus(maxDaysOfOnlyFailedJobs, ChronoUnit.DAYS));
+ final int maxDaysOfOnlyFailedJobsBeforeWarning = maxDaysOfOnlyFailedJobs / 2;
+ final int maxFailedJobsInARowBeforeConnectionDisableWarning = configs.getMaxFailedJobsInARowBeforeConnectionDisable() / 2;
+ final long currTimestampInSeconds = input.getCurrTimestamp().getEpochSecond();
+ final Job lastJob = jobPersistence.getLastReplicationJob(input.getConnectionId())
+ .orElseThrow(() -> new Exception("Auto-Disable Connection should not have been attempted if can't get latest replication job."));
+
+ final List jobs = jobPersistence.listJobStatusAndTimestampWithConnection(input.getConnectionId(),
+ REPLICATION_TYPES, input.getCurrTimestamp().minus(maxDaysOfOnlyFailedJobs, DAYS));
int numFailures = 0;
+ Optional successTimestamp = Optional.empty();
- // jobs are sorted from most recent to least recent
- for (final JobStatus jobStatus : jobStatuses) {
+ for (final JobWithStatusAndTimestamp job : jobs) {
+ final JobStatus jobStatus = job.getStatus();
if (jobStatus == JobStatus.FAILED) {
numFailures++;
- if (numFailures == configs.getMaxFailedJobsInARowBeforeConnectionDisable())
- break;
} else if (jobStatus == JobStatus.SUCCEEDED) {
- return new AutoDisableConnectionOutput(false);
+ successTimestamp = Optional.of(job.getUpdatedAtInSecond());
+ break;
}
}
- // if the jobs in the last 14 days don't include any succeeded or failed jobs (e.g. only cancelled
- // jobs), do not auto-disable
if (numFailures == 0) {
return new AutoDisableConnectionOutput(false);
+ } else if (numFailures == configs.getMaxFailedJobsInARowBeforeConnectionDisable()) {
+ // disable connection if max consecutive failed jobs limit has been hit
+ disableConnection(input.getConnectionId(), lastJob);
+ return new AutoDisableConnectionOutput(true);
+ } else if (numFailures == maxFailedJobsInARowBeforeConnectionDisableWarning) {
+ // warn if number of consecutive failures hits 50% of MaxFailedJobsInARow
+ jobNotifier.autoDisableConnectionAlertWithoutCustomerioConfig(CONNECTION_DISABLED_WARNING_NOTIFICATION, lastJob);
+ return new AutoDisableConnectionOutput(false);
}
- // if the very first job of a connection fails, it will hit the condition of "only failed jobs in
- // the past `maxDaysOfOnlyFailedJobs` days", to avoid this behavior, we ensure that this condition
- // is only taken into account if the connection has a job that's at least `maxDaysOfOnlyFailedJobs`
- // days old
- if (numFailures != configs.getMaxFailedJobsInARowBeforeConnectionDisable()) {
- final Optional optionalFirstJob = jobPersistence.getFirstReplicationJob(input.getConnectionId());
- if (optionalFirstJob.isPresent()) {
- final long timeBetweenCurrTimestampAndFirstJob = input.getCurrTimestamp().getEpochSecond()
- - optionalFirstJob.get().getCreatedAtInSecond();
- if (timeBetweenCurrTimestampAndFirstJob <= TimeUnit.DAYS.toSeconds(maxDaysOfOnlyFailedJobs)) {
- return new AutoDisableConnectionOutput(false);
- }
- }
+ // calculate the number of days this connection first tried a replication job, used to ensure not to
+ // disable or warn for `maxDaysOfOnlyFailedJobs` if the first job is younger than
+ // `maxDaysOfOnlyFailedJobs` days, This avoids cases such as "the very first job run was a failure".
+ final Job firstJob = jobPersistence.getFirstReplicationJob(input.getConnectionId())
+ .orElseThrow(() -> new Exception("Auto-Disable Connection should not have been attempted if no replication job has been run."));
+ final int numDaysSinceFirstReplicationJob = getDaysSinceTimestamp(currTimestampInSeconds, firstJob.getCreatedAtInSecond());
+ final boolean firstReplicationOlderThanMaxDisableDays = numDaysSinceFirstReplicationJob >= maxDaysOfOnlyFailedJobs;
+ final boolean noPreviousSuccess = successTimestamp.isEmpty();
+
+ // disable connection if only failed jobs in the past maxDaysOfOnlyFailedJobs days
+ if (firstReplicationOlderThanMaxDisableDays && noPreviousSuccess) {
+ disableConnection(input.getConnectionId(), lastJob);
+ return new AutoDisableConnectionOutput(true);
+ }
+
+ final boolean firstReplicationOlderThanMaxDisableWarningDays = numDaysSinceFirstReplicationJob >= maxDaysOfOnlyFailedJobsBeforeWarning;
+ final boolean successOlderThanPrevFailureByMaxWarningDays = // set to true if no previous success is found
+ noPreviousSuccess || getDaysSinceTimestamp(currTimestampInSeconds, successTimestamp.get()) >= maxDaysOfOnlyFailedJobsBeforeWarning;
+
+ // send warning if there are only failed jobs in the past maxDaysOfOnlyFailedJobsBeforeWarning days
+ // _unless_ a warning should have already been sent in the previous failure
+ if (firstReplicationOlderThanMaxDisableWarningDays && successOlderThanPrevFailureByMaxWarningDays) {
+ sendWarningIfNotPreviouslySent(successTimestamp, maxDaysOfOnlyFailedJobsBeforeWarning, firstJob, lastJob, jobs, numFailures);
}
- final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId());
- standardSync.setStatus(Status.INACTIVE);
- configRepository.writeStandardSync(standardSync);
} catch (final Exception e) {
throw new RetryableException(e);
}
}
- return new AutoDisableConnectionOutput(true);
+ return new AutoDisableConnectionOutput(false);
+ }
+
+ private void sendWarningIfNotPreviouslySent(final Optional successTimestamp,
+ final int maxDaysOfOnlyFailedJobsBeforeWarning,
+ final Job firstJob,
+ final Job lastJob,
+ final List jobs,
+ final int numFailures) {
+ if (numFailures > 1 && checkIfWarningPreviouslySent(successTimestamp, maxDaysOfOnlyFailedJobsBeforeWarning, firstJob, jobs)) {
+ return;
+ }
+ jobNotifier.autoDisableConnectionAlertWithoutCustomerioConfig(CONNECTION_DISABLED_WARNING_NOTIFICATION, lastJob);
+ }
+
+ // Checks to see if warning should have been sent in the previous failure, if so skip sending of
+ // warning to avoid spam
+ // Assume warning has been sent if either of the following is true:
+ // 1. no success found in the time span and the previous failure occurred
+ // maxDaysOfOnlyFailedJobsBeforeWarning days after the first job
+ // 2. success found and the previous failure occurred maxDaysOfOnlyFailedJobsBeforeWarning days
+ // after that success
+ private boolean checkIfWarningPreviouslySent(final Optional successTimestamp,
+ final int maxDaysOfOnlyFailedJobsBeforeWarning,
+ final Job firstJob,
+ final List jobs) {
+ if (jobs.size() <= 1)
+ return false;
+
+ // get previous failed job (skipping first job since that's considered "current" job)
+ JobWithStatusAndTimestamp prevFailedJob = jobs.get(1);
+ for (int i = 2; i < jobs.size(); i++) {
+ if (prevFailedJob.getStatus() == JobStatus.FAILED)
+ break;
+ prevFailedJob = jobs.get(i);
+ }
+
+ final boolean successExists = successTimestamp.isPresent();
+ boolean successOlderThanPrevFailureByMaxWarningDays = false;
+ if (successExists) {
+ successOlderThanPrevFailureByMaxWarningDays =
+ getDaysSinceTimestamp(prevFailedJob.getUpdatedAtInSecond(), successTimestamp.get()) >= maxDaysOfOnlyFailedJobsBeforeWarning;
+ }
+ final boolean prevFailureOlderThanFirstJobByMaxWarningDays =
+ getDaysSinceTimestamp(prevFailedJob.getUpdatedAtInSecond(), firstJob.getUpdatedAtInSecond()) >= maxDaysOfOnlyFailedJobsBeforeWarning;
+
+ return (successExists && successOlderThanPrevFailureByMaxWarningDays)
+ || (!successExists && prevFailureOlderThanFirstJobByMaxWarningDays);
+ }
+
+ private int getDaysSinceTimestamp(final long currentTimestampInSeconds, final long timestampInSeconds) {
+ return Math.toIntExact(TimeUnit.SECONDS.toDays(currentTimestampInSeconds - timestampInSeconds));
+ }
+
+ private void disableConnection(final UUID connectionId, final Job lastJob) throws JsonValidationException, IOException, ConfigNotFoundException {
+ final StandardSync standardSync = configRepository.getStandardSync(connectionId);
+ standardSync.setStatus(Status.INACTIVE);
+ configRepository.writeStandardSync(standardSync);
+
+ jobNotifier.autoDisableConnectionAlertWithoutCustomerioConfig(CONNECTION_DISABLED_NOTIFICATION, lastJob);
}
}
diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java
index 4d74a84f4a5c..9aadf0279269 100644
--- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java
+++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java
@@ -14,6 +14,7 @@
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionActivityInput;
+import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionOutput;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.GetMaxAttemptOutput;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput;
@@ -136,6 +137,9 @@ public void setUp() {
new IntegrationLauncherConfig(),
new IntegrationLauncherConfig(),
new StandardSyncInput()));
+
+ Mockito.when(mAutoDisableConnectionActivity.autoDisableFailingConnection(Mockito.any()))
+ .thenReturn(new AutoDisableConnectionOutput(false));
}
@AfterEach
diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityTest.java
index 7513eb391683..287f4694c796 100644
--- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityTest.java
+++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityTest.java
@@ -7,6 +7,10 @@
import static io.airbyte.config.EnvConfigs.DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE;
import static io.airbyte.config.EnvConfigs.DEFAULT_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE;
import static io.airbyte.scheduler.models.Job.REPLICATION_TYPES;
+import static io.airbyte.scheduler.persistence.JobNotifier.CONNECTION_DISABLED_NOTIFICATION;
+import static io.airbyte.scheduler.persistence.JobNotifier.CONNECTION_DISABLED_WARNING_NOTIFICATION;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.Configs;
@@ -16,6 +20,8 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
+import io.airbyte.scheduler.models.JobWithStatusAndTimestamp;
+import io.airbyte.scheduler.persistence.JobNotifier;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionActivityInput;
@@ -28,7 +34,7 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
-import org.assertj.core.api.Assertions;
+import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
@@ -53,6 +59,9 @@ class AutoDisableConnectionActivityTest {
@Mock
private Configs mConfigs;
+ @Mock
+ private JobNotifier mJobNotifier;
+
@Mock
private Job mJob;
@@ -63,115 +72,193 @@ class AutoDisableConnectionActivityTest {
private static final Instant CURR_INSTANT = Instant.now();
private static final AutoDisableConnectionActivityInput ACTIVITY_INPUT = new AutoDisableConnectionActivityInput(CONNECTION_ID, CURR_INSTANT);
private static final int MAX_FAILURE_JOBS_IN_A_ROW = DEFAULT_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE;
+ private static final int MAX_DAYS_OF_ONLY_FAILED_JOBS = DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE;
+ private static final int MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_WARNING = DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE / 2;
+
+ private static final JobWithStatusAndTimestamp FAILED_JOB =
+ new JobWithStatusAndTimestamp(1, JobStatus.FAILED, CURR_INSTANT.getEpochSecond(), CURR_INSTANT.getEpochSecond());
+ private static final JobWithStatusAndTimestamp SUCCEEDED_JOB =
+ new JobWithStatusAndTimestamp(1, JobStatus.SUCCEEDED, CURR_INSTANT.getEpochSecond(), CURR_INSTANT.getEpochSecond());
+ private static final JobWithStatusAndTimestamp CANCELLED_JOB =
+ new JobWithStatusAndTimestamp(1, JobStatus.CANCELLED, CURR_INSTANT.getEpochSecond(), CURR_INSTANT.getEpochSecond());
private final StandardSync standardSync = new StandardSync();
@BeforeEach
- void setUp() {
+ void setUp() throws IOException {
standardSync.setStatus(Status.ACTIVE);
Mockito.when(mFeatureFlags.autoDisablesFailingConnections()).thenReturn(true);
- Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE);
+ Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(MAX_DAYS_OF_ONLY_FAILED_JOBS);
+ Mockito.when(mJobPersistence.getLastReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
+ }
+
+ // test warnings
+
+ @Test
+ @DisplayName("Test that a notification warning is sent for connections that have failed `MAX_FAILURE_JOBS_IN_A_ROW / 2` times")
+ public void testWarningNotificationsForAutoDisablingMaxNumFailures() throws IOException {
+ // from most recent to least recent: MAX_FAILURE_JOBS_IN_A_ROW/2 and 1 success
+ final List jobs = new ArrayList<>(Collections.nCopies(MAX_FAILURE_JOBS_IN_A_ROW / 2, FAILED_JOB));
+ jobs.add(SUCCEEDED_JOB);
+
+ Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
+ Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
+ CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS))).thenReturn(jobs);
+
+ final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
+ assertThat(output.isDisabled()).isFalse();
+ assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+ Mockito.verify(mJobNotifier, Mockito.only()).autoDisableConnectionAlertWithoutCustomerioConfig(eq(CONNECTION_DISABLED_WARNING_NOTIFICATION),
+ Mockito.any());
+ }
+
+ @Test
+ @DisplayName("Test that a notification warning is sent after only failed jobs in last `MAX_DAYS_OF_STRAIGHT_FAILURE / 2` days")
+ public void testWarningNotificationsForAutoDisablingMaxDaysOfFailure() throws IOException {
+ Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(MAX_DAYS_OF_ONLY_FAILED_JOBS);
+ Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
+ CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS)))
+ .thenReturn(Collections.singletonList(FAILED_JOB));
+
+ Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
+ Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
+ Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(
+ CURR_INSTANT.getEpochSecond() - TimeUnit.DAYS.toSeconds(MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_WARNING));
+
+ final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
+ assertThat(output.isDisabled()).isFalse();
+ assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+ Mockito.verify(mJobNotifier, Mockito.only()).autoDisableConnectionAlertWithoutCustomerioConfig(eq(CONNECTION_DISABLED_WARNING_NOTIFICATION),
+ Mockito.any());
+ }
+
+ @Test
+ @DisplayName("Test that a notification warning is not sent after one was just sent for failing multiple days")
+ public void testWarningNotificationsDoesNotSpam() throws IOException {
+ final List jobs = new ArrayList<>(Collections.nCopies(2, FAILED_JOB));
+ final long mJobCreateOrUpdatedInSeconds = CURR_INSTANT.getEpochSecond() - TimeUnit.DAYS.toSeconds(MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_WARNING);
+
+ Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(MAX_DAYS_OF_ONLY_FAILED_JOBS);
+ Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
+ CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS))).thenReturn(jobs);
+
+ Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
+ Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
+ Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(mJobCreateOrUpdatedInSeconds);
+ Mockito.when(mJob.getUpdatedAtInSecond()).thenReturn(mJobCreateOrUpdatedInSeconds);
+
+ final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
+ assertThat(output.isDisabled()).isFalse();
+ assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+ Mockito.verify(mJobNotifier, Mockito.never()).autoDisableConnectionAlertWithoutCustomerioConfig(Mockito.anyString(),
+ Mockito.any());
}
+ @Test
+ @DisplayName("Test that the connection is _not_ disabled and no warning is sent after only failed jobs and oldest job is less than `MAX_DAYS_OF_STRAIGHT_FAILURE / 2 `days old")
+ public void testOnlyFailuresButFirstJobYoungerThanMaxDaysWarning() throws IOException {
+ Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(MAX_DAYS_OF_ONLY_FAILED_JOBS);
+ Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
+ CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS)))
+ .thenReturn(Collections.singletonList(FAILED_JOB));
+
+ Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
+ Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
+ Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(CURR_INSTANT.getEpochSecond());
+
+ final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
+ assertThat(output.isDisabled()).isFalse();
+ assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+ Mockito.verify(mJobNotifier, Mockito.never()).autoDisableConnectionAlertWithoutCustomerioConfig(Mockito.anyString(), Mockito.any());
+ }
+
+ // test should disable / shouldn't disable cases
+
@Test
@DisplayName("Test that the connection is disabled after MAX_FAILURE_JOBS_IN_A_ROW straight failures")
public void testMaxFailuresInARow() throws IOException, JsonValidationException, ConfigNotFoundException {
// from most recent to least recent: MAX_FAILURE_JOBS_IN_A_ROW and 1 success
- final List jobStatuses = new ArrayList<>(Collections.nCopies(MAX_FAILURE_JOBS_IN_A_ROW, JobStatus.FAILED));
- jobStatuses.add(JobStatus.SUCCEEDED);
+ final List jobs = new ArrayList<>(Collections.nCopies(MAX_FAILURE_JOBS_IN_A_ROW, FAILED_JOB));
+ jobs.add(SUCCEEDED_JOB);
- Mockito.when(mJobPersistence.listJobStatusWithConnection(CONNECTION_ID, REPLICATION_TYPES,
- CURR_INSTANT.minus(DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE, ChronoUnit.DAYS))).thenReturn(jobStatuses);
Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
+ Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
+ CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS))).thenReturn(jobs);
Mockito.when(mConfigRepository.getStandardSync(CONNECTION_ID)).thenReturn(standardSync);
final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
- Assertions.assertThat(output.isDisabled()).isTrue();
- Assertions.assertThat(standardSync.getStatus()).isEqualTo(Status.INACTIVE);
+ assertThat(output.isDisabled()).isTrue();
+ assertThat(standardSync.getStatus()).isEqualTo(Status.INACTIVE);
+ Mockito.verify(mJobNotifier, Mockito.only()).autoDisableConnectionAlertWithoutCustomerioConfig(eq(CONNECTION_DISABLED_NOTIFICATION),
+ Mockito.any());
}
@Test
@DisplayName("Test that the connection is _not_ disabled after MAX_FAILURE_JOBS_IN_A_ROW - 1 straight failures")
public void testLessThanMaxFailuresInARow() throws IOException {
// from most recent to least recent: MAX_FAILURE_JOBS_IN_A_ROW-1 and 1 success
- final List jobStatuses = new ArrayList<>(Collections.nCopies(MAX_FAILURE_JOBS_IN_A_ROW - 1, JobStatus.FAILED));
- jobStatuses.add(JobStatus.SUCCEEDED);
+ final List jobs = new ArrayList<>(Collections.nCopies(MAX_FAILURE_JOBS_IN_A_ROW - 1, FAILED_JOB));
+ jobs.add(SUCCEEDED_JOB);
- Mockito.when(mJobPersistence.listJobStatusWithConnection(CONNECTION_ID, REPLICATION_TYPES,
- CURR_INSTANT.minus(DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE, ChronoUnit.DAYS))).thenReturn(jobStatuses);
Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
+ Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
+ Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
+ CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS))).thenReturn(jobs);
+ Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(
+ CURR_INSTANT.getEpochSecond() - TimeUnit.DAYS.toSeconds(MAX_DAYS_OF_ONLY_FAILED_JOBS));
final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
- Assertions.assertThat(output.isDisabled()).isFalse();
- Assertions.assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+ assertThat(output.isDisabled()).isFalse();
+ assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+
+ // check that no notification has been sent
+ Mockito.verify(mJobNotifier, Mockito.never()).autoDisableConnectionAlertWithoutCustomerioConfig(Mockito.anyString(), Mockito.any());
}
@Test
@DisplayName("Test that the connection is _not_ disabled after 0 jobs in last MAX_DAYS_OF_STRAIGHT_FAILURE days")
public void testNoRuns() throws IOException {
- Mockito.when(mJobPersistence.listJobStatusWithConnection(CONNECTION_ID, REPLICATION_TYPES,
- CURR_INSTANT.minus(DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE, ChronoUnit.DAYS))).thenReturn(Collections.emptyList());
+ Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
+ CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS))).thenReturn(Collections.emptyList());
final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
- Assertions.assertThat(output.isDisabled()).isFalse();
- Assertions.assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+ assertThat(output.isDisabled()).isFalse();
+ assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+ Mockito.verify(mJobNotifier, Mockito.never()).autoDisableConnectionAlertWithoutCustomerioConfig(Mockito.anyString(), Mockito.any());
}
@Test
@DisplayName("Test that the connection is disabled after only failed jobs in last MAX_DAYS_OF_STRAIGHT_FAILURE days")
public void testOnlyFailuresInMaxDays() throws IOException, JsonValidationException, ConfigNotFoundException {
- final int maxDaysOfOnlyFailedJobsBeforeConnectionDisable = 1;
-
- Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(maxDaysOfOnlyFailedJobsBeforeConnectionDisable);
- Mockito.when(mJobPersistence.listJobStatusWithConnection(CONNECTION_ID, REPLICATION_TYPES,
- CURR_INSTANT.minus(maxDaysOfOnlyFailedJobsBeforeConnectionDisable, ChronoUnit.DAYS)))
- .thenReturn(Collections.singletonList(JobStatus.FAILED));
+ Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(MAX_DAYS_OF_ONLY_FAILED_JOBS);
+ Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
+ CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS)))
+ .thenReturn(Collections.singletonList(FAILED_JOB));
- Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
- // set first job created at to older than DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE
- // days
- Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(Instant.MIN.getEpochSecond());
-
- Mockito.when(mConfigRepository.getStandardSync(CONNECTION_ID)).thenReturn(standardSync);
Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
-
- final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
- Assertions.assertThat(output.isDisabled()).isTrue();
- Assertions.assertThat(standardSync.getStatus()).isEqualTo(Status.INACTIVE);
- }
-
- @Test
- @DisplayName("Test that the connection is _not_ disabled after only failed jobs and oldest job is less than MAX_DAYS_OF_STRAIGHT_FAILURE days old")
- public void testOnlyFailuresButFirstJobYoungerThanMaxDays() throws IOException, JsonValidationException, ConfigNotFoundException {
- final int maxDaysOfOnlyFailedJobsBeforeConnectionDisable = 1;
-
- Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(maxDaysOfOnlyFailedJobsBeforeConnectionDisable);
- Mockito.when(mJobPersistence.listJobStatusWithConnection(CONNECTION_ID, REPLICATION_TYPES,
- CURR_INSTANT.minus(maxDaysOfOnlyFailedJobsBeforeConnectionDisable, ChronoUnit.DAYS)))
- .thenReturn(Collections.singletonList(JobStatus.FAILED));
-
Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob));
- Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(CURR_INSTANT.getEpochSecond());
- Mockito.when(mConfigs.getMaxFailedJobsInARowBeforeConnectionDisable()).thenReturn(MAX_FAILURE_JOBS_IN_A_ROW);
+ Mockito.when(mJob.getCreatedAtInSecond()).thenReturn(
+ CURR_INSTANT.getEpochSecond() - TimeUnit.DAYS.toSeconds(MAX_DAYS_OF_ONLY_FAILED_JOBS));
+ Mockito.when(mConfigRepository.getStandardSync(CONNECTION_ID)).thenReturn(standardSync);
final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
- Assertions.assertThat(output.isDisabled()).isFalse();
- Assertions.assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+ assertThat(output.isDisabled()).isTrue();
+ assertThat(standardSync.getStatus()).isEqualTo(Status.INACTIVE);
+ Mockito.verify(mJobNotifier, Mockito.only()).autoDisableConnectionAlertWithoutCustomerioConfig(eq(CONNECTION_DISABLED_NOTIFICATION),
+ Mockito.any());
}
@Test
@DisplayName("Test that the connection is _not_ disabled after only cancelled jobs")
- public void testIgnoreOnlyCancelledRuns() throws IOException, JsonValidationException, ConfigNotFoundException {
- final int maxDaysOfOnlyFailedJobsBeforeConnectionDisable = 1;
-
- Mockito.when(mConfigs.getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable()).thenReturn(maxDaysOfOnlyFailedJobsBeforeConnectionDisable);
- Mockito.when(mJobPersistence.listJobStatusWithConnection(CONNECTION_ID, REPLICATION_TYPES,
- CURR_INSTANT.minus(maxDaysOfOnlyFailedJobsBeforeConnectionDisable, ChronoUnit.DAYS)))
- .thenReturn(Collections.singletonList(JobStatus.CANCELLED));
+ public void testIgnoreOnlyCancelledRuns() throws IOException {
+ Mockito.when(mJobPersistence.listJobStatusAndTimestampWithConnection(CONNECTION_ID, REPLICATION_TYPES,
+ CURR_INSTANT.minus(MAX_DAYS_OF_ONLY_FAILED_JOBS, ChronoUnit.DAYS)))
+ .thenReturn(Collections.singletonList(CANCELLED_JOB));
final AutoDisableConnectionOutput output = autoDisableActivity.autoDisableFailingConnection(ACTIVITY_INPUT);
- Assertions.assertThat(output.isDisabled()).isFalse();
- Assertions.assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+ assertThat(output.isDisabled()).isFalse();
+ assertThat(standardSync.getStatus()).isEqualTo(Status.ACTIVE);
+ Mockito.verify(mJobNotifier, Mockito.never()).autoDisableConnectionAlertWithoutCustomerioConfig(Mockito.anyString(), Mockito.any());
}
}