diff --git a/airbyte-metrics/build.gradle b/airbyte-metrics/build.gradle index 924f6e781378..49a8cccd8dbe 100644 --- a/airbyte-metrics/build.gradle +++ b/airbyte-metrics/build.gradle @@ -10,4 +10,6 @@ dependencies { implementation 'io.prometheus:simpleclient_hotspot:0.12.0' // basic client instrumentation implementation 'io.prometheus:simpleclient_httpserver:0.12.0' // basic server to serve prometheus port implementation 'io.prometheus:simpleclient_pushgateway:0.12.0' // push libs for basic server + + implementation 'com.datadoghq:java-dogstatsd-client:4.0.0' } diff --git a/airbyte-metrics/src/main/java/io/airbyte/metrics/DogstatsdMetricSingleton.java b/airbyte-metrics/src/main/java/io/airbyte/metrics/DogstatsdMetricSingleton.java new file mode 100644 index 000000000000..0bf54812a80b --- /dev/null +++ b/airbyte-metrics/src/main/java/io/airbyte/metrics/DogstatsdMetricSingleton.java @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.metrics; + +import com.timgroup.statsd.NonBlockingStatsDClientBuilder; +import com.timgroup.statsd.StatsDClient; +import lombok.extern.slf4j.Slf4j; + +/** + * Light wrapper around the DogsStatsD client to make using the client slightly more ergonomic. + *

+ * This class mainly exists to help Airbyte instrument/debug application on Airbyte Cloud. + *

+ * Open source users are free to turn this on and consume the same metrics. + */ +@Slf4j +public class DogstatsdMetricSingleton { + + private static DogstatsdMetricSingleton instance; + private final StatsDClient statsDClient; + private final boolean instancePublish; + + public DogstatsdMetricSingleton(final String appName, final boolean publish) { + instancePublish = publish; + statsDClient = new NonBlockingStatsDClientBuilder() + .prefix(appName) + .hostname(System.getenv("DD_AGENT_HOST")) + .port(Integer.parseInt(System.getenv("DD_DOGSTATSD_PORT"))) + .build(); + } + + public static synchronized DogstatsdMetricSingleton getInstance() { + if (instance == null) { + throw new RuntimeException("You must initialize configuration with the initialize() method before getting an instance."); + } + return instance; + } + + public synchronized static void initialize(final String appName, final boolean publish) { + if (instance != null) { + throw new RuntimeException("You cannot initialize configuration more than once."); + } + if (publish) { + log.info("Starting DogStatsD client.."); + // The second constructor argument ('true') makes this server start as a separate daemon thread. + // http://prometheus.github.io/client_java/io/prometheus/client/exporter/HTTPServer.html#HTTPServer-int-boolean- + instance = new DogstatsdMetricSingleton(appName, publish); + } + } + + /** + * Increment or decrement a counter. + * + * @param name of counter. + * @param amt to adjust. + * @param tags + */ + public void count(final String name, final double amt, final String... tags) { + if (instancePublish) { + log.info("publishing count, name: {}, value: {}", name, amt); + statsDClient.count(name, amt, tags); + } + } + + /** + * Record the latest value for a gauge. + * + * @param name of gauge. + * @param val to record. + * @param tags + */ + public void gauge(final String name, final double val, final String... tags) { + if (instancePublish) { + log.info("publishing gauge, name: {}, value: {}", name, val); + statsDClient.gauge(name, val, tags); + } + } + + /** + * Submit a single execution time aggregated locally by the Agent. Use this if approximate stats are + * sufficient. + * + * @param name of histogram. + * @param val of time to record. + * @param tags + */ + public void recordTimeLocal(final String name, final double val, final String... tags) { + if (instancePublish) { + log.info("recording histogram, name: {}, value: {}", name, val); + statsDClient.histogram(name, val, tags); + } + } + + /** + * Submit a single execution time aggregated globally by Datadog. Use this for precise stats. + * + * @param name of distribution. + * @param val of time to record. + * @param tags + */ + public void recordTimeGlobal(final String name, final double val, final String... tags) { + if (instancePublish) { + log.info("recording distribution, name: {}, value: {}", name, val); + statsDClient.distribution(name, val, tags); + } + } + + /** + * Wrapper of {@link #recordTimeGlobal(String, double, String...)} with a runnable for convenience. + * + * @param name + * @param runnable + * @param tags + */ + public void recordTimeGlobal(final String name, final Runnable runnable, final String... tags) { + final long start = System.currentTimeMillis(); + runnable.run(); + final long end = System.currentTimeMillis(); + final long val = end - start; + recordTimeGlobal(name, val, tags); + } + +} diff --git a/airbyte-metrics/src/main/java/io/airbyte/metrics/PrometheusBaseExample.java b/airbyte-metrics/src/main/java/io/airbyte/metrics/PrometheusBaseExample.java deleted file mode 100644 index ebe85ad4746a..000000000000 --- a/airbyte-metrics/src/main/java/io/airbyte/metrics/PrometheusBaseExample.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.metrics; - -import io.prometheus.client.Counter; -import io.prometheus.client.Gauge; -import io.prometheus.client.Histogram; -import io.prometheus.client.exporter.HTTPServer; -import java.io.IOException; - -public class PrometheusBaseExample { - - // The following represents a class we want to add instrumentation - // (metrics) to: - public static class AnInstrumentedClass { - - // Number and type of metrics per class is left on discretion - // of a Developer. - // A "namespace()" here sets the prefix of a metric. - static final Counter counter = Counter.build().namespace("app_prom_java").name("my_counter").help("This is my counter").register(); - static final Gauge gauge = Gauge.build().name("test_metric_gauges").help("test scheduler metric").register(); - static final Histogram histogram = Histogram.build().namespace("app_prom_java").name("my_histogram").help("This is my histogram").register(); - // static final Summary summary = - // Summary.build().namespace("app_prom_java").name("my_summary").help("This is my - // summary").register(); - - public static void doSomething() { - // Here goes some business logic. Whenever we want to report - // something to a monitoring system -- we update a corresponding - // metrics object, i.e.: - - // counter.inc(rand(0, 5)); - - gauge.set(rand(-5, 10)); - // histogram.observe(rand(0, 5)); - // summary.observe(rand(0, 5)); - } - - private static double rand(final double min, final double max) { - return min + (Math.random() * (max - min)); - } - - } - - public static void main(final String[] args) { - try { - // The second constructor argument ('true') makes this server start as a separate daemon thread. - // http://prometheus.github.io/client_java/io/prometheus/client/exporter/HTTPServer.html#HTTPServer-int-boolean- - new HTTPServer(Integer.parseInt("8081"), true); - } catch (final IOException e) {} - - // The following block along with an instance of the instrumented - // class simulates activity inside instrumented class object, which - // we may track later by watching metrics' values: - while (true) { - try { - AnInstrumentedClass.doSomething(); - - Thread.sleep(1000); - } catch (final InterruptedException e) { - e.printStackTrace(); - } - } - - } - -} diff --git a/airbyte-metrics/src/main/java/io/airbyte/metrics/MetricSingleton.java b/airbyte-metrics/src/main/java/io/airbyte/metrics/PrometheusMetricSingleton.java similarity index 95% rename from airbyte-metrics/src/main/java/io/airbyte/metrics/MetricSingleton.java rename to airbyte-metrics/src/main/java/io/airbyte/metrics/PrometheusMetricSingleton.java index 2b19dbb18630..96f1882bb7bb 100644 --- a/airbyte-metrics/src/main/java/io/airbyte/metrics/MetricSingleton.java +++ b/airbyte-metrics/src/main/java/io/airbyte/metrics/PrometheusMetricSingleton.java @@ -27,10 +27,11 @@ *

* Open source users are free to turn this on and consume the same metrics. */ -public class MetricSingleton { +@Deprecated +public class PrometheusMetricSingleton { - private static final Logger LOGGER = LoggerFactory.getLogger(MetricSingleton.class); - private static MetricSingleton instance; + private static final Logger LOGGER = LoggerFactory.getLogger(PrometheusMetricSingleton.class); + private static PrometheusMetricSingleton instance; private final Map nameToGauge = new HashMap<>(); private final Map nameToCounter = new HashMap<>(); @@ -38,9 +39,9 @@ public class MetricSingleton { private HTTPServer monitoringDaemon; - private MetricSingleton() {} + private PrometheusMetricSingleton() {} - public static synchronized MetricSingleton getInstance() { + public static synchronized PrometheusMetricSingleton getInstance() { if (instance == null) { throw new RuntimeException("You must initialize configuration with the initializeMonitoringServiceDaemon() method before getting an instance."); } @@ -193,7 +194,7 @@ public synchronized static void initializeMonitoringServiceDaemon(final String m if (instance != null) { throw new RuntimeException("You cannot initialize configuration more than once."); } - instance = new MetricSingleton(); + instance = new PrometheusMetricSingleton(); if (publish) { try { MDC.setContextMap(mdc); diff --git a/airbyte-metrics/src/test/java/io/airbyte/metrics/MetricSingletonTest.java b/airbyte-metrics/src/test/java/io/airbyte/metrics/PrometheusMetricSingletonTest.java similarity index 75% rename from airbyte-metrics/src/test/java/io/airbyte/metrics/MetricSingletonTest.java rename to airbyte-metrics/src/test/java/io/airbyte/metrics/PrometheusMetricSingletonTest.java index 3577eb1c003b..7392d4d9700d 100644 --- a/airbyte-metrics/src/test/java/io/airbyte/metrics/MetricSingletonTest.java +++ b/airbyte-metrics/src/test/java/io/airbyte/metrics/PrometheusMetricSingletonTest.java @@ -25,7 +25,7 @@ * Since Prometheus publishes metrics at a specific port, we can test our wrapper by querying the * port and validating the response. */ -public class MetricSingletonTest { +public class PrometheusMetricSingletonTest { private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_1_1) @@ -41,12 +41,12 @@ public static void setUp() throws IOException { availPort = socket.getLocalPort(); } - MetricSingleton.initializeMonitoringServiceDaemon(String.valueOf(availPort), Map.of(), true); + PrometheusMetricSingleton.initializeMonitoringServiceDaemon(String.valueOf(availPort), Map.of(), true); } @AfterAll public static void tearDown() { - MetricSingleton.getInstance().closeMonitoringServiceDaemon(); + PrometheusMetricSingleton.getInstance().closeMonitoringServiceDaemon(); } @Nested @@ -54,12 +54,13 @@ class Validation { @Test public void testNameWithDashFails() { - assertThrows(RuntimeException.class, () -> MetricSingleton.getInstance().incrementCounter("bad-name", 0.0, "name with dashes are not allowed")); + assertThrows(RuntimeException.class, + () -> PrometheusMetricSingleton.getInstance().incrementCounter("bad-name", 0.0, "name with dashes are not allowed")); } @Test public void testNoDescriptionFails() { - assertThrows(RuntimeException.class, () -> MetricSingleton.getInstance().incrementCounter("good_name", 0.0, null)); + assertThrows(RuntimeException.class, () -> PrometheusMetricSingleton.getInstance().incrementCounter("good_name", 0.0, null)); } } @@ -69,7 +70,7 @@ public void testCounter() throws InterruptedException, IOException { final var metricName = "test_counter"; final var rand = new Random(); for (int i = 0; i < 5; i++) { - MetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing counter"); + PrometheusMetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing counter"); Thread.sleep(500); } @@ -82,7 +83,7 @@ public void testGauge() throws InterruptedException, IOException { final var metricName = "test_gauge"; final var rand = new Random(); for (int i = 0; i < 5; i++) { - MetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing gauge"); + PrometheusMetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing gauge"); Thread.sleep(500); } @@ -95,7 +96,7 @@ public void testTimer() throws InterruptedException, IOException { final var metricName = "test_timer"; final var rand = new Random(); for (int i = 0; i < 5; i++) { - MetricSingleton.getInstance().recordTime(metricName, rand.nextDouble() * 2, "testing time"); + PrometheusMetricSingleton.getInstance().recordTime(metricName, rand.nextDouble() * 2, "testing time"); Thread.sleep(500); } diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 7541ca1c4c3b..5b17ae6bbe4a 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -29,7 +29,7 @@ import io.airbyte.db.Database; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import io.airbyte.db.instance.jobs.JobsDatabaseInstance; -import io.airbyte.metrics.MetricSingleton; +import io.airbyte.metrics.PrometheusMetricSingleton; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.models.JobStatus; import io.airbyte.scheduler.persistence.DefaultJobPersistence; @@ -273,7 +273,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot, configs); final Map mdc = MDC.getCopyOfContextMap(); - MetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics()); + PrometheusMetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics()); LOGGER.info("Launching scheduler..."); new SchedulerApp(