Skip to content

Commit

Permalink
Metrics Reporter Queries Part 1 (#10663)
Browse files Browse the repository at this point in the history
Add all the simpler queries from https://docs.google.com/document/d/11pEUsHyKUhh4CtV3aReau3SUG-ncEvy6ROJRVln6YB4/edit?usp=sharing.

- Num Pending Jobs
- Num Concurrent Jobs
- Oldest Pending Job
- Oldest Running Job
  • Loading branch information
davinchia authored Mar 2, 2022
1 parent e58f126 commit 44b7ed8
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.jobs.jooq.Tables.JOBS;

import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage;
import io.airbyte.db.instance.jobs.jooq.enums.JobStatus;
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;

/**
* Keep track of all metric queries.
*/
@Slf4j
public class MetricQueries {

public static List<ReleaseStage> jobIdToReleaseStages(final DSLContext ctx, final long jobId) {
Expand Down Expand Up @@ -44,4 +48,53 @@ public static List<ReleaseStage> srcIdAndDestIdToReleaseStages(final DSLContext
.or(ACTOR.ID.eq(dstId)).fetch().getValues(ACTOR_DEFINITION.RELEASE_STAGE);
}

public static int numberOfPendingJobs(final DSLContext ctx) {
return ctx.selectCount().from(JOBS).where(JOBS.STATUS.eq(JobStatus.pending)).fetchOne(0, int.class);
}

public static int numberOfRunningJobs(final DSLContext ctx) {
return ctx.selectCount().from(JOBS).where(JOBS.STATUS.eq(JobStatus.running)).fetchOne(0, int.class);
}

public static Long oldestPendingJobAgeSecs(final DSLContext ctx) {
return oldestJobAgeSecs(ctx, JobStatus.pending);
}

public static Long oldestRunningJobAgeSecs(final DSLContext ctx) {
return oldestJobAgeSecs(ctx, JobStatus.running);
}

private static Long oldestJobAgeSecs(final DSLContext ctx, final JobStatus status) {
final var readableTimeField = "run_duration";
final var durationSecField = "run_duration_secs";
final var query = String.format("""
with
oldest_job as (
SELECT id,
age(current_timestamp, created_at) AS %s
FROM jobs
WHERE status = '%s'
ORDER BY run_duration DESC
LIMIT 1)
select id,
run_duration,
extract(epoch from run_duration) as %s
from oldest_job""", readableTimeField, status.getLiteral(), durationSecField);
final var res = ctx.fetch(query);
// unfortunately there are no good Jooq methods for retrieving a single record of a single column
// forcing the List cast.
final var duration = res.getValues(durationSecField, Double.class);

if (duration.size() == 0) {
return 0L;
}
// .get(0) works in the following code due to the query's SELECT 1.
final var id = res.getValues("id", String.class).get(0);
final var readableTime = res.getValues(readableTimeField, String.class).get(0);
log.info("oldest job information - id: {}, readable time: {}", id, readableTime);

// as double can have rounding errors, round down to remove noise.
return duration.get(0).longValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,21 @@ public enum MetricsRegistry {
KUBE_POD_PROCESS_CREATE_TIME_MILLISECS(
MetricEmittingApps.WORKER,
"kube_pod_process_create_time_millisecs",
"time taken to create a new kube pod process");
"time taken to create a new kube pod process"),
NUM_PENDING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_pending_jobs",
"number of pending jobs"),
NUM_RUNNING_JOBS(
MetricEmittingApps.METRICS_REPORTER,
"num_running_jobs",
"number of running jobs"),
OLDEST_PENDING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_pending_job_age_secs",
"oldest pending job in seconds"),
OLDEST_RUNNING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_running_job_age_secs",
"oldest running job in seconds");

public final MetricEmittingApp application;
public final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
import io.airbyte.db.instance.configs.jooq.enums.ActorType;
import io.airbyte.db.instance.configs.jooq.enums.NamespaceDefinitionType;
import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage;
import io.airbyte.db.instance.jobs.jooq.enums.JobStatus;
import io.airbyte.db.instance.test.TestDatabaseProviders;
import java.io.IOException;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.UUID;
import org.jooq.JSONB;
Expand Down Expand Up @@ -61,11 +64,6 @@ static void setUpAll() throws IOException, SQLException {
configDb.transaction(ctx -> ctx.alterTable(ACTOR).dropForeignKey("actor_workspace_id_fkey").execute());
}

@AfterEach
void tearDown() throws SQLException {
configDb.transaction(ctx -> ctx.truncate(ACTOR));
}

@Nested
class srcIdAndDestIdToReleaseStages {

Expand Down Expand Up @@ -97,6 +95,11 @@ void shouldReturnNothingIfNotApplicable() throws SQLException {
@Nested
class jobIdToReleaseStages {

@AfterEach
void tearDown() throws SQLException {
configDb.transaction(ctx -> ctx.truncate(JOBS).execute());
}

@Test
@DisplayName("should return the right release stages")
void shouldReturnReleaseStages() throws SQLException {
Expand Down Expand Up @@ -133,6 +136,166 @@ void shouldReturnNothingIfNotApplicable() throws SQLException {
assertEquals(0, res.size());
}

@Nested
class oldestPendingJob {

@AfterEach
void tearDown() throws SQLException {
configDb.transaction(ctx -> ctx.truncate(JOBS).execute());
}

@Test
@DisplayName("should return only the pending job's age in seconds")
void shouldReturnOnlyPendingSeconds() throws SQLException {
final var expAgeSecs = 1000;
final var oldestCreateAt = OffsetDateTime.now().minus(expAgeSecs, ChronoUnit.SECONDS);
// oldest pending job
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT).values(1L, "", JobStatus.pending, oldestCreateAt)
.execute());
// second oldest pending job
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT).values(2L, "", JobStatus.pending, OffsetDateTime.now())
.execute());
// non-pending jobs
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.running).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, "", JobStatus.failed).execute());

final var res = configDb.query(MetricQueries::oldestPendingJobAgeSecs);
assertEquals(1000, res);
}

@Test
@DisplayName("should not error out or return any result if not applicable")
void shouldReturnNothingIfNotApplicable() throws SQLException {
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.succeeded).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.running).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.failed).execute());

final var res = configDb.query(MetricQueries::oldestPendingJobAgeSecs);
assertEquals(0L, res);
}

}

@Nested
class numJobs {

@AfterEach
void tearDown() throws SQLException {
configDb.transaction(ctx -> ctx.truncate(JOBS).execute());
}

@Test
void runningJobsShouldReturnCorrectCount() throws SQLException {
// non-pending jobs
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.pending).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.running).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, "", JobStatus.running).execute());

final var res = configDb.query(MetricQueries::numberOfRunningJobs);
assertEquals(2, res);
}

@Test
void runningJobsShouldReturnZero() throws SQLException {
// non-pending jobs
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.pending).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute());

final var res = configDb.query(MetricQueries::numberOfRunningJobs);
assertEquals(0, res);
}

@Test
void pendingJobsShouldReturnCorrectCount() throws SQLException {
// non-pending jobs
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.pending).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.pending).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, "", JobStatus.running).execute());

final var res = configDb.query(MetricQueries::numberOfPendingJobs);
assertEquals(2, res);
}

@Test
void pendingJobsShouldReturnZero() throws SQLException {
// non-pending jobs
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.running).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.failed).execute());

final var res = configDb.query(MetricQueries::numberOfPendingJobs);
assertEquals(0, res);
}

}

@Nested
class oldestRunningJob {

@AfterEach
void tearDown() throws SQLException {
configDb.transaction(ctx -> ctx.truncate(JOBS).execute());
}

@Test
@DisplayName("should return only the running job's age in seconds")
void shouldReturnOnlyRunningSeconds() throws SQLException {
final var expAgeSecs = 10000;
final var oldestCreateAt = OffsetDateTime.now().minus(expAgeSecs, ChronoUnit.SECONDS);
// oldest pending job
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT).values(1L, "", JobStatus.running, oldestCreateAt)
.execute());
// second oldest pending job
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT).values(2L, "", JobStatus.running, OffsetDateTime.now())
.execute());
// non-pending jobs
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.pending).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(4L, "", JobStatus.failed).execute());

final var res = configDb.query(MetricQueries::oldestRunningJobAgeSecs);
assertEquals(10000, res);
}

@Test
@DisplayName("should not error out or return any result if not applicable")
void shouldReturnNothingIfNotApplicable() throws SQLException {
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(1L, "", JobStatus.succeeded).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(2L, "", JobStatus.pending).execute());
configDb.transaction(
ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS).values(3L, "", JobStatus.failed).execute());

final var res = configDb.query(MetricQueries::oldestRunningJobAgeSecs);
assertEquals(0L, res);
}

}

}

}
1 change: 1 addition & 0 deletions airbyte-metrics/reporter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ plugins {

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-db:lib')
implementation project(':airbyte-metrics:lib')
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,70 @@

import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.metrics.lib.DatadogClientConfiguration;
import io.airbyte.metrics.lib.DogStatsDMetricSingleton;
import io.airbyte.metrics.lib.MetricEmittingApps;
import io.airbyte.metrics.lib.MetricQueries;
import io.airbyte.metrics.lib.MetricsRegistry;
import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ReporterApp {

public static void main(final String[] args) {
public static void main(final String[] args) throws IOException, InterruptedException {
final Configs configs = new EnvConfigs();

DogStatsDMetricSingleton.initialize(MetricEmittingApps.METRICS_REPORTER, new DatadogClientConfiguration(configs));

final Database configDatabase = new ConfigsDatabaseInstance(
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl())
.getInitialized();

final var pollers = Executors.newScheduledThreadPool(4);

log.info("Starting pollers..");
pollers.scheduleAtFixedRate(() -> {
try {
final var pendingJobs = configDatabase.query(MetricQueries::numberOfPendingJobs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.NUM_PENDING_JOBS, pendingJobs);
} catch (final SQLException e) {
e.printStackTrace();
}
}, 0, 15, TimeUnit.SECONDS);
pollers.scheduleAtFixedRate(() -> {
try {
final var runningJobs = configDatabase.query(MetricQueries::numberOfRunningJobs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.NUM_RUNNING_JOBS, runningJobs);
} catch (final SQLException e) {
e.printStackTrace();
}
}, 0, 15, TimeUnit.SECONDS);
pollers.scheduleAtFixedRate(() -> {
try {
final var age = configDatabase.query(MetricQueries::oldestRunningJobAgeSecs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.OLDEST_RUNNING_JOB_AGE_SECS, age);
} catch (final SQLException e) {
e.printStackTrace();
}
}, 0, 15, TimeUnit.SECONDS);
pollers.scheduleAtFixedRate(() -> {
try {
final var age = configDatabase.query(MetricQueries::oldestPendingJobAgeSecs);
DogStatsDMetricSingleton.gauge(MetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, age);
} catch (final SQLException e) {
e.printStackTrace();
}
}, 0, 15, TimeUnit.SECONDS);

Thread.sleep(1000_000 * 1000);
}

}

0 comments on commit 44b7ed8

Please sign in to comment.