diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java index 21e83cec2728..133892fc3995 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricQueries.java @@ -6,15 +6,19 @@ import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION; +import static io.airbyte.db.instance.jobs.jooq.Tables.JOBS; import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; +import io.airbyte.db.instance.jobs.jooq.enums.JobStatus; import java.util.List; import java.util.UUID; +import lombok.extern.slf4j.Slf4j; import org.jooq.DSLContext; /** * Keep track of all metric queries. */ +@Slf4j public class MetricQueries { public static List jobIdToReleaseStages(final DSLContext ctx, final long jobId) { @@ -44,4 +48,53 @@ public static List srcIdAndDestIdToReleaseStages(final DSLContext .or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE); } + public static int numberOfPendingJobs(final DSLContext ctx) { + return ctx.selectCount().from(JOBS).where(JOBS.STATUS.eq(JobStatus.pending)).fetchOne(0, int.class); + } + + public static int numberOfRunningJobs(final DSLContext ctx) { + return ctx.selectCount().from(JOBS).where(JOBS.STATUS.eq(JobStatus.running)).fetchOne(0, int.class); + } + + public static Long oldestPendingJobAgeSecs(final DSLContext ctx) { + return oldestJobAgeSecs(ctx, JobStatus.pending); + } + + public static Long oldestRunningJobAgeSecs(final DSLContext ctx) { + return oldestJobAgeSecs(ctx, JobStatus.running); + } + + private static Long oldestJobAgeSecs(final DSLContext ctx, final JobStatus status) { + final var readableTimeField = "run_duration"; + final var durationSecField = "run_duration_secs"; + final var query = String.format(""" + with + oldest_job as ( + SELECT id, + age(current_timestamp, created_at) AS %s + FROM jobs + WHERE status = '%s' + ORDER BY run_duration DESC + LIMIT 1) + select id, + run_duration, + extract(epoch from run_duration) as %s + from oldest_job""", readableTimeField, status.getLiteral(), durationSecField); + final var res = ctx.fetch(query); + // unfortunately there are no good Jooq methods for retrieving a single record of a single column + // forcing the List cast. + final var duration = res.getValues(durationSecField, Double.class); + + if (duration.size() == 0) { + return 0L; + } + // .get(0) works in the following code due to the query's SELECT 1. + final var id = res.getValues("id", String.class).get(0); + final var readableTime = res.getValues(readableTimeField, String.class).get(0); + log.info("oldest job information - id: {}, readable time: {}", id, readableTime); + + // as double can have rounding errors, round down to remove noise. + return duration.get(0).longValue(); + } + } diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java index 6b34affca475..3436cec2ec38 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java @@ -61,7 +61,21 @@ public enum MetricsRegistry { KUBE_POD_PROCESS_CREATE_TIME_MILLISECS( MetricEmittingApps.WORKER, "kube_pod_process_create_time_millisecs", - "time taken to create a new kube pod process"); + "time taken to create a new kube pod process"), + NUM_PENDING_JOBS( + MetricEmittingApps.METRICS_REPORTER, + "num_pending_jobs", + "number of pending jobs"), + NUM_RUNNING_JOBS( + MetricEmittingApps.METRICS_REPORTER, + "num_running_jobs", + "number of running jobs"), + OLDEST_PENDING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER, + "oldest_pending_job_age_secs", + "oldest pending job in seconds"), + OLDEST_RUNNING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER, + "oldest_running_job_age_secs", + "oldest running job in seconds"); public final MetricEmittingApp application; public final String metricName; diff --git a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/MetrisQueriesTest.java b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/MetrisQueriesTest.java index b55bd1c51ae5..2d608194a0da 100644 --- a/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/MetrisQueriesTest.java +++ b/airbyte-metrics/lib/src/test/java/io/airbyte/metrics/lib/MetrisQueriesTest.java @@ -14,9 +14,12 @@ import io.airbyte.db.instance.configs.jooq.enums.ActorType; import io.airbyte.db.instance.configs.jooq.enums.NamespaceDefinitionType; import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; +import io.airbyte.db.instance.jobs.jooq.enums.JobStatus; import io.airbyte.db.instance.test.TestDatabaseProviders; import java.io.IOException; import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.UUID; import org.jooq.JSONB; @@ -61,11 +64,6 @@ static void setUpAll() throws IOException, SQLException { configDb.transaction(ctx -> ctx.alterTable(ACTOR).dropForeignKey("actor_workspace_id_fkey").execute()); } - @AfterEach - void tearDown() throws SQLException { - configDb.transaction(ctx -> ctx.truncate(ACTOR)); - } - @Nested class srcIdAndDestIdToReleaseStages { @@ -97,6 +95,11 @@ void shouldReturnNothingIfNotApplicable() throws SQLException { @Nested class jobIdToReleaseStages { + @AfterEach + void tearDown() throws SQLException { + configDb.transaction(ctx -> ctx.truncate(JOBS).execute()); + } + @Test @DisplayName("should return the right release stages") void shouldReturnReleaseStages() throws SQLException { @@ -133,6 +136,166 @@ void shouldReturnNothingIfNotApplicable() throws SQLException { assertEquals(0, res.size()); } + @Nested + class oldestPendingJob { + + @AfterEach + void tearDown() throws SQLException { + configDb.transaction(ctx -> ctx.truncate(JOBS).execute()); + } + + @Test + @DisplayName("should return only the pending job's age in seconds") + void shouldReturnOnlyPendingSeconds() throws SQLException { + final var expAgeSecs = 1000; + final var oldestCreateAt = OffsetDateTime.now().minus(expAgeSecs, ChronoUnit.SECONDS); + // oldest pending job + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT).values(1L, "", JobStatus.pending, oldestCreateAt) + .execute()); + // second oldest pending job + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT).values(2L, "", JobStatus.pending, OffsetDateTime.now()) + .execute()); + // non-pending jobs + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.running).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, "", JobStatus.failed).execute()); + + final var res = configDb.query(MetricQueries::oldestPendingJobAgeSecs); + assertEquals(1000, res); + } + + @Test + @DisplayName("should not error out or return any result if not applicable") + void shouldReturnNothingIfNotApplicable() throws SQLException { + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.succeeded).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.running).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.failed).execute()); + + final var res = configDb.query(MetricQueries::oldestPendingJobAgeSecs); + assertEquals(0L, res); + } + + } + + @Nested + class numJobs { + + @AfterEach + void tearDown() throws SQLException { + configDb.transaction(ctx -> ctx.truncate(JOBS).execute()); + } + + @Test + void runningJobsShouldReturnCorrectCount() throws SQLException { + // non-pending jobs + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.pending).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.running).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, "", JobStatus.running).execute()); + + final var res = configDb.query(MetricQueries::numberOfRunningJobs); + assertEquals(2, res); + } + + @Test + void runningJobsShouldReturnZero() throws SQLException { + // non-pending jobs + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.pending).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute()); + + final var res = configDb.query(MetricQueries::numberOfRunningJobs); + assertEquals(0, res); + } + + @Test + void pendingJobsShouldReturnCorrectCount() throws SQLException { + // non-pending jobs + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.pending).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.pending).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, "", JobStatus.running).execute()); + + final var res = configDb.query(MetricQueries::numberOfPendingJobs); + assertEquals(2, res); + } + + @Test + void pendingJobsShouldReturnZero() throws SQLException { + // non-pending jobs + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.running).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute()); + + final var res = configDb.query(MetricQueries::numberOfPendingJobs); + assertEquals(0, res); + } + + } + + @Nested + class oldestRunningJob { + + @AfterEach + void tearDown() throws SQLException { + configDb.transaction(ctx -> ctx.truncate(JOBS).execute()); + } + + @Test + @DisplayName("should return only the running job's age in seconds") + void shouldReturnOnlyRunningSeconds() throws SQLException { + final var expAgeSecs = 10000; + final var oldestCreateAt = OffsetDateTime.now().minus(expAgeSecs, ChronoUnit.SECONDS); + // oldest pending job + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT).values(1L, "", JobStatus.running, oldestCreateAt) + .execute()); + // second oldest pending job + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT).values(2L, "", JobStatus.running, OffsetDateTime.now()) + .execute()); + // non-pending jobs + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.pending).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, "", JobStatus.failed).execute()); + + final var res = configDb.query(MetricQueries::oldestRunningJobAgeSecs); + assertEquals(10000, res); + } + + @Test + @DisplayName("should not error out or return any result if not applicable") + void shouldReturnNothingIfNotApplicable() throws SQLException { + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.succeeded).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.pending).execute()); + configDb.transaction( + ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.failed).execute()); + + final var res = configDb.query(MetricQueries::oldestRunningJobAgeSecs); + assertEquals(0L, res); + } + + } + } } diff --git a/airbyte-metrics/reporter/build.gradle b/airbyte-metrics/reporter/build.gradle index ade24b36df1a..cc1655ab6a4e 100644 --- a/airbyte-metrics/reporter/build.gradle +++ b/airbyte-metrics/reporter/build.gradle @@ -4,6 +4,7 @@ plugins { dependencies { implementation project(':airbyte-config:models') + implementation project(':airbyte-db:lib') implementation project(':airbyte-metrics:lib') } diff --git a/airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/ReporterApp.java b/airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/ReporterApp.java index 72e6205151ee..e89ce101a226 100644 --- a/airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/ReporterApp.java +++ b/airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/ReporterApp.java @@ -6,15 +6,70 @@ import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import io.airbyte.metrics.lib.DatadogClientConfiguration; import io.airbyte.metrics.lib.DogStatsDMetricSingleton; import io.airbyte.metrics.lib.MetricEmittingApps; +import io.airbyte.metrics.lib.MetricQueries; +import io.airbyte.metrics.lib.MetricsRegistry; +import java.io.IOException; +import java.sql.SQLException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class ReporterApp { - public static void main(final String[] args) { + public static void main(final String[] args) throws IOException, InterruptedException { final Configs configs = new EnvConfigs(); + DogStatsDMetricSingleton.initialize(MetricEmittingApps.METRICS_REPORTER, new DatadogClientConfiguration(configs)); + + final Database configDatabase = new ConfigsDatabaseInstance( + configs.getConfigDatabaseUser(), + configs.getConfigDatabasePassword(), + configs.getConfigDatabaseUrl()) + .getInitialized(); + + final var pollers = Executors.newScheduledThreadPool(4); + + log.info("Starting pollers.."); + pollers.scheduleAtFixedRate(() -> { + try { + final var pendingJobs = configDatabase.query(MetricQueries::numberOfPendingJobs); + DogStatsDMetricSingleton.gauge(MetricsRegistry.NUM_PENDING_JOBS, pendingJobs); + } catch (final SQLException e) { + e.printStackTrace(); + } + }, 0, 15, TimeUnit.SECONDS); + pollers.scheduleAtFixedRate(() -> { + try { + final var runningJobs = configDatabase.query(MetricQueries::numberOfRunningJobs); + DogStatsDMetricSingleton.gauge(MetricsRegistry.NUM_RUNNING_JOBS, runningJobs); + } catch (final SQLException e) { + e.printStackTrace(); + } + }, 0, 15, TimeUnit.SECONDS); + pollers.scheduleAtFixedRate(() -> { + try { + final var age = configDatabase.query(MetricQueries::oldestRunningJobAgeSecs); + DogStatsDMetricSingleton.gauge(MetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, age); + } catch (final SQLException e) { + e.printStackTrace(); + } + }, 0, 15, TimeUnit.SECONDS); + pollers.scheduleAtFixedRate(() -> { + try { + final var age = configDatabase.query(MetricQueries::oldestPendingJobAgeSecs); + DogStatsDMetricSingleton.gauge(MetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, age); + } catch (final SQLException e) { + e.printStackTrace(); + } + }, 0, 15, TimeUnit.SECONDS); + + Thread.sleep(1000_000 * 1000); } }