Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more metadata to the JobErrorReporter #14395

Merged
merged 5 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,17 @@ public class JobNotifier {
public static final String CONNECTION_DISABLED_WARNING_NOTIFICATION = "Connection Disabled Warning Notification";
public static final String CONNECTION_DISABLED_NOTIFICATION = "Connection Disabled Notification";

private final String connectionPageUrl;
private final ConfigRepository configRepository;
private final TrackingClient trackingClient;
private final WebUrlHelper webUrlHelper;
private final WorkspaceHelper workspaceHelper;

public JobNotifier(final String webappUrl,
public JobNotifier(final WebUrlHelper webUrlHelper,
final ConfigRepository configRepository,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient) {
this.webUrlHelper = webUrlHelper;
this.workspaceHelper = workspaceHelper;
if (webappUrl.endsWith("/")) {
this.connectionPageUrl = String.format("%sconnections/", webappUrl);
} else {
this.connectionPageUrl = String.format("%s/connections/", webappUrl);
}
this.configRepository = configRepository;
this.trackingClient = trackingClient;
}
Expand Down Expand Up @@ -82,7 +78,7 @@ private void notifyJob(final String reason,
final String destinationConnector = destinationDefinition.getName();
final String failReason = Strings.isNullOrEmpty(reason) ? "" : String.format(", as the %s", reason);
final String jobDescription = getJobDescription(job, failReason);
final String logUrl = connectionPageUrl + connectionId;
final String logUrl = webUrlHelper.getConnectionUrl(workspaceId, connectionId);
final ImmutableMap<String, Object> jobMetadata = TrackingMetadata.generateJobAttemptMetadata(job);
final ImmutableMap<String, Object> sourceMetadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition);
final ImmutableMap<String, Object> destinationMetadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence;

import java.util.UUID;

public class WebUrlHelper {

private final String webAppUrl;

public WebUrlHelper(final String webAppUrl) {
this.webAppUrl = webAppUrl;
}

public String getBaseUrl() {
if (webAppUrl.endsWith("/")) {
return webAppUrl.substring(0, webAppUrl.length() - 1);
}

return webAppUrl;
}

public String getWorkspaceUrl(final UUID workspaceId) {
return String.format("%s/workspaces/%s", getBaseUrl(), workspaceId);
}

public String getConnectionUrl(final UUID workspaceId, final UUID connectionId) {
return String.format("%s/connections/%s", getWorkspaceUrl(workspaceId), connectionId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.persistence.WebUrlHelper;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
Expand All @@ -28,24 +29,30 @@ public class JobErrorReporter {
private static final String AIRBYTE_VERSION_META_KEY = "airbyte_version";
private static final String FAILURE_ORIGIN_META_KEY = "failure_origin";
private static final String FAILURE_TYPE_META_KEY = "failure_type";
private static final String WORKSPACE_ID_META_KEY = "workspace_id";
private static final String CONNECTION_ID_META_KEY = "connection_id";
private static final String CONNECTION_URL_META_KEY = "connection_url";
private static final String CONNECTOR_NAME_META_KEY = "connector_name";
private static final String CONNECTOR_REPOSITORY_META_KEY = "connector_repository";
private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id";
private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage";

private final ConfigRepository configRepository;
private final DeploymentMode deploymentMode;
private final String airbyteVersion;
private final WebUrlHelper webUrlHelper;
private final JobErrorReportingClient jobErrorReportingClient;

public JobErrorReporter(final ConfigRepository configRepository,
final DeploymentMode deploymentMode,
final String airbyteVersion,
final WebUrlHelper webUrlHelper,
final JobErrorReportingClient jobErrorReportingClient) {

this.configRepository = configRepository;
this.deploymentMode = deploymentMode;
this.airbyteVersion = airbyteVersion;
this.webUrlHelper = webUrlHelper;
this.jobErrorReportingClient = jobErrorReportingClient;
}

Expand All @@ -62,12 +69,15 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu
.toList();

final StandardWorkspace workspace = configRepository.getStandardWorkspaceFromConnection(connectionId, true);
final String connectionUrl = webUrlHelper.getConnectionUrl(workspace.getWorkspaceId(), connectionId);

for (final FailureReason failureReason : traceMessageFailures) {
final FailureOrigin failureOrigin = failureReason.getFailureOrigin();

final HashMap<String, String> metadata = new HashMap<>();
metadata.put(WORKSPACE_ID_META_KEY, workspace.getWorkspaceId().toString());
metadata.put(CONNECTION_ID_META_KEY, connectionId.toString());
metadata.put(CONNECTION_URL_META_KEY, connectionUrl);
metadata.put(AIRBYTE_VERSION_META_KEY, airbyteVersion);
metadata.put(DEPLOYMENT_MODE_META_KEY, deploymentMode.name());
metadata.put(FAILURE_ORIGIN_META_KEY, failureOrigin.value());
Expand All @@ -80,6 +90,7 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu

metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, sourceDefinition.getSourceDefinitionId().toString());
metadata.put(CONNECTOR_NAME_META_KEY, sourceDefinition.getName());
metadata.put(CONNECTOR_REPOSITORY_META_KEY, sourceDefinition.getDockerRepository());
metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value());

jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
Expand All @@ -89,6 +100,7 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu

metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, destinationDefinition.getDestinationDefinitionId().toString());
metadata.put(CONNECTOR_NAME_META_KEY, destinationDefinition.getName());
metadata.put(CONNECTOR_REPOSITORY_META_KEY, destinationDefinition.getDockerRepository());
metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, destinationDefinition.getReleaseStage().value());

jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class JobNotifierTest {
private static final String TEST_DOCKER_TAG = "0.1.0";
private static final UUID WORKSPACE_ID = UUID.randomUUID();

private final WebUrlHelper webUrlHelper = new WebUrlHelper(WEBAPP_URL);

private ConfigRepository configRepository;
private WorkspaceHelper workspaceHelper;
private JobNotifier jobNotifier;
Expand All @@ -59,7 +61,7 @@ void setup() {
workspaceHelper = mock(WorkspaceHelper.class);
trackingClient = mock(TrackingClient.class);

jobNotifier = spy(new JobNotifier(WEBAPP_URL, configRepository, workspaceHelper, trackingClient));
jobNotifier = spy(new JobNotifier(webUrlHelper, configRepository, workspaceHelper, trackingClient));
notificationClient = mock(NotificationClient.class);
when(jobNotifier.getNotificationClient(getSlackNotification())).thenReturn(notificationClient);
}
Expand Down Expand Up @@ -92,7 +94,7 @@ void testFailJob() throws IOException, InterruptedException, JsonValidationExcep
"destination-test",
String.format("sync started on %s, running for 1 day 10 hours 17 minutes 36 seconds, as the JobNotifierTest was running.",
formatter.format(Instant.ofEpochSecond(job.getStartedAtInSecond().get()))),
"http://localhost:8000/connections/" + job.getScope());
String.format("http://localhost:8000/workspaces/%s/connections/%s", WORKSPACE_ID, job.getScope()));

final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("connection_id", UUID.fromString(job.getScope()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence;

import java.util.UUID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class WebUrlHelperTest {

private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final UUID CONNECTION_ID = UUID.randomUUID();

@Test
void testGetBaseUrl() {
final WebUrlHelper webUrlHelper = new WebUrlHelper("http://localhost:8000");
Assertions.assertEquals("http://localhost:8000", webUrlHelper.getBaseUrl());
}

@Test
void testGetBaseUrlTrailingSlash() {
final WebUrlHelper webUrlHelper = new WebUrlHelper("http://localhost:8001/");
Assertions.assertEquals("http://localhost:8001", webUrlHelper.getBaseUrl());
}

@Test
void testGetWorkspaceUrl() {
final WebUrlHelper webUrlHelper = new WebUrlHelper("http://localhost:8000");
final String workspaceUrl = webUrlHelper.getWorkspaceUrl(WORKSPACE_ID);
final String expectedUrl = String.format("http://localhost:8000/workspaces/%s", WORKSPACE_ID);
Assertions.assertEquals(expectedUrl, workspaceUrl);
}

@Test
void testGetConnectionUrl() {
final WebUrlHelper webUrlHelper = new WebUrlHelper("http://localhost:8000");
final String connectionUrl = webUrlHelper.getConnectionUrl(WORKSPACE_ID, CONNECTION_ID);
final String expectedUrl = String.format("http://localhost:8000/workspaces/%s/connections/%s", WORKSPACE_ID, CONNECTION_ID);
Assertions.assertEquals(expectedUrl, connectionUrl);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.persistence.WebUrlHelper;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -27,27 +28,33 @@

public class JobErrorReporterTest {

private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final String CONNECTION_URL = "http://localhost:8000/connection/my_connection";
private static final DeploymentMode DEPLOYMENT_MODE = DeploymentMode.OSS;
private static final String AIRBYTE_VERSION = "0.1.40";
private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID();
private static final String SOURCE_DEFINITION_NAME = "stripe";
private static final String SOURCE_DOCKER_REPOSITORY = "airbyte/source-stripe";
private static final String SOURCE_DOCKER_IMAGE = "airbyte/source-stripe:1.2.3";
private static final StandardSourceDefinition.ReleaseStage SOURCE_RELEASE_STAGE = StandardSourceDefinition.ReleaseStage.BETA;
private static final UUID DESTINATION_DEFINITION_ID = UUID.randomUUID();
private static final String DESTINATION_DEFINITION_NAME = "snowflake";
private static final StandardDestinationDefinition.ReleaseStage DESTINATION_RELEASE_STAGE = StandardDestinationDefinition.ReleaseStage.BETA;
private static final String DESTINATION_DOCKER_REPOSITORY = "airbyte/destination-snowflake";
private static final String DESTINATION_DOCKER_IMAGE = "airbyte/destination-snowflake:1.2.3";
private static final StandardDestinationDefinition.ReleaseStage DESTINATION_RELEASE_STAGE = StandardDestinationDefinition.ReleaseStage.BETA;

private ConfigRepository configRepository;
private JobErrorReportingClient jobErrorReportingClient;
private WebUrlHelper webUrlHelper;
private JobErrorReporter jobErrorReporter;

@BeforeEach
void setup() {
configRepository = mock(ConfigRepository.class);
jobErrorReportingClient = mock(JobErrorReportingClient.class);
jobErrorReporter = new JobErrorReporter(configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, jobErrorReportingClient);
webUrlHelper = mock(WebUrlHelper.class);
jobErrorReporter = new JobErrorReporter(configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, webUrlHelper, jobErrorReportingClient);
}

@Test
Expand All @@ -74,42 +81,53 @@ void testReportSyncJobFailure() {
Mockito.when(mJobSyncConfig.getSourceDockerImage()).thenReturn(SOURCE_DOCKER_IMAGE);
Mockito.when(mJobSyncConfig.getDestinationDockerImage()).thenReturn(DESTINATION_DOCKER_IMAGE);

Mockito.when(webUrlHelper.getConnectionUrl(WORKSPACE_ID, CONNECTION_ID)).thenReturn(CONNECTION_URL);

Mockito.when(configRepository.getSourceDefinitionFromConnection(CONNECTION_ID))
.thenReturn(new StandardSourceDefinition()
.withDockerRepository(SOURCE_DOCKER_REPOSITORY)
.withReleaseStage(SOURCE_RELEASE_STAGE)
.withSourceDefinitionId(SOURCE_DEFINITION_ID)
.withName(SOURCE_DEFINITION_NAME));

Mockito.when(configRepository.getDestinationDefinitionFromConnection(CONNECTION_ID))
.thenReturn(new StandardDestinationDefinition()
.withDockerRepository(DESTINATION_DOCKER_REPOSITORY)
.withReleaseStage(DESTINATION_RELEASE_STAGE)
.withDestinationDefinitionId(DESTINATION_DEFINITION_ID)
.withName(DESTINATION_DEFINITION_NAME));

final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class);
Mockito.when(configRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, true)).thenReturn(mWorkspace);
Mockito.when(mWorkspace.getWorkspaceId()).thenReturn(WORKSPACE_ID);

jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, mJobSyncConfig);

final Map<String, String> expectedSourceMetadata = Map.of(
"connection_id", CONNECTION_ID.toString(),
"deployment_mode", DEPLOYMENT_MODE.name(),
"airbyte_version", AIRBYTE_VERSION,
"failure_origin", "source",
"failure_type", "system_error",
"connector_definition_id", SOURCE_DEFINITION_ID.toString(),
"connector_name", SOURCE_DEFINITION_NAME,
"connector_release_stage", SOURCE_RELEASE_STAGE.toString());

final Map<String, String> expectedDestinationMetadata = Map.of(
"connection_id", CONNECTION_ID.toString(),
"deployment_mode", DEPLOYMENT_MODE.name(),
"airbyte_version", AIRBYTE_VERSION,
"failure_origin", "destination",
"failure_type", "system_error",
"connector_definition_id", DESTINATION_DEFINITION_ID.toString(),
"connector_name", DESTINATION_DEFINITION_NAME,
"connector_release_stage", DESTINATION_RELEASE_STAGE.toString());
final Map<String, String> expectedSourceMetadata = Map.ofEntries(
Map.entry("workspace_id", WORKSPACE_ID.toString()),
Map.entry("connection_id", CONNECTION_ID.toString()),
Map.entry("connection_url", CONNECTION_URL),
Map.entry("deployment_mode", DEPLOYMENT_MODE.name()),
Map.entry("airbyte_version", AIRBYTE_VERSION),
Map.entry("failure_origin", "source"),
Map.entry("failure_type", "system_error"),
Map.entry("connector_definition_id", SOURCE_DEFINITION_ID.toString()),
Map.entry("connector_repository", SOURCE_DOCKER_REPOSITORY),
Map.entry("connector_name", SOURCE_DEFINITION_NAME),
Map.entry("connector_release_stage", SOURCE_RELEASE_STAGE.toString()));

final Map<String, String> expectedDestinationMetadata = Map.ofEntries(
Map.entry("workspace_id", WORKSPACE_ID.toString()),
Map.entry("connection_id", CONNECTION_ID.toString()),
Map.entry("connection_url", CONNECTION_URL),
Map.entry("deployment_mode", DEPLOYMENT_MODE.name()),
Map.entry("airbyte_version", AIRBYTE_VERSION),
Map.entry("failure_origin", "destination"),
Map.entry("failure_type", "system_error"),
Map.entry("connector_definition_id", DESTINATION_DEFINITION_ID.toString()),
Map.entry("connector_repository", DESTINATION_DOCKER_REPOSITORY),
Map.entry("connector_name", DESTINATION_DEFINITION_NAME),
Map.entry("connector_release_stage", DESTINATION_RELEASE_STAGE.toString()));

Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, sourceFailureReason, SOURCE_DOCKER_IMAGE, expectedSourceMetadata);
Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, destinationFailureReason, DESTINATION_DOCKER_IMAGE,
Expand All @@ -135,6 +153,10 @@ void testReportSyncJobFailureDoesNotThrow() {
.withSourceDefinitionId(SOURCE_DEFINITION_ID)
.withName(SOURCE_DEFINITION_NAME));

final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class);
Mockito.when(configRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, true)).thenReturn(mWorkspace);
Mockito.when(mWorkspace.getWorkspaceId()).thenReturn(WORKSPACE_ID);

Mockito.doThrow(new RuntimeException("some exception"))
.when(jobErrorReportingClient)
.reportJobFailureReason(Mockito.any(), Mockito.eq(sourceFailureReason), Mockito.any(), Mockito.any());
Expand Down
12 changes: 10 additions & 2 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.airbyte.scheduler.persistence.JobCreator;
import io.airbyte.scheduler.persistence.JobNotifier;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WebUrlHelper;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter;
import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReportingClient;
Expand Down Expand Up @@ -446,8 +447,10 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf

final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig = getContainerOrchestratorConfig(configs);

final WebUrlHelper webUrlHelper = new WebUrlHelper(configs.getWebappUrl());

final JobNotifier jobNotifier = new JobNotifier(
configs.getWebappUrl(),
webUrlHelper,
configRepository,
workspaceHelper,
TrackingClientSingleton.get());
Expand All @@ -456,7 +459,12 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf

final JobErrorReportingClient jobErrorReportingClient = JobErrorReportingClientFactory.getClient(configs.getJobErrorReportingStrategy(), configs);
final JobErrorReporter jobErrorReporter =
new JobErrorReporter(configRepository, configs.getDeploymentMode(), configs.getAirbyteVersionOrWarning(), jobErrorReportingClient);
new JobErrorReporter(
configRepository,
configs.getDeploymentMode(),
configs.getAirbyteVersionOrWarning(),
webUrlHelper,
jobErrorReportingClient);

new WorkerApp(
workspaceRoot,
Expand Down