Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐞 Fix db config persistence unique constraint conflict #5846

Merged
merged 12 commits into from
Sep 4, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigSchemaMigrationSupport;
Expand All @@ -50,6 +51,7 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
Expand Down Expand Up @@ -121,8 +123,6 @@ public <T> List<T> listConfigs(AirbyteConfig configType, Class<T> clazz) throws

@Override
public <T> void writeConfig(AirbyteConfig configType, String configId, T config) throws IOException {
LOGGER.info("Upserting {} record {}", configType, configId);

database.transaction(ctx -> {
boolean isExistingConfig = ctx.fetchExists(select()
.from(AIRBYTE_CONFIGS)
Expand All @@ -131,27 +131,9 @@ public <T> void writeConfig(AirbyteConfig configType, String configId, T config)
OffsetDateTime timestamp = OffsetDateTime.now();

if (isExistingConfig) {
int updateCount = ctx.update(AIRBYTE_CONFIGS)
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(config)))
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId))
.execute();
if (updateCount != 0 && updateCount != 1) {
LOGGER.warn("{} config {} has been updated; updated record count: {}", configType, configId, updateCount);
}

return null;
}

int insertionCount = ctx.insertInto(AIRBYTE_CONFIGS)
.set(AIRBYTE_CONFIGS.CONFIG_ID, configId)
.set(AIRBYTE_CONFIGS.CONFIG_TYPE, configType.name())
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(config)))
.set(AIRBYTE_CONFIGS.CREATED_AT, timestamp)
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.execute();
if (insertionCount != 1) {
LOGGER.warn("{} config {} has been inserted; insertion record count: {}", configType, configId, insertionCount);
updateConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configId);
} else {
insertConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configType.getIdFieldName());
}

return null;
Expand Down Expand Up @@ -215,20 +197,25 @@ public Map<String, Stream<JsonNode>> dumpConfigs() throws IOException {
* @return the number of inserted records for convenience, which is always 1.
*/
@VisibleForTesting
int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, String idFieldName) {
int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, @Nullable String idFieldName) {
String configId = idFieldName == null
? UUID.randomUUID().toString()
: configJson.get(idFieldName).asText();
LOGGER.info("Inserting {} record {}", configType, configId);

ctx.insertInto(AIRBYTE_CONFIGS)
int insertionCount = ctx.insertInto(AIRBYTE_CONFIGS)
.set(AIRBYTE_CONFIGS.CONFIG_ID, configId)
.set(AIRBYTE_CONFIGS.CONFIG_TYPE, configType)
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(configJson)))
.set(AIRBYTE_CONFIGS.CREATED_AT, timestamp)
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.onConflict(AIRBYTE_CONFIGS.CONFIG_TYPE, AIRBYTE_CONFIGS.CONFIG_ID)
.doNothing()
.execute();
return 1;
if (insertionCount != 1) {
LOGGER.warn("{} config {} already exists (insertion record count: {})", configType, configId, insertionCount);
}
return insertionCount;
}

/**
Expand All @@ -238,11 +225,15 @@ int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configTy
int updateConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, String configId) {
LOGGER.info("Updating {} record {}", configType, configId);

return ctx.update(AIRBYTE_CONFIGS)
int updateCount = ctx.update(AIRBYTE_CONFIGS)
.set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(configJson)))
.set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp)
.where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId))
.execute();
if (updateCount != 1) {
LOGGER.warn("{} config {} is not updated (updated record count: {})", configType, configId, updateCount);
}
return updateCount;
}

@VisibleForTesting
Expand All @@ -268,12 +259,14 @@ void copyConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence
LOGGER.info("Config database data loading completed with {} records", insertionCount);
}

private static class ConnectorInfo {
static class ConnectorInfo {

private final String connectorDefinitionId;
private final String dockerImageTag;
final String dockerRepository;
final String connectorDefinitionId;
final String dockerImageTag;

private ConnectorInfo(String connectorDefinitionId, String dockerImageTag) {
private ConnectorInfo(String dockerRepository, String connectorDefinitionId, String dockerImageTag) {
this.dockerRepository = dockerRepository;
this.connectorDefinitionId = connectorDefinitionId;
this.dockerImageTag = dockerImageTag;
}
Expand Down Expand Up @@ -334,7 +327,8 @@ private <T> ConnectorCounter updateConnectorDefinitions(DSLContext ctx,
AirbyteConfig configType,
List<T> latestDefinitions,
Set<String> connectorRepositoriesInUse,
Map<String, ConnectorInfo> connectorRepositoryToIdVersionMap) {
Map<String, ConnectorInfo> connectorRepositoryToIdVersionMap)
throws IOException {
int newCount = 0;
int updatedCount = 0;
for (T latestDefinition : latestDefinitions) {
Expand Down Expand Up @@ -364,7 +358,8 @@ private <T> ConnectorCounter updateConnectorDefinitions(DSLContext ctx,
* repository name instead of definition id because connectors can be added manually by
* users, and are not always the same as those in the seed.
*/
private Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(DSLContext ctx) {
@VisibleForTesting
Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(DSLContext ctx) {
Copy link
Contributor

@sherifnada sherifnada Sep 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this return the info keyed by definition? connector repository is not guaranteed to be a primary key. Or at least maybe it should return a multimap?

Copy link
Contributor Author

@tuliren tuliren Sep 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we cannot key it by definition. The purpose is to upgrade the connector version, and the version is included in the definition. If we key by definition, we cannot check which definition needs the upgrade efficiently.

Why the connector repository is not unique? Do we allow multiple versions of repos in the same workspace? That seems chaotic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair points, let's resolve after the weekend. Enjoy the long weekend!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it also makes sense for different connectors to have the same repository with different names. In that case, I can change it to a multi-map.

Sure, this can wait after the weekend.

Field<String> repoField = field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR).as("repository");
Field<String> versionField = field("config_blob ->> 'dockerImageTag'", SQLDataType.VARCHAR).as("version");
return ctx.select(AIRBYTE_CONFIGS.CONFIG_ID, repoField, versionField)
Expand All @@ -373,7 +368,20 @@ private Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(DSLContext ct
.fetch().stream()
.collect(Collectors.toMap(
row -> row.getValue(repoField),
row -> new ConnectorInfo(row.getValue(AIRBYTE_CONFIGS.CONFIG_ID), row.getValue(versionField))));
row -> new ConnectorInfo(row.getValue(repoField), row.getValue(AIRBYTE_CONFIGS.CONFIG_ID), row.getValue(versionField)),
// when there are duplicated connector definitions, return the latest one
(c1, c2) -> {
AirbyteVersion v1 = new AirbyteVersion(c1.dockerImageTag);
AirbyteVersion v2 = new AirbyteVersion(c2.dockerImageTag);
LOGGER.warn("Duplicated connector version found for {}: {} ({}) vs {} ({})",
c1.dockerRepository, c1.dockerImageTag, c1.connectorDefinitionId, c2.dockerImageTag, c2.connectorDefinitionId);
int comparison = v1.patchVersionCompareTo(v2);
if (comparison >= 0) {
return c1;
} else {
return c2;
}
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ public void testNoUpdateForUsedConnector() throws Exception {

// create a sync to mark the destination as used
StandardSync s3Sync = new StandardSync()
.withConnectionId(UUID.randomUUID())
.withSourceId(SOURCE_GITHUB.getSourceDefinitionId())
.withDestinationId(destinationS3V2.getDestinationDefinitionId());
configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID.randomUUID().toString(), s3Sync);
configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, s3Sync.getConnectionId().toString(), s3Sync);

configPersistence.loadData(seedPersistence);
// s3 destination is not updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.DatabaseConfigPersistence.ConnectorInfo;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -120,4 +124,98 @@ public void testDumpConfigs() throws Exception {
assertSameConfigDump(expected, actual);
}

@Test
public void testGetConnectorRepositoryToInfoMap() throws Exception {
String connectorRepository = "airbyte/duplicated-connector";
String oldVersion = "0.1.10";
String newVersion = "0.2.0";
StandardSourceDefinition source1 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withDockerRepository(connectorRepository)
.withDockerImageTag(oldVersion);
StandardSourceDefinition source2 = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withDockerRepository(connectorRepository)
.withDockerImageTag(newVersion);
writeSource(configPersistence, source1);
writeSource(configPersistence, source2);
Map<String, ConnectorInfo> result = database.query(ctx -> configPersistence.getConnectorRepositoryToInfoMap(ctx));
// when there are duplicated connector definitions, the one with the latest version should be
// retrieved
assertEquals(newVersion, result.get(connectorRepository).dockerImageTag);
}

@Test
public void testInsertConfigRecord() throws Exception {
OffsetDateTime timestamp = OffsetDateTime.now();
UUID definitionId = UUID.randomUUID();
String connectorRepository = "airbyte/test-connector";

// when the record does not exist, it is inserted
StandardSourceDefinition source1 = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.1.2");
int insertionCount = database.query(ctx -> configPersistence.insertConfigRecord(
ctx,
timestamp,
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
Jsons.jsonNode(source1),
ConfigSchema.STANDARD_SOURCE_DEFINITION.getIdFieldName()));
assertEquals(1, insertionCount);
// write an irrelevant source to make sure that it is not changed
writeSource(configPersistence, SOURCE_GITHUB);
assertRecordCount(2);
assertHasSource(source1);
assertHasSource(SOURCE_GITHUB);

// when the record already exists, it is ignored
StandardSourceDefinition source2 = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.1.5");
insertionCount = database.query(ctx -> configPersistence.insertConfigRecord(
ctx,
timestamp,
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
Jsons.jsonNode(source2),
ConfigSchema.STANDARD_SOURCE_DEFINITION.getIdFieldName()));
assertEquals(0, insertionCount);
assertRecordCount(2);
assertHasSource(source1);
assertHasSource(SOURCE_GITHUB);
}

@Test
public void testUpdateConfigRecord() throws Exception {
OffsetDateTime timestamp = OffsetDateTime.now();
UUID definitionId = UUID.randomUUID();
String connectorRepository = "airbyte/test-connector";

StandardSourceDefinition oldSource = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.3.5");
writeSource(configPersistence, oldSource);
// write an irrelevant source to make sure that it is not changed
writeSource(configPersistence, SOURCE_GITHUB);
assertRecordCount(2);
assertHasSource(oldSource);
assertHasSource(SOURCE_GITHUB);

StandardSourceDefinition newSource = new StandardSourceDefinition()
.withSourceDefinitionId(definitionId)
.withDockerRepository(connectorRepository)
.withDockerImageTag("0.3.5");
database.query(ctx -> configPersistence.updateConfigRecord(
ctx,
timestamp,
ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
Jsons.jsonNode(newSource),
definitionId.toString()));
assertRecordCount(2);
assertHasSource(newSource);
assertHasSource(SOURCE_GITHUB);
}

}