Skip to content

Commit

Permalink
Add datadog tracking to record schema validation errors (#13393)
Browse files Browse the repository at this point in the history
* Add datadog tracking for record schema validation errors
and pass in env variables needed for tracking dd in workers
  • Loading branch information
alovew authored Aug 8, 2022
1 parent acc38c4 commit 12270cc
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class EnvConfigs implements Configs {
private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE";
public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET";
public static final String PUBLISH_METRICS = "PUBLISH_METRICS";
public static final String DD_AGENT_HOST = "DD_AGENT_HOST";
public static final String DD_DOGSTATSD_PORT = "DD_DOGSTATSD_PORT";
private static final String CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION";
private static final String CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS = "CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS";
private static final String JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION";
Expand All @@ -97,8 +99,6 @@ public class EnvConfigs implements Configs {
private static final String CONTAINER_ORCHESTRATOR_SECRET_NAME = "CONTAINER_ORCHESTRATOR_SECRET_NAME";
private static final String CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH = "CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH";
private static final String CONTAINER_ORCHESTRATOR_IMAGE = "CONTAINER_ORCHESTRATOR_IMAGE";
private static final String DD_AGENT_HOST = "DD_AGENT_HOST";
private static final String DD_DOGSTATSD_PORT = "DD_DOGSTATSD_PORT";
public static final String DD_CONSTANT_TAGS = "DD_CONSTANT_TAGS";
public static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME";
public static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION";
Expand Down Expand Up @@ -132,7 +132,7 @@ public class EnvConfigs implements Configs {
private static final String MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE = "MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE";
private static final String MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE = "MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE";

private static final String METRIC_CLIENT = "METRIC_CLIENT";
public static final String METRIC_CLIENT = "METRIC_CLIENT";
private static final String OTEL_COLLECTOR_ENDPOINT = "OTEL_COLLECTOR_ENDPOINT";

// job-type-specific overrides
Expand Down Expand Up @@ -798,7 +798,7 @@ public String getDDDogStatsDPort() {

@Override
public List<String> getDDConstantTags() {
String tagsString = getEnvOrDefault(DD_CONSTANT_TAGS, "");
final String tagsString = getEnvOrDefault(DD_CONSTANT_TAGS, "");
return Splitter.on(",")
.splitToStream(tagsString)
.filter(s -> !s.trim().isBlank())
Expand Down
1 change: 1 addition & 0 deletions airbyte-container-orchestrator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-scheduler:scheduler-persistence')
implementation project(':airbyte-workers')
implementation project(':airbyte-metrics:metrics-lib')

testImplementation 'org.mockito:mockito-inline:2.13.0'
testImplementation libs.postgresql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
import io.airbyte.config.Configs;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.general.DefaultReplicationWorker;
import io.airbyte.workers.general.ReplicationWorker;
Expand Down Expand Up @@ -96,6 +100,10 @@ public Optional<String> runJob() throws Exception {
featureFlags.useStreamCapableState())
: new DefaultAirbyteSource(workerConfigs, sourceLauncher);

MetricClientFactory.initialize(MetricEmittingApps.WORKER);
final MetricClient metricClient = MetricClientFactory.getMetricClient();
final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage());

log.info("Setting up replication worker...");
final ReplicationWorker replicationWorker = new DefaultReplicationWorker(
jobRunConfig.getJobId(),
Expand All @@ -104,7 +112,8 @@ public Optional<String> runJob() throws Exception {
new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()),
new DefaultAirbyteDestination(workerConfigs, destinationLauncher),
new AirbyteMessageTracker(),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)));
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter);

log.info("Running replication worker...");
final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ public synchronized static MetricClient getMetricClient() {
*/
public static synchronized void initialize(final MetricEmittingApp metricEmittingApp) {
if (metricClient != null) {
throw new RuntimeException("You cannot initialize configuration more than once.");
LOGGER.warn("Metric client is already initialized to " + configs.getMetricClient());
return;
}

if (DATADOG_METRIC_CLIENT.equals(configs.getMetricClient())) {
initializeDatadogMetricClient(metricEmittingApp);
if (configs.getDDAgentHost() == null || configs.getDDDogStatsDPort() == null) {
throw new RuntimeException("DD_AGENT_HOST is null or DD_DOGSTATSD_PORT is null. Both are required to use the DataDog Metric Client");
} else {
initializeDatadogMetricClient(metricEmittingApp);
}
} else if (OTEL_METRIC_CLIENT.equals(configs.getMetricClient())) {
initializeOpenTelemetryMetricClient(metricEmittingApp);
} else {
Expand Down Expand Up @@ -90,7 +95,7 @@ public String host() {
* Returning null for default get function because the host has been overridden above.
*/
@Override
public String get(String key) {
public String get(final String key) {
return null;
}

Expand All @@ -105,7 +110,7 @@ public String get(String key) {
public static MeterRegistry getMeterRegistry() {

if (DATADOG_METRIC_CLIENT.equals(configs.getMetricClient())) {
StatsdConfig config = getDatadogStatsDConfig();
final StatsdConfig config = getDatadogStatsDConfig();
return new StatsdMeterRegistry(config, Clock.SYSTEM);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,18 @@ public enum OssMetricsRegistry implements MetricsRegistry {
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states."),

TEMPORAL_WORKFLOW_ATTEMPT(MetricEmittingApps.WORKER,
"temporal_workflow_attempt",
"count of the number of workflow attempts"),

TEMPORAL_WORKFLOW_SUCCESS(MetricEmittingApps.WORKER,
"temporal_workflow_success",
"count of the number of successful workflow syncs."),

TEMPORAL_WORKFLOW_FAILURE(MetricEmittingApps.WORKER,
"temporal_workflow_failure",
"count of the number of workflow failures");
"count of the number of workflow failures"),
NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS(MetricEmittingApps.WORKER,
"record_schema_validation_error",
"number of record schema validation errors");

private final MetricEmittingApp application;
private final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ void testMetricClientFactoryCreateSuccess() {
});
}

@Test
@DisplayName("Should throw error if MetricClientFactory create a metric client multiple times;")
void testMetricClientFactoryCreateMultipleTimesThrows() {
Assertions.assertThrows(RuntimeException.class, () -> {
MetricClientFactory.initialize(MetricEmittingApps.METRICS_REPORTER);
MetricClientFactory.initialize(MetricEmittingApps.METRICS_REPORTER);
});
}

@Test
@DisplayName("Should not return null if metric client not specified;")
void testMicroMeterRegistryRuturnsNullForEmptyClientConfig() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;

import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.OssMetricsRegistry;

public class WorkerMetricReporter {

private final String dockerRepo;
private final String dockerVersion;
private final MetricClient metricClient;

public WorkerMetricReporter(final MetricClient metricClient, final String dockerImage) {
final String[] dockerImageInfo = dockerImage.split(":");
this.dockerRepo = dockerImageInfo[0];
this.dockerVersion = dockerImageInfo.length > 1 ? dockerImageInfo[1] : "";
this.metricClient = metricClient;
}

public void trackSchemaValidationError(final String stream) {
metricClient.count(OssMetricsRegistry.NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS, 1, new MetricAttribute("docker_repo", dockerRepo),
new MetricAttribute("docker_version", dockerVersion), new MetricAttribute("stream", stream));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,16 @@ public class DefaultReplicationWorker implements ReplicationWorker {
private final AtomicBoolean cancelled;
private final AtomicBoolean hasFailed;
private final RecordSchemaValidator recordSchemaValidator;
private final WorkerMetricReporter metricReporter;

public DefaultReplicationWorker(final String jobId,
final int attempt,
final AirbyteSource source,
final AirbyteMapper mapper,
final AirbyteDestination destination,
final MessageTracker messageTracker,
final RecordSchemaValidator recordSchemaValidator) {
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter) {
this.jobId = jobId;
this.attempt = attempt;
this.source = source;
Expand All @@ -94,6 +96,7 @@ public DefaultReplicationWorker(final String jobId,
this.messageTracker = messageTracker;
this.executors = Executors.newFixedThreadPool(2);
this.recordSchemaValidator = recordSchemaValidator;
this.metricReporter = metricReporter;

this.cancelled = new AtomicBoolean(false);
this.hasFailed = new AtomicBoolean(false);
Expand All @@ -112,7 +115,7 @@ public DefaultReplicationWorker(final String jobId,
* @throws WorkerException
*/
@Override
public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt);

// todo (cgardens) - this should not be happening in the worker. this is configuration information
Expand Down Expand Up @@ -154,7 +157,7 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
});

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator),
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter),
executors).whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof SourceException) {
Expand Down Expand Up @@ -293,7 +296,8 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
final AirbyteMapper mapper,
final MessageTracker messageTracker,
final Map<String, String> mdc,
final RecordSchemaValidator recordSchemaValidator) {
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
Expand Down Expand Up @@ -337,6 +341,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
if (!validationErrors.isEmpty()) {
validationErrors.forEach((stream, errorPair) -> {
LOGGER.warn("Schema validation errors found for stream {}. Error messages: {}", stream, errorPair.getLeft());
metricReporter.trackSchemaValidationError(stream);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.workers.WorkerApp;
Expand Down Expand Up @@ -282,6 +283,11 @@ public void create(final Map<String, String> allLabels,

}

final EnvConfigs envConfigs = new EnvConfigs();
envVars.add(new EnvVar(EnvConfigs.METRIC_CLIENT, envConfigs.getMetricClient(), null));
envVars.add(new EnvVar(EnvConfigs.DD_AGENT_HOST, envConfigs.getDDAgentHost(), null));
envVars.add(new EnvVar(EnvConfigs.DD_DOGSTATSD_PORT, envConfigs.getDDDogStatsDPort(), null));
envVars.add(new EnvVar(EnvConfigs.PUBLISH_METRICS, Boolean.toString(envConfigs.getPublishMetrics()), null));
envVars.add(new EnvVar(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(useStreamCapableState), null));
final List<ContainerPort> containerPorts = KubePodProcess.createContainerPortList(portMap);
containerPorts.add(new ContainerPort(WorkerApp.KUBE_HEARTBEAT_PORT, null, null, null, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public LauncherWorker(final UUID connectionId,
final ResourceRequirements resourceRequirements,
final Class<OUTPUT> outputClass,
final Supplier<ActivityExecutionContext> activityContext) {

this.connectionId = connectionId;
this.application = application;
this.podNamePrefix = podNamePrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class OrchestratorConstants {
EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_LIMIT,
EnvConfigs.JOB_DEFAULT_ENV_MAP,
EnvConfigs.LOCAL_ROOT,
EnvConfigs.PUBLISH_METRICS,
EnvConfigs.DD_AGENT_HOST,
EnvConfigs.DD_DOGSTATSD_PORT,
EnvConfigs.METRIC_CLIENT,
LOG_LEVEL,
LogClientSingleton.GCS_LOG_BUCKET,
LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.scheduler.persistence.JobPersistence;
Expand All @@ -26,6 +29,7 @@
import io.airbyte.workers.WorkerApp.ContainerOrchestratorConfig;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.general.DefaultReplicationWorker;
import io.airbyte.workers.internal.AirbyteMessageTracker;
Expand Down Expand Up @@ -210,6 +214,9 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage())
? new EmptyAirbyteSource(useStreamCapableState)
: new DefaultAirbyteSource(workerConfigs, sourceLauncher);
MetricClientFactory.initialize(MetricEmittingApps.WORKER);
final MetricClient metricClient = MetricClientFactory.getMetricClient();
final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage());

return new DefaultReplicationWorker(
jobRunConfig.getJobId(),
Expand All @@ -218,7 +225,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()),
new DefaultAirbyteDestination(workerConfigs, destinationLauncher),
new AirbyteMessageTracker(),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)));
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter);
};
}

Expand Down
Loading

0 comments on commit 12270cc

Please sign in to comment.