Skip to content

Commit

Permalink
report synchronous check/spec/discover failures to JobErrorReporter (#…
Browse files Browse the repository at this point in the history
…14818)

* report failures for synchronous check/discover, refactor common logic

* allow null workspace, send spec errors

* add failure origin, format

* rm connector_type, fix failing tests

* add tests for other job types

* log instead of throw

* move swallow to common spot

* connector jobs use context instead of passing full config

* sync jobs use context instead of passing raw config

* fix failing test

* fix failing scheduler client test
  • Loading branch information
pedroslopez authored Aug 2, 2022
1 parent bbbd1ad commit f924359
Show file tree
Hide file tree
Showing 14 changed files with 507 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobCheckConnectionConfig;
Expand All @@ -16,6 +17,8 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.persistence.job_error_reporter.ConnectorJobReportingContext;
import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
Expand All @@ -26,19 +29,27 @@
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerClient {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSynchronousSchedulerClient.class);

private final TemporalClient temporalClient;
private final JobTracker jobTracker;
private final JobErrorReporter jobErrorReporter;
private final OAuthConfigSupplier oAuthConfigSupplier;

public DefaultSynchronousSchedulerClient(final TemporalClient temporalClient,
final JobTracker jobTracker,
final JobErrorReporter jobErrorReporter,
final OAuthConfigSupplier oAuthConfigSupplier) {
this.temporalClient = temporalClient;
this.jobTracker = jobTracker;
this.jobErrorReporter = jobErrorReporter;
this.oAuthConfigSupplier = oAuthConfigSupplier;
}

Expand All @@ -53,10 +64,14 @@ public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConne
.withConnectionConfiguration(sourceConfiguration)
.withDockerImage(dockerImage);

final UUID jobId = UUID.randomUUID();
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);

return execute(
ConfigType.CHECK_CONNECTION_SOURCE,
jobReportingContext,
source.getSourceDefinitionId(),
jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
() -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
ConnectorJobOutput::getCheckConnection,
source.getWorkspaceId());
}
Expand All @@ -73,10 +88,14 @@ public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheck
.withConnectionConfiguration(destinationConfiguration)
.withDockerImage(dockerImage);

final UUID jobId = UUID.randomUUID();
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);

return execute(
ConfigType.CHECK_CONNECTION_DESTINATION,
jobReportingContext,
destination.getDestinationDefinitionId(),
jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
() -> temporalClient.submitCheckConnection(jobId, 0, jobCheckConnectionConfig),
ConnectorJobOutput::getCheckConnection,
destination.getWorkspaceId());
}
Expand All @@ -92,10 +111,14 @@ public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceC
.withConnectionConfiguration(sourceConfiguration)
.withDockerImage(dockerImage);

final UUID jobId = UUID.randomUUID();
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);

return execute(
ConfigType.DISCOVER_SCHEMA,
jobReportingContext,
source.getSourceDefinitionId(),
jobId -> temporalClient.submitDiscoverSchema(UUID.randomUUID(), 0, jobDiscoverCatalogConfig),
() -> temporalClient.submitDiscoverSchema(jobId, 0, jobDiscoverCatalogConfig),
ConnectorJobOutput::getDiscoverCatalog,
source.getWorkspaceId());
}
Expand All @@ -104,31 +127,39 @@ public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceC
public SynchronousResponse<ConnectorSpecification> createGetSpecJob(final String dockerImage) throws IOException {
final JobGetSpecConfig jobSpecConfig = new JobGetSpecConfig().withDockerImage(dockerImage);

final UUID jobId = UUID.randomUUID();
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);

return execute(
ConfigType.GET_SPEC,
jobReportingContext,
null,
jobId -> temporalClient.submitGetSpec(UUID.randomUUID(), 0, jobSpecConfig),
() -> temporalClient.submitGetSpec(jobId, 0, jobSpecConfig),
ConnectorJobOutput::getSpec,
null);
}

@VisibleForTesting
<T, U> SynchronousResponse<T> execute(final ConfigType configType,
final ConnectorJobReportingContext jobContext,
@Nullable final UUID connectorDefinitionId,
final Function<UUID, TemporalResponse<U>> executor,
final Supplier<TemporalResponse<U>> executor,
final Function<U, T> outputMapper,
final UUID workspaceId) {
final long createdAt = Instant.now().toEpochMilli();
final UUID jobId = UUID.randomUUID();
final UUID jobId = jobContext.jobId();
try {
track(jobId, configType, connectorDefinitionId, workspaceId, JobState.STARTED, null);
final TemporalResponse<U> temporalResponse = executor.apply(jobId);
final TemporalResponse<U> temporalResponse = executor.get();
final Optional<U> jobOutput = temporalResponse.getOutput();
final T mappedOutput = jobOutput.map(outputMapper).orElse(null);
final JobState outputState = temporalResponse.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED;

track(jobId, configType, connectorDefinitionId, workspaceId, outputState, mappedOutput);
// TODO(pedro): report ConnectorJobOutput's failureReason to the JobErrorReporter, like the above

if (outputState == JobState.FAILED && jobOutput.isPresent()) {
reportError(configType, jobContext, jobOutput.get(), connectorDefinitionId, workspaceId);
}

final long endedAt = Instant.now().toEpochMilli();
return SynchronousResponse.fromTemporalResponse(
Expand Down Expand Up @@ -177,4 +208,34 @@ private <T> void track(final UUID jobId,

}

private <S, T> void reportError(final ConfigType configType,
final ConnectorJobReportingContext jobContext,
final T jobOutput,
final UUID connectorDefinitionId,
final UUID workspaceId) {
Exceptions.swallow(() -> {
switch (configType) {
case CHECK_CONNECTION_SOURCE -> jobErrorReporter.reportSourceCheckJobFailure(
connectorDefinitionId,
workspaceId,
((ConnectorJobOutput) jobOutput).getFailureReason(),
jobContext);
case CHECK_CONNECTION_DESTINATION -> jobErrorReporter.reportDestinationCheckJobFailure(
connectorDefinitionId,
workspaceId,
((ConnectorJobOutput) jobOutput).getFailureReason(),
jobContext);
case DISCOVER_SCHEMA -> jobErrorReporter.reportDiscoverJobFailure(
connectorDefinitionId,
workspaceId,
((ConnectorJobOutput) jobOutput).getFailureReason(),
jobContext);
case GET_SPEC -> jobErrorReporter.reportSpecJobFailure(
((ConnectorJobOutput) jobOutput).getFailureReason(),
jobContext);
default -> LOGGER.error("Tried to report job failure for type {}, but this job type is not supported", configType);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -29,6 +30,8 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.persistence.job_error_reporter.ConnectorJobReportingContext;
import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
Expand All @@ -39,6 +42,7 @@
import java.nio.file.Path;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
Expand Down Expand Up @@ -69,15 +73,17 @@ class DefaultSynchronousSchedulerClientTest {

private TemporalClient temporalClient;
private JobTracker jobTracker;
private JobErrorReporter jobErrorReporter;
private OAuthConfigSupplier oAuthConfigSupplier;
private DefaultSynchronousSchedulerClient schedulerClient;

@BeforeEach
void setup() throws IOException {
temporalClient = mock(TemporalClient.class);
jobTracker = mock(JobTracker.class);
jobErrorReporter = mock(JobErrorReporter.class);
oAuthConfigSupplier = mock(OAuthConfigSupplier.class);
schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);
schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier);

when(oAuthConfigSupplier.injectSourceOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION);
when(oAuthConfigSupplier.injectDestinationOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION);
Expand All @@ -97,12 +103,13 @@ class ExecuteSynchronousJob {
@Test
void testExecuteJobSuccess() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
final Supplier<TemporalResponse<String>> function = mock(Supplier.class);
final Function<String, String> mapperFunction = output -> output;
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>("hello", createMetadata(true)));
when(function.get()).thenReturn(new TemporalResponse<>("hello", createMetadata(true)));

final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3");
final SynchronousResponse<String> response = schedulerClient
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);

assertNotNull(response);
assertEquals("hello", response.getOutput());
Expand All @@ -114,18 +121,20 @@ void testExecuteJobSuccess() {

verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED));
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.SUCCEEDED));
verifyNoInteractions(jobErrorReporter);
}

@SuppressWarnings("unchecked")
@Test
void testExecuteMappedOutput() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Function<UUID, TemporalResponse<Integer>> function = mock(Function.class);
final Supplier<TemporalResponse<Integer>> function = mock(Supplier.class);
final Function<Integer, String> mapperFunction = Object::toString;
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(42, createMetadata(true)));
when(function.get()).thenReturn(new TemporalResponse<>(42, createMetadata(true)));

final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3");
final SynchronousResponse<String> response = schedulerClient
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);

assertNotNull(response);
assertEquals("42", response.getOutput());
Expand All @@ -140,12 +149,13 @@ void testExecuteMappedOutput() {
@Test
void testExecuteJobFailure() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
final Supplier<TemporalResponse<String>> function = mock(Supplier.class);
final Function<String, String> mapperFunction = output -> output;
when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(null, createMetadata(false)));
when(function.get()).thenReturn(new TemporalResponse<>(null, createMetadata(false)));

final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3");
final SynchronousResponse<String> response = schedulerClient
.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);
.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);

assertNotNull(response);
assertNull(response.getOutput());
Expand All @@ -157,22 +167,26 @@ void testExecuteJobFailure() {

verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED));
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.FAILED));
verifyNoInteractions(jobErrorReporter);
}

@SuppressWarnings("unchecked")
@Test
void testExecuteRuntimeException() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Function<UUID, TemporalResponse<String>> function = mock(Function.class);
final Supplier<TemporalResponse<String>> function = mock(Supplier.class);
final Function<String, String> mapperFunction = output -> output;
when(function.apply(any(UUID.class))).thenThrow(new RuntimeException());
when(function.get()).thenThrow(new RuntimeException());

final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3");
assertThrows(
RuntimeException.class,
() -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID));
() -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function,
mapperFunction, WORKSPACE_ID));

verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED));
verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.FAILED));
verifyNoInteractions(jobErrorReporter);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import java.util.UUID;

public record ConnectorJobReportingContext(UUID jobId, String dockerImage) {}
Loading

0 comments on commit f924359

Please sign in to comment.