Skip to content

Commit

Permalink
Remove the use of ConfigPersistence for ActorCatalog operation (#10387)
Browse files Browse the repository at this point in the history
* Skip ConfigPersistence for ActorCatalog operations

* Fix catalog insertion logic

- ActorCatalog and ActorCatalogFetchEvent are stored within the same
  transation.
- The function writing catalog now automatically handles deduplication.
- Fixed function visibility: helper function to handle ActorCatalog
  insertion are now private.

* Fix fetch catalog query

take the catalog associated with the latest fetch event in case where
multiple event are present for the same config, actorId, actor version.

* Fix name of columns used for insert

* Add testing on deduplication of catalogs

* Add javadoc for actor catalog functions

* Rename sourceId to actorId

* Fix formatting
  • Loading branch information
malikdiarra authored and etsybaev committed Mar 5, 2022
1 parent 577a1b9 commit dc58c2e
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG_FETCH_EVENT;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -41,6 +43,7 @@
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -53,6 +56,12 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.DSLContext;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.Record2;
import org.jooq.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -627,35 +636,111 @@ public List<ActorCatalog> listActorCatalogs()
return actorCatalogs;
}

public void writeCatalog(final AirbyteCatalog catalog,
final UUID sourceId,
final String configurationHash,
final String connectorVersion)
throws JsonValidationException, IOException {
private Map<UUID, AirbyteCatalog> findCatalogByHash(final String catalogHash, final DSLContext context) {
final Result<Record2<UUID, JSONB>> records = context.select(ACTOR_CATALOG.ID, ACTOR_CATALOG.CATALOG)
.from(ACTOR_CATALOG)
.where(ACTOR_CATALOG.CATALOG_HASH.eq(catalogHash)).fetch();

final Map<UUID, AirbyteCatalog> result = new HashMap<>();
for (final Record record : records) {
final AirbyteCatalog catalog = Jsons.deserialize(
record.get(ACTOR_CATALOG.CATALOG).toString(), AirbyteCatalog.class);
result.put(record.get(ACTOR_CATALOG.ID), catalog);
}
return result;
}

/**
* Store an Airbyte catalog in DB if it is not present already
*
* Checks in the config DB if the catalog is present already, if so returns it identifier. It is not
* present, it is inserted in DB with a new identifier and that identifier is returned.
*
* @param airbyteCatalog An Airbyte catalog to cache
* @param context
* @return the db identifier for the cached catalog.
*/
private UUID getOrInsertActorCatalog(final AirbyteCatalog airbyteCatalog,
final DSLContext context) {
final OffsetDateTime timestamp = OffsetDateTime.now();
final HashFunction hashFunction = Hashing.murmur3_32_fixed();
final String catalogHash = hashFunction.hashBytes(Jsons.serialize(catalog).getBytes(
final String catalogHash = hashFunction.hashBytes(Jsons.serialize(airbyteCatalog).getBytes(
Charsets.UTF_8)).toString();
ActorCatalog actorCatalog = new ActorCatalog()
.withCatalog(Jsons.jsonNode(catalog))
.withId(UUID.randomUUID())
.withCatalogHash(catalogHash);
final Optional<ActorCatalog> existingCatalog = findExistingCatalog(actorCatalog);
if (existingCatalog.isPresent()) {
actorCatalog = existingCatalog.get();
} else {
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG,
actorCatalog.getId().toString(),
actorCatalog);
}
final ActorCatalogFetchEvent actorCatalogFetchEvent = new ActorCatalogFetchEvent()
.withActorCatalogId(actorCatalog.getId())
.withId(UUID.randomUUID())
.withConfigHash(configurationHash)
.withConnectorVersion(connectorVersion)
.withActorId(sourceId);
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
actorCatalogFetchEvent.getId().toString(),
actorCatalogFetchEvent);
final Map<UUID, AirbyteCatalog> catalogs = findCatalogByHash(catalogHash, context);

for (final Map.Entry<UUID, AirbyteCatalog> entry : catalogs.entrySet()) {
if (entry.getValue().equals(airbyteCatalog)) {
return entry.getKey();
}
}

final UUID catalogId = UUID.randomUUID();
context.insertInto(ACTOR_CATALOG)
.set(ACTOR_CATALOG.ID, catalogId)
.set(ACTOR_CATALOG.CATALOG, JSONB.valueOf(Jsons.serialize(airbyteCatalog)))
.set(ACTOR_CATALOG.CATALOG_HASH, catalogHash)
.set(ACTOR_CATALOG.CREATED_AT, timestamp)
.set(ACTOR_CATALOG.MODIFIED_AT, timestamp).execute();
return catalogId;
}

public Optional<AirbyteCatalog> getActorCatalog(final UUID actorId,
final String actorVersion,
final String configHash)
throws IOException {
final Result<Record1<JSONB>> records = database.transaction(ctx -> ctx.select(ACTOR_CATALOG.CATALOG)
.from(ACTOR_CATALOG).join(ACTOR_CATALOG_FETCH_EVENT)
.on(ACTOR_CATALOG.ID.eq(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID))
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(actorId))
.and(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION.eq(actorVersion))
.and(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH.eq(configHash))
.orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1)).fetch();

if (records.size() >= 1) {
final JSONB record = records.get(0).get(ACTOR_CATALOG.CATALOG);
return Optional.of(Jsons.deserialize(record.toString(), AirbyteCatalog.class));
}
return Optional.empty();

}

/**
* Stores source catalog information.
*
* This function is called each time the schema of a source is fetched. This can occur because the
* source is set up for the first time, because the configuration or version of the connector
* changed or because the user explicitly requested a schema refresh. Schemas are stored separately
* and de-duplicated upon insertion. Once a schema has been successfully stored, a call to
* getActorCatalog(actorId, connectionVersion, configurationHash) will return the most recent schema
* stored for those parameters.
*
* @param catalog
* @param actorId
* @param connectorVersion
* @param configurationHash
* @return The identifier (UUID) of the fetch event inserted in the database
* @throws IOException
*/
public UUID writeActorCatalogFetchEvent(final AirbyteCatalog catalog,
final UUID actorId,
final String connectorVersion,
final String configurationHash)
throws IOException {
final OffsetDateTime timestamp = OffsetDateTime.now();
final UUID fetchEventID = UUID.randomUUID();
database.transaction(ctx -> {
final UUID catalogId = getOrInsertActorCatalog(catalog, ctx);
return ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT)
.set(ACTOR_CATALOG_FETCH_EVENT.ID, fetchEventID)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID, actorId)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID, catalogId)
.set(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH, configurationHash)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION, connectorVersion)
.set(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT, timestamp)
.set(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, timestamp).execute();
});

return fetchEventID;
}

public int countConnectionsForWorkspace(final UUID workspaceId) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.spy;

Expand All @@ -12,18 +13,25 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.SourceType;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -97,4 +105,54 @@ void testWorkspaceCountConnections() throws IOException {
assertEquals(MockData.sourceConnections().size(), configRepository.countSourcesForWorkspace(workspaceId));
}

@Test
void testSimpleInsertActorCatalog() throws IOException, JsonValidationException, SQLException {

final StandardWorkspace workspace = MockData.standardWorkspace();

final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withSourceType(SourceType.DATABASE)
.withDockerRepository("docker-repo")
.withDockerImageTag("1.2.0")
.withName("sourceDefinition");
configRepository.writeStandardSourceDefinition(sourceDefinition);

final SourceConnection source = new SourceConnection()
.withSourceDefinitionId(sourceDefinition.getSourceDefinitionId())
.withSourceId(UUID.randomUUID())
.withName("SomeConnector")
.withWorkspaceId(workspace.getWorkspaceId())
.withConfiguration(Jsons.deserialize("{}"));
final ConnectorSpecification specification = new ConnectorSpecification()
.withConnectionSpecification(Jsons.deserialize("{}"));
configRepository.writeSourceConnection(source, specification);

final AirbyteCatalog actorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING));
configRepository.writeActorCatalogFetchEvent(
actorCatalog, source.getSourceId(), "1.2.0", "ConfigHash");

final Optional<AirbyteCatalog> catalog =
configRepository.getActorCatalog(source.getSourceId(), "1.2.0", "ConfigHash");
assertTrue(catalog.isPresent());
assertEquals(actorCatalog, catalog.get());
assertFalse(configRepository.getSourceCatalog(source.getSourceId(), "1.3.0", "ConfigHash").isPresent());
assertFalse(configRepository.getSourceCatalog(source.getSourceId(), "1.2.0", "OtherConfigHash").isPresent());

configRepository.writeActorCatalogFetchEvent(actorCatalog, source.getSourceId(), "1.3.0", "ConfigHash");
final Optional<AirbyteCatalog> catalogNewConnectorVersion =
configRepository.getActorCatalog(source.getSourceId(), "1.3.0", "ConfigHash");
assertTrue(catalogNewConnectorVersion.isPresent());
assertEquals(actorCatalog, catalogNewConnectorVersion.get());

configRepository.writeActorCatalogFetchEvent(actorCatalog, source.getSourceId(), "1.2.0", "OtherConfigHash");
final Optional<AirbyteCatalog> catalogNewConfig =
configRepository.getActorCatalog(source.getSourceId(), "1.2.0", "OtherConfigHash");
assertTrue(catalogNewConfig.isPresent());
assertEquals(actorCatalog, catalogNewConfig.get());

final int catalogDbEntry = database.query(ctx -> ctx.selectCount().from(ACTOR_CATALOG)).fetchOne().into(int.class);
assertEquals(1, catalogDbEntry);
}

}

0 comments on commit dc58c2e

Please sign in to comment.