Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for temporal workflow resets #15016

Merged
merged 8 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.timgroup.statsd.NonBlockingStatsDClientBuilder;
import com.timgroup.statsd.StatsDClient;
import io.airbyte.config.Configs;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -18,11 +20,17 @@
* <p>
* Open source users are free to turn this on and consume the same metrics.
* <p>
* This class is intended to be used in conjection with {@link Configs#getPublishMetrics()}.
* This class is intended to be used in conjunction with {@link Configs#getPublishMetrics()}.
* <p>
* Any {@link MetricAttribute}s provided with the metric data are sent as tags created by joining
* the {@code key} and {@code value} property of each {@link MetricAttribute} with a
* {@link #TAG_DELIMITER} delimiter.
*/
@Slf4j
public class DogStatsDMetricClient implements MetricClient {

private static final String TAG_DELIMITER = ":";

private boolean instancePublish = false;
private StatsDClient statsDClient;

Expand Down Expand Up @@ -62,19 +70,19 @@ public synchronized void shutdown() {
*
* @param metric
* @param amt to adjust.
* @param tags
* @param attributes
*/
@Override
public void count(final MetricsRegistry metric, final long amt, final String... tags) {
public void count(final MetricsRegistry metric, final long amt, final MetricAttribute... attributes) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, count {} not emitted", metric);
return;
}

log.info("publishing count, name: {}, value: {}, tags: {}", metric, amt, tags);
statsDClient.count(metric.getMetricName(), amt, tags);
log.info("publishing count, name: {}, value: {}, attributes: {}", metric, amt, attributes);
statsDClient.count(metric.getMetricName(), amt, toTags(attributes));
}
}

Expand All @@ -83,34 +91,44 @@ public void count(final MetricsRegistry metric, final long amt, final String...
*
* @param metric
* @param val to record.
* @param tags
* @param attributes
*/
@Override
public void gauge(final MetricsRegistry metric, final double val, final String... tags) {
public void gauge(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, gauge {} not emitted", metric);
return;
}

log.info("publishing gauge, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.gauge(metric.getMetricName(), val, tags);
log.info("publishing gauge, name: {}, value: {}, attributes: {}", metric, val, attributes);
statsDClient.gauge(metric.getMetricName(), val, toTags(attributes));
}
}

@Override
public void distribution(MetricsRegistry metric, double val, final String... tags) {
public void distribution(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
if (instancePublish) {
if (statsDClient == null) {
// do not loudly fail to prevent application disruption
log.warn("singleton not initialized, distribution {} not emitted", metric);
return;
}

log.info("recording distribution, name: {}, value: {}, tags: {}", metric, val, tags);
statsDClient.distribution(metric.getMetricName(), val, tags);
log.info("recording distribution, name: {}, value: {}, attributes: {}", metric, val, attributes);
statsDClient.distribution(metric.getMetricName(), val, toTags(attributes));
}
}

/**
* Converts each {@link MetricAttribute} tuple to a list of tags consumable by StatsD.
*
* @param attributes An array of {@link MetricAttribute} tuples.
* @return An array of tag values.
*/
private String[] toTags(final MetricAttribute... attributes) {
return Stream.of(attributes).map(a -> String.join(TAG_DELIMITER, a.key(), a.value())).collect(Collectors.toList()).toArray(new String[] {});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

/**
* Custom tuple that represents a key/value pair to be included with a metric.
jdpgrailsdev marked this conversation as resolved.
Show resolved Hide resolved
* <p>
* It is up to each {@link MetricClient} implementation to decide what data from this record is used
* when generating a metric. See the specific implementations of the {@link MetricClient} interface
* for actual usage.
*/
public record MetricAttribute(String key, String value) {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ public interface MetricClient {
*
* @param metric
* @param val to record.
* @param tags
* @param attributes
*/
void count(MetricsRegistry metric, long val, final String... tags);
void count(MetricsRegistry metric, long val, final MetricAttribute... attributes);

/**
* Record the latest value for a gauge.
*
* @param metric
* @param val to record.
* @param tags
* @param attributes
*/
void gauge(MetricsRegistry metric, double val, final String... tags);
void gauge(MetricsRegistry metric, double val, final MetricAttribute... attributes);

/*
* Accepts value on the metrics, and report the distribution of these values. Useful to analysis how
Expand All @@ -35,9 +35,9 @@ public interface MetricClient {
*
* @param val to record.
*
* @param tags
* @param attributes
*/
void distribution(MetricsRegistry metric, double val, final String... tags);
void distribution(MetricsRegistry metric, double val, final MetricAttribute... attributes);

/*
* Reset initialization. Can be used in a unit test to reset metric client state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@
*/
public class MetricTags {

private static final String RELEASE_STAGE = "release_stage";
private static final String FAILURE_ORIGIN = "failure_origin";
private static final String JOB_STATUS = "job_status";
public static final String CONNECTION_ID = "connection_id";
public static final String FAILURE_ORIGIN = "failure_origin";
public static final String JOB_ID = "job_id";
public static final String JOB_STATUS = "job_status";
public static final String RELEASE_STAGE = "release_stage";
public static final String RESET_WORKFLOW_FAILURE_CAUSE = "failure_cause";
public static final String WORKFLOW_TYPE = "workflow_type";

public static String getReleaseStage(final ReleaseStage stage) {
return tagDelimit(RELEASE_STAGE, stage.getLiteral());
return stage.getLiteral();
}

public static String getFailureOrigin(final FailureOrigin origin) {
return tagDelimit(FAILURE_ORIGIN, origin.value());
return origin.value();
}

public static String getJobStatus(final JobStatus status) {
return tagDelimit(JOB_STATUS, status.getLiteral());
}

private static String tagDelimit(final String tagName, final String tagVal) {
return String.join(":", tagName, tagVal);
return status.getLiteral();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
public class NotImplementedMetricClient implements MetricClient {

@Override
public void count(MetricsRegistry metric, long val, String... tags) {
public void count(final MetricsRegistry metric, final long val, final MetricAttribute... attributes) {
// Not Implemented.
}

@Override
public void gauge(MetricsRegistry metric, double val, String... tags) {
public void gauge(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
// Not Implemented.
}

@Override
public void distribution(MetricsRegistry metric, double val, String... tags) {
public void distribution(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
// Not Implemented.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,58 +27,54 @@
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;

/**
* Implementation of the {@link MetricClient} that sends the provided metric data to an
* OpenTelemetry compliant metrics store.
* <p>
* Any {@link MetricAttribute}s provided along with the metric data are passed as key/value pairs
* annotating the metric.
*/
public class OpenTelemetryMetricClient implements MetricClient {

private Meter meter;
private SdkMeterProvider meterProvider;

@Override
public void count(MetricsRegistry metric, long val, String... tags) {
LongCounter counter = meter
public void count(final MetricsRegistry metric, final long val, final MetricAttribute... attributes) {
final LongCounter counter = meter
.counterBuilder(metric.getMetricName())
.setDescription(metric.getMetricDescription())
.build();

AttributesBuilder attributesBuilder = Attributes.builder();
for (String tag : tags) {
attributesBuilder.put(stringKey(tag), tag);
}

final AttributesBuilder attributesBuilder = buildAttributes(attributes);
counter.add(val, attributesBuilder.build());
}

@Override
public void gauge(MetricsRegistry metric, double val, String... tags) {
AttributesBuilder attributesBuilder = Attributes.builder();
for (String tag : tags) {
attributesBuilder.put(stringKey(tag), tag);
}
public void gauge(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
final AttributesBuilder attributesBuilder = buildAttributes(attributes);
meter.gaugeBuilder(metric.getMetricName()).setDescription(metric.getMetricDescription())
.buildWithCallback(measurement -> measurement.record(val, attributesBuilder.build()));
}

@Override
public void distribution(MetricsRegistry metric, double val, String... tags) {
DoubleHistogram histogramMeter = meter.histogramBuilder(metric.getMetricName()).setDescription(metric.getMetricDescription()).build();
AttributesBuilder attributesBuilder = Attributes.builder();

for (String tag : tags) {
attributesBuilder.put(stringKey(tag), tag);
}
public void distribution(final MetricsRegistry metric, final double val, final MetricAttribute... attributes) {
final DoubleHistogram histogramMeter = meter.histogramBuilder(metric.getMetricName()).setDescription(metric.getMetricDescription()).build();
final AttributesBuilder attributesBuilder = buildAttributes(attributes);
histogramMeter.record(val, attributesBuilder.build());
}

public void initialize(MetricEmittingApp metricEmittingApp, String otelEndpoint) {
Resource resource = Resource.getDefault().toBuilder().put(SERVICE_NAME, metricEmittingApp.getApplicationName()).build();
public void initialize(final MetricEmittingApp metricEmittingApp, final String otelEndpoint) {
final Resource resource = Resource.getDefault().toBuilder().put(SERVICE_NAME, metricEmittingApp.getApplicationName()).build();

SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
final SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(
BatchSpanProcessor
.builder(OtlpGrpcSpanExporter.builder().setEndpoint(otelEndpoint).build())
.build())
.setResource(resource)
.build();
MetricExporter metricExporter = OtlpGrpcMetricExporter.builder()
final MetricExporter metricExporter = OtlpGrpcMetricExporter.builder()
.setEndpoint(otelEndpoint).build();
initialize(metricEmittingApp, metricExporter, sdkTracerProvider, resource);
}
Expand All @@ -89,13 +85,17 @@ SdkMeterProvider getSdkMeterProvider() {
}

@VisibleForTesting
void initialize(MetricEmittingApp metricEmittingApp, MetricExporter metricExporter, SdkTracerProvider sdkTracerProvider, Resource resource) {
void initialize(
final MetricEmittingApp metricEmittingApp,
final MetricExporter metricExporter,
final SdkTracerProvider sdkTracerProvider,
final Resource resource) {
meterProvider = SdkMeterProvider.builder()
.registerMetricReader(PeriodicMetricReader.builder(metricExporter).build())
.setResource(resource)
.build();

OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
final OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setMeterProvider(meterProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
Expand All @@ -110,4 +110,12 @@ public void shutdown() {
resetForTest();
}

private AttributesBuilder buildAttributes(final MetricAttribute... attributes) {
final AttributesBuilder attributesBuilder = Attributes.builder();
for (final MetricAttribute attribute : attributes) {
attributesBuilder.put(stringKey(attribute.key()), attribute.value());
}
return attributesBuilder;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,19 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"oldest running job in seconds"),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states.");
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states."),

TEMPORAL_WORKFLOW_ATTEMPT(MetricEmittingApps.WORKER,
"temporal_workflow_attempt",
"count of the number of workflow attempts"),

TEMPORAL_WORKFLOW_SUCCESS(MetricEmittingApps.WORKER,
"temporal_workflow_success",
"count of the number of successful workflow syncs."),

TEMPORAL_WORKFLOW_FAILURE(MetricEmittingApps.WORKER,
"temporal_workflow_failure",
"count of the number of workflow failures");

private final MetricEmittingApp application;
private final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void testCountSuccess() {
@Test
@DisplayName("Tags should be passed into metrics")
void testCountWithTagSuccess() {
openTelemetryMetricClient.count(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1, TAG);
openTelemetryMetricClient.count(OssMetricsRegistry.KUBE_POD_PROCESS_CREATE_TIME_MILLISECS, 1, new MetricAttribute(TAG, TAG));

metricProvider.forceFlush();
final List<MetricData> metricDataList = metricExporter.getFinishedMetricItems();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.metrics.lib.MetricTags;
Expand Down Expand Up @@ -52,7 +53,8 @@ public enum ToEmit {
final var times = ReporterApp.configDatabase.query(MetricQueries::overallJobRuntimeForTerminalJobsInLastHour);
for (Pair<JobStatus, Double> pair : times) {
MetricClientFactory.getMetricClient().distribution(
OssMetricsRegistry.OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS, pair.getRight(), MetricTags.getJobStatus(pair.getLeft()));
OssMetricsRegistry.OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS, pair.getRight(),
new MetricAttribute(MetricTags.JOB_STATUS, MetricTags.getJobStatus(pair.getLeft())));
}
}), 1, TimeUnit.HOURS);

Expand Down
Loading