diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 19f810f6d6cd..fa63c660af8a 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -15,10 +15,12 @@ import static io.airbyte.db.instance.configs.jooq.generated.Tables.OPERATION; import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE; import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE_SERVICE_ACCOUNT; +import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS; import static org.jooq.impl.DSL.asterisk; import static org.jooq.impl.DSL.groupConcat; import static org.jooq.impl.DSL.noCondition; import static org.jooq.impl.DSL.select; +import static org.jooq.impl.SQLDataType.VARCHAR; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; @@ -82,6 +84,7 @@ import org.jooq.JSONB; import org.jooq.JoinType; import org.jooq.Record; +import org.jooq.Record1; import org.jooq.Record2; import org.jooq.Result; import org.jooq.SelectJoinStep; @@ -761,6 +764,25 @@ public List listWorkspaceDestinationConnection(final UUID return result.stream().map(DbConverter::buildDestinationConnection).collect(Collectors.toList()); } + /** + * List workspace IDs with most recently running jobs within a given time window (in hours). + * + * @param timeWindowInHours - integer, e.g. 24, 48, etc + * @return List - list of workspace IDs + * @throws IOException - failed to query data + */ + public List listWorkspacesByMostRecentlyRunningJobs(final int timeWindowInHours) throws IOException { + final Result> records = database.query(ctx -> ctx.selectDistinct(ACTOR.WORKSPACE_ID) + .from(ACTOR) + .join(CONNECTION) + .on(CONNECTION.SOURCE_ID.eq(ACTOR.ID)) + .join(JOBS) + .on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE)) + .where(JOBS.UPDATED_AT.greaterOrEqual(OffsetDateTime.now().minusHours(timeWindowInHours))) + .fetch()); + return records.stream().map(record -> record.get(ACTOR.WORKSPACE_ID)).collect(Collectors.toList()); + } + /** * Returns all active sources using a definition * diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseConfigDatabaseTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseConfigDatabaseTest.java index 3f8a221fc7db..10ba2848a7e7 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseConfigDatabaseTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/BaseConfigDatabaseTest.java @@ -11,6 +11,7 @@ import io.airbyte.db.init.DatabaseInitializationException; import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; import io.airbyte.db.instance.configs.ConfigsDatabaseTestProvider; +import io.airbyte.db.instance.test.TestDatabaseProviders; import io.airbyte.test.utils.DatabaseConnectionHelper; import java.io.IOException; import java.sql.SQLException; @@ -132,10 +133,12 @@ private static void createDbContainer() { container.start(); } - private static void setDb() { + private static void setDb() throws DatabaseInitializationException, IOException { dataSource = DatabaseConnectionHelper.createDataSource(container); dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); - database = new Database(dslContext); + final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(dataSource, dslContext); + database = databaseProviders.createNewConfigsDatabase(); + databaseProviders.createNewJobsDatabase(); } private static void migrateDb() throws IOException, DatabaseInitializationException { diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspaceFilterTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspaceFilterTest.java new file mode 100644 index 000000000000..70429a25688b --- /dev/null +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspaceFilterTest.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.persistence; + +import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR; +import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION; +import static io.airbyte.db.instance.configs.jooq.generated.Tables.CONNECTION; +import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE; +import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.airbyte.db.ExceptionWrappingDatabase; +import io.airbyte.db.instance.configs.jooq.generated.enums.ActorType; +import io.airbyte.db.instance.configs.jooq.generated.enums.NamespaceDefinitionType; +import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage; +import java.io.IOException; +import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.jooq.JSONB; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class WorkspaceFilterTest extends BaseConfigDatabaseTest { + + private static final UUID SRC_DEF_ID = UUID.randomUUID(); + private static final UUID DST_DEF_ID = UUID.randomUUID(); + private static final UUID ACTOR_ID_0 = UUID.randomUUID(); + private static final UUID ACTOR_ID_1 = UUID.randomUUID(); + private static final UUID ACTOR_ID_2 = UUID.randomUUID(); + private static final UUID ACTOR_ID_3 = UUID.randomUUID(); + private static final UUID CONN_ID_0 = UUID.randomUUID(); + private static final UUID CONN_ID_1 = UUID.randomUUID(); + private static final UUID CONN_ID_2 = UUID.randomUUID(); + private static final UUID CONN_ID_3 = UUID.randomUUID(); + private static final UUID CONN_ID_4 = UUID.randomUUID(); + private static final UUID CONN_ID_5 = UUID.randomUUID(); + private static final UUID WORKSPACE_ID_0 = UUID.randomUUID(); + private static final UUID WORKSPACE_ID_1 = UUID.randomUUID(); + private static final UUID WORKSPACE_ID_2 = UUID.randomUUID(); + private static final UUID WORKSPACE_ID_3 = UUID.randomUUID(); + private ConfigRepository configRepository; + + @BeforeAll + static void setUpAll() throws SQLException { + // create actor_definition + database.transaction(ctx -> ctx.insertInto(ACTOR_DEFINITION, ACTOR_DEFINITION.ID, ACTOR_DEFINITION.NAME, ACTOR_DEFINITION.DOCKER_REPOSITORY, + ACTOR_DEFINITION.DOCKER_IMAGE_TAG, ACTOR_DEFINITION.SPEC, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.RELEASE_STAGE) + .values(SRC_DEF_ID, "srcDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.source, ReleaseStage.beta) + .values(DST_DEF_ID, "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.generally_available) + .values(UUID.randomUUID(), "dstDef", "repository", "tag", JSONB.valueOf("{}"), ActorType.destination, ReleaseStage.alpha) + .execute()); + + // create workspace + database.transaction(ctx -> ctx.insertInto(WORKSPACE, WORKSPACE.ID, WORKSPACE.NAME, WORKSPACE.SLUG, WORKSPACE.INITIAL_SETUP_COMPLETE) + .values(WORKSPACE_ID_0, "ws-0", "ws-0", true) + .values(WORKSPACE_ID_1, "ws-1", "ws-1", true) + .values(WORKSPACE_ID_2, "ws-2", "ws-2", true) + .values(WORKSPACE_ID_3, "ws-3", "ws-3", true) + .execute()); + // create actors + database.transaction( + ctx -> ctx.insertInto(ACTOR, ACTOR.WORKSPACE_ID, ACTOR.ID, ACTOR.ACTOR_DEFINITION_ID, ACTOR.NAME, ACTOR.CONFIGURATION, ACTOR.ACTOR_TYPE) + .values(WORKSPACE_ID_0, ACTOR_ID_0, SRC_DEF_ID, "ACTOR-0", JSONB.valueOf("{}"), ActorType.source) + .values(WORKSPACE_ID_1, ACTOR_ID_1, SRC_DEF_ID, "ACTOR-1", JSONB.valueOf("{}"), ActorType.source) + .values(WORKSPACE_ID_2, ACTOR_ID_2, DST_DEF_ID, "ACTOR-2", JSONB.valueOf("{}"), ActorType.source) + .values(WORKSPACE_ID_3, ACTOR_ID_3, DST_DEF_ID, "ACTOR-3", JSONB.valueOf("{}"), ActorType.source) + .execute()); + // create connections + database.transaction( + ctx -> ctx.insertInto(CONNECTION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, + CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL) + .values(ACTOR_ID_0, ACTOR_ID_1, CONN_ID_0, NamespaceDefinitionType.source, "CONN-0", JSONB.valueOf("{}"), true) + .values(ACTOR_ID_0, ACTOR_ID_2, CONN_ID_1, NamespaceDefinitionType.source, "CONN-1", JSONB.valueOf("{}"), true) + .values(ACTOR_ID_1, ACTOR_ID_2, CONN_ID_2, NamespaceDefinitionType.source, "CONN-2", JSONB.valueOf("{}"), true) + .values(ACTOR_ID_1, ACTOR_ID_2, CONN_ID_3, NamespaceDefinitionType.source, "CONN-3", JSONB.valueOf("{}"), true) + .values(ACTOR_ID_2, ACTOR_ID_3, CONN_ID_4, NamespaceDefinitionType.source, "CONN-4", JSONB.valueOf("{}"), true) + .values(ACTOR_ID_3, ACTOR_ID_1, CONN_ID_5, NamespaceDefinitionType.source, "CONN-5", JSONB.valueOf("{}"), true) + .execute()); + // create jobs + final OffsetDateTime currentTs = OffsetDateTime.now(); + database.transaction(ctx -> ctx.insertInto(JOBS, JOBS.ID, JOBS.UPDATED_AT, JOBS.SCOPE) + .values(0L, currentTs.minusHours(0), CONN_ID_0.toString()) + .values(1L, currentTs.minusHours(5), CONN_ID_0.toString()) + .values(2L, currentTs.minusHours(10), CONN_ID_1.toString()) + .values(3L, currentTs.minusHours(15), CONN_ID_1.toString()) + .values(4L, currentTs.minusHours(20), CONN_ID_2.toString()) + .values(5L, currentTs.minusHours(30), CONN_ID_3.toString()) + .values(6L, currentTs.minusHours(40), CONN_ID_4.toString()) + .values(7L, currentTs.minusHours(50), CONN_ID_4.toString()) + .values(8L, currentTs.minusHours(70), CONN_ID_5.toString()) + .execute()); + } + + @BeforeEach + void beforeEach() { + configRepository = new ConfigRepository(database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)), null); + } + + @Test + @DisplayName("Should return a list of workspace IDs with most recently running jobs") + void testListWorkspacesByMostRecentlyRunningJobs() throws IOException { + final int timeWindowInHours = 48; + /* + * Following function is to filter workspace (IDs) with most recently running jobs within a given + * time window. Step 1: Filter on table JOBS where job's UPDATED_AT timestamp is within the given + * time window. Step 2: Trace back via CONNECTION table and ACTOR table. Step 3: Return workspace + * IDs from ACTOR table. + */ + final List actualResult = configRepository.listWorkspacesByMostRecentlyRunningJobs(timeWindowInHours); + /* + * With the test data provided above, expected outputs for each step: Step 1: `jobs` (IDs) OL, 1L, + * 2L, 3L, 4L, 5L and 6L. Step 2: `connections` (IDs) CONN_ID_0, CONN_ID_1, CONN_ID_2, CONN_ID_3, + * and CONN_ID_4 `actors` (IDs) ACTOR_ID_0, ACTOR_ID_1, and ACTOR_ID_2. Step 3: `workspaces` (IDs) + * WORKSPACE_ID_0, WORKSPACE_ID_1 and WORKSPACE_ID_2. + */ + final List expectedResult = new ArrayList<>(); + expectedResult.add(WORKSPACE_ID_0); + expectedResult.add(WORKSPACE_ID_1); + expectedResult.add(WORKSPACE_ID_2); + assertTrue(expectedResult.size() == actualResult.size() && expectedResult.containsAll(actualResult) && actualResult.containsAll(expectedResult)); + } + +}