From 3a2f7db103485dfadad36b06b2b590f047b81c75 Mon Sep 17 00:00:00 2001 From: Pedro Lopez Date: Mon, 4 Jul 2022 11:42:15 -0400 Subject: [PATCH 1/5] add workspace_id and connector_repository as tags --- .../job_error_reporter/JobErrorReporter.java | 5 +++++ .../job_error_reporter/JobErrorReporterTest.java | 12 +++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java index c82cae5dcd95..3d65f3816227 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java @@ -28,8 +28,10 @@ public class JobErrorReporter { private static final String AIRBYTE_VERSION_META_KEY = "airbyte_version"; private static final String FAILURE_ORIGIN_META_KEY = "failure_origin"; private static final String FAILURE_TYPE_META_KEY = "failure_type"; + private static final String WORKSPACE_ID_META_KEY = "workspace_id"; private static final String CONNECTION_ID_META_KEY = "connection_id"; private static final String CONNECTOR_NAME_META_KEY = "connector_name"; + private static final String CONNECTOR_REPOSITORY_META_KEY = "connector_repository"; private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id"; private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage"; @@ -67,6 +69,7 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu final FailureOrigin failureOrigin = failureReason.getFailureOrigin(); final HashMap metadata = new HashMap<>(); + metadata.put(WORKSPACE_ID_META_KEY, workspace.getWorkspaceId().toString()); metadata.put(CONNECTION_ID_META_KEY, connectionId.toString()); metadata.put(AIRBYTE_VERSION_META_KEY, airbyteVersion); metadata.put(DEPLOYMENT_MODE_META_KEY, deploymentMode.name()); @@ -80,6 +83,7 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, sourceDefinition.getSourceDefinitionId().toString()); metadata.put(CONNECTOR_NAME_META_KEY, sourceDefinition.getName()); + metadata.put(CONNECTOR_REPOSITORY_META_KEY, sourceDefinition.getDockerRepository()); metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value()); jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata); @@ -89,6 +93,7 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, destinationDefinition.getDestinationDefinitionId().toString()); metadata.put(CONNECTOR_NAME_META_KEY, destinationDefinition.getName()); + metadata.put(CONNECTOR_REPOSITORY_META_KEY, destinationDefinition.getDockerRepository()); metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, destinationDefinition.getReleaseStage().value()); jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata); diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java index ae99ad02ad53..7194f6d4a625 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java @@ -27,17 +27,20 @@ public class JobErrorReporterTest { + private static final UUID WORKSPACE_ID = UUID.randomUUID(); private static final UUID CONNECTION_ID = UUID.randomUUID(); private static final DeploymentMode DEPLOYMENT_MODE = DeploymentMode.OSS; private static final String AIRBYTE_VERSION = "0.1.40"; private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID(); private static final String SOURCE_DEFINITION_NAME = "stripe"; + private static final String SOURCE_DOCKER_REPOSITORY = "airbyte/source-stripe"; private static final String SOURCE_DOCKER_IMAGE = "airbyte/source-stripe:1.2.3"; private static final StandardSourceDefinition.ReleaseStage SOURCE_RELEASE_STAGE = StandardSourceDefinition.ReleaseStage.BETA; private static final UUID DESTINATION_DEFINITION_ID = UUID.randomUUID(); private static final String DESTINATION_DEFINITION_NAME = "snowflake"; - private static final StandardDestinationDefinition.ReleaseStage DESTINATION_RELEASE_STAGE = StandardDestinationDefinition.ReleaseStage.BETA; + private static final String DESTINATION_DOCKER_REPOSITORY = "airbyte/destination-snowflake"; private static final String DESTINATION_DOCKER_IMAGE = "airbyte/destination-snowflake:1.2.3"; + private static final StandardDestinationDefinition.ReleaseStage DESTINATION_RELEASE_STAGE = StandardDestinationDefinition.ReleaseStage.BETA; private ConfigRepository configRepository; private JobErrorReportingClient jobErrorReportingClient; @@ -76,38 +79,45 @@ void testReportSyncJobFailure() { Mockito.when(configRepository.getSourceDefinitionFromConnection(CONNECTION_ID)) .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPOSITORY) .withReleaseStage(SOURCE_RELEASE_STAGE) .withSourceDefinitionId(SOURCE_DEFINITION_ID) .withName(SOURCE_DEFINITION_NAME)); Mockito.when(configRepository.getDestinationDefinitionFromConnection(CONNECTION_ID)) .thenReturn(new StandardDestinationDefinition() + .withDockerRepository(DESTINATION_DOCKER_REPOSITORY) .withReleaseStage(DESTINATION_RELEASE_STAGE) .withDestinationDefinitionId(DESTINATION_DEFINITION_ID) .withName(DESTINATION_DEFINITION_NAME)); final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class); Mockito.when(configRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, true)).thenReturn(mWorkspace); + Mockito.when(mWorkspace.getWorkspaceId()).thenReturn(WORKSPACE_ID); jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, mJobSyncConfig); final Map expectedSourceMetadata = Map.of( + "workspace_id", WORKSPACE_ID.toString(), "connection_id", CONNECTION_ID.toString(), "deployment_mode", DEPLOYMENT_MODE.name(), "airbyte_version", AIRBYTE_VERSION, "failure_origin", "source", "failure_type", "system_error", "connector_definition_id", SOURCE_DEFINITION_ID.toString(), + "connector_repository", SOURCE_DOCKER_REPOSITORY, "connector_name", SOURCE_DEFINITION_NAME, "connector_release_stage", SOURCE_RELEASE_STAGE.toString()); final Map expectedDestinationMetadata = Map.of( + "workspace_id", WORKSPACE_ID.toString(), "connection_id", CONNECTION_ID.toString(), "deployment_mode", DEPLOYMENT_MODE.name(), "airbyte_version", AIRBYTE_VERSION, "failure_origin", "destination", "failure_type", "system_error", "connector_definition_id", DESTINATION_DEFINITION_ID.toString(), + "connector_repository", DESTINATION_DOCKER_REPOSITORY, "connector_name", DESTINATION_DEFINITION_NAME, "connector_release_stage", DESTINATION_RELEASE_STAGE.toString()); From dd8993b71c73928423954956cdc93c7a50138a08 Mon Sep 17 00:00:00 2001 From: Pedro Lopez Date: Mon, 4 Jul 2022 12:20:06 -0400 Subject: [PATCH 2/5] add tag for connection url --- .../scheduler/persistence/WebUrlHelper.java | 28 ++++++++++ .../job_error_reporter/JobErrorReporter.java | 7 +++ .../persistence/WebUrlHelperTest.java | 38 +++++++++++++ .../JobErrorReporterTest.java | 56 +++++++++++-------- .../java/io/airbyte/workers/WorkerApp.java | 10 +++- 5 files changed, 114 insertions(+), 25 deletions(-) create mode 100644 airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/WebUrlHelper.java create mode 100644 airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/WebUrlHelperTest.java diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/WebUrlHelper.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/WebUrlHelper.java new file mode 100644 index 000000000000..34c6a966a325 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/WebUrlHelper.java @@ -0,0 +1,28 @@ +package io.airbyte.scheduler.persistence; + +import java.util.UUID; + +public class WebUrlHelper { + + private final String webAppUrl; + + public WebUrlHelper(final String webAppUrl) { + this.webAppUrl = webAppUrl; + } + + public String getBaseUrl() { + if(webAppUrl.endsWith("/")) { + return webAppUrl.substring(0, webAppUrl.length() - 1); + } + + return webAppUrl; + } + + public String getWorkspaceUrl(final UUID workspaceId) { + return String.format("%s/workspaces/%s", getBaseUrl(), workspaceId); + } + + public String getConnectionUrl(final UUID workspaceId, final UUID connectionId) { + return String.format("%s/connections/%s", getWorkspaceUrl(workspaceId), connectionId); + } +} diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java index 3d65f3816227..5d2c5ce621be 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java @@ -13,6 +13,7 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.scheduler.persistence.WebUrlHelper; import java.util.HashMap; import java.util.List; import java.util.UUID; @@ -30,6 +31,7 @@ public class JobErrorReporter { private static final String FAILURE_TYPE_META_KEY = "failure_type"; private static final String WORKSPACE_ID_META_KEY = "workspace_id"; private static final String CONNECTION_ID_META_KEY = "connection_id"; + private static final String CONNECTION_URL_META_KEY = "connection_url"; private static final String CONNECTOR_NAME_META_KEY = "connector_name"; private static final String CONNECTOR_REPOSITORY_META_KEY = "connector_repository"; private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id"; @@ -38,16 +40,19 @@ public class JobErrorReporter { private final ConfigRepository configRepository; private final DeploymentMode deploymentMode; private final String airbyteVersion; + private final WebUrlHelper webUrlHelper; private final JobErrorReportingClient jobErrorReportingClient; public JobErrorReporter(final ConfigRepository configRepository, final DeploymentMode deploymentMode, final String airbyteVersion, + final WebUrlHelper webUrlHelper, final JobErrorReportingClient jobErrorReportingClient) { this.configRepository = configRepository; this.deploymentMode = deploymentMode; this.airbyteVersion = airbyteVersion; + this.webUrlHelper = webUrlHelper; this.jobErrorReportingClient = jobErrorReportingClient; } @@ -64,6 +69,7 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu .toList(); final StandardWorkspace workspace = configRepository.getStandardWorkspaceFromConnection(connectionId, true); + final String connectionUrl = webUrlHelper.getConnectionUrl(workspace.getWorkspaceId(), connectionId); for (final FailureReason failureReason : traceMessageFailures) { final FailureOrigin failureOrigin = failureReason.getFailureOrigin(); @@ -71,6 +77,7 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu final HashMap metadata = new HashMap<>(); metadata.put(WORKSPACE_ID_META_KEY, workspace.getWorkspaceId().toString()); metadata.put(CONNECTION_ID_META_KEY, connectionId.toString()); + metadata.put(CONNECTION_URL_META_KEY, connectionUrl); metadata.put(AIRBYTE_VERSION_META_KEY, airbyteVersion); metadata.put(DEPLOYMENT_MODE_META_KEY, deploymentMode.name()); metadata.put(FAILURE_ORIGIN_META_KEY, failureOrigin.value()); diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/WebUrlHelperTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/WebUrlHelperTest.java new file mode 100644 index 000000000000..937a76060390 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/WebUrlHelperTest.java @@ -0,0 +1,38 @@ +package io.airbyte.scheduler.persistence; + +import java.util.UUID; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class WebUrlHelperTest { + private static final UUID WORKSPACE_ID = UUID.randomUUID(); + private static final UUID CONNECTION_ID = UUID.randomUUID(); + + @Test + void testGetBaseUrl() { + final WebUrlHelper webUrlHelper = new WebUrlHelper("http://localhost:8000"); + Assertions.assertEquals("http://localhost:8000", webUrlHelper.getBaseUrl()); + } + + @Test + void testGetBaseUrlTrailingSlash() { + final WebUrlHelper webUrlHelper = new WebUrlHelper("http://localhost:8001/"); + Assertions.assertEquals("http://localhost:8001", webUrlHelper.getBaseUrl()); + } + + @Test + void testGetWorkspaceUrl() { + final WebUrlHelper webUrlHelper = new WebUrlHelper("http://localhost:8000"); + final String workspaceUrl = webUrlHelper.getWorkspaceUrl(WORKSPACE_ID); + final String expectedUrl = String.format("http://localhost:8000/workspaces/%s", WORKSPACE_ID); + Assertions.assertEquals(expectedUrl, workspaceUrl); + } + + @Test + void testGetConnectionUrl() { + final WebUrlHelper webUrlHelper = new WebUrlHelper("http://localhost:8000"); + final String connectionUrl = webUrlHelper.getConnectionUrl(WORKSPACE_ID, CONNECTION_ID); + final String expectedUrl = String.format("http://localhost:8000/workspaces/%s/connections/%s", WORKSPACE_ID, CONNECTION_ID); + Assertions.assertEquals(expectedUrl, connectionUrl); + } +} diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java index 7194f6d4a625..42f397c7c021 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java @@ -17,6 +17,7 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.scheduler.persistence.WebUrlHelper; import java.util.List; import java.util.Map; import java.util.UUID; @@ -29,6 +30,7 @@ public class JobErrorReporterTest { private static final UUID WORKSPACE_ID = UUID.randomUUID(); private static final UUID CONNECTION_ID = UUID.randomUUID(); + private static final String CONNECTION_URL = "http://localhost:8000/connection/my_connection"; private static final DeploymentMode DEPLOYMENT_MODE = DeploymentMode.OSS; private static final String AIRBYTE_VERSION = "0.1.40"; private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID(); @@ -44,13 +46,15 @@ public class JobErrorReporterTest { private ConfigRepository configRepository; private JobErrorReportingClient jobErrorReportingClient; + private WebUrlHelper webUrlHelper; private JobErrorReporter jobErrorReporter; @BeforeEach void setup() { configRepository = mock(ConfigRepository.class); jobErrorReportingClient = mock(JobErrorReportingClient.class); - jobErrorReporter = new JobErrorReporter(configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, jobErrorReportingClient); + webUrlHelper = mock(WebUrlHelper.class); + jobErrorReporter = new JobErrorReporter(configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, webUrlHelper, jobErrorReportingClient); } @Test @@ -77,6 +81,8 @@ void testReportSyncJobFailure() { Mockito.when(mJobSyncConfig.getSourceDockerImage()).thenReturn(SOURCE_DOCKER_IMAGE); Mockito.when(mJobSyncConfig.getDestinationDockerImage()).thenReturn(DESTINATION_DOCKER_IMAGE); + Mockito.when(webUrlHelper.getConnectionUrl(WORKSPACE_ID, CONNECTION_ID)).thenReturn(CONNECTION_URL); + Mockito.when(configRepository.getSourceDefinitionFromConnection(CONNECTION_ID)) .thenReturn(new StandardSourceDefinition() .withDockerRepository(SOURCE_DOCKER_REPOSITORY) @@ -97,29 +103,31 @@ void testReportSyncJobFailure() { jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, mJobSyncConfig); - final Map expectedSourceMetadata = Map.of( - "workspace_id", WORKSPACE_ID.toString(), - "connection_id", CONNECTION_ID.toString(), - "deployment_mode", DEPLOYMENT_MODE.name(), - "airbyte_version", AIRBYTE_VERSION, - "failure_origin", "source", - "failure_type", "system_error", - "connector_definition_id", SOURCE_DEFINITION_ID.toString(), - "connector_repository", SOURCE_DOCKER_REPOSITORY, - "connector_name", SOURCE_DEFINITION_NAME, - "connector_release_stage", SOURCE_RELEASE_STAGE.toString()); - - final Map expectedDestinationMetadata = Map.of( - "workspace_id", WORKSPACE_ID.toString(), - "connection_id", CONNECTION_ID.toString(), - "deployment_mode", DEPLOYMENT_MODE.name(), - "airbyte_version", AIRBYTE_VERSION, - "failure_origin", "destination", - "failure_type", "system_error", - "connector_definition_id", DESTINATION_DEFINITION_ID.toString(), - "connector_repository", DESTINATION_DOCKER_REPOSITORY, - "connector_name", DESTINATION_DEFINITION_NAME, - "connector_release_stage", DESTINATION_RELEASE_STAGE.toString()); + final Map expectedSourceMetadata = Map.ofEntries( + Map.entry("workspace_id", WORKSPACE_ID.toString()), + Map.entry("connection_id", CONNECTION_ID.toString()), + Map.entry("connection_url", CONNECTION_URL), + Map.entry("deployment_mode", DEPLOYMENT_MODE.name()), + Map.entry("airbyte_version", AIRBYTE_VERSION), + Map.entry("failure_origin", "source"), + Map.entry("failure_type", "system_error"), + Map.entry("connector_definition_id", SOURCE_DEFINITION_ID.toString()), + Map.entry("connector_repository", SOURCE_DOCKER_REPOSITORY), + Map.entry("connector_name", SOURCE_DEFINITION_NAME), + Map.entry("connector_release_stage", SOURCE_RELEASE_STAGE.toString())); + + final Map expectedDestinationMetadata = Map.ofEntries( + Map.entry("workspace_id", WORKSPACE_ID.toString()), + Map.entry("connection_id", CONNECTION_ID.toString()), + Map.entry("connection_url", CONNECTION_URL), + Map.entry("deployment_mode", DEPLOYMENT_MODE.name()), + Map.entry("airbyte_version", AIRBYTE_VERSION), + Map.entry("failure_origin", "destination"), + Map.entry("failure_type", "system_error"), + Map.entry("connector_definition_id", DESTINATION_DEFINITION_ID.toString()), + Map.entry("connector_repository", DESTINATION_DOCKER_REPOSITORY), + Map.entry("connector_name", DESTINATION_DEFINITION_NAME), + Map.entry("connector_release_stage", DESTINATION_RELEASE_STAGE.toString())); Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, sourceFailureReason, SOURCE_DOCKER_IMAGE, expectedSourceMetadata); Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, destinationFailureReason, DESTINATION_DOCKER_IMAGE, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 6b6305495490..a3352a5e41a6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -41,6 +41,7 @@ import io.airbyte.scheduler.persistence.JobCreator; import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.scheduler.persistence.WebUrlHelper; import io.airbyte.scheduler.persistence.WorkspaceHelper; import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter; import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReportingClient; @@ -446,6 +447,8 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf final Optional containerOrchestratorConfig = getContainerOrchestratorConfig(configs); + final WebUrlHelper webUrlHelper = new WebUrlHelper(configs.getWebappUrl()); + final JobNotifier jobNotifier = new JobNotifier( configs.getWebappUrl(), configRepository, @@ -456,7 +459,12 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf final JobErrorReportingClient jobErrorReportingClient = JobErrorReportingClientFactory.getClient(configs.getJobErrorReportingStrategy(), configs); final JobErrorReporter jobErrorReporter = - new JobErrorReporter(configRepository, configs.getDeploymentMode(), configs.getAirbyteVersionOrWarning(), jobErrorReportingClient); + new JobErrorReporter( + configRepository, + configs.getDeploymentMode(), + configs.getAirbyteVersionOrWarning(), + webUrlHelper, + jobErrorReportingClient); new WorkerApp( workspaceRoot, From 7bacab408379c77a0f47aa5fed688f329bfe1751 Mon Sep 17 00:00:00 2001 From: Pedro Lopez Date: Mon, 4 Jul 2022 12:27:48 -0400 Subject: [PATCH 3/5] fix urls for job notifier --- .../airbyte/scheduler/persistence/JobNotifier.java | 12 ++++-------- .../scheduler/persistence/JobNotifierTest.java | 6 ++++-- .../src/main/java/io/airbyte/workers/WorkerApp.java | 2 +- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java index b2896a6fdfd0..c190c3ecc98a 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobNotifier.java @@ -39,21 +39,17 @@ public class JobNotifier { public static final String CONNECTION_DISABLED_WARNING_NOTIFICATION = "Connection Disabled Warning Notification"; public static final String CONNECTION_DISABLED_NOTIFICATION = "Connection Disabled Notification"; - private final String connectionPageUrl; private final ConfigRepository configRepository; private final TrackingClient trackingClient; + private final WebUrlHelper webUrlHelper; private final WorkspaceHelper workspaceHelper; - public JobNotifier(final String webappUrl, + public JobNotifier(final WebUrlHelper webUrlHelper, final ConfigRepository configRepository, final WorkspaceHelper workspaceHelper, final TrackingClient trackingClient) { + this.webUrlHelper = webUrlHelper; this.workspaceHelper = workspaceHelper; - if (webappUrl.endsWith("/")) { - this.connectionPageUrl = String.format("%sconnections/", webappUrl); - } else { - this.connectionPageUrl = String.format("%s/connections/", webappUrl); - } this.configRepository = configRepository; this.trackingClient = trackingClient; } @@ -82,7 +78,7 @@ private void notifyJob(final String reason, final String destinationConnector = destinationDefinition.getName(); final String failReason = Strings.isNullOrEmpty(reason) ? "" : String.format(", as the %s", reason); final String jobDescription = getJobDescription(job, failReason); - final String logUrl = connectionPageUrl + connectionId; + final String logUrl = webUrlHelper.getConnectionUrl(workspaceId, connectionId); final ImmutableMap jobMetadata = TrackingMetadata.generateJobAttemptMetadata(job); final ImmutableMap sourceMetadata = TrackingMetadata.generateSourceDefinitionMetadata(sourceDefinition); final ImmutableMap destinationMetadata = TrackingMetadata.generateDestinationDefinitionMetadata(destinationDefinition); diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java index a310000fa61f..01abe668d711 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java @@ -47,6 +47,8 @@ class JobNotifierTest { private static final String TEST_DOCKER_TAG = "0.1.0"; private static final UUID WORKSPACE_ID = UUID.randomUUID(); + private final WebUrlHelper webUrlHelper = new WebUrlHelper(WEBAPP_URL); + private ConfigRepository configRepository; private WorkspaceHelper workspaceHelper; private JobNotifier jobNotifier; @@ -59,7 +61,7 @@ void setup() { workspaceHelper = mock(WorkspaceHelper.class); trackingClient = mock(TrackingClient.class); - jobNotifier = spy(new JobNotifier(WEBAPP_URL, configRepository, workspaceHelper, trackingClient)); + jobNotifier = spy(new JobNotifier(webUrlHelper, configRepository, workspaceHelper, trackingClient)); notificationClient = mock(NotificationClient.class); when(jobNotifier.getNotificationClient(getSlackNotification())).thenReturn(notificationClient); } @@ -92,7 +94,7 @@ void testFailJob() throws IOException, InterruptedException, JsonValidationExcep "destination-test", String.format("sync started on %s, running for 1 day 10 hours 17 minutes 36 seconds, as the JobNotifierTest was running.", formatter.format(Instant.ofEpochSecond(job.getStartedAtInSecond().get()))), - "http://localhost:8000/connections/" + job.getScope()); + String.format("http://localhost:8000/workspaces/%s/connections/%s", WORKSPACE_ID, job.getScope())); final Builder metadata = ImmutableMap.builder(); metadata.put("connection_id", UUID.fromString(job.getScope())); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index a3352a5e41a6..76cad1d6b885 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -450,7 +450,7 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf final WebUrlHelper webUrlHelper = new WebUrlHelper(configs.getWebappUrl()); final JobNotifier jobNotifier = new JobNotifier( - configs.getWebappUrl(), + webUrlHelper, configRepository, workspaceHelper, TrackingClientSingleton.get()); From 7d840b00669434a98468221702905fca31e724a8 Mon Sep 17 00:00:00 2001 From: Pedro Lopez Date: Mon, 4 Jul 2022 12:28:10 -0400 Subject: [PATCH 4/5] format --- .../io/airbyte/scheduler/persistence/WebUrlHelper.java | 7 ++++++- .../io/airbyte/scheduler/persistence/WebUrlHelperTest.java | 6 ++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/WebUrlHelper.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/WebUrlHelper.java index 34c6a966a325..fd9145c2f3a2 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/WebUrlHelper.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/WebUrlHelper.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.scheduler.persistence; import java.util.UUID; @@ -11,7 +15,7 @@ public WebUrlHelper(final String webAppUrl) { } public String getBaseUrl() { - if(webAppUrl.endsWith("/")) { + if (webAppUrl.endsWith("/")) { return webAppUrl.substring(0, webAppUrl.length() - 1); } @@ -25,4 +29,5 @@ public String getWorkspaceUrl(final UUID workspaceId) { public String getConnectionUrl(final UUID workspaceId, final UUID connectionId) { return String.format("%s/connections/%s", getWorkspaceUrl(workspaceId), connectionId); } + } diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/WebUrlHelperTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/WebUrlHelperTest.java index 937a76060390..d5e80775b737 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/WebUrlHelperTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/WebUrlHelperTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.scheduler.persistence; import java.util.UUID; @@ -5,6 +9,7 @@ import org.junit.jupiter.api.Test; public class WebUrlHelperTest { + private static final UUID WORKSPACE_ID = UUID.randomUUID(); private static final UUID CONNECTION_ID = UUID.randomUUID(); @@ -35,4 +40,5 @@ void testGetConnectionUrl() { final String expectedUrl = String.format("http://localhost:8000/workspaces/%s/connections/%s", WORKSPACE_ID, CONNECTION_ID); Assertions.assertEquals(expectedUrl, connectionUrl); } + } From a9d4a0896030d6880659c83541f5cd87220f03ad Mon Sep 17 00:00:00 2001 From: Pedro Lopez Date: Mon, 4 Jul 2022 12:58:54 -0400 Subject: [PATCH 5/5] fix failing test --- .../persistence/job_error_reporter/JobErrorReporterTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java index 42f397c7c021..8580a440a7fd 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java @@ -153,6 +153,10 @@ void testReportSyncJobFailureDoesNotThrow() { .withSourceDefinitionId(SOURCE_DEFINITION_ID) .withName(SOURCE_DEFINITION_NAME)); + final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class); + Mockito.when(configRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, true)).thenReturn(mWorkspace); + Mockito.when(mWorkspace.getWorkspaceId()).thenReturn(WORKSPACE_ID); + Mockito.doThrow(new RuntimeException("some exception")) .when(jobErrorReportingClient) .reportJobFailureReason(Mockito.any(), Mockito.eq(sourceFailureReason), Mockito.any(), Mockito.any());