Skip to content

Commit

Permalink
Set resource limits for connector definitions: persistence layer (#10481
Browse files Browse the repository at this point in the history
)
  • Loading branch information
cgardens authored Feb 23, 2022
1 parent 2e4d91e commit 342840d
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.28.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.32.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ActorDefinitionResourceRequirements.yaml
title: ActorDefinitionResourceRequirements
description: actor definition specific resource requirements
type: object
additionalProperties: true
properties:
default:
description: if set, these are the requirements that should be set for ALL jobs run for this actor definition.
"$ref": ResourceRequirements.yaml
jobSpecific:
type: array
items:
"$ref": "#/definitions/JobTypeResourceLimit"
definitions:
JobTypeResourceLimit:
description: sets resource requirements for a specific job type for an actor definition. these values override the default, if both are set.
type: object
required:
- jobtype
- resourceRequirements
properties:
jobType:
"$ref": JobType.yaml
resourceRequirements:
"$ref": ResourceRequirements.yaml
14 changes: 14 additions & 0 deletions airbyte-config/models/src/main/resources/types/JobType.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/JobType.yaml
title: JobType
description: enum that describes the different types of jobs that the platform runs.
type: string
enum:
- get_spec
- check_connection
- discover_schema
- sync
- reset_connection
- connection_updater
- replicate
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ResourceRequirements.yaml
title: ResourceRequirements
description: generic configuration for pod source requirements
type: object
additionalProperties: true
properties:
# todo (cgardens) - should be camel case for consistency.
cpu_request:
type: string
cpu_limit:
type: string
memory_request:
type: string
memory_limit:
type: string
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ properties:
description: The date when this connector was first released, in yyyy-mm-dd format.
type: string
format: date
resourceRequirements:
"$ref": ActorDefinitionResourceRequirements.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,5 @@ properties:
description: The date when this connector was first released, in yyyy-mm-dd format.
type: string
format: date
resourceRequirements:
"$ref": ActorDefinitionResourceRequirements.yaml
13 changes: 1 addition & 12 deletions airbyte-config/models/src/main/resources/types/StandardSync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,4 @@ properties:
manual:
type: boolean
resourceRequirements:
type: object
description: optional resource requirements to run sync workers
additionalProperties: false
properties:
cpu_request:
type: string
cpu_limit:
type: string
memory_request:
type: string
memory_limit:
type: string
"$ref": ResourceRequirements.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigWithMetadata;
Expand Down Expand Up @@ -398,7 +399,10 @@ private StandardSourceDefinition buildStandardSourceDefinition(final Record reco
.withReleaseStage(record.get(ACTOR_DEFINITION.RELEASE_STAGE) == null ? null
: Enums.toEnum(record.get(ACTOR_DEFINITION.RELEASE_STAGE, String.class), StandardSourceDefinition.ReleaseStage.class).orElseThrow())
.withReleaseDate(record.get(ACTOR_DEFINITION.RELEASE_DATE) == null ? null
: record.get(ACTOR_DEFINITION.RELEASE_DATE).toString());
: record.get(ACTOR_DEFINITION.RELEASE_DATE).toString())
.withResourceRequirements(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS) == null
? null
: Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class));
}

private List<ConfigWithMetadata<StandardDestinationDefinition>> listStandardDestinationDefinitionWithMetadata() throws IOException {
Expand Down Expand Up @@ -442,7 +446,10 @@ private StandardDestinationDefinition buildStandardDestinationDefinition(final R
.withReleaseStage(record.get(ACTOR_DEFINITION.RELEASE_STAGE) == null ? null
: Enums.toEnum(record.get(ACTOR_DEFINITION.RELEASE_STAGE, String.class), StandardDestinationDefinition.ReleaseStage.class).orElseThrow())
.withReleaseDate(record.get(ACTOR_DEFINITION.RELEASE_DATE) == null ? null
: record.get(ACTOR_DEFINITION.RELEASE_DATE).toString());
: record.get(ACTOR_DEFINITION.RELEASE_DATE).toString())
.withResourceRequirements(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS) == null
? null
: Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class));
}

private List<ConfigWithMetadata<SourceConnection>> listSourceConnectionWithMetadata() throws IOException {
Expand Down Expand Up @@ -887,6 +894,9 @@ private void writeStandardSourceDefinition(final List<StandardSourceDefinition>
io.airbyte.db.instance.configs.jooq.enums.ReleaseStage.class).orElseThrow())
.set(ACTOR_DEFINITION.RELEASE_DATE, standardSourceDefinition.getReleaseDate() == null ? null
: LocalDate.parse(standardSourceDefinition.getReleaseDate()))
.set(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS,
standardSourceDefinition.getResourceRequirements() == null ? null
: JSONB.valueOf(Jsons.serialize(standardSourceDefinition.getResourceRequirements())))
.set(ACTOR_DEFINITION.UPDATED_AT, timestamp)
.where(ACTOR_DEFINITION.ID.eq(standardSourceDefinition.getSourceDefinitionId()))
.execute();
Expand All @@ -912,6 +922,9 @@ private void writeStandardSourceDefinition(final List<StandardSourceDefinition>
io.airbyte.db.instance.configs.jooq.enums.ReleaseStage.class).orElseThrow())
.set(ACTOR_DEFINITION.RELEASE_DATE, standardSourceDefinition.getReleaseDate() == null ? null
: LocalDate.parse(standardSourceDefinition.getReleaseDate()))
.set(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS,
standardSourceDefinition.getResourceRequirements() == null ? null
: JSONB.valueOf(Jsons.serialize(standardSourceDefinition.getResourceRequirements())))
.set(ACTOR_DEFINITION.CREATED_AT, timestamp)
.set(ACTOR_DEFINITION.UPDATED_AT, timestamp)
.execute();
Expand Down Expand Up @@ -949,6 +962,9 @@ private void writeStandardDestinationDefinition(final List<StandardDestinationDe
io.airbyte.db.instance.configs.jooq.enums.ReleaseStage.class).orElseThrow())
.set(ACTOR_DEFINITION.RELEASE_DATE, standardDestinationDefinition.getReleaseDate() == null ? null
: LocalDate.parse(standardDestinationDefinition.getReleaseDate()))
.set(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS,
standardDestinationDefinition.getResourceRequirements() == null ? null
: JSONB.valueOf(Jsons.serialize(standardDestinationDefinition.getResourceRequirements())))
.set(ACTOR_DEFINITION.UPDATED_AT, timestamp)
.where(ACTOR_DEFINITION.ID.eq(standardDestinationDefinition.getDestinationDefinitionId()))
.execute();
Expand All @@ -970,6 +986,9 @@ private void writeStandardDestinationDefinition(final List<StandardDestinationDe
io.airbyte.db.instance.configs.jooq.enums.ReleaseStage.class).orElseThrow())
.set(ACTOR_DEFINITION.RELEASE_DATE, standardDestinationDefinition.getReleaseDate() == null ? null
: LocalDate.parse(standardDestinationDefinition.getReleaseDate()))
.set(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS,
standardDestinationDefinition.getResourceRequirements() == null ? null
: JSONB.valueOf(Jsons.serialize(standardDestinationDefinition.getResourceRequirements())))
.set(ACTOR_DEFINITION.CREATED_AT, timestamp)
.set(ACTOR_DEFINITION.UPDATED_AT, timestamp)
.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
Expand Down Expand Up @@ -112,7 +113,8 @@ public static List<StandardSourceDefinition> standardSourceDefinitions() {
.withDocumentationUrl("documentation-url-1")
.withIcon("icon-1")
.withSpec(connectorSpecification)
.withTombstone(false);
.withTombstone(false)
.withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(new ResourceRequirements().withCpuRequest("2")));
final StandardSourceDefinition standardSourceDefinition2 = new StandardSourceDefinition()
.withSourceDefinitionId(SOURCE_DEFINITION_ID_2)
.withSourceType(SourceType.DATABASE)
Expand Down Expand Up @@ -148,7 +150,8 @@ public static List<StandardDestinationDefinition> standardDestinationDefinitions
.withDocumentationUrl("documentation-url-3")
.withIcon("icon-3")
.withSpec(connectorSpecification)
.withTombstone(false);
.withTombstone(false)
.withResourceRequirements(new ActorDefinitionResourceRequirements().withDefault(new ResourceRequirements().withCpuRequest("2")));
final StandardDestinationDefinition standardDestinationDefinition2 = new StandardDestinationDefinition()
.withDestinationDefinitionId(DESTINATION_DEFINITION_ID_2)
.withName("random-destination-2")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_35_32_001__AddConnectorDefinitionResourceLimits extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_32_001__AddConnectorDefinitionResourceLimits.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

final DSLContext ctx = DSL.using(context.getConnection());
addResourceReqsToActorDefs(ctx);
}

public static void addResourceReqsToActorDefs(final DSLContext ctx) {
ctx.alterTable("actor_definition")
.addColumnIfNotExists(DSL.field("resource_requirements", SQLDataType.JSONB.nullable(true)))
.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ create table "public"."actor_definition"(
"tombstone" bool not null default false,
"release_stage" release_stage null,
"release_date" date null,
"resource_requirements" jsonb null,
constraint "actor_definition_pkey"
primary key ("id")
);
Expand Down

0 comments on commit 342840d

Please sign in to comment.