Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove production use of DatabaseConfigPersistence #19275

Merged
merged 7 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
* This class can be used to store DB queries for persisting configs that we may want to reuse
* across this package.
* <p>
* Currently this class is used to move write queries out of {@link DatabaseConfigPersistence} so
* that they can be reused/composed in {@link ConfigRepository}.
* Currently this class is used to move write queries out of {@link ConfigPersistence} so that they
* can be reused/composed in {@link ConfigRepository}.
*/
@SuppressWarnings("PMD.CognitiveComplexity")
public class ConfigWriter {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.configs.jooq.generated.tables.Actor;
import io.airbyte.db.instance.configs.jooq.generated.tables.ActorDefinition;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.StreamDescriptor;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -72,11 +74,7 @@ public ConfigWithMetadata<StandardSync> getStandardSyncWithMetadata(final UUID c
}

public List<StandardSync> listStandardSync() throws IOException {
return listStandardSyncWithMetadata().stream().map(ConfigWithMetadata::getConfig).toList();
}

public List<ConfigWithMetadata<StandardSync>> listStandardSyncWithMetadata() throws IOException {
return listStandardSyncWithMetadata(Optional.empty());
return listStandardSyncWithMetadata(Optional.empty()).stream().map(ConfigWithMetadata::getConfig).toList();
}

public void writeStandardSync(final StandardSync standardSync) throws IOException {
Expand All @@ -86,6 +84,12 @@ public void writeStandardSync(final StandardSync standardSync) throws IOExceptio
});
}

/**
* Deletes a connection (sync) and all of dependent resources (state and connection_operations).
*
* @param standardSyncId - id of the sync (a.k.a. connection_id)
* @throws IOException - error while accessing db.
*/
public void deleteStandardSync(final UUID standardSyncId) throws IOException {
database.transaction(ctx -> {
PersistenceHelpers.deleteConfig(CONNECTION_OPERATION, CONNECTION_OPERATION.CONNECTION_ID, standardSyncId, ctx);
Expand Down Expand Up @@ -118,6 +122,11 @@ public void clearUnsupportedProtocolVersionFlag(final UUID actorDefinitionId,
});
}

public List<StreamDescriptor> getAllStreamsForConnection(final UUID connectionId) throws ConfigNotFoundException, IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved from ConfigRepository

final StandardSync standardSync = getStandardSync(connectionId);
return CatalogHelpers.extractStreamDescriptors(standardSync.getCatalog());
}

private List<ConfigWithMetadata<StandardSync>> listStandardSyncWithMetadata(final Optional<UUID> configId) throws IOException {
final Result<Record> result = database.query(ctx -> {
final SelectJoinStep<Record> query = ctx.select(asterisk()).from(CONNECTION);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import static io.airbyte.config.ConfigSchema.STANDARD_DESTINATION_DEFINITION;
import static io.airbyte.config.ConfigSchema.STANDARD_SOURCE_DEFINITION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.ReleaseStage;
import io.airbyte.config.StandardSourceDefinition.SourceType;
import io.airbyte.config.persistence.ActorDefinitionMigrator.ConnectorInfo;
import io.airbyte.db.ExceptionWrappingDatabase;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ActorDefinitionMigratorTest extends BaseConfigDatabaseTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests from DatabaseConfigPersistenceTest that were specific to migrator.


public static final String DEFAULT_PROTOCOL_VERSION = "0.2.0";
protected static final StandardSourceDefinition SOURCE_POSTGRES = new StandardSourceDefinition()
.withName("Postgres")
.withSourceDefinitionId(UUID.fromString("decd338e-5647-4c0b-adf4-da0e75f5a750"))
.withDockerRepository("airbyte/source-postgres")
.withDockerImageTag("0.3.11")
.withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres")
.withIcon("postgresql.svg")
.withSourceType(SourceType.DATABASE)
.withTombstone(false);
protected static final StandardSourceDefinition SOURCE_CUSTOM = new StandardSourceDefinition()
.withName("Custom")
.withSourceDefinitionId(UUID.fromString("baba338e-5647-4c0b-adf4-da0e75f5a750"))
.withDockerRepository("airbyte/custom")
.withDockerImageTag("0.3.11")
.withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres")
.withIcon("postgresql.svg")
.withSourceType(SourceType.DATABASE)
.withCustom(true)
.withReleaseStage(ReleaseStage.CUSTOM)
.withTombstone(false);
protected static final StandardDestinationDefinition DESTINATION_S3 = new StandardDestinationDefinition()
.withName("S3")
.withDestinationDefinitionId(UUID.fromString("4816b78f-1489-44c1-9060-4b19d5fa9362"))
.withDockerRepository("airbyte/destination-s3")
.withDockerImageTag("0.1.12")
.withDocumentationUrl("https://docs.airbyte.io/integrations/destinations/s3")
.withProtocolVersion(DEFAULT_PROTOCOL_VERSION)
.withTombstone(false);
protected static final StandardDestinationDefinition DESTINATION_CUSTOM = new StandardDestinationDefinition()
.withName("Custom")
.withDestinationDefinitionId(UUID.fromString("baba338e-5647-4c0b-adf4-da0e75f5a750"))
.withDockerRepository("airbyte/custom")
.withDockerImageTag("0.3.11")
.withDocumentationUrl("https://docs.airbyte.io/integrations/sources/postgres")
.withIcon("postgresql.svg")
.withCustom(true)
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)
.withTombstone(false);

private ActorDefinitionMigrator migrator;
private ConfigRepository configRepository;

@BeforeEach
void setup() throws SQLException {
truncateAllTables();

migrator = new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database));
configRepository = new ConfigRepository(database, migrator, null);
}

private void writeSource(final StandardSourceDefinition source) throws Exception {
configRepository.writeStandardSourceDefinition(source);
}

@Test
void testGetConnectorRepositoryToInfoMap() throws Exception {
final String connectorRepository = "airbyte/duplicated-connector";
final String oldVersion = "0.1.10";
final String newVersion = DEFAULT_PROTOCOL_VERSION;
final StandardSourceDefinition source1 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withName("source-1")
.withDockerRepository(connectorRepository)
.withDockerImageTag(oldVersion);
final StandardSourceDefinition source2 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withName("source-2")
.withDockerRepository(connectorRepository)
.withDockerImageTag(newVersion);
writeSource(source1);
writeSource(source2);
final Map<String, ConnectorInfo> result = database.query(ctx -> migrator.getConnectorRepositoryToInfoMap(ctx));
// when there are duplicated connector definitions, the one with the latest version should be
// retrieved
assertEquals(newVersion, result.get(connectorRepository).dockerImageTag);
}

@Test
void testHasNewVersion() {
assertTrue(ActorDefinitionMigrator.hasNewVersion("0.1.99", DEFAULT_PROTOCOL_VERSION));
assertFalse(ActorDefinitionMigrator.hasNewVersion("invalid_version", "0.1.2"));
}

@Test
void testHasNewPatchVersion() {
assertFalse(ActorDefinitionMigrator.hasNewPatchVersion("0.1.99", DEFAULT_PROTOCOL_VERSION));
assertFalse(ActorDefinitionMigrator.hasNewPatchVersion("invalid_version", "0.3.1"));
assertTrue(ActorDefinitionMigrator.hasNewPatchVersion("0.1.0", "0.1.3"));
}

@Test
void testGetNewFields() {
final JsonNode o1 = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }");
final JsonNode o2 = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3 }");
assertEquals(Collections.emptySet(), ActorDefinitionMigrator.getNewFields(o1, o1));
assertEquals(Collections.singleton("field3"), ActorDefinitionMigrator.getNewFields(o1, o2));
assertEquals(Collections.singleton("field2"), ActorDefinitionMigrator.getNewFields(o2, o1));
}

@Test
void testGetDefinitionWithNewFields() {
final JsonNode current = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2 }");
final JsonNode latest = Jsons.deserialize("{ \"field1\": 1, \"field3\": 3, \"field4\": 4 }");
final Set<String> newFields = Set.of("field3");

assertEquals(current, ActorDefinitionMigrator.getDefinitionWithNewFields(current, latest, Collections.emptySet()));

final JsonNode currentWithNewFields = Jsons.deserialize("{ \"field1\": 1, \"field2\": 2, \"field3\": 3 }");
assertEquals(currentWithNewFields, ActorDefinitionMigrator.getDefinitionWithNewFields(current, latest, newFields));
}

@Test
void testActorDefinitionReleaseDate() throws Exception {
final UUID definitionId = UUID.randomUUID();
final String connectorRepository = "airbyte/test-connector";

// when the record does not exist, it is inserted
final StandardSourceDefinition sourceDef = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.1.2")
.withName("random-name")
.withProtocolVersion(DEFAULT_PROTOCOL_VERSION)
.withTombstone(false);
writeSource(sourceDef);
assertEquals(sourceDef, configRepository.getStandardSourceDefinition(sourceDef.getSourceDefinitionId()));
}

@Test
void filterCustomSource() {
final Map<String, ConnectorInfo> sourceMap = new HashMap<>();
final String nonCustomKey = "non-custom";
final String customKey = "custom";
sourceMap.put(nonCustomKey, new ConnectorInfo("id", Jsons.jsonNode(SOURCE_POSTGRES)));
sourceMap.put(customKey, new ConnectorInfo("id", Jsons.jsonNode(SOURCE_CUSTOM)));

final Map<String, ConnectorInfo> filteredSrcMap = migrator.filterCustomConnector(sourceMap, STANDARD_SOURCE_DEFINITION);

assertThat(filteredSrcMap).containsOnlyKeys(nonCustomKey);
}

@Test
void filterCustomDestination() {
final Map<String, ConnectorInfo> sourceMap = new HashMap<>();
final String nonCustomKey = "non-custom";
final String customKey = "custom";
sourceMap.put(nonCustomKey, new ConnectorInfo("id", Jsons.jsonNode(DESTINATION_S3)));
sourceMap.put(customKey, new ConnectorInfo("id", Jsons.jsonNode(DESTINATION_CUSTOM)));

final Map<String, ConnectorInfo> filteredDestMap = migrator.filterCustomConnector(sourceMap, STANDARD_DESTINATION_DEFINITION);

assertThat(filteredDestMap).containsOnlyKeys(nonCustomKey);
}

}
Loading