Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to DogStatsD. #10238

Merged
merged 9 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte-metrics/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ dependencies {
implementation 'io.prometheus:simpleclient_hotspot:0.12.0' // basic client instrumentation
implementation 'io.prometheus:simpleclient_httpserver:0.12.0' // basic server to serve prometheus port
implementation 'io.prometheus:simpleclient_pushgateway:0.12.0' // push libs for basic server

implementation 'com.datadoghq:java-dogstatsd-client:4.0.0'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics;

import com.timgroup.statsd.NonBlockingStatsDClientBuilder;
import com.timgroup.statsd.StatsDClient;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

/**
* Light wrapper around the DogsStatsD client to make using the client slightly more ergonomic.
* <p>
* This class mainly exists to help Airbyte instrument/debug application on Airbyte Cloud.
* <p>
* Open source users are free to turn this on and consume the same metrics.
*/
@Slf4j
public class DogstatsdMetricSingleton {

private static DogstatsdMetricSingleton instance;
private final StatsDClient statsDClient;
private final boolean instancePublish;

public DogstatsdMetricSingleton(final String appName, final boolean publish) {
instancePublish = publish;
statsDClient = new NonBlockingStatsDClientBuilder()
.prefix(appName)
.hostname(System.getenv("DD_AGENT_HOST"))
.port(Integer.parseInt(System.getenv("DD_DOGSTATSD_PORT")))
.build();
}

public static synchronized DogstatsdMetricSingleton getInstance() {
if (instance == null) {
throw new RuntimeException("You must initialize configuration with the initializeMonitoringServiceDaemon() method before getting an instance.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this say .. with the initialize() method?

Also, what is the downside of combining getInstance() and initialize() into the same method? The behavior would be that if instance == null, it initializes and returns. If instance != null, it just returns. That approach feels like it would be a bit more ergonomic, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes good catch on the error message.

this is a small thing - the main downside would be the initialise method has more parameters than the getInstance method. we'd have to combine those parameters if we combined the methods. because of this I prefer to keep these separate for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davinchia I'm curious how you are thinking about consumers of this class. If I want to use the DogstatsdMetricSingleton, how do I know if it has been initialized or not? Is it expected that a consumer would call getInstance wrapped with a try-catch to catch a RuntimeException in the case that it was already initialized, and call initialize in that case? That doesn't feel very ergonomic to me, but let me know if I'm thinking about this wrong

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - I'm trying to follow the general singleton pattern where initialize is called in the main method, and getInstance is called everywhere else. Although it's possible to catch, most folks don't really catch the RTE since it's usually understood the singleton is initialised when the JVM first starts up.

I agree with you having to do getInstance is somewhat annoying, but this is a fairly common pattern and I don't really think it's worth trying to get rid of.

I can't see how to not have a simply method signature if we combine initialize and getInstance and I didn't want to spend more time playing with this.

Happy to discuss more/review any ideas/approaches you come up with!

}
return instance;
}

public synchronized static void initialize(final String appName, final Map<String, String> mdc, final boolean publish) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the MDC important here? Shouldn't the initialization be run from some thread that has the MDC configured?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point - I think it's left over from when I was testing. You should be right that we don't need this anymore. Will remove.

if (instance != null) {
throw new RuntimeException("You cannot initialize configuration more than once.");
}
if (publish) {
MDC.setContextMap(mdc);
log.info("Starting DogStatsD client..");
// The second constructor argument ('true') makes this server start as a separate daemon thread.
// http://prometheus.github.io/client_java/io/prometheus/client/exporter/HTTPServer.html#HTTPServer-int-boolean-
instance = new DogstatsdMetricSingleton(appName, publish);
}
}

/**
* Increment or decrement a counter.
*
* @param name of counter.
* @param amt to adjust.
* @param tags
*/
public void count(final String name, final double amt, final String... tags) {
if (instancePublish) {
log.info("publishing count, name: {}, value: {}", name, amt);
statsDClient.count(name, amt, tags);
}
}

/**
* Record the latest value for a gauge.
*
* @param name of gauge.
* @param val to record.
* @param tags
*/
public void gauge(final String name, final double val, final String... tags) {
if (instancePublish) {
log.info("publishing gauge, name: {}, value: {}", name, val);
statsDClient.gauge(name, val, tags);
}
}

/**
* Submit a single execution time aggregated locally by the Agent. Use this if approximate stats are
* sufficient.
*
* @param name of histogram.
* @param val of time to record.
* @param tags
*/
public void recordTimeLocal(final String name, final double val, final String... tags) {
if (instancePublish) {
log.info("recording histogram, name: {}, value: {}", name, val);
statsDClient.histogram(name, val, tags);
}
}

/**
* Submit a single execution time aggregated globally by Datadog. Use this for precise stats.
*
* @param name of distribution.
* @param val of time to record.
* @param tags
*/
public void recordTimeGlobal(final String name, final double val, final String... tags) {
if (instancePublish) {
log.info("recording distribution, name: {}, value: {}", name, val);
statsDClient.distribution(name, val, tags);
}
}

/**
* Wrapper of {@link #recordTimeGlobal(String, double, String...)} with a runnable for convenience.
*
* @param name
* @param runnable
* @param tags
*/
public void recordTimeGlobal(final String name, final Runnable runnable, final String... tags) {
final long start = System.currentTimeMillis();
runnable.run();
final long end = System.currentTimeMillis();
final long val = end - start;
recordTimeGlobal(name, val, tags);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@
* <p>
* Open source users are free to turn this on and consume the same metrics.
*/
public class MetricSingleton {
@Deprecated
public class PrometheusMetricSingleton {

private static final Logger LOGGER = LoggerFactory.getLogger(MetricSingleton.class);
private static MetricSingleton instance;
private static final Logger LOGGER = LoggerFactory.getLogger(PrometheusMetricSingleton.class);
private static PrometheusMetricSingleton instance;

private final Map<String, Gauge> nameToGauge = new HashMap<>();
private final Map<String, Counter> nameToCounter = new HashMap<>();
private final Map<String, Histogram> nameToHistogram = new HashMap<>();

private HTTPServer monitoringDaemon;

private MetricSingleton() {}
private PrometheusMetricSingleton() {}

public static synchronized MetricSingleton getInstance() {
public static synchronized PrometheusMetricSingleton getInstance() {
if (instance == null) {
throw new RuntimeException("You must initialize configuration with the initializeMonitoringServiceDaemon() method before getting an instance.");
}
Expand Down Expand Up @@ -193,7 +194,7 @@ public synchronized static void initializeMonitoringServiceDaemon(final String m
if (instance != null) {
throw new RuntimeException("You cannot initialize configuration more than once.");
}
instance = new MetricSingleton();
instance = new PrometheusMetricSingleton();
if (publish) {
try {
MDC.setContextMap(mdc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* Since Prometheus publishes metrics at a specific port, we can test our wrapper by querying the
* port and validating the response.
*/
public class MetricSingletonTest {
public class PrometheusMetricSingletonTest {

private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
Expand All @@ -41,25 +41,26 @@ public static void setUp() throws IOException {
availPort = socket.getLocalPort();
}

MetricSingleton.initializeMonitoringServiceDaemon(String.valueOf(availPort), Map.of(), true);
PrometheusMetricSingleton.initializeMonitoringServiceDaemon(String.valueOf(availPort), Map.of(), true);
}

@AfterAll
public static void tearDown() {
MetricSingleton.getInstance().closeMonitoringServiceDaemon();
PrometheusMetricSingleton.getInstance().closeMonitoringServiceDaemon();
}

@Nested
class Validation {

@Test
public void testNameWithDashFails() {
assertThrows(RuntimeException.class, () -> MetricSingleton.getInstance().incrementCounter("bad-name", 0.0, "name with dashes are not allowed"));
assertThrows(RuntimeException.class,
() -> PrometheusMetricSingleton.getInstance().incrementCounter("bad-name", 0.0, "name with dashes are not allowed"));
}

@Test
public void testNoDescriptionFails() {
assertThrows(RuntimeException.class, () -> MetricSingleton.getInstance().incrementCounter("good_name", 0.0, null));
assertThrows(RuntimeException.class, () -> PrometheusMetricSingleton.getInstance().incrementCounter("good_name", 0.0, null));
}

}
Expand All @@ -69,7 +70,7 @@ public void testCounter() throws InterruptedException, IOException {
final var metricName = "test_counter";
final var rand = new Random();
for (int i = 0; i < 5; i++) {
MetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing counter");
PrometheusMetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing counter");
Thread.sleep(500);
}

Expand All @@ -82,7 +83,7 @@ public void testGauge() throws InterruptedException, IOException {
final var metricName = "test_gauge";
final var rand = new Random();
for (int i = 0; i < 5; i++) {
MetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing gauge");
PrometheusMetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing gauge");
Thread.sleep(500);
}

Expand All @@ -95,7 +96,7 @@ public void testTimer() throws InterruptedException, IOException {
final var metricName = "test_timer";
final var rand = new Random();
for (int i = 0; i < 5; i++) {
MetricSingleton.getInstance().recordTime(metricName, rand.nextDouble() * 2, "testing time");
PrometheusMetricSingleton.getInstance().recordTime(metricName, rand.nextDouble() * 2, "testing time");
Thread.sleep(500);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.metrics.MetricSingleton;
import io.airbyte.metrics.PrometheusMetricSingleton;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
Expand Down Expand Up @@ -273,7 +273,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce
final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot, configs);

final Map<String, String> mdc = MDC.getCopyOfContextMap();
MetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics());
PrometheusMetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics());
davinchia marked this conversation as resolved.
Show resolved Hide resolved

LOGGER.info("Launching scheduler...");
new SchedulerApp(
Expand Down