diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index b190076e00a8..e7ed4da19105 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -599,6 +599,29 @@ paths: $ref: "#/components/responses/NotFoundResponse" "422": $ref: "#/components/responses/InvalidInputResponse" + /v1/sources/most_recent_source_actor_catalog: + post: + tags: + - source + summary: Get most recent ActorCatalog for source + operationId: getMostRecentSourceActorCatalog + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/SourceIdRequestBody" + required: true + responses: + "200": + description: Successful operation + content: + application/json: + schema: + $ref: "#/components/schemas/ActorCatalogWithUpdatedAt" + "404": + $ref: "#/components/responses/NotFoundResponse" + "422": + $ref: "#/components/responses/InvalidInputResponse" /v1/sources/search: post: tags: @@ -3692,6 +3715,16 @@ components: properties: logType: $ref: "#/components/schemas/LogType" + # ACTOR CATALOG + ActorCatalogWithUpdatedAt: + description: A source actor catalog with the timestamp it was mostly recently updated + type: object + properties: + updatedAt: + type: integer + format: int64 + catalog: + type: object # SCHEMA CATALOG AirbyteCatalog: description: describes the available schema (catalog). diff --git a/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithUpdatedAt.yaml b/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithUpdatedAt.yaml new file mode 100644 index 000000000000..812b7c14eef1 --- /dev/null +++ b/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithUpdatedAt.yaml @@ -0,0 +1,23 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +title: ActorCatalogWithUpdatedAt +description: Catalog of an actor with its most recent ActorCatalogFetchEvent created_at timestamp. +type: object +additionalProperties: false +required: + - id + - catalog + - catalogHash + - updatedAt +properties: + id: + type: string + format: uuid + catalog: + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + catalogHash: + type: string + updatedAt: + type: integer + format: int64 diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index cc896bfdb0ba..78c111a6bbf8 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -32,6 +32,7 @@ import io.airbyte.commons.version.Version; import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; +import io.airbyte.config.ActorCatalogWithUpdatedAt; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; @@ -1314,6 +1315,16 @@ public Optional getActorCatalog(final UUID actorId, return records.stream().findFirst().map(DbConverter::buildActorCatalog); } + public Optional getMostRecentSourceActorCatalog(final UUID sourceId) throws IOException { + final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk(), ACTOR_CATALOG_FETCH_EVENT.CREATED_AT) + .from(ACTOR_CATALOG) + .join(ACTOR_CATALOG_FETCH_EVENT) + .on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID)) + .where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId)) + .orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch()); + return records.stream().findFirst().map(DbConverter::buildActorCatalogWithUpdatedAt); + } + public Optional getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException { final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk()) .from(ACTOR_CATALOG) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java index e4b3fcb10031..41f1822bbd45 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java @@ -17,6 +17,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; +import io.airbyte.config.ActorCatalogWithUpdatedAt; import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; @@ -218,6 +219,14 @@ public static ActorCatalog buildActorCatalog(final Record record) { .withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH)); } + public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Record record) { + return new ActorCatalogWithUpdatedAt() + .withId(record.get(ACTOR_CATALOG.ID)) + .withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString())) + .withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH)) + .withUpdatedAt(record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC)); + } + public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) { return new ActorCatalogFetchEvent() .withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID)) diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java index 595087e0d198..d0bd6152325a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java @@ -5,6 +5,7 @@ package io.airbyte.server.apis; import io.airbyte.api.generated.SourceApi; +import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.SourceCloneRequestBody; import io.airbyte.api.model.generated.SourceCreate; @@ -66,6 +67,11 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) { return ApiHelper.execute(() -> sourceHandler.getSource(sourceIdRequestBody)); } + @Override + public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalog(final SourceIdRequestBody sourceIdRequestBody) { + return ApiHelper.execute(() -> sourceHandler.getMostRecentSourceActorCatalogWithUpdatedAt(sourceIdRequestBody)); + } + @Override public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) { return ApiHelper.execute(() -> sourceHandler.listSourcesForWorkspace(workspaceIdRequestBody)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index 6a24b5169c60..fcc9e44e609e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; +import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.SourceCloneConfiguration; import io.airbyte.api.model.generated.SourceCloneRequestBody; @@ -31,6 +32,7 @@ import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; @@ -132,6 +134,17 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) return buildSourceRead(sourceIdRequestBody.getSourceId()); } + public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalogWithUpdatedAt(final SourceIdRequestBody sourceIdRequestBody) + throws IOException { + Optional actorCatalog = + configRepository.getMostRecentSourceActorCatalog(sourceIdRequestBody.getSourceId()); + if (actorCatalog.isEmpty()) { + return new ActorCatalogWithUpdatedAt(); + } else { + return new ActorCatalogWithUpdatedAt().updatedAt(actorCatalog.get().getUpdatedAt()).catalog(actorCatalog.get().getCatalog()); + } + } + public SourceRead cloneSource(final SourceCloneRequestBody sourceCloneRequestBody) throws JsonValidationException, IOException, ConfigNotFoundException { // read source configuration from db diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index c81bfb97597c..0103d8f0f5fd 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -123,7 +123,6 @@ class WebBackendConnectionsHandlerTest { private SchedulerHandler schedulerHandler; private StateHandler stateHandler; private WebBackendConnectionsHandler wbHandler; - private SourceRead sourceRead; private ConnectionRead connectionRead; private ConnectionRead brokenConnectionRead; @@ -1090,6 +1089,7 @@ void testUpdateConnectionFixingBreakingSchemaChange() throws JsonValidationExcep new ConnectionRead().connectionId(expected.getConnectionId()).breakingChange(true).sourceId(sourceId)); final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of()); + when(configRepository.getMostRecentActorCatalogForSource(sourceId)).thenReturn(Optional.of(new ActorCatalog().withCatalog(Jsons.deserialize( "{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}")))); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff, catalogDiff); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java index c30631dcf698..d3ecbb1709b8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java @@ -8,14 +8,13 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.generated.SourceApi; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; +import io.airbyte.api.client.model.generated.SourceIdRequestBody; import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.config.ActorCatalogFetchEvent; -import io.airbyte.config.persistence.ConfigRepository; import jakarta.inject.Singleton; -import java.io.IOException; import java.time.OffsetDateTime; -import java.util.Optional; import java.util.UUID; import lombok.extern.slf4j.Slf4j; @@ -23,15 +22,11 @@ @Singleton public class RefreshSchemaActivityImpl implements RefreshSchemaActivity { - private final Optional configRepository; - private final SourceApi sourceApi; private final EnvVariableFeatureFlags envVariableFeatureFlags; - public RefreshSchemaActivityImpl(Optional configRepository, - SourceApi sourceApi, + public RefreshSchemaActivityImpl(SourceApi sourceApi, EnvVariableFeatureFlags envVariableFeatureFlags) { - this.configRepository = configRepository; this.sourceApi = sourceApi; this.envVariableFeatureFlags = envVariableFeatureFlags; } @@ -39,8 +34,7 @@ public RefreshSchemaActivityImpl(Optional configRepository, @Override @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) public boolean shouldRefreshSchema(UUID sourceCatalogId) { - // if job persistence is unavailable, default to skipping the schema refresh - if (configRepository.isEmpty() || !envVariableFeatureFlags.autoDetectSchema()) { + if (!envVariableFeatureFlags.autoDetectSchema()) { return false; } @@ -66,12 +60,13 @@ public void refreshSchema(UUID sourceCatalogId, UUID connectionId) { private boolean schemaRefreshRanRecently(UUID sourceCatalogId) { try { - Optional mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId); - if (mostRecentFetchEvent.isEmpty()) { + SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceCatalogId); + ActorCatalogWithUpdatedAt mostRecentFetchEvent = sourceApi.getMostRecentSourceActorCatalog(sourceIdRequestBody); + if (mostRecentFetchEvent.getUpdatedAt() == null) { return false; } - return mostRecentFetchEvent.get().getCreatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond(); - } catch (IOException e) { + return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond(); + } catch (ApiException e) { // catching this exception because we don't want to block replication due to a failed schema refresh log.info("Encountered an error fetching most recent actor catalog fetch event: ", e); return true; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java index d120bf8c7f45..e2168db11f17 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.scheduling.activities; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -11,14 +12,11 @@ import io.airbyte.api.client.generated.SourceApi; import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.config.ActorCatalogFetchEvent; -import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl; -import java.io.IOException; import java.time.OffsetDateTime; -import java.util.Optional; import java.util.UUID; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -29,7 +27,6 @@ @ExtendWith(MockitoExtension.class) class RefreshSchemaActivityTest { - static private ConfigRepository mConfigRepository; static private SourceApi mSourceApi; static private EnvVariableFeatureFlags mEnvVariableFeatureFlags; @@ -40,32 +37,31 @@ class RefreshSchemaActivityTest { @BeforeEach void setUp() { mSourceApi = mock(SourceApi.class); - mConfigRepository = mock(ConfigRepository.class); mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class); mSourceApi = mock(SourceApi.class); when(mEnvVariableFeatureFlags.autoDetectSchema()).thenReturn(true); - refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi, mEnvVariableFeatureFlags); + refreshSchemaActivity = new RefreshSchemaActivityImpl(mSourceApi, mEnvVariableFeatureFlags); } @Test - void testShouldRefreshSchemaNoRecentRefresh() throws IOException { - when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.empty()); + void testShouldRefreshSchemaNoRecentRefresh() throws ApiException { + when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(new ActorCatalogWithUpdatedAt()); Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } @Test - void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws IOException { + void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws ApiException { Long twoDaysAgo = OffsetDateTime.now().minusHours(48l).toEpochSecond(); - ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twoDaysAgo); - when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent)); + ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twoDaysAgo); + when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt); Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } @Test - void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException { + void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws ApiException { Long twelveHoursAgo = OffsetDateTime.now().minusHours(12l).toEpochSecond(); - ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twelveHoursAgo); - when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent)); + ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twelveHoursAgo); + when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt); Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index cef598e12e04..f9bb724fb0ec 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -319,6 +319,7 @@

Source

  • post /v1/sources/create
  • post /v1/sources/delete
  • post /v1/sources/discover_schema
  • +
  • post /v1/sources/most_recent_source_actor_catalog
  • post /v1/sources/get
  • post /v1/sources/list
  • post /v1/sources/search
  • @@ -6498,6 +6499,65 @@

    422

    InvalidInputExceptionInfo
    +
    +
    + Up +
    post /v1/sources/most_recent_source_actor_catalog
    +
    Get most recent ActorCatalog for source (getMostRecentSourceActorCatalog)
    +
    + + +

    Consumes

    + This API call consumes the following media types via the Content-Type request header: +
      +
    • application/json
    • +
    + +

    Request body

    +
    +
    SourceIdRequestBody SourceIdRequestBody (required)
    + +
    Body Parameter
    + +
    + + + + +

    Return type

    + + + + +

    Example data

    +
    Content-Type: application/json
    +
    {
    +  "catalog" : "{}",
    +  "updatedAt" : 0
    +}
    + +

    Produces

    + This API call produces the following media types according to the Accept request header; + the media type will be conveyed by the Content-Type response header. +
      +
    • application/json
    • +
    + +

    Responses

    +

    200

    + Successful operation + ActorCatalogWithUpdatedAt +

    404

    + Object with given id was not found. + NotFoundKnownExceptionInfo +

    422

    + Input failed validation + InvalidInputExceptionInfo +
    +
    Up @@ -10016,6 +10076,7 @@

    Models

    Table of Contents

      +
    1. ActorCatalogWithUpdatedAt -
    2. ActorDefinitionResourceRequirements -
    3. AdvancedAuth -
    4. AirbyteCatalog -
    5. @@ -10196,6 +10257,14 @@

      Table of Contents

    6. WorkspaceUpdateName -
    +
    +

    ActorCatalogWithUpdatedAt - Up

    +
    A source actor catalog with the timestamp it was mostly recently updated
    +
    +
    updatedAt (optional)
    Long format: int64
    +
    catalog (optional)
    +
    +

    ActorDefinitionResourceRequirements - Up

    actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level.