diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index 2d32af0a93aa..76e6990230cd 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -89,6 +89,8 @@ public class EnvConfigs implements Configs { private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE"; public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET"; public static final String PUBLISH_METRICS = "PUBLISH_METRICS"; + public static final String DD_AGENT_HOST = "DD_AGENT_HOST"; + public static final String DD_DOGSTATSD_PORT = "DD_DOGSTATSD_PORT"; private static final String CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION"; private static final String CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS = "CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS"; private static final String JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION"; @@ -97,8 +99,6 @@ public class EnvConfigs implements Configs { private static final String CONTAINER_ORCHESTRATOR_SECRET_NAME = "CONTAINER_ORCHESTRATOR_SECRET_NAME"; private static final String CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH = "CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH"; private static final String CONTAINER_ORCHESTRATOR_IMAGE = "CONTAINER_ORCHESTRATOR_IMAGE"; - private static final String DD_AGENT_HOST = "DD_AGENT_HOST"; - private static final String DD_DOGSTATSD_PORT = "DD_DOGSTATSD_PORT"; public static final String DD_CONSTANT_TAGS = "DD_CONSTANT_TAGS"; public static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME"; public static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION"; @@ -132,7 +132,7 @@ public class EnvConfigs implements Configs { private static final String MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE = "MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE"; private static final String MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE = "MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE"; - private static final String METRIC_CLIENT = "METRIC_CLIENT"; + public static final String METRIC_CLIENT = "METRIC_CLIENT"; private static final String OTEL_COLLECTOR_ENDPOINT = "OTEL_COLLECTOR_ENDPOINT"; // job-type-specific overrides @@ -798,7 +798,7 @@ public String getDDDogStatsDPort() { @Override public List getDDConstantTags() { - String tagsString = getEnvOrDefault(DD_CONSTANT_TAGS, ""); + final String tagsString = getEnvOrDefault(DD_CONSTANT_TAGS, ""); return Splitter.on(",") .splitToStream(tagsString) .filter(s -> !s.trim().isBlank()) diff --git a/airbyte-container-orchestrator/build.gradle b/airbyte-container-orchestrator/build.gradle index d694f6466ecd..d2f6196b9319 100644 --- a/airbyte-container-orchestrator/build.gradle +++ b/airbyte-container-orchestrator/build.gradle @@ -18,6 +18,7 @@ dependencies { implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-scheduler:scheduler-persistence') implementation project(':airbyte-workers') + implementation project(':airbyte-metrics:metrics-lib') testImplementation 'org.mockito:mockito-inline:2.13.0' testImplementation libs.postgresql diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java index 8bc6f0686405..2f6901a36f27 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java @@ -9,11 +9,15 @@ import io.airbyte.config.Configs; import io.airbyte.config.ReplicationOutput; import io.airbyte.config.StandardSyncInput; +import io.airbyte.metrics.lib.MetricClient; +import io.airbyte.metrics.lib.MetricClientFactory; +import io.airbyte.metrics.lib.MetricEmittingApps; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.RecordSchemaValidator; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerConstants; +import io.airbyte.workers.WorkerMetricReporter; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.general.DefaultReplicationWorker; import io.airbyte.workers.general.ReplicationWorker; @@ -96,6 +100,10 @@ public Optional runJob() throws Exception { featureFlags.useStreamCapableState()) : new DefaultAirbyteSource(workerConfigs, sourceLauncher); + MetricClientFactory.initialize(MetricEmittingApps.WORKER); + final MetricClient metricClient = MetricClientFactory.getMetricClient(); + final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage()); + log.info("Setting up replication worker..."); final ReplicationWorker replicationWorker = new DefaultReplicationWorker( jobRunConfig.getJobId(), @@ -104,7 +112,8 @@ public Optional runJob() throws Exception { new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()), new DefaultAirbyteDestination(workerConfigs, destinationLauncher), new AirbyteMessageTracker(), - new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput))); + new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), + metricReporter); log.info("Running replication worker..."); final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId()); diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricClientFactory.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricClientFactory.java index 18cf95c97efe..543ed7acbf2d 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricClientFactory.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricClientFactory.java @@ -57,11 +57,16 @@ public synchronized static MetricClient getMetricClient() { */ public static synchronized void initialize(final MetricEmittingApp metricEmittingApp) { if (metricClient != null) { - throw new RuntimeException("You cannot initialize configuration more than once."); + LOGGER.warn("Metric client is already initialized to " + configs.getMetricClient()); + return; } if (DATADOG_METRIC_CLIENT.equals(configs.getMetricClient())) { - initializeDatadogMetricClient(metricEmittingApp); + if (configs.getDDAgentHost() == null || configs.getDDDogStatsDPort() == null) { + throw new RuntimeException("DD_AGENT_HOST is null or DD_DOGSTATSD_PORT is null. Both are required to use the DataDog Metric Client"); + } else { + initializeDatadogMetricClient(metricEmittingApp); + } } else if (OTEL_METRIC_CLIENT.equals(configs.getMetricClient())) { initializeOpenTelemetryMetricClient(metricEmittingApp); } else { @@ -90,7 +95,7 @@ public String host() { * Returning null for default get function because the host has been overridden above. */ @Override - public String get(String key) { + public String get(final String key) { return null; } @@ -105,7 +110,7 @@ public String get(String key) { public static MeterRegistry getMeterRegistry() { if (DATADOG_METRIC_CLIENT.equals(configs.getMetricClient())) { - StatsdConfig config = getDatadogStatsDConfig(); + final StatsdConfig config = getDatadogStatsDConfig(); return new StatsdMeterRegistry(config, Clock.SYSTEM); } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java index d717a64ac339..0d9c37125772 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java @@ -96,18 +96,18 @@ public enum OssMetricsRegistry implements MetricsRegistry { OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER, "overall_job_runtime_in_last_hour_by_terminal_state_secs", "overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states."), - TEMPORAL_WORKFLOW_ATTEMPT(MetricEmittingApps.WORKER, "temporal_workflow_attempt", "count of the number of workflow attempts"), - TEMPORAL_WORKFLOW_SUCCESS(MetricEmittingApps.WORKER, "temporal_workflow_success", "count of the number of successful workflow syncs."), - TEMPORAL_WORKFLOW_FAILURE(MetricEmittingApps.WORKER, "temporal_workflow_failure", - "count of the number of workflow failures"); + "count of the number of workflow failures"), + NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS(MetricEmittingApps.WORKER, + "record_schema_validation_error", + "number of record schema validation errors"); private final MetricEmittingApp application; private final String metricName; diff --git a/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricClientFactoryTest.java b/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricClientFactoryTest.java index 60299f377b30..14e49846797f 100644 --- a/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricClientFactoryTest.java +++ b/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/MetricClientFactoryTest.java @@ -35,15 +35,6 @@ void testMetricClientFactoryCreateSuccess() { }); } - @Test - @DisplayName("Should throw error if MetricClientFactory create a metric client multiple times;") - void testMetricClientFactoryCreateMultipleTimesThrows() { - Assertions.assertThrows(RuntimeException.class, () -> { - MetricClientFactory.initialize(MetricEmittingApps.METRICS_REPORTER); - MetricClientFactory.initialize(MetricEmittingApps.METRICS_REPORTER); - }); - } - @Test @DisplayName("Should not return null if metric client not specified;") void testMicroMeterRegistryRuturnsNullForEmptyClientConfig() { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerMetricReporter.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerMetricReporter.java new file mode 100644 index 000000000000..7ba8794a25b7 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerMetricReporter.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers; + +import io.airbyte.metrics.lib.MetricAttribute; +import io.airbyte.metrics.lib.MetricClient; +import io.airbyte.metrics.lib.OssMetricsRegistry; + +public class WorkerMetricReporter { + + private final String dockerRepo; + private final String dockerVersion; + private final MetricClient metricClient; + + public WorkerMetricReporter(final MetricClient metricClient, final String dockerImage) { + final String[] dockerImageInfo = dockerImage.split(":"); + this.dockerRepo = dockerImageInfo[0]; + this.dockerVersion = dockerImageInfo.length > 1 ? dockerImageInfo[1] : ""; + this.metricClient = metricClient; + } + + public void trackSchemaValidationError(final String stream) { + metricClient.count(OssMetricsRegistry.NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS, 1, new MetricAttribute("docker_repo", dockerRepo), + new MetricAttribute("docker_version", dockerVersion), new MetricAttribute("stream", stream)); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 1fb914a65747..4b5e26425a1a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -78,6 +78,7 @@ public class DefaultReplicationWorker implements ReplicationWorker { private final AtomicBoolean cancelled; private final AtomicBoolean hasFailed; private final RecordSchemaValidator recordSchemaValidator; + private final WorkerMetricReporter metricReporter; public DefaultReplicationWorker(final String jobId, final int attempt, @@ -85,7 +86,8 @@ public DefaultReplicationWorker(final String jobId, final AirbyteMapper mapper, final AirbyteDestination destination, final MessageTracker messageTracker, - final RecordSchemaValidator recordSchemaValidator) { + final RecordSchemaValidator recordSchemaValidator, + final WorkerMetricReporter metricReporter) { this.jobId = jobId; this.attempt = attempt; this.source = source; @@ -94,6 +96,7 @@ public DefaultReplicationWorker(final String jobId, this.messageTracker = messageTracker; this.executors = Executors.newFixedThreadPool(2); this.recordSchemaValidator = recordSchemaValidator; + this.metricReporter = metricReporter; this.cancelled = new AtomicBoolean(false); this.hasFailed = new AtomicBoolean(false); @@ -112,7 +115,7 @@ public DefaultReplicationWorker(final String jobId, * @throws WorkerException */ @Override - public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException { + public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException { LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt); // todo (cgardens) - this should not be happening in the worker. this is configuration information @@ -154,7 +157,7 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo }); final CompletableFuture replicationThreadFuture = CompletableFuture.runAsync( - getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator), + getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter), executors).whenComplete((msg, ex) -> { if (ex != null) { if (ex.getCause() instanceof SourceException) { @@ -293,7 +296,8 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, final AirbyteMapper mapper, final MessageTracker messageTracker, final Map mdc, - final RecordSchemaValidator recordSchemaValidator) { + final RecordSchemaValidator recordSchemaValidator, + final WorkerMetricReporter metricReporter) { return () -> { MDC.setContextMap(mdc); LOGGER.info("Replication thread started."); @@ -337,6 +341,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, if (!validationErrors.isEmpty()) { validationErrors.forEach((stream, errorPair) -> { LOGGER.warn("Schema validation errors found for stream {}. Error messages: {}", stream, errorPair.getLeft()); + metricReporter.trackSchemaValidationError(stream); }); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java index 92c6708cb306..ac79c44c0dfc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java @@ -7,6 +7,7 @@ import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.EnvConfigs; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.workers.WorkerApp; @@ -282,6 +283,11 @@ public void create(final Map allLabels, } + final EnvConfigs envConfigs = new EnvConfigs(); + envVars.add(new EnvVar(EnvConfigs.METRIC_CLIENT, envConfigs.getMetricClient(), null)); + envVars.add(new EnvVar(EnvConfigs.DD_AGENT_HOST, envConfigs.getDDAgentHost(), null)); + envVars.add(new EnvVar(EnvConfigs.DD_DOGSTATSD_PORT, envConfigs.getDDDogStatsDPort(), null)); + envVars.add(new EnvVar(EnvConfigs.PUBLISH_METRICS, Boolean.toString(envConfigs.getPublishMetrics()), null)); envVars.add(new EnvVar(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(useStreamCapableState), null)); final List containerPorts = KubePodProcess.createContainerPortList(portMap); containerPorts.add(new ContainerPort(WorkerApp.KUBE_HEARTBEAT_PORT, null, null, null, null)); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java index 8f7dea9b6308..3dd1f602fdaf 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java @@ -72,6 +72,7 @@ public LauncherWorker(final UUID connectionId, final ResourceRequirements resourceRequirements, final Class outputClass, final Supplier activityContext) { + this.connectionId = connectionId; this.application = application; this.podNamePrefix = podNamePrefix; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/OrchestratorConstants.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/OrchestratorConstants.java index d18e1564cc9a..02afc3b94516 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/OrchestratorConstants.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/OrchestratorConstants.java @@ -42,6 +42,10 @@ public class OrchestratorConstants { EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_LIMIT, EnvConfigs.JOB_DEFAULT_ENV_MAP, EnvConfigs.LOCAL_ROOT, + EnvConfigs.PUBLISH_METRICS, + EnvConfigs.DD_AGENT_HOST, + EnvConfigs.DD_DOGSTATSD_PORT, + EnvConfigs.METRIC_CLIENT, LOG_LEVEL, LogClientSingleton.GCS_LOG_BUCKET, LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 4a84a0ac9988..618c67f6deda 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -17,6 +17,9 @@ import io.airbyte.config.StandardSyncSummary; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; +import io.airbyte.metrics.lib.MetricClient; +import io.airbyte.metrics.lib.MetricClientFactory; +import io.airbyte.metrics.lib.MetricEmittingApps; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.scheduler.persistence.JobPersistence; @@ -26,6 +29,7 @@ import io.airbyte.workers.WorkerApp.ContainerOrchestratorConfig; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerConstants; +import io.airbyte.workers.WorkerMetricReporter; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.general.DefaultReplicationWorker; import io.airbyte.workers.internal.AirbyteMessageTracker; @@ -210,6 +214,9 @@ private CheckedSupplier, Exception> WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage()) ? new EmptyAirbyteSource(useStreamCapableState) : new DefaultAirbyteSource(workerConfigs, sourceLauncher); + MetricClientFactory.initialize(MetricEmittingApps.WORKER); + final MetricClient metricClient = MetricClientFactory.getMetricClient(); + final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage()); return new DefaultReplicationWorker( jobRunConfig.getJobId(), @@ -218,7 +225,8 @@ private CheckedSupplier, Exception> new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()), new DefaultAirbyteDestination(workerConfigs, destinationLauncher), new AirbyteMessageTracker(), - new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput))); + new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), + metricReporter); }; } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java index 405706678356..ac6780532266 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java @@ -38,6 +38,8 @@ import io.airbyte.config.WorkerSourceConfig; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; +import io.airbyte.metrics.lib.MetricClient; +import io.airbyte.metrics.lib.MetricClientFactory; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.validation.json.JsonSchemaValidator; @@ -94,6 +96,8 @@ class DefaultReplicationWorkerTest { private WorkerDestinationConfig destinationConfig; private AirbyteMessageTracker messageTracker; private RecordSchemaValidator recordSchemaValidator; + private MetricClient metricClient; + private WorkerMetricReporter workerMetricReporter; @SuppressWarnings("unchecked") @BeforeEach @@ -113,6 +117,8 @@ void setup() throws Exception { destination = mock(AirbyteDestination.class); messageTracker = mock(AirbyteMessageTracker.class); recordSchemaValidator = mock(RecordSchemaValidator.class); + metricClient = MetricClientFactory.getMetricClient(); + workerMetricReporter = new WorkerMetricReporter(metricClient, "docker_image:v1.0.0"); when(source.isFinished()).thenReturn(false, false, false, true); when(destination.isFinished()).thenReturn(false, false, false, true); @@ -138,7 +144,8 @@ void test() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); worker.run(syncInput, jobRoot); @@ -163,7 +170,8 @@ void testInvalidSchema() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); worker.run(syncInput, jobRoot); @@ -188,7 +196,8 @@ void testSourceNonZeroExitValue() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput output = worker.run(syncInput, jobRoot); assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus()); assertTrue(output.getFailures().stream().anyMatch(f -> f.getFailureOrigin().equals(FailureOrigin.SOURCE))); @@ -207,7 +216,8 @@ void testReplicationRunnableSourceFailure() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput output = worker.run(syncInput, jobRoot); assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus()); @@ -228,7 +238,8 @@ void testReplicationRunnableDestinationFailure() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput output = worker.run(syncInput, jobRoot); assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus()); @@ -248,7 +259,8 @@ void testReplicationRunnableDestinationFailureViaTraceMessage() throws Exception mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput output = worker.run(syncInput, jobRoot); assertTrue(output.getFailures().stream() @@ -269,7 +281,8 @@ void testReplicationRunnableWorkerFailure() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput output = worker.run(syncInput, jobRoot); assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus()); @@ -288,7 +301,8 @@ void testDestinationNonZeroExitValue() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput output = worker.run(syncInput, jobRoot); assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus()); @@ -308,7 +322,8 @@ void testDestinationRunnableDestinationFailure() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput output = worker.run(syncInput, jobRoot); assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus()); @@ -329,7 +344,8 @@ void testDestinationRunnableWorkerFailure() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput output = worker.run(syncInput, jobRoot); assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus()); @@ -351,7 +367,8 @@ void testLoggingInThreads() throws IOException, WorkerException { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); worker.run(syncInput, jobRoot); @@ -391,7 +408,8 @@ void testCancellation() throws InterruptedException { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final Thread workerThread = new Thread(() -> { try { @@ -432,7 +450,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput actual = worker.run(syncInput, jobRoot); final ReplicationOutput replicationOutput = new ReplicationOutput() @@ -485,7 +504,8 @@ void testPopulatesStateOnFailureIfAvailable() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput actual = worker.run(syncInput, jobRoot); assertNotNull(actual); @@ -503,7 +523,8 @@ void testRetainsStateOnFailureIfNewStateNotAvailable() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput actual = worker.run(syncInput, jobRoot); @@ -529,7 +550,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput actual = worker.run(syncInput, jobRoot); final SyncStats expectedTotalStats = new SyncStats() @@ -565,7 +587,8 @@ void testDoesNotPopulatesStateOnFailureIfNotAvailable() throws Exception { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); final ReplicationOutput actual = worker.run(syncInputWithoutState, jobRoot); @@ -584,7 +607,8 @@ void testDoesNotPopulateOnIrrecoverableFailure() { mapper, destination, messageTracker, - recordSchemaValidator); + recordSchemaValidator, + workerMetricReporter); assertThrows(WorkerException.class, () -> worker.run(syncInput, jobRoot)); }