diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java index 59b93cb425f5..5e16ec715f52 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java @@ -25,6 +25,7 @@ import java.time.format.FormatStyle; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; @@ -79,9 +80,9 @@ private void notifyJob(final String reason, final String failReason = Strings.isNullOrEmpty(reason) ? "" : String.format(", as the %s", reason); final String jobDescription = getJobDescription(job, failReason); final String logUrl = webUrlHelper.getConnectionUrl(workspaceId, connectionId); - final ImmutableMap jobMetadata = TrackingMetadata.generateJobAttemptMetadata(job); - final ImmutableMap sourceMetadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition); - final ImmutableMap destinationMetadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition); + final Map jobMetadata = TrackingMetadata.generateJobAttemptMetadata(job); + final Map sourceMetadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition); + final Map destinationMetadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition); for (final Notification notification : notifications) { final NotificationClient notificationClient = getNotificationClient(notification); try { diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java index 2f5aeabec6a1..b139b8a87546 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplier.java @@ -10,7 +10,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableMap; import io.airbyte.analytics.TrackingClient; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; @@ -25,6 +24,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +55,7 @@ public JsonNode injectSourceOAuthParameters(final UUID sourceDefinitionId, final .ifPresent(sourceOAuthParameter -> { if (injectOAuthParameters(sourceDefinition.getName(), sourceDefinition.getSpec(), sourceOAuthParameter.getConfiguration(), sourceConnectorConfig)) { - final ImmutableMap metadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition); + final Map metadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition); Exceptions.swallow(() -> trackingClient.track(workspaceId, "OAuth Injection - Backend", metadata)); } }); @@ -75,7 +75,7 @@ public JsonNode injectDestinationOAuthParameters(final UUID destinationDefinitio .ifPresent(destinationOAuthParameter -> { if (injectOAuthParameters(destinationDefinition.getName(), destinationDefinition.getSpec(), destinationOAuthParameter.getConfiguration(), destinationConnectorConfig)) { - final ImmutableMap metadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition); + final Map metadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition); Exceptions.swallow(() -> trackingClient.track(workspaceId, "OAuth Injection - Backend", metadata)); } }); diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java index 83a977824ae7..0cfcfb2a617a 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java @@ -11,8 +11,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; import io.airbyte.analytics.TrackingClient; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; @@ -35,6 +33,7 @@ import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -84,10 +83,10 @@ public void trackCheckConnectionSource(final UUID jobId, final JobState jobState, final StandardCheckConnectionOutput output) { Exceptions.swallow(() -> { - final ImmutableMap checkConnMetadata = generateCheckConnectionMetadata(output); - final ImmutableMap jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.CHECK_CONNECTION_SOURCE); - final ImmutableMap sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinitionId); - final ImmutableMap stateMetadata = generateStateMetadata(jobState); + final Map checkConnMetadata = generateCheckConnectionMetadata(output); + final Map jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.CHECK_CONNECTION_SOURCE); + final Map sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinitionId); + final Map stateMetadata = generateStateMetadata(jobState); track(workspaceId, MoreMaps.merge(checkConnMetadata, jobMetadata, sourceDefMetadata, stateMetadata)); }); @@ -99,10 +98,10 @@ public void trackCheckConnectionDestination(final UUID jobId, final JobState jobState, final StandardCheckConnectionOutput output) { Exceptions.swallow(() -> { - final ImmutableMap checkConnMetadata = generateCheckConnectionMetadata(output); - final ImmutableMap jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.CHECK_CONNECTION_DESTINATION); - final ImmutableMap destinationDefinitionMetadata = generateDestinationDefinitionMetadata(destinationDefinitionId); - final ImmutableMap stateMetadata = generateStateMetadata(jobState); + final Map checkConnMetadata = generateCheckConnectionMetadata(output); + final Map jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.CHECK_CONNECTION_DESTINATION); + final Map destinationDefinitionMetadata = generateDestinationDefinitionMetadata(destinationDefinitionId); + final Map stateMetadata = generateStateMetadata(jobState); track(workspaceId, MoreMaps.merge(checkConnMetadata, jobMetadata, destinationDefinitionMetadata, stateMetadata)); }); @@ -110,9 +109,9 @@ public void trackCheckConnectionDestination(final UUID jobId, public void trackDiscover(final UUID jobId, final UUID sourceDefinitionId, final UUID workspaceId, final JobState jobState) { Exceptions.swallow(() -> { - final ImmutableMap jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.DISCOVER_SCHEMA); - final ImmutableMap sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinitionId); - final ImmutableMap stateMetadata = generateStateMetadata(jobState); + final Map jobMetadata = generateJobMetadata(jobId.toString(), ConfigType.DISCOVER_SCHEMA); + final Map sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinitionId); + final Map stateMetadata = generateStateMetadata(jobState); track(workspaceId, MoreMaps.merge(jobMetadata, sourceDefMetadata, stateMetadata)); }); @@ -127,14 +126,12 @@ public void trackSync(final Job job, final JobState jobState) { final long jobId = job.getId(); final UUID connectionId = UUID.fromString(job.getScope()); final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId); - final UUID sourceDefinitionId = sourceDefinition.getSourceDefinitionId(); final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId); - final UUID destinationDefinitionId = destinationDefinition.getDestinationDefinitionId(); final Map jobMetadata = generateJobMetadata(String.valueOf(jobId), configType, job.getAttemptsCount()); - final Map jobAttemptMetadata = generateJobAttemptMetadata(job.getId(), jobState); - final Map sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinitionId); - final Map destinationDefMetadata = generateDestinationDefinitionMetadata(destinationDefinitionId); + final Map jobAttemptMetadata = generateJobAttemptMetadata(jobId, jobState); + final Map sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinition); + final Map destinationDefMetadata = generateDestinationDefinitionMetadata(destinationDefinition); final Map syncMetadata = generateSyncMetadata(connectionId); final Map stateMetadata = generateStateMetadata(jobState); final Map syncConfigMetadata = generateSyncConfigMetadata( @@ -155,6 +152,38 @@ public void trackSync(final Job job, final JobState jobState) { }); } + public void trackSyncForInternalFailure(final Long jobId, + final UUID connectionId, + final Integer attempts, + final JobState jobState, + final Exception e) { + Exceptions.swallow(() -> { + final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId); + final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId); + + final Map jobMetadata = generateJobMetadata(String.valueOf(jobId), null, attempts); + final Map jobAttemptMetadata = generateJobAttemptMetadata(jobId, jobState); + final Map sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinition); + final Map destinationDefMetadata = generateDestinationDefinitionMetadata(destinationDefinition); + final Map syncMetadata = generateSyncMetadata(connectionId); + final Map stateMetadata = generateStateMetadata(jobState); + final Map generalMetadata = Map.of("connection_id", connectionId, "internal_error_cause", e.getMessage(), + "internal_error_type", e.getClass().getName()); + + final UUID workspaceId = workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(jobId); + + track(workspaceId, + MoreMaps.merge( + jobMetadata, + jobAttemptMetadata, + sourceDefMetadata, + destinationDefMetadata, + syncMetadata, + stateMetadata, + generalMetadata)); + }); + } + private Map generateSyncConfigMetadata(final JobConfig config, final JsonNode sourceConfigSchema, final JsonNode destinationConfigSchema) { @@ -293,8 +322,8 @@ private Map generateSyncMetadata(final UUID connectionId) throws return MoreMaps.merge(TrackingMetadata.generateSyncMetadata(standardSync), operationUsage, streamCountData); } - private static ImmutableMap generateStateMetadata(final JobState jobState) { - final Builder metadata = ImmutableMap.builder(); + private static Map generateStateMetadata(final JobState jobState) { + final Map metadata = new HashMap<>(); if (JobState.STARTED.equals(jobState)) { metadata.put("attempt_stage", "STARTED"); @@ -303,7 +332,7 @@ private static ImmutableMap generateStateMetadata(final JobState metadata.put("attempt_completion_status", jobState); } - return metadata.build(); + return Collections.unmodifiableMap(metadata); } /** @@ -312,46 +341,54 @@ private static ImmutableMap generateStateMetadata(final JobState * job with a failed check. Because of this, tracking just the job attempt status does not capture * the whole picture. The `check_connection_outcome` field tracks this. */ - private ImmutableMap generateCheckConnectionMetadata(final StandardCheckConnectionOutput output) { + private Map generateCheckConnectionMetadata(final StandardCheckConnectionOutput output) { if (output == null) { - return ImmutableMap.of(); + return Map.of(); } - final Builder metadata = ImmutableMap.builder(); - metadata.put("check_connection_outcome", output.getStatus().toString()); - return metadata.build(); + return Map.of("check_connection_outcome", output.getStatus().toString()); } - private ImmutableMap generateDestinationDefinitionMetadata(final UUID destinationDefinitionId) - throws ConfigNotFoundException, IOException, JsonValidationException { + private Map generateDestinationDefinitionMetadata(final UUID destinationDefinitionId) + throws JsonValidationException, ConfigNotFoundException, IOException { final StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(destinationDefinitionId); + return generateDestinationDefinitionMetadata(destinationDefinition); + } + + private Map generateDestinationDefinitionMetadata(final StandardDestinationDefinition destinationDefinition) { return TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition); } - private ImmutableMap generateSourceDefinitionMetadata(final UUID sourceDefinitionId) - throws ConfigNotFoundException, IOException, JsonValidationException { + private Map generateSourceDefinitionMetadata(final UUID sourceDefinitionId) + throws JsonValidationException, ConfigNotFoundException, IOException { final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId); + return generateSourceDefinitionMetadata(sourceDefinition); + } + + private Map generateSourceDefinitionMetadata(final StandardSourceDefinition sourceDefinition) { return TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition); } - private ImmutableMap generateJobMetadata(final String jobId, final ConfigType configType) { + private Map generateJobMetadata(final String jobId, final ConfigType configType) { return generateJobMetadata(jobId, configType, 0); } - private ImmutableMap generateJobMetadata(final String jobId, final ConfigType configType, final int attempt) { - final Builder metadata = ImmutableMap.builder(); - metadata.put("job_type", configType); + private Map generateJobMetadata(final String jobId, final ConfigType configType, final int attempt) { + final Map metadata = new HashMap<>(); + if (configType != null) { + metadata.put("job_type", configType); + } metadata.put("job_id", jobId); metadata.put("attempt_id", attempt); - return metadata.build(); + return Collections.unmodifiableMap(metadata); } - private ImmutableMap generateJobAttemptMetadata(final long jobId, final JobState jobState) throws IOException { + private Map generateJobAttemptMetadata(final long jobId, final JobState jobState) throws IOException { final Job job = jobPersistence.getJob(jobId); if (jobState != JobState.STARTED) { return TrackingMetadata.generateJobAttemptMetadata(job); } else { - return ImmutableMap.of(); + return Map.of(); } } @@ -362,7 +399,7 @@ private void track(final UUID workspaceId, final Map metadata) if (workspaceId != null) { final StandardWorkspace standardWorkspace = configRepository.getStandardWorkspace(workspaceId, true); if (standardWorkspace != null && standardWorkspace.getName() != null) { - final Map standardTrackingMetadata = ImmutableMap.of( + final Map standardTrackingMetadata = Map.of( "workspace_id", workspaceId, "workspace_name", standardWorkspace.getName()); diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java index bdb448db38ca..15ec10da0048 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java @@ -26,13 +26,14 @@ import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.util.Strings; public class TrackingMetadata { - public static ImmutableMap generateSyncMetadata(final StandardSync standardSync) { + public static Map generateSyncMetadata(final StandardSync standardSync) { final Builder metadata = ImmutableMap.builder(); metadata.put("connection_id", standardSync.getConnectionId()); @@ -75,7 +76,7 @@ public static ImmutableMap generateSyncMetadata(final StandardSy return metadata.build(); } - public static ImmutableMap generateDestinationDefinitionMetadata(final StandardDestinationDefinition destinationDefinition) { + public static Map generateDestinationDefinitionMetadata(final StandardDestinationDefinition destinationDefinition) { final Builder metadata = ImmutableMap.builder(); metadata.put("connector_destination", destinationDefinition.getName()); metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId()); @@ -87,7 +88,7 @@ public static ImmutableMap generateDestinationDefinitionMetadata return metadata.build(); } - public static ImmutableMap generateSourceDefinitionMetadata(final StandardSourceDefinition sourceDefinition) { + public static Map generateSourceDefinitionMetadata(final StandardSourceDefinition sourceDefinition) { final Builder metadata = ImmutableMap.builder(); metadata.put("connector_source", sourceDefinition.getName()); metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId()); @@ -99,7 +100,7 @@ public static ImmutableMap generateSourceDefinitionMetadata(fina return metadata.build(); } - public static ImmutableMap generateJobAttemptMetadata(final Job job) { + public static Map generateJobAttemptMetadata(final Job job) { final Builder metadata = ImmutableMap.builder(); if (job != null) { final List attempts = job.getAttempts(); diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java index 8f05b23d4ec4..4cb784585817 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java @@ -88,6 +88,12 @@ class JobTrackerTest { private static final String ATTEMPT_ID = "attempt_id"; private static final String METADATA = "metadata"; private static final String SOME = "some"; + private static final String ATTEMPT_STAGE_KEY = "attempt_stage"; + private static final String CONNECTOR_SOURCE_KEY = "connector_source"; + private static final String CONNECTOR_SOURCE_DEFINITION_ID_KEY = "connector_source_definition_id"; + private static final String CONNECTOR_SOURCE_DOCKER_REPOSITORY_KEY = "connector_source_docker_repository"; + private static final String CONNECTOR_SOURCE_VERSION_KEY = "connector_source_version"; + private static final String FREQUENCY_KEY = "frequency"; private static final long SYNC_START_TIME = 1000L; private static final long SYNC_END_TIME = 10000L; @@ -97,14 +103,14 @@ class JobTrackerTest { private static final long LONG_JOB_ID = 10L; // for sync the job id is a long not a uuid. private static final ImmutableMap STARTED_STATE_METADATA = ImmutableMap.builder() - .put("attempt_stage", "STARTED") + .put(ATTEMPT_STAGE_KEY, "STARTED") .build(); private static final ImmutableMap SUCCEEDED_STATE_METADATA = ImmutableMap.builder() - .put("attempt_stage", "ENDED") + .put(ATTEMPT_STAGE_KEY, "ENDED") .put("attempt_completion_status", JobState.SUCCEEDED) .build(); private static final ImmutableMap FAILED_STATE_METADATA = ImmutableMap.builder() - .put("attempt_stage", "ENDED") + .put(ATTEMPT_STAGE_KEY, "ENDED") .put("attempt_completion_status", JobState.FAILED) .build(); private static final ImmutableMap ATTEMPT_METADATA = ImmutableMap.builder() @@ -186,10 +192,10 @@ void testTrackCheckConnectionSource() throws ConfigNotFoundException, IOExceptio .put(JOB_TYPE, ConfigType.CHECK_CONNECTION_SOURCE) .put(JOB_ID_KEY, JOB_ID.toString()) .put(ATTEMPT_ID, 0) - .put("connector_source", SOURCE_DEF_NAME) - .put("connector_source_definition_id", UUID1) - .put("connector_source_docker_repository", CONNECTOR_REPOSITORY) - .put("connector_source_version", CONNECTOR_VERSION) + .put(CONNECTOR_SOURCE_KEY, SOURCE_DEF_NAME) + .put(CONNECTOR_SOURCE_DEFINITION_ID_KEY, UUID1) + .put(CONNECTOR_SOURCE_DOCKER_REPOSITORY_KEY, CONNECTOR_REPOSITORY) + .put(CONNECTOR_SOURCE_VERSION_KEY, CONNECTOR_VERSION) .build(); when(configRepository.getStandardSourceDefinition(UUID1)) @@ -246,10 +252,10 @@ void testTrackDiscover() throws ConfigNotFoundException, IOException, JsonValida .put(JOB_TYPE, ConfigType.DISCOVER_SCHEMA) .put(JOB_ID_KEY, JOB_ID.toString()) .put(ATTEMPT_ID, 0) - .put("connector_source", SOURCE_DEF_NAME) - .put("connector_source_definition_id", UUID1) - .put("connector_source_docker_repository", CONNECTOR_REPOSITORY) - .put("connector_source_version", CONNECTOR_VERSION) + .put(CONNECTOR_SOURCE_KEY, SOURCE_DEF_NAME) + .put(CONNECTOR_SOURCE_DEFINITION_ID_KEY, UUID1) + .put(CONNECTOR_SOURCE_DOCKER_REPOSITORY_KEY, CONNECTOR_REPOSITORY) + .put(CONNECTOR_SOURCE_VERSION_KEY, CONNECTOR_VERSION) .build(); when(configRepository.getStandardSourceDefinition(UUID1)) @@ -269,6 +275,78 @@ void testTrackSync() throws ConfigNotFoundException, IOException, JsonValidation testAsynchronous(ConfigType.SYNC, SYNC_CONFIG_METADATA); } + @Test + void testTrackSyncForInternalFailure() throws JsonValidationException, ConfigNotFoundException, IOException { + final Long jobId = 12345L; + final Integer attemptNumber = 2; + final JobState jobState = JobState.SUCCEEDED; + final Exception exception = new IOException("test"); + + when(workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(jobId)).thenReturn(WORKSPACE_ID); + when(configRepository.getStandardSync(CONNECTION_ID)) + .thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true).withCatalog(CATALOG)); + when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)) + .thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME)); + when(configRepository.getStandardSync(CONNECTION_ID)) + .thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(false).withCatalog(CATALOG) + .withSchedule(new Schedule().withUnits(1L).withTimeUnit(TimeUnit.MINUTES))); + when(configRepository.getSourceDefinitionFromConnection(CONNECTION_ID)) + .thenReturn(new StandardSourceDefinition() + .withSourceDefinitionId(UUID1) + .withName(SOURCE_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) + .withDockerImageTag(CONNECTOR_VERSION) + .withSpec(SOURCE_SPEC)); + when(configRepository.getDestinationDefinitionFromConnection(CONNECTION_ID)) + .thenReturn(new StandardDestinationDefinition() + .withDestinationDefinitionId(UUID2) + .withName(DESTINATION_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) + .withDockerImageTag(CONNECTOR_VERSION) + .withSpec(DESTINATION_SPEC)); + when(configRepository.getStandardSourceDefinition(UUID1)) + .thenReturn(new StandardSourceDefinition() + .withSourceDefinitionId(UUID1) + .withName(SOURCE_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) + .withDockerImageTag(CONNECTOR_VERSION) + .withSpec(SOURCE_SPEC)); + when(configRepository.getStandardDestinationDefinition(UUID2)) + .thenReturn(new StandardDestinationDefinition() + .withDestinationDefinitionId(UUID2) + .withName(DESTINATION_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) + .withDockerImageTag(CONNECTOR_VERSION) + .withSpec(DESTINATION_SPEC)); + + jobTracker.trackSyncForInternalFailure(jobId, CONNECTION_ID, attemptNumber, jobState, exception); + final Map metadata = new LinkedHashMap(); + metadata.put("namespace_definition", NamespaceDefinitionType.SOURCE); + metadata.put("number_of_streams", 1); + metadata.put("internal_error_type", exception.getClass().getName()); + metadata.put(CONNECTOR_SOURCE_KEY, SOURCE_DEF_NAME); + metadata.put("internal_error_cause", exception.getMessage()); + metadata.put(FREQUENCY_KEY, "1 min"); + metadata.put(CONNECTOR_SOURCE_DEFINITION_ID_KEY, UUID1); + metadata.put("workspace_id", WORKSPACE_ID); + metadata.put(CONNECTOR_SOURCE_DOCKER_REPOSITORY_KEY, CONNECTOR_REPOSITORY); + metadata.put(ATTEMPT_STAGE_KEY, "ENDED"); + metadata.put("attempt_completion_status", jobState); + metadata.put("connection_id", CONNECTION_ID); + metadata.put(JOB_ID_KEY, String.valueOf(jobId)); + metadata.put(CONNECTOR_SOURCE_VERSION_KEY, CONNECTOR_VERSION); + metadata.put("connector_destination_version", CONNECTOR_VERSION); + metadata.put("attempt_id", attemptNumber); + metadata.put("connector_destination", DESTINATION_DEF_NAME); + metadata.put("operation_count", 0); + metadata.put("connector_destination_docker_repository", CONNECTOR_REPOSITORY); + metadata.put("table_prefix", false); + metadata.put("workspace_name", WORKSPACE_NAME); + metadata.put("connector_destination_definition_id", UUID2); + + verify(trackingClient).track(WORKSPACE_ID, JobTracker.MESSAGE_NAME, metadata); + } + @Test void testTrackReset() throws ConfigNotFoundException, IOException, JsonValidationException { testAsynchronous(ConfigType.RESET_CONNECTION); @@ -295,7 +373,7 @@ void testAsynchronous(final ConfigType configType, final Map add .thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME)); final Map manualMetadata = MoreMaps.merge( metadata, - ImmutableMap.of("frequency", "manual"), + Map.of(FREQUENCY_KEY, "manual"), additionalExpectedMetadata); assertCorrectMessageForEachState((jobState) -> jobTracker.trackSync(job, jobState), manualMetadata); @@ -305,7 +383,7 @@ void testAsynchronous(final ConfigType configType, final Map add .withSchedule(new Schedule().withUnits(1L).withTimeUnit(TimeUnit.MINUTES))); final Map scheduledMetadata = MoreMaps.merge( metadata, - ImmutableMap.of("frequency", "1 min"), + Map.of(FREQUENCY_KEY, "1 min"), additionalExpectedMetadata); assertCorrectMessageForEachState((jobState) -> jobTracker.trackSync(job, jobState), scheduledMetadata); } @@ -406,7 +484,7 @@ void testAsynchronousAttemptWithFailures(final ConfigType configType, final Map< void testAsynchronousAttempt(final ConfigType configType, final Job job, final Map additionalExpectedMetadata) throws ConfigNotFoundException, IOException, JsonValidationException { - final ImmutableMap metadata = getJobMetadata(configType, LONG_JOB_ID); + final Map metadata = getJobMetadata(configType, LONG_JOB_ID); // test when frequency is manual. when(configRepository.getStandardSync(CONNECTION_ID)) .thenReturn(new StandardSync().withConnectionId(CONNECTION_ID).withManual(true).withCatalog(CATALOG)); @@ -416,7 +494,7 @@ void testAsynchronousAttempt(final ConfigType configType, final Job job, final M final Map manualMetadata = MoreMaps.merge( ATTEMPT_METADATA, metadata, - ImmutableMap.of("frequency", "manual"), + Map.of(FREQUENCY_KEY, "manual"), additionalExpectedMetadata); jobTracker.trackSync(job, JobState.SUCCEEDED); @@ -575,10 +653,10 @@ private ImmutableMap getJobMetadata(final ConfigType configType, .put(JOB_ID_KEY, String.valueOf(jobId)) .put(ATTEMPT_ID, 700) .put("connection_id", CONNECTION_ID) - .put("connector_source", SOURCE_DEF_NAME) - .put("connector_source_definition_id", UUID1) - .put("connector_source_docker_repository", CONNECTOR_REPOSITORY) - .put("connector_source_version", CONNECTOR_VERSION) + .put(CONNECTOR_SOURCE_KEY, SOURCE_DEF_NAME) + .put(CONNECTOR_SOURCE_DEFINITION_ID_KEY, UUID1) + .put(CONNECTOR_SOURCE_DOCKER_REPOSITORY_KEY, CONNECTOR_REPOSITORY) + .put(CONNECTOR_SOURCE_VERSION_KEY, CONNECTOR_VERSION) .put("connector_destination", DESTINATION_DEF_NAME) .put("connector_destination_definition_id", UUID2) .put("connector_destination_docker_repository", CONNECTOR_REPOSITORY) diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadataTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadataTest.java index 84322e8f4ef7..437c508e1bee 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadataTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadataTest.java @@ -8,10 +8,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableMap; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSync; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.Test; @@ -35,12 +35,12 @@ void testNulls() { when(standardSync.getCatalog()).thenReturn(mock(ConfiguredAirbyteCatalog.class)); // try to generate metadata - final ImmutableMap expected = ImmutableMap.of( + final Map expected = Map.of( "connection_id", connectionId, "frequency", "manual", "operation_count", 0, "table_prefix", false); - final ImmutableMap actual = TrackingMetadata.generateSyncMetadata(standardSync); + final Map actual = TrackingMetadata.generateSyncMetadata(standardSync); assertEquals(expected, actual); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java index 39614ed1bf38..4310a61dd88a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/OAuthHandler.java @@ -4,7 +4,6 @@ package io.airbyte.server.handlers; -import com.google.common.collect.ImmutableMap; import io.airbyte.analytics.TrackingClient; import io.airbyte.api.model.generated.CompleteDestinationOAuthRequest; import io.airbyte.api.model.generated.CompleteSourceOauthRequest; @@ -56,7 +55,7 @@ public OAuthConsentRead getSourceOAuthConsent(final SourceOauthConsentRequest so configRepository.getStandardSourceDefinition(sourceDefinitionIdRequestBody.getSourceDefinitionId()); final OAuthFlowImplementation oAuthFlowImplementation = oAuthImplementationFactory.create(sourceDefinition); final ConnectorSpecification spec = sourceDefinition.getSpec(); - final ImmutableMap metadata = generateSourceMetadata(sourceDefinitionIdRequestBody.getSourceDefinitionId()); + final Map metadata = generateSourceMetadata(sourceDefinitionIdRequestBody.getSourceDefinitionId()); final OAuthConsentRead result; if (OAuthConfigSupplier.hasOAuthConfigSpecification(spec)) { result = new OAuthConsentRead().consentUrl(oAuthFlowImplementation.getSourceConsentUrl( @@ -85,7 +84,7 @@ public OAuthConsentRead getDestinationOAuthConsent(final DestinationOauthConsent configRepository.getStandardDestinationDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId()); final OAuthFlowImplementation oAuthFlowImplementation = oAuthImplementationFactory.create(destinationDefinition); final ConnectorSpecification spec = destinationDefinition.getSpec(); - final ImmutableMap metadata = generateDestinationMetadata(destinationDefinitionIdRequestBody.getDestinationDefinitionId()); + final Map metadata = generateDestinationMetadata(destinationDefinitionIdRequestBody.getDestinationDefinitionId()); final OAuthConsentRead result; if (OAuthConfigSupplier.hasOAuthConfigSpecification(spec)) { result = new OAuthConsentRead().consentUrl(oAuthFlowImplementation.getDestinationConsentUrl( @@ -114,7 +113,7 @@ public Map completeSourceOAuth(final CompleteSourceOauthRequest configRepository.getStandardSourceDefinition(oauthSourceRequestBody.getSourceDefinitionId()); final OAuthFlowImplementation oAuthFlowImplementation = oAuthImplementationFactory.create(sourceDefinition); final ConnectorSpecification spec = sourceDefinition.getSpec(); - final ImmutableMap metadata = generateSourceMetadata(oauthSourceRequestBody.getSourceDefinitionId()); + final Map metadata = generateSourceMetadata(oauthSourceRequestBody.getSourceDefinitionId()); final Map result; if (OAuthConfigSupplier.hasOAuthConfigSpecification(spec)) { result = oAuthFlowImplementation.completeSourceOAuth( @@ -146,7 +145,7 @@ public Map completeDestinationOAuth(final CompleteDestinationOAu configRepository.getStandardDestinationDefinition(oauthDestinationRequestBody.getDestinationDefinitionId()); final OAuthFlowImplementation oAuthFlowImplementation = oAuthImplementationFactory.create(destinationDefinition); final ConnectorSpecification spec = destinationDefinition.getSpec(); - final ImmutableMap metadata = generateDestinationMetadata(oauthDestinationRequestBody.getDestinationDefinitionId()); + final Map metadata = generateDestinationMetadata(oauthDestinationRequestBody.getDestinationDefinitionId()); final Map result; if (OAuthConfigSupplier.hasOAuthConfigSpecification(spec)) { result = oAuthFlowImplementation.completeDestinationOAuth( @@ -196,13 +195,13 @@ public void setDestinationInstancewideOauthParams(final SetInstancewideDestinati configRepository.writeDestinationOAuthParam(param); } - private ImmutableMap generateSourceMetadata(final UUID sourceDefinitionId) + private Map generateSourceMetadata(final UUID sourceDefinitionId) throws JsonValidationException, ConfigNotFoundException, IOException { final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId); return TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition); } - private ImmutableMap generateDestinationMetadata(final UUID destinationDefinitionId) + private Map generateDestinationMetadata(final UUID destinationDefinitionId) throws JsonValidationException, ConfigNotFoundException, IOException { final StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(destinationDefinitionId); return TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 8f24e13d7c04..9e7e78bdc420 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -166,7 +166,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr if (workflowState.isDeleted()) { if (workflowState.isRunning()) { log.info("Cancelling the current running job because a connection deletion was requested"); - reportCancelled(); + reportCancelled(connectionUpdaterInput.getConnectionId()); } log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow."); deleteConnectionBeforeTerminatingTheWorkflow(); @@ -231,7 +231,7 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn final GeneratedJobInput jobInputs = getJobInput(); - reportJobStarting(); + reportJobStarting(connectionUpdaterInput.getConnectionId()); StandardSyncOutput standardSyncOutput = null; try { @@ -291,11 +291,13 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput( workflowInternalState.getJobId(), workflowInternalState.getAttemptNumber(), + connectionUpdaterInput.getConnectionId(), standardSyncOutput)); } else { runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccessWithAttemptNumber, new JobSuccessInputWithAttemptNumber( workflowInternalState.getJobId(), workflowInternalState.getAttemptNumber(), + connectionUpdaterInput.getConnectionId(), standardSyncOutput)); } @@ -317,12 +319,14 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput( workflowInternalState.getJobId(), workflowInternalState.getAttemptNumber(), + connectionUpdaterInput.getConnectionId(), standardSyncOutput, FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess()))); } else { runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailureWithAttemptNumber, new AttemptNumberFailureInput( workflowInternalState.getJobId(), workflowInternalState.getAttemptNumber(), + connectionUpdaterInput.getConnectionId(), standardSyncOutput, FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess()))); } @@ -340,7 +344,8 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, } else { final String failureReason = failureType == FailureType.CONFIG_ERROR ? "Connection Check Failed " + connectionId : "Job failed after too many retries for connection " + connectionId; - runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobFailure, new JobFailureInput(connectionUpdaterInput.getJobId(), failureReason)); + runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobFailure, new JobFailureInput(connectionUpdaterInput.getJobId(), + connectionUpdaterInput.getConnectionId(), connectionUpdaterInput.getAttemptNumber(), failureReason)); final int autoDisableConnectionVersion = Workflow.getVersion("auto_disable_failing_connection", Workflow.DEFAULT_VERSION, AUTO_DISABLE_FAILING_CONNECTION_CHANGE_CURRENT_VERSION); @@ -744,12 +749,14 @@ private String getSyncTaskQueue() { /** * Report the job as started in the job tracker and set it as running in the workflow internal * state. + * + * @param connectionId The connection ID associated with this execution of the workflow. */ - private void reportJobStarting() { + private void reportJobStarting(final UUID connectionId) { runMandatoryActivity( jobCreationAndStatusUpdateActivity::reportJobStart, new ReportJobStartInput( - workflowInternalState.getJobId())); + workflowInternalState.getJobId(), connectionId)); workflowState.setRunning(true); } @@ -822,14 +829,14 @@ private void deleteConnectionBeforeTerminatingTheWorkflow() { */ private void reportCancelledAndContinueWith(final boolean skipSchedulingNextRun, final ConnectionUpdaterInput connectionUpdaterInput) { if (workflowInternalState.getJobId() != null && workflowInternalState.getAttemptNumber() != null) { - reportCancelled(); + reportCancelled(connectionUpdaterInput.getConnectionId()); } resetNewConnectionInput(connectionUpdaterInput); connectionUpdaterInput.setSkipScheduling(skipSchedulingNextRun); prepareForNextRunAndContinueAsNew(connectionUpdaterInput); } - private void reportCancelled() { + private void reportCancelled(final UUID connectionId) { final Long jobId = workflowInternalState.getJobId(); final Integer attemptNumber = workflowInternalState.getAttemptNumber(); final Set failures = workflowInternalState.getFailures(); @@ -842,12 +849,14 @@ private void reportCancelled() { new JobCancelledInput( jobId, attemptNumber, + connectionId, FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess))); } else { runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelledWithAttemptNumber, new JobCancelledInputWithAttemptNumber( jobId, attemptNumber, + connectionId, FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess))); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java index eb3427285a82..6393d2643ff8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivity.java @@ -96,6 +96,7 @@ class JobSuccessInput { private long jobId; private int attemptId; + private UUID connectionId; private StandardSyncOutput standardSyncOutput; } @@ -113,6 +114,7 @@ class JobSuccessInputWithAttemptNumber { private long jobId; private int attemptNumber; + private UUID connectionId; private StandardSyncOutput standardSyncOutput; } @@ -129,6 +131,8 @@ class JobSuccessInputWithAttemptNumber { class JobFailureInput { private long jobId; + private UUID connectionId; + private int attemptNumber; private String reason; } @@ -146,6 +150,7 @@ class AttemptFailureInput { private long jobId; private int attemptId; + private UUID connectionId; private StandardSyncOutput standardSyncOutput; private AttemptFailureSummary attemptFailureSummary; @@ -164,6 +169,7 @@ class AttemptNumberFailureInput { private long jobId; private int attemptNumber; + private UUID connectionId; private StandardSyncOutput standardSyncOutput; private AttemptFailureSummary attemptFailureSummary; @@ -182,6 +188,7 @@ class JobCancelledInput { private long jobId; private int attemptId; + private UUID connectionId; private AttemptFailureSummary attemptFailureSummary; } @@ -199,6 +206,7 @@ class JobCancelledInputWithAttemptNumber { private long jobId; private int attemptNumber; + private UUID connectionId; private AttemptFailureSummary attemptFailureSummary; } @@ -215,6 +223,7 @@ class JobCancelledInputWithAttemptNumber { class ReportJobStartInput { private long jobId; + private UUID connectionId; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 96d79afd5f9e..dcdf9475b8cf 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -207,6 +207,7 @@ public void jobSuccess(final JobSuccessInput input) { emitJobIdToReleaseStagesMetric(OssMetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId); trackCompletion(job, JobStatus.SUCCEEDED); } catch (final IOException e) { + trackCompletionForInternalFailure(input.getJobId(), input.getConnectionId(), input.getAttemptId(), JobStatus.SUCCEEDED, e); throw new RetryableException(e); } } @@ -216,6 +217,7 @@ public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber i jobSuccess(new JobSuccessInput( input.getJobId(), input.getAttemptNumber(), + input.getConnectionId(), input.getStandardSyncOutput())); } @@ -228,7 +230,6 @@ public void jobFailure(final JobFailureInput input) { jobNotifier.failJob(input.getReason(), job); emitJobIdToReleaseStagesMetric(OssMetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, jobId); - trackCompletion(job, JobStatus.FAILED); final UUID connectionId = UUID.fromString(job.getScope()); final JobSyncConfig jobSyncConfig = job.getConfig().getSync(); @@ -236,7 +237,9 @@ public void jobFailure(final JobFailureInput input) { new SyncJobReportingContext(jobId, jobSyncConfig.getSourceDockerImage(), jobSyncConfig.getDestinationDockerImage()); job.getLastFailedAttempt().flatMap(Attempt::getFailureSummary) .ifPresent(failureSummary -> jobErrorReporter.reportSyncJobFailure(connectionId, failureSummary, jobContext)); + trackCompletion(job, JobStatus.FAILED); } catch (final IOException e) { + trackCompletionForInternalFailure(input.getJobId(), input.getConnectionId(), input.getAttemptNumber(), JobStatus.FAILED, e); throw new RetryableException(e); } } @@ -274,6 +277,7 @@ public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput inpu attemptFailure(new AttemptFailureInput( input.getJobId(), input.getAttemptNumber(), + input.getConnectionId(), input.getStandardSyncOutput(), input.getAttemptFailureSummary())); } @@ -288,10 +292,11 @@ public void jobCancelled(final JobCancelledInput input) { jobPersistence.cancelJob(jobId); final Job job = jobPersistence.getJob(jobId); - trackCompletion(job, JobStatus.FAILED); emitJobIdToReleaseStagesMetric(OssMetricsRegistry.JOB_CANCELLED_BY_RELEASE_STAGE, jobId); jobNotifier.failJob("Job was cancelled", job); + trackCompletion(job, JobStatus.FAILED); } catch (final IOException e) { + trackCompletionForInternalFailure(input.getJobId(), input.getConnectionId(), input.getAttemptId(), JobStatus.FAILED, e); throw new RetryableException(e); } } @@ -301,6 +306,7 @@ public void jobCancelledWithAttemptNumber(final JobCancelledInputWithAttemptNumb jobCancelled(new JobCancelledInput( input.getJobId(), input.getAttemptNumber(), + input.getConnectionId(), input.getAttemptFailureSummary())); } @@ -370,4 +376,12 @@ private void trackCompletion(final Job job, final io.airbyte.workers.JobStatus s jobTracker.trackSync(job, Enums.convertTo(status, JobState.class)); } + private void trackCompletionForInternalFailure(final Long jobId, + final UUID connectionId, + final Integer attemptId, + final io.airbyte.workers.JobStatus status, + final Exception e) { + jobTracker.trackSyncForInternalFailure(jobId, connectionId, attemptId, Enums.convertTo(status, JobState.class), e); + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index a24fbf6e9ffb..27598d2b34ca 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -6,6 +6,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.config.AttemptFailureSummary; @@ -123,6 +125,7 @@ class JobCreationAndStatusUpdateActivityTest { private static final int ATTEMPT_NUMBER = 1; private static final StreamDescriptor STREAM_DESCRIPTOR1 = new StreamDescriptor().withName("stream 1").withNamespace("namespace 1"); private static final StreamDescriptor STREAM_DESCRIPTOR2 = new StreamDescriptor().withName("stream 2").withNamespace("namespace 2"); + private static final String TEST_EXCEPTION_MESSAGE = "test"; private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput() .withStandardSyncSummary( @@ -207,7 +210,7 @@ void createAttempt() throws IOException { final AttemptCreationOutput output = jobCreationAndStatusUpdateActivity.createNewAttempt(new AttemptCreationInput( JOB_ID)); - Mockito.verify(mLogClientSingleton).setJobMdc(mWorkerEnvironment, mLogConfigs, mPath); + verify(mLogClientSingleton).setJobMdc(mWorkerEnvironment, mLogConfigs, mPath); Assertions.assertThat(output.getAttemptId()).isEqualTo(ATTEMPT_ID); } } @@ -255,7 +258,7 @@ void createAttemptNumber() throws IOException { final AttemptNumberCreationOutput output = jobCreationAndStatusUpdateActivity.createNewAttemptNumber(new AttemptCreationInput( JOB_ID)); - Mockito.verify(mLogClientSingleton).setJobMdc(mWorkerEnvironment, mLogConfigs, mPath); + verify(mLogClientSingleton).setJobMdc(mWorkerEnvironment, mLogConfigs, mPath); Assertions.assertThat(output.getAttemptNumber()).isEqualTo(ATTEMPT_NUMBER); } } @@ -279,23 +282,26 @@ class Update { @Test void setJobSuccess() throws IOException { - jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput)); + jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, CONNECTION_ID, standardSyncOutput)); - Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats(), + verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats(), jobOutput.getSync().getNormalizationSummary()); - Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID); - Mockito.verify(mJobNotifier).successJob(Mockito.any()); - Mockito.verify(mJobtracker).trackSync(Mockito.any(), eq(JobState.SUCCEEDED)); + verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID); + verify(mJobNotifier).successJob(Mockito.any()); + verify(mJobtracker).trackSync(Mockito.any(), eq(JobState.SUCCEEDED)); } @Test void setJobSuccessWrapException() throws IOException { - Mockito.doThrow(new IOException()) + final IOException exception = new IOException(TEST_EXCEPTION_MESSAGE); + Mockito.doThrow(exception) .when(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID); - Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, null))) + Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, CONNECTION_ID, null))) .isInstanceOf(RetryableException.class) .hasCauseInstanceOf(IOException.class); + + verify(mJobtracker, times(1)).trackSyncForInternalFailure(JOB_ID, CONNECTION_ID, ATTEMPT_ID, JobState.SUCCEEDED, exception); } @Test @@ -318,48 +324,55 @@ void setJobFailure() throws IOException { Mockito.when(mJobPersistence.getJob(JOB_ID)) .thenReturn(mJob); - jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, "reason")); + jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, CONNECTION_ID, 1, "reason")); - Mockito.verify(mJobPersistence).failJob(JOB_ID); - Mockito.verify(mJobNotifier).failJob(eq("reason"), Mockito.any()); - Mockito.verify(mJobErrorReporter).reportSyncJobFailure(eq(CONNECTION_ID), eq(failureSummary), Mockito.any()); + verify(mJobPersistence).failJob(JOB_ID); + verify(mJobNotifier).failJob(eq("reason"), Mockito.any()); + verify(mJobErrorReporter).reportSyncJobFailure(eq(CONNECTION_ID), eq(failureSummary), Mockito.any()); } @Test void setJobFailureWrapException() throws IOException { - Mockito.doThrow(new IOException()) + final Exception exception = new IOException(TEST_EXCEPTION_MESSAGE); + Mockito.doThrow(exception) .when(mJobPersistence).failJob(JOB_ID); - Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, ""))) + Assertions + .assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, CONNECTION_ID, ATTEMPT_NUMBER, ""))) .isInstanceOf(RetryableException.class) .hasCauseInstanceOf(IOException.class); + + verify(mJobtracker, times(1)).trackSyncForInternalFailure(JOB_ID, CONNECTION_ID, ATTEMPT_NUMBER, JobState.FAILED, exception); } @Test void setAttemptFailure() throws IOException { - jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, standardSyncOutput, failureSummary)); + jobCreationAndStatusUpdateActivity + .attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, CONNECTION_ID, standardSyncOutput, failureSummary)); - Mockito.verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID); - Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats(), + verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID); + verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats(), jobOutput.getSync().getNormalizationSummary()); - Mockito.verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, failureSummary); + verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, failureSummary); } @Test void setAttemptFailureWrapException() throws IOException { - Mockito.doThrow(new IOException()) + final Exception exception = new IOException(TEST_EXCEPTION_MESSAGE); + Mockito.doThrow(exception) .when(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID); Assertions .assertThatThrownBy( - () -> jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, null, failureSummary))) + () -> jobCreationAndStatusUpdateActivity + .attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, CONNECTION_ID, null, failureSummary))) .isInstanceOf(RetryableException.class) .hasCauseInstanceOf(IOException.class); } @Test void setJobCancelled() throws IOException { - jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(JOB_ID, ATTEMPT_ID, failureSummary)); + jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(JOB_ID, ATTEMPT_ID, CONNECTION_ID, failureSummary)); // attempt must be failed before job is cancelled, or else job state machine is not respected final InOrder orderVerifier = Mockito.inOrder(mJobPersistence); @@ -370,12 +383,16 @@ void setJobCancelled() throws IOException { @Test void setJobCancelledWrapException() throws IOException { - Mockito.doThrow(new IOException()) + final Exception exception = new IOException(); + Mockito.doThrow(exception) .when(mJobPersistence).cancelJob(JOB_ID); - Assertions.assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(JOB_ID, ATTEMPT_ID, null))) + Assertions + .assertThatThrownBy(() -> jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput(JOB_ID, ATTEMPT_ID, CONNECTION_ID, null))) .isInstanceOf(RetryableException.class) .hasCauseInstanceOf(IOException.class); + + verify(mJobtracker, times(1)).trackSyncForInternalFailure(JOB_ID, CONNECTION_ID, ATTEMPT_ID, JobState.FAILED, exception); } @Test @@ -395,16 +412,16 @@ void ensureCleanJobState() throws IOException { jobCreationAndStatusUpdateActivity.ensureCleanJobState(new EnsureCleanJobStateInput(CONNECTION_ID)); - Mockito.verify(mJobPersistence).failJob(runningJob.getId()); - Mockito.verify(mJobPersistence).failJob(pendingJob.getId()); - Mockito.verify(mJobPersistence).failAttempt(runningJob.getId(), runningAttemptNumber); - Mockito.verify(mJobPersistence).writeAttemptFailureSummary(eq(runningJob.getId()), eq(runningAttemptNumber), any()); - Mockito.verify(mJobPersistence).getJob(runningJob.getId()); - Mockito.verify(mJobPersistence).getJob(pendingJob.getId()); - Mockito.verify(mJobNotifier).failJob(any(), eq(runningJob)); - Mockito.verify(mJobNotifier).failJob(any(), eq(pendingJob)); - Mockito.verify(mJobtracker).trackSync(runningJob, JobState.FAILED); - Mockito.verify(mJobtracker).trackSync(pendingJob, JobState.FAILED); + verify(mJobPersistence).failJob(runningJob.getId()); + verify(mJobPersistence).failJob(pendingJob.getId()); + verify(mJobPersistence).failAttempt(runningJob.getId(), runningAttemptNumber); + verify(mJobPersistence).writeAttemptFailureSummary(eq(runningJob.getId()), eq(runningAttemptNumber), any()); + verify(mJobPersistence).getJob(runningJob.getId()); + verify(mJobPersistence).getJob(pendingJob.getId()); + verify(mJobNotifier).failJob(any(), eq(runningJob)); + verify(mJobNotifier).failJob(any(), eq(pendingJob)); + verify(mJobtracker).trackSync(runningJob, JobState.FAILED); + verify(mJobtracker).trackSync(pendingJob, JobState.FAILED); Mockito.verifyNoMoreInteractions(mJobPersistence, mJobNotifier, mJobtracker); }