Skip to content

Commit

Permalink
Copy job attempt state to configs database (airbytehq#7219)
Browse files Browse the repository at this point in the history
* Add migration to create latest state table

* Log migration name

* Expose db variables to airbyte-db

* Implement migration

* Fix migration test

* temp

* Rebase on master

* Save state in temporal (airbytehq#7253)

* Copy state to airbyte_configs table

* Add standard sync state

* Move state methods to config repository

* Add unit tests

* Fix unit tests

* Register standard sync state in migration

* Add comment

* Use config model instead of json node

* Add comments

* Remove unnecessary method

* Fix migration query

* Remove unused config database

* Move persist statement and log the call

* Update dev doc

* Add unit tests for sync workflow

Co-authored-by: Charles <giardina.charles@gmail.com>
  • Loading branch information
2 people authored and schlattk committed Jan 4, 2022
1 parent bcb51a4 commit 864c2f7
Show file tree
Hide file tree
Showing 51 changed files with 1,278 additions and 440 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum ConfigSchema implements AirbyteConfig {
destinationConnection -> destinationConnection.getDestinationId().toString(),
"destinationId"),

// sync
// sync (i.e. connection)
STANDARD_SYNC("StandardSync.yaml",
StandardSync.class,
standardSync -> standardSync.getConnectionId().toString(),
Expand All @@ -46,6 +46,10 @@ public enum ConfigSchema implements AirbyteConfig {
StandardSyncOperation.class,
standardSyncOperation -> standardSyncOperation.getOperationId().toString(),
"operationId"),
STANDARD_SYNC_STATE("StandardSyncState.yaml",
StandardSyncState.class,
standardSyncState -> standardSyncState.getConnectionId().toString(),
"connectionId"),

SOURCE_OAUTH_PARAM("SourceOAuthParameter.yaml", SourceOAuthParameter.class,
sourceOAuthParameter -> sourceOAuthParameter.getOauthParameterId().toString(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StandardSyncState.yaml
title: StandardSyncState
description: The current state of a connection (i.e. StandardSync).
type: object
additionalProperties: false
required:
- connectionId
properties:
connectionId:
type: string
format: uuid
description: This is a foreign key that references a connection (i.e. StandardSync).
state:
"$ref": State.yaml
description: The current (latest) connection state.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.config.persistence;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.client.util.Preconditions;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
Expand All @@ -19,7 +20,9 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncState;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
Expand All @@ -42,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class ConfigRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRepository.class);
Expand Down Expand Up @@ -431,6 +435,30 @@ public List<DestinationOAuthParameter> listDestinationOAuthParam() throws JsonVa
return persistence.listConfigs(ConfigSchema.DESTINATION_OAUTH_PARAM, DestinationOAuthParameter.class);
}

public Optional<State> getConnectionState(final UUID connectionId) throws IOException {
try {
final StandardSyncState connectionState = persistence.getConfig(
ConfigSchema.STANDARD_SYNC_STATE,
connectionId.toString(),
StandardSyncState.class);
return Optional.of(connectionState.getState());
} catch (final ConfigNotFoundException e) {
return Optional.empty();
} catch (final JsonValidationException e) {
throw new IllegalStateException(e);
}
}

public void updateConnectionState(final UUID connectionId, final State state) throws IOException {
LOGGER.info("Updating connection {} state: {}", connectionId, state);
final StandardSyncState connectionState = new StandardSyncState().withConnectionId(connectionId).withState(state);
try {
persistence.writeConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), connectionState);
} catch (final JsonValidationException e) {
throw new IllegalStateException(e);
}
}

/**
* Converts between a dumpConfig() output and a replaceAllConfigs() input, by deserializing the
* string/jsonnode into the AirbyteConfig, Stream<Object<AirbyteConfig.getClassName()>
Expand All @@ -453,6 +481,7 @@ public void replaceAllConfigsDeserializing(final Map<String, Stream<JsonNode>> c

public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final boolean dryRun) throws IOException {
if (longLivedSecretPersistence.isPresent()) {
Preconditions.checkNotNull(specFetcherFn);
final var augmentedMap = new HashMap<>(configs);

// get all source defs so that we can use their specs when storing secrets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public DatabaseConfigPersistence(final Database database) {
* If this is a migration deployment from an old version that relies on file system config
* persistence, copy the existing configs from local files.
*/
public void migrateFileConfigs(final Configs serverConfigs) throws IOException {
public DatabaseConfigPersistence migrateFileConfigs(final Configs serverConfigs) throws IOException {
database.transaction(ctx -> {
final boolean isInitialized = ctx.fetchExists(AIRBYTE_CONFIGS);
if (isInitialized) {
Expand All @@ -77,6 +77,8 @@ public void migrateFileConfigs(final Configs serverConfigs) throws IOException {

return null;
});

return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.airbyte.config.AirbyteConfig;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -186,7 +185,7 @@ public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final

@Override
public void loadData(final ConfigPersistence seedPersistence) throws IOException {
throw new UnsupportedEncodingException("This method is not supported in this implementation");
// this method is not supported in this implementation, but needed in tests; do nothing
}

private <T> T getConfigInternal(final AirbyteConfig configType, final String configId, final Class<T> clazz)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardSyncState;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -34,6 +41,11 @@ void setup() {
new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence));
}

@AfterEach
void cleanUp() {
reset(configPersistence);
}

@Test
void testWorkspaceWithNullTombstone() throws ConfigNotFoundException, IOException, JsonValidationException {
assertReturnsWorkspace(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID));
Expand All @@ -55,4 +67,34 @@ void assertReturnsWorkspace(final StandardWorkspace workspace) throws ConfigNotF
assertEquals(workspace, configRepository.getStandardWorkspace(WORKSPACE_ID, true));
}

@Test
void testGetConnectionState() throws Exception {
final UUID connectionId = UUID.randomUUID();
final State state = new State().withState(Jsons.deserialize("{ \"cursor\": 1000 }"));
final StandardSyncState connectionState = new StandardSyncState().withConnectionId(connectionId).withState(state);

when(configPersistence.getConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), StandardSyncState.class))
.thenThrow(new ConfigNotFoundException(ConfigSchema.STANDARD_SYNC_STATE, connectionId));
assertEquals(Optional.empty(), configRepository.getConnectionState(connectionId));

reset(configPersistence);
when(configPersistence.getConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), StandardSyncState.class))
.thenReturn(connectionState);
assertEquals(Optional.of(state), configRepository.getConnectionState(connectionId));
}

@Test
void testUpdateConnectionState() throws Exception {
final UUID connectionId = UUID.randomUUID();
final State state1 = new State().withState(Jsons.deserialize("{ \"cursor\": 1 }"));
final StandardSyncState connectionState1 = new StandardSyncState().withConnectionId(connectionId).withState(state1);
final State state2 = new State().withState(Jsons.deserialize("{ \"cursor\": 2 }"));
final StandardSyncState connectionState2 = new StandardSyncState().withConnectionId(connectionId).withState(state2);

configRepository.updateConnectionState(connectionId, state1);
verify(configPersistence, times(1)).writeConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), connectionState1);
configRepository.updateConnectionState(connectionId, state2);
verify(configPersistence, times(1)).writeConfig(ConfigSchema.STANDARD_SYNC_STATE, connectionId.toString(), connectionState2);
}

}
1 change: 1 addition & 0 deletions airbyte-db/lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies {

implementation project(':airbyte-protocol:models')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-config:models')
implementation "org.flywaydb:flyway-core:7.14.0"
implementation "org.testcontainers:postgresql:1.15.3"
// These are required because gradle might be using lower version of Jna from other
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs;

import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.table;

import io.airbyte.config.ConfigSchema;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.DatabaseMigrator;
import io.airbyte.db.instance.test.TestDatabaseProvider;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.jooq.JSONB;

public class ConfigsDatabaseTestProvider implements TestDatabaseProvider {

private final String user;
private final String password;
private final String jdbcUrl;

public ConfigsDatabaseTestProvider(final String user, final String password, final String jdbcUrl) {
this.user = user;
this.password = password;
this.jdbcUrl = jdbcUrl;
}

@Override
public Database create(final boolean runMigration) throws IOException {
final Database database = new ConfigsDatabaseInstance(user, password, jdbcUrl)
.getAndInitialize();

if (runMigration) {
final DatabaseMigrator migrator = new ConfigsDatabaseMigrator(
database,
ConfigsDatabaseTestProvider.class.getSimpleName());
migrator.createBaseline();
migrator.migrate();
}

// The configs database is considered ready only if there are some seed records.
// So we need to create at least one record here.
final OffsetDateTime timestamp = OffsetDateTime.now();
new ExceptionWrappingDatabase(database).transaction(ctx -> ctx.insertInto(table("airbyte_configs"))
.set(field("config_id"), UUID.randomUUID().toString())
.set(field("config_type"), ConfigSchema.STATE.name())
.set(field("config_blob"), JSONB.valueOf("{}"))
.set(field("created_at"), timestamp)
.set(field("updated_at"), timestamp)
.execute());

return database;
}

}
Loading

0 comments on commit 864c2f7

Please sign in to comment.