Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy job attempt state to configs database #7219

Merged
merged 24 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.config.persistence;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.client.util.Preconditions;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
Expand Down Expand Up @@ -42,6 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class ConfigRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRepository.class);
Expand Down Expand Up @@ -453,6 +455,7 @@ public void replaceAllConfigsDeserializing(final Map<String, Stream<JsonNode>> c

public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final boolean dryRun) throws IOException {
if (longLivedSecretPersistence.isPresent()) {
Preconditions.checkNotNull(specFetcherFn);
final var augmentedMap = new HashMap<>(configs);

// get all source defs so that we can use their specs when storing secrets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public DatabaseConfigPersistence(final Database database) {
* If this is a migration deployment from an old version that relies on file system config
* persistence, copy the existing configs from local files.
*/
public void migrateFileConfigs(final Configs serverConfigs) throws IOException {
public DatabaseConfigPersistence migrateFileConfigs(final Configs serverConfigs) throws IOException {
database.transaction(ctx -> {
final boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS);
if (isInitialized) {
Expand All @@ -77,6 +77,8 @@ public void migrateFileConfigs(final Configs serverConfigs) throws IOException {

return null;
});

return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.airbyte.config.AirbyteConfig;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -186,7 +185,7 @@ public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final

@Override
public void loadData(final ConfigPersistence seedPersistence) throws IOException {
throw new UnsupportedEncodingException("This method is not supported in this implementation");
// this method is not supported in this implementation, but needed in tests; do nothing
}

private <T> T getConfigInternal(final AirbyteConfig configType, final String configId, final Class<T> clazz)
Expand Down
1 change: 1 addition & 0 deletions airbyte-db/lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies {

implementation project(':airbyte-protocol:models')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-config:models')
implementation "org.flywaydb:flyway-core:7.14.0"
implementation "org.testcontainers:postgresql:1.15.3"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs;

import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.table;

import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.db.instance.test.TestDatabaseProvider;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.jooq.JSONB;

public class ConfigsDatabaseTestProvider implements TestDatabaseProvider {

private final String user;
private final String password;
private final String jdbcUrl;

public ConfigsDatabaseTestProvider(String user, String password, String jdbcUrl) {
this.user = user;
this.password = password;
this.jdbcUrl = jdbcUrl;
}

@Override
public Database create(final boolean runMigration) throws IOException {
final Database database = new ConfigsDatabaseInstance(user, password, jdbcUrl)
.getAndInitialize();

if (runMigration) {
final DatabaseMigrator migrator = new ConfigsDatabaseMigrator(
database,
ConfigsDatabaseTestProvider.class.getSimpleName());
migrator.createBaseline();
migrator.migrate();
}

// The configs database is considered ready only if there are some seed records.
// So we need to create at least one record here.
OffsetDateTime timestamp = OffsetDateTime.now();
new ExceptionWrappingDatabase(database).transaction(ctx -> ctx.insertInto(table("airbyte_configs"))
.set(field("config_id"), UUID.randomUUID().toString())
.set(field("config_type"), "STANDARD_SOURCE_DEFINITION")
.set(field("config_blob"), JSONB.valueOf("{}"))
.set(field("created_at"), timestamp)
.set(field("updated_at"), timestamp)
.execute());

return database;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.db.Database;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import java.io.IOException;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.Record2;
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Create a new table to store the latest job state for each standard sync.
* <li>Column sync_id: the connectionId in StandardSync</><li>Column state: a json node representing
* a State object</li>
*/
public class V0_29_21_001__Store_last_sync_state extends BaseJavaMigration {

private static final String MIGRATION_NAME = "Configs db migration 0.29.21.001";
private static final Logger LOGGER = LoggerFactory.getLogger(V0_29_21_001__Store_last_sync_state.class);

// sync state table
static final Table<?> SYNC_STATE_TABLE = DSL.table("sync_state");
static final Field<UUID> COLUMN_SYNC_ID = DSL.field("sync_id", SQLDataType.UUID.nullable(false));
static final Field<JSONB> COLUMN_STATE = DSL.field("state", SQLDataType.JSONB.nullable(false));
static final Field<OffsetDateTime> COLUMN_CREATED_AT = DSL.field("created_at",
SQLDataType.OFFSETDATETIME.nullable(false).defaultValue(DSL.currentOffsetDateTime()));
static final Field<OffsetDateTime> COLUMN_UPDATED_AT = DSL.field("updated_at",
SQLDataType.OFFSETDATETIME.nullable(false).defaultValue(DSL.currentOffsetDateTime()));

private final Configs configs;

public V0_29_21_001__Store_last_sync_state() {
this.configs = new EnvConfigs();
}

@VisibleForTesting
V0_29_21_001__Store_last_sync_state(final Configs configs) {
this.configs = configs;
}

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());
final DSLContext ctx = DSL.using(context.getConnection());

createTable(ctx);

final Optional<Database> jobsDatabase = getJobsDatabase(configs);
if (jobsDatabase.isPresent()) {
copyData(ctx, getSyncToStateMap(jobsDatabase.get()), OffsetDateTime.now());
}
}

@VisibleForTesting
static void createTable(final DSLContext ctx) {
ctx.createTableIfNotExists(SYNC_STATE_TABLE)
.column(COLUMN_SYNC_ID)
.column(COLUMN_STATE)
.column(COLUMN_CREATED_AT)
.column(COLUMN_UPDATED_AT)
.execute();
ctx.createUniqueIndexIfNotExists(String.format("%s_sync_id_idx", SYNC_STATE_TABLE))
.on(SYNC_STATE_TABLE, Collections.singleton(COLUMN_SYNC_ID))
.execute();
}

@VisibleForTesting
static void copyData(final DSLContext ctx, final Map<String, JsonNode> syncToStateMap, final OffsetDateTime timestamp) {
for (final Map.Entry<String, JsonNode> entry : syncToStateMap.entrySet()) {
ctx.insertInto(SYNC_STATE_TABLE)
.set(COLUMN_SYNC_ID, UUID.fromString(entry.getKey()))
.set(COLUMN_STATE, JSONB.valueOf(Jsons.serialize(entry.getValue())))
.set(COLUMN_CREATED_AT, timestamp)
.set(COLUMN_UPDATED_AT, timestamp)
// This migration is idempotent. If the record for a sync_id already exists,
// it means that the migration has already been run before. Abort insertion.
.onDuplicateKeyIgnore()
.execute();
}
}

/**
* This migration requires a connection to the job database, which may be a separate database from
* the config database. However, the job database only exists in production, not in development or
* test. We use the job database environment variables to determine how to connect to the job
* database. This approach is not 100% reliable. However, it is better than doing half of the
* migration here (creating the table), and the rest of the work during server start up (copying the
* data from the job database).
*/
@VisibleForTesting
static Optional<Database> getJobsDatabase(final Configs configs) {
try {
// If the environment variables exist, it means the migration is run in production.
// Connect to the official job database.
final Database jobsDatabase = new JobsDatabaseInstance(
configs.getDatabaseUser(),
configs.getDatabasePassword(),
configs.getDatabaseUrl())
.getInitialized();
LOGGER.info("[{}] Connected to jobs database: {}", MIGRATION_NAME, configs.getDatabaseUrl());
return Optional.of(jobsDatabase);
} catch (final IllegalArgumentException e) {
// If the environment variables do not exist, it means the migration is run in development.
// Connect to a mock job database, because we don't need to copy any data in test.
LOGGER.info("[{}] This is the dev environment; there is no jobs database", MIGRATION_NAME);
return Optional.empty();
} catch (final IOException e) {
throw new RuntimeException("Cannot connect to jobs database", e);
}
}

/**
* @return a map from sync id to last job attempt state.
*/
@VisibleForTesting
static Map<String, JsonNode> getSyncToStateMap(final Database jobsDatabase) throws SQLException {
final Table<?> jobsTable = DSL.table("jobs");
final Field<Long> jobIdField = DSL.field("jobs.id", SQLDataType.BIGINT);
final Field<String> syncIdField = DSL.field("jobs.scope", SQLDataType.VARCHAR);

final Table<?> attemptsTable = DSL.table("attempts");
final Field<Long> attemptJobIdField = DSL.field("attempts.job_id", SQLDataType.BIGINT);
final Field<Integer> attemptNumberField = DSL.field("attempts.attempt_number", SQLDataType.INTEGER);

// output schema: JobOutput.yaml
// sync schema: StandardSyncOutput.yaml
// state schema: State.yaml
final Field<JSONB> attemptStateField = DSL.field("attempts.output -> 'sync' -> 'state'", SQLDataType.JSONB);

return jobsDatabase.query(ctx -> ctx
.select(syncIdField, attemptStateField)
.from(attemptsTable)
.innerJoin(jobsTable)
.on(jobIdField.eq(attemptJobIdField))
.where(DSL.row(attemptJobIdField, attemptNumberField).in(
// for each job id, find the last attempt with a state
DSL.select(attemptJobIdField, DSL.max(attemptNumberField))
.from(attemptsTable)
.where(attemptStateField.isNotNull())
.groupBy(attemptJobIdField)))
.fetch()
.stream()
.collect(Collectors.toMap(
Record2::value1,
r -> Jsons.deserialize(r.value2().data()))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public static void createNextMigrationFile(final String dbIdentifier, final Flyw

final String template = MoreResources.readResource("migration_template.txt");
final String newMigration = template.replace("<db-name>", dbIdentifier)
.replace("<version-id>", versionId)
.replace("<description>", description)
.replaceAll("<version-id>", versionId)
.replaceAll("<description>", description)
.strip();

final String fileName = String.format("V%s__%s.java", versionId, description);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.jobs;

import io.airbyte.db.Database;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.db.instance.test.TestDatabaseProvider;
import java.io.IOException;

public class JobsDatabaseTestProvider implements TestDatabaseProvider {

private final String user;
private final String password;
private final String jdbcUrl;

public JobsDatabaseTestProvider(String user, String password, String jdbcUrl) {
this.user = user;
this.password = password;
this.jdbcUrl = jdbcUrl;
}

@Override
public Database create(final boolean runMigration) throws IOException {
final Database jobsDatabase = new JobsDatabaseInstance(user, password, jdbcUrl)
.getAndInitialize();

if (runMigration) {
final DatabaseMigrator migrator = new JobsDatabaseMigrator(
jobsDatabase,
JobsDatabaseTestProvider.class.getSimpleName());
migrator.createBaseline();
migrator.migrate();
}

return jobsDatabase;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_29_15_001__Add_temporalWorkflowId_col_to_Attempts extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_29_15_001__Add_temporalWorkflowId_col_to_Attempts.class);

@Override
public void migrate(final Context context) throws Exception {
// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

final DSLContext ctx = DSL.using(context.getConnection());
ctx.alterTable("attempts")
.addColumnIfNotExists(DSL.field("temporal_workflow_id", SQLDataType.VARCHAR(256).nullable(true)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.test;

import io.airbyte.db.Database;
import java.io.IOException;

public interface TestDatabaseProvider {

Database create(final boolean runMigration) throws IOException;

}
Loading