From 1b93653f4eeccf350793c024522d07281eeee940 Mon Sep 17 00:00:00 2001 From: alovew Date: Wed, 21 Dec 2022 12:36:36 -0600 Subject: [PATCH 1/5] add actorCatalogWithCreatedAt --- airbyte-api/src/main/openapi/config.yaml | 33 +++++++++ .../types/ActorCatalogWithCreatedAt.yaml | 23 +++++++ .../config/persistence/ConfigRepository.java | 7 +- .../config/persistence/DbConverter.java | 9 +++ .../server/apis/SourceApiController.java | 6 ++ .../server/handlers/SourceHandler.java | 12 ++++ .../sync/RefreshSchemaActivityImpl.java | 13 ++-- .../activities/RefreshSchemaActivityTest.java | 14 ++-- .../api/generated-api-html/index.html | 69 +++++++++++++++++++ 9 files changed, 169 insertions(+), 17 deletions(-) create mode 100644 airbyte-config/config-models/src/main/resources/types/ActorCatalogWithCreatedAt.yaml diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index b190076e00a8..f0f9c8fa9975 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -599,6 +599,29 @@ paths: $ref: "#/components/responses/NotFoundResponse" "422": $ref: "#/components/responses/InvalidInputResponse" + /v1/sources/most_recent_source_actor_catalog: + post: + tags: + - source + summary: Get most recent ActorCatalog for source + operationId: getMostRecentSourceActorCatalog + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/SourceIdRequestBody" + required: true + responses: + "200": + description: Successful operation + content: + application/json: + schema: + $ref: "#/components/schemas/ActorCatalog" + "404": + $ref: "#/components/responses/NotFoundResponse" + "422": + $ref: "#/components/responses/InvalidInputResponse" /v1/sources/search: post: tags: @@ -3692,6 +3715,16 @@ components: properties: logType: $ref: "#/components/schemas/LogType" + # ACTOR CATALOG + ActorCatalog: + description: describes a source ActorCatalog + type: object + properties: + updatedAt: + type: integer + format: int64 + catalog: + type: object # SCHEMA CATALOG AirbyteCatalog: description: describes the available schema (catalog). diff --git a/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithCreatedAt.yaml b/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithCreatedAt.yaml new file mode 100644 index 000000000000..885ffa75e061 --- /dev/null +++ b/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithCreatedAt.yaml @@ -0,0 +1,23 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +title: ActorCatalogWithCreatedAt +description: Catalog of an actor with its most recent ActorCatalogFetchEvent created_at timestamp. +type: object +additionalProperties: false +required: + - id + - catalog + - catalogHash + - createdAt +properties: + id: + type: string + format: uuid + catalog: + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + catalogHash: + type: string + createdAt: + type: integer + format: int64 diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index cc896bfdb0ba..f338439d7701 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -32,6 +32,7 @@ import io.airbyte.commons.version.Version; import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; +import io.airbyte.config.ActorCatalogWithCreatedAt; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; @@ -1314,14 +1315,14 @@ public Optional getActorCatalog(final UUID actorId, return records.stream().findFirst().map(DbConverter::buildActorCatalog); } - public Optional getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException { - final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk()) + public Optional getMostRecentSourceActorCatalog(final UUID sourceId) throws IOException { + final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk(), ACTOR_CATALOG_FETCH_EVENT.CREATED_AT) .from(ACTOR_CATALOG) .join(ACTOR_CATALOG_FETCH_EVENT) .on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID)) .where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId)) .orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch()); - return records.stream().findFirst().map(DbConverter::buildActorCatalog); + return records.stream().findFirst().map(DbConverter::buildActorCatalogWithCreatedAt); } public Optional getMostRecentActorCatalogFetchEventForSource(final UUID sourceId) throws IOException { diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java index e4b3fcb10031..14236f945006 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java @@ -17,6 +17,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; +import io.airbyte.config.ActorCatalogWithCreatedAt; import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; @@ -218,6 +219,14 @@ public static ActorCatalog buildActorCatalog(final Record record) { .withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH)); } + public static ActorCatalogWithCreatedAt buildActorCatalogWithCreatedAt(final Record record) { + return new ActorCatalogWithCreatedAt() + .withId(record.get(ACTOR_CATALOG.ID)) + .withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString())) + .withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH)) + .withCreatedAt(record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC)); + } + public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) { return new ActorCatalogFetchEvent() .withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID)) diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java index 595087e0d198..fca860690c60 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java @@ -5,6 +5,7 @@ package io.airbyte.server.apis; import io.airbyte.api.generated.SourceApi; +import io.airbyte.api.model.generated.ActorCatalog; import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.SourceCloneRequestBody; import io.airbyte.api.model.generated.SourceCreate; @@ -66,6 +67,11 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) { return ApiHelper.execute(() -> sourceHandler.getSource(sourceIdRequestBody)); } + @Override + public Optional getMostRecentSourceActorCatalog(final SourceIdRequestBody sourceIdRequestBody) { + return ApiHelper.execute(() -> sourceHandler.getMostRecentSourceActorCatalog(sourceIdRequestBody)); + } + @Override public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) { return ApiHelper.execute(() -> sourceHandler.listSourcesForWorkspace(workspaceIdRequestBody)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index 6a24b5169c60..0613398155a6 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; +import io.airbyte.api.model.generated.ActorCatalog; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.SourceCloneConfiguration; import io.airbyte.api.model.generated.SourceCloneRequestBody; @@ -31,6 +32,7 @@ import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; @@ -132,6 +134,16 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) return buildSourceRead(sourceIdRequestBody.getSourceId()); } + public Optional getMostRecentSourceActorCatalog(final SourceIdRequestBody sourceIdRequestBody) + throws IOException { + Optional actorCatalog = configRepository.getMostRecentSourceActorCatalog(sourceIdRequestBody.getSourceId()); + if(!actorCatalog.isPresent()) { + return Optional.empty(); + } else { + return new ActorCatalog().catalog(actorCatalog.get().getCatalog()).updatedAt(actorCatalog.get().getCreatedAt()); + } + } + public SourceRead cloneSource(final SourceCloneRequestBody sourceCloneRequestBody) throws JsonValidationException, IOException, ConfigNotFoundException { // read source configuration from db diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java index c30631dcf698..bb4c9e2c44d6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java @@ -7,11 +7,11 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; +import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.generated.SourceApi; import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.config.ActorCatalogFetchEvent; -import io.airbyte.config.persistence.ConfigRepository; import jakarta.inject.Singleton; import java.io.IOException; import java.time.OffsetDateTime; @@ -23,15 +23,15 @@ @Singleton public class RefreshSchemaActivityImpl implements RefreshSchemaActivity { - private final Optional configRepository; + private final ConnectionApi connectionApi; private final SourceApi sourceApi; private final EnvVariableFeatureFlags envVariableFeatureFlags; - public RefreshSchemaActivityImpl(Optional configRepository, + public RefreshSchemaActivityImpl(ConnectionApi connectionApi, SourceApi sourceApi, EnvVariableFeatureFlags envVariableFeatureFlags) { - this.configRepository = configRepository; + this.connectionApi = connectionApi; this.sourceApi = sourceApi; this.envVariableFeatureFlags = envVariableFeatureFlags; } @@ -39,8 +39,7 @@ public RefreshSchemaActivityImpl(Optional configRepository, @Override @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) public boolean shouldRefreshSchema(UUID sourceCatalogId) { - // if job persistence is unavailable, default to skipping the schema refresh - if (configRepository.isEmpty() || !envVariableFeatureFlags.autoDetectSchema()) { + if (!envVariableFeatureFlags.autoDetectSchema()) { return false; } @@ -66,7 +65,7 @@ public void refreshSchema(UUID sourceCatalogId, UUID connectionId) { private boolean schemaRefreshRanRecently(UUID sourceCatalogId) { try { - Optional mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId); + Optional mostRecentFetchEvent = Optional.empty(); if (mostRecentFetchEvent.isEmpty()) { return false; } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java index d120bf8c7f45..59ae471ab511 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java @@ -9,12 +9,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.generated.SourceApi; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.config.ActorCatalogFetchEvent; -import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl; import java.io.IOException; import java.time.OffsetDateTime; @@ -29,7 +29,7 @@ @ExtendWith(MockitoExtension.class) class RefreshSchemaActivityTest { - static private ConfigRepository mConfigRepository; + static private ConnectionApi mConnectionApi; static private SourceApi mSourceApi; static private EnvVariableFeatureFlags mEnvVariableFeatureFlags; @@ -40,16 +40,16 @@ class RefreshSchemaActivityTest { @BeforeEach void setUp() { mSourceApi = mock(SourceApi.class); - mConfigRepository = mock(ConfigRepository.class); + mConnectionApi = mock(ConnectionApi.class); mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class); mSourceApi = mock(SourceApi.class); when(mEnvVariableFeatureFlags.autoDetectSchema()).thenReturn(true); - refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi, mEnvVariableFeatureFlags); + refreshSchemaActivity = new RefreshSchemaActivityImpl(mConnectionApi, mSourceApi, mEnvVariableFeatureFlags); } @Test void testShouldRefreshSchemaNoRecentRefresh() throws IOException { - when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.empty()); + when(mConnectionApi.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.empty()); Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } @@ -57,7 +57,7 @@ void testShouldRefreshSchemaNoRecentRefresh() throws IOException { void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws IOException { Long twoDaysAgo = OffsetDateTime.now().minusHours(48l).toEpochSecond(); ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twoDaysAgo); - when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent)); + when(mConnectionApi.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent)); Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } @@ -65,7 +65,7 @@ void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws IOException { void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException { Long twelveHoursAgo = OffsetDateTime.now().minusHours(12l).toEpochSecond(); ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twelveHoursAgo); - when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent)); + when(mConnectionApi.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent)); Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index cef598e12e04..3f62b3a69d31 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -319,6 +319,7 @@

Source

  • post /v1/sources/create
  • post /v1/sources/delete
  • post /v1/sources/discover_schema
  • +
  • post /v1/sources/most_recent_source_actor_catalog
  • post /v1/sources/get
  • post /v1/sources/list
  • post /v1/sources/search
  • @@ -6498,6 +6499,65 @@

    422

    InvalidInputExceptionInfo
    +
    +
    + Up +
    post /v1/sources/most_recent_source_actor_catalog
    +
    Get most recent ActorCatalog for source (getMostRecentSourceActorCatalog)
    +
    + + +

    Consumes

    + This API call consumes the following media types via the Content-Type request header: +
      +
    • application/json
    • +
    + +

    Request body

    +
    +
    SourceIdRequestBody SourceIdRequestBody (required)
    + +
    Body Parameter
    + +
    + + + + +

    Return type

    +
    + ActorCatalog + +
    + + + +

    Example data

    +
    Content-Type: application/json
    +
    {
    +  "catalog" : "{}",
    +  "updatedAt" : 0
    +}
    + +

    Produces

    + This API call produces the following media types according to the Accept request header; + the media type will be conveyed by the Content-Type response header. +
      +
    • application/json
    • +
    + +

    Responses

    +

    200

    + Successful operation + ActorCatalog +

    404

    + Object with given id was not found. + NotFoundKnownExceptionInfo +

    422

    + Input failed validation + InvalidInputExceptionInfo +
    +
    Up @@ -10016,6 +10076,7 @@

    Models

    Table of Contents

      +
    1. ActorCatalog -
    2. ActorDefinitionResourceRequirements -
    3. AdvancedAuth -
    4. AirbyteCatalog -
    5. @@ -10196,6 +10257,14 @@

      Table of Contents

    6. WorkspaceUpdateName -
    +
    +

    ActorCatalog - Up

    +
    describes a source ActorCatalog
    +
    +
    updatedAt (optional)
    Long format: int64
    +
    catalog (optional)
    +
    +

    ActorDefinitionResourceRequirements - Up

    actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level.
    From 43a2d903d5d4008262dbbe3e5a15d0ae6696abcd Mon Sep 17 00:00:00 2001 From: alovew Date: Wed, 28 Dec 2022 11:44:17 -0700 Subject: [PATCH 2/5] use actorCatalogWithUpdatedAt --- airbyte-api/src/main/openapi/config.yaml | 6 ++--- ...At.yaml => ActorCatalogWithUpdatedAt.yaml} | 6 ++--- .../config/persistence/ConfigRepository.java | 6 ++--- .../config/persistence/DbConverter.java | 8 +++--- .../server/apis/SourceApiController.java | 6 ++--- .../server/handlers/SourceHandler.java | 13 +++++----- .../WebBackendConnectionsHandler.java | 10 ++++--- .../WebBackendConnectionsHandlerTest.java | 10 ++++--- .../sync/RefreshSchemaActivityImpl.java | 22 +++++++--------- .../activities/RefreshSchemaActivityTest.java | 26 ++++++++----------- .../api/generated-api-html/index.html | 10 +++---- 11 files changed, 61 insertions(+), 62 deletions(-) rename airbyte-config/config-models/src/main/resources/types/{ActorCatalogWithCreatedAt.yaml => ActorCatalogWithUpdatedAt.yaml} (88%) diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index f0f9c8fa9975..e7ed4da19105 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -617,7 +617,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/ActorCatalog" + $ref: "#/components/schemas/ActorCatalogWithUpdatedAt" "404": $ref: "#/components/responses/NotFoundResponse" "422": @@ -3716,8 +3716,8 @@ components: logType: $ref: "#/components/schemas/LogType" # ACTOR CATALOG - ActorCatalog: - description: describes a source ActorCatalog + ActorCatalogWithUpdatedAt: + description: A source actor catalog with the timestamp it was mostly recently updated type: object properties: updatedAt: diff --git a/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithCreatedAt.yaml b/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithUpdatedAt.yaml similarity index 88% rename from airbyte-config/config-models/src/main/resources/types/ActorCatalogWithCreatedAt.yaml rename to airbyte-config/config-models/src/main/resources/types/ActorCatalogWithUpdatedAt.yaml index 885ffa75e061..812b7c14eef1 100644 --- a/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithCreatedAt.yaml +++ b/airbyte-config/config-models/src/main/resources/types/ActorCatalogWithUpdatedAt.yaml @@ -1,6 +1,6 @@ --- "$schema": http://json-schema.org/draft-07/schema# -title: ActorCatalogWithCreatedAt +title: ActorCatalogWithUpdatedAt description: Catalog of an actor with its most recent ActorCatalogFetchEvent created_at timestamp. type: object additionalProperties: false @@ -8,7 +8,7 @@ required: - id - catalog - catalogHash - - createdAt + - updatedAt properties: id: type: string @@ -18,6 +18,6 @@ properties: existingJavaType: com.fasterxml.jackson.databind.JsonNode catalogHash: type: string - createdAt: + updatedAt: type: integer format: int64 diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index f338439d7701..f63c7f890b7f 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -32,7 +32,7 @@ import io.airbyte.commons.version.Version; import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; -import io.airbyte.config.ActorCatalogWithCreatedAt; +import io.airbyte.config.ActorCatalogWithUpdatedAt; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; @@ -1315,14 +1315,14 @@ public Optional getActorCatalog(final UUID actorId, return records.stream().findFirst().map(DbConverter::buildActorCatalog); } - public Optional getMostRecentSourceActorCatalog(final UUID sourceId) throws IOException { + public Optional getMostRecentSourceActorCatalog(final UUID sourceId) throws IOException { final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk(), ACTOR_CATALOG_FETCH_EVENT.CREATED_AT) .from(ACTOR_CATALOG) .join(ACTOR_CATALOG_FETCH_EVENT) .on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID)) .where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId)) .orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch()); - return records.stream().findFirst().map(DbConverter::buildActorCatalogWithCreatedAt); + return records.stream().findFirst().map(DbConverter::buildActorCatalogWithUpdatedAt); } public Optional getMostRecentActorCatalogFetchEventForSource(final UUID sourceId) throws IOException { diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java index 14236f945006..41f1822bbd45 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java @@ -17,7 +17,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; -import io.airbyte.config.ActorCatalogWithCreatedAt; +import io.airbyte.config.ActorCatalogWithUpdatedAt; import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; @@ -219,12 +219,12 @@ public static ActorCatalog buildActorCatalog(final Record record) { .withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH)); } - public static ActorCatalogWithCreatedAt buildActorCatalogWithCreatedAt(final Record record) { - return new ActorCatalogWithCreatedAt() + public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Record record) { + return new ActorCatalogWithUpdatedAt() .withId(record.get(ACTOR_CATALOG.ID)) .withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString())) .withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH)) - .withCreatedAt(record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC)); + .withUpdatedAt(record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC)); } public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java index fca860690c60..d0bd6152325a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java @@ -5,7 +5,7 @@ package io.airbyte.server.apis; import io.airbyte.api.generated.SourceApi; -import io.airbyte.api.model.generated.ActorCatalog; +import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.SourceCloneRequestBody; import io.airbyte.api.model.generated.SourceCreate; @@ -68,8 +68,8 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) { } @Override - public Optional getMostRecentSourceActorCatalog(final SourceIdRequestBody sourceIdRequestBody) { - return ApiHelper.execute(() -> sourceHandler.getMostRecentSourceActorCatalog(sourceIdRequestBody)); + public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalog(final SourceIdRequestBody sourceIdRequestBody) { + return ApiHelper.execute(() -> sourceHandler.getMostRecentSourceActorCatalogWithUpdatedAt(sourceIdRequestBody)); } @Override diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index 0613398155a6..fcc9e44e609e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; -import io.airbyte.api.model.generated.ActorCatalog; +import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.SourceCloneConfiguration; import io.airbyte.api.model.generated.SourceCloneRequestBody; @@ -134,13 +134,14 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) return buildSourceRead(sourceIdRequestBody.getSourceId()); } - public Optional getMostRecentSourceActorCatalog(final SourceIdRequestBody sourceIdRequestBody) + public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalogWithUpdatedAt(final SourceIdRequestBody sourceIdRequestBody) throws IOException { - Optional actorCatalog = configRepository.getMostRecentSourceActorCatalog(sourceIdRequestBody.getSourceId()); - if(!actorCatalog.isPresent()) { - return Optional.empty(); + Optional actorCatalog = + configRepository.getMostRecentSourceActorCatalog(sourceIdRequestBody.getSourceId()); + if (actorCatalog.isEmpty()) { + return new ActorCatalogWithUpdatedAt(); } else { - return new ActorCatalog().catalog(actorCatalog.get().getCatalog()).updatedAt(actorCatalog.get().getCreatedAt()); + return new ActorCatalogWithUpdatedAt().updatedAt(actorCatalog.get().getUpdatedAt()).catalog(actorCatalog.get().getCatalog()); } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 362afe51c35b..ce7f39d5da1b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -6,8 +6,10 @@ import static java.util.stream.Collectors.toMap; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.AirbyteCatalog; import io.airbyte.api.model.generated.AirbyteStream; import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration; @@ -49,7 +51,6 @@ import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.MoreBooleans; -import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.StandardSync; import io.airbyte.config.persistence.ConfigNotFoundException; @@ -522,12 +523,13 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne if (webBackendConnectionPatch.getSyncCatalog() != null) { // Get the most recent actor catalog fetched for this connection's source and the newly updated sync // catalog - Optional mostRecentActorCatalog = configRepository.getMostRecentActorCatalogForSource(originalConnectionRead.getSourceId()); + SourceIdRequestBody requestBody = new SourceIdRequestBody().sourceId(originalConnectionRead.getSourceId()); + ActorCatalogWithUpdatedAt mostRecentActorCatalog = sourceHandler.getMostRecentSourceActorCatalogWithUpdatedAt(requestBody); AirbyteCatalog newAirbyteCatalog = webBackendConnectionPatch.getSyncCatalog(); // Get the diff between these two catalogs to check for breaking changes - if (mostRecentActorCatalog.isPresent()) { + if (mostRecentActorCatalog.getCatalog() != null) { final io.airbyte.protocol.models.AirbyteCatalog mostRecentAirbyteCatalog = - Jsons.object(mostRecentActorCatalog.get().getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); + Jsons.object((JsonNode) mostRecentActorCatalog.getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); final CatalogDiff catalogDiff = connectionsHandler.getDiff(newAirbyteCatalog, CatalogConverter.toApi(mostRecentAirbyteCatalog), CatalogConverter.toProtocol(newAirbyteCatalog)); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index c81bfb97597c..9783086cdf2a 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; +import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.AirbyteCatalog; import io.airbyte.api.model.generated.AirbyteStream; import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration; @@ -124,6 +125,8 @@ class WebBackendConnectionsHandlerTest { private StateHandler stateHandler; private WebBackendConnectionsHandler wbHandler; + private SourceHandler sourceHandler; + private SourceRead sourceRead; private ConnectionRead connectionRead; private ConnectionRead brokenConnectionRead; @@ -155,7 +158,7 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio connectionsHandler = mock(ConnectionsHandler.class); stateHandler = mock(StateHandler.class); operationsHandler = mock(OperationsHandler.class); - final SourceHandler sourceHandler = mock(SourceHandler.class); + sourceHandler = mock(SourceHandler.class); final DestinationHandler destinationHandler = mock(DestinationHandler.class); final JobHistoryHandler jobHistoryHandler = mock(JobHistoryHandler.class); configRepository = mock(ConfigRepository.class); @@ -1090,8 +1093,9 @@ void testUpdateConnectionFixingBreakingSchemaChange() throws JsonValidationExcep new ConnectionRead().connectionId(expected.getConnectionId()).breakingChange(true).sourceId(sourceId)); final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of()); - when(configRepository.getMostRecentActorCatalogForSource(sourceId)).thenReturn(Optional.of(new ActorCatalog().withCatalog(Jsons.deserialize( - "{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}")))); + + when(sourceHandler.getMostRecentSourceActorCatalogWithUpdatedAt(any())).thenReturn(new ActorCatalogWithUpdatedAt().catalog(Jsons.deserialize( + "{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}"))); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff, catalogDiff); when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId())) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java index bb4c9e2c44d6..d3ecbb1709b8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java @@ -7,15 +7,14 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; -import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.generated.SourceApi; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; +import io.airbyte.api.client.model.generated.SourceIdRequestBody; import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.config.ActorCatalogFetchEvent; import jakarta.inject.Singleton; -import java.io.IOException; import java.time.OffsetDateTime; -import java.util.Optional; import java.util.UUID; import lombok.extern.slf4j.Slf4j; @@ -23,15 +22,11 @@ @Singleton public class RefreshSchemaActivityImpl implements RefreshSchemaActivity { - private final ConnectionApi connectionApi; - private final SourceApi sourceApi; private final EnvVariableFeatureFlags envVariableFeatureFlags; - public RefreshSchemaActivityImpl(ConnectionApi connectionApi, - SourceApi sourceApi, + public RefreshSchemaActivityImpl(SourceApi sourceApi, EnvVariableFeatureFlags envVariableFeatureFlags) { - this.connectionApi = connectionApi; this.sourceApi = sourceApi; this.envVariableFeatureFlags = envVariableFeatureFlags; } @@ -65,12 +60,13 @@ public void refreshSchema(UUID sourceCatalogId, UUID connectionId) { private boolean schemaRefreshRanRecently(UUID sourceCatalogId) { try { - Optional mostRecentFetchEvent = Optional.empty(); - if (mostRecentFetchEvent.isEmpty()) { + SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceCatalogId); + ActorCatalogWithUpdatedAt mostRecentFetchEvent = sourceApi.getMostRecentSourceActorCatalog(sourceIdRequestBody); + if (mostRecentFetchEvent.getUpdatedAt() == null) { return false; } - return mostRecentFetchEvent.get().getCreatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond(); - } catch (IOException e) { + return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond(); + } catch (ApiException e) { // catching this exception because we don't want to block replication due to a failed schema refresh log.info("Encountered an error fetching most recent actor catalog fetch event: ", e); return true; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java index 59ae471ab511..e2168db11f17 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java @@ -4,21 +4,19 @@ package io.airbyte.workers.temporal.scheduling.activities; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.generated.SourceApi; import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.commons.features.EnvVariableFeatureFlags; -import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl; -import java.io.IOException; import java.time.OffsetDateTime; -import java.util.Optional; import java.util.UUID; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -29,7 +27,6 @@ @ExtendWith(MockitoExtension.class) class RefreshSchemaActivityTest { - static private ConnectionApi mConnectionApi; static private SourceApi mSourceApi; static private EnvVariableFeatureFlags mEnvVariableFeatureFlags; @@ -40,32 +37,31 @@ class RefreshSchemaActivityTest { @BeforeEach void setUp() { mSourceApi = mock(SourceApi.class); - mConnectionApi = mock(ConnectionApi.class); mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class); mSourceApi = mock(SourceApi.class); when(mEnvVariableFeatureFlags.autoDetectSchema()).thenReturn(true); - refreshSchemaActivity = new RefreshSchemaActivityImpl(mConnectionApi, mSourceApi, mEnvVariableFeatureFlags); + refreshSchemaActivity = new RefreshSchemaActivityImpl(mSourceApi, mEnvVariableFeatureFlags); } @Test - void testShouldRefreshSchemaNoRecentRefresh() throws IOException { - when(mConnectionApi.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.empty()); + void testShouldRefreshSchemaNoRecentRefresh() throws ApiException { + when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(new ActorCatalogWithUpdatedAt()); Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } @Test - void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws IOException { + void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws ApiException { Long twoDaysAgo = OffsetDateTime.now().minusHours(48l).toEpochSecond(); - ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twoDaysAgo); - when(mConnectionApi.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent)); + ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twoDaysAgo); + when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt); Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } @Test - void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException { + void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws ApiException { Long twelveHoursAgo = OffsetDateTime.now().minusHours(12l).toEpochSecond(); - ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twelveHoursAgo); - when(mConnectionApi.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent)); + ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twelveHoursAgo); + when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt); Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 3f62b3a69d31..f9bb724fb0ec 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -6526,7 +6526,7 @@

    Request body

    Return type

    @@ -6549,7 +6549,7 @@

    Produces

    Responses

    200

    Successful operation - ActorCatalog + ActorCatalogWithUpdatedAt

    404

    Object with given id was not found. NotFoundKnownExceptionInfo @@ -10076,7 +10076,7 @@

    Models

    Table of Contents

      -
    1. ActorCatalog -
    2. +
    3. ActorCatalogWithUpdatedAt -
    4. ActorDefinitionResourceRequirements -
    5. AdvancedAuth -
    6. AirbyteCatalog -
    7. @@ -10258,8 +10258,8 @@

      Table of Contents

    -

    ActorCatalog - Up

    -
    describes a source ActorCatalog
    +

    ActorCatalogWithUpdatedAt - Up

    +
    A source actor catalog with the timestamp it was mostly recently updated
    updatedAt (optional)
    Long format: int64
    catalog (optional)
    From dcf448289f3c6838ade13d7a9518db4e4efbc058 Mon Sep 17 00:00:00 2001 From: alovew Date: Wed, 28 Dec 2022 14:03:17 -0700 Subject: [PATCH 3/5] revert webbackendconnectionhandler changes --- .../airbyte/config/persistence/ConfigRepository.java | 10 ++++++++++ .../server/handlers/WebBackendConnectionsHandler.java | 8 ++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index f63c7f890b7f..b9fa7985bfc0 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -1325,6 +1325,16 @@ public Optional getMostRecentSourceActorCatalog(final return records.stream().findFirst().map(DbConverter::buildActorCatalogWithUpdatedAt); } + public Optional getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException { + final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk()) + .from(ACTOR_CATALOG) + .join(ACTOR_CATALOG_FETCH_EVENT) + .on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID)) + .where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId)) + .orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch()); + return records.stream().findFirst().map(DbConverter::buildActorCatalog); + } + public Optional getMostRecentActorCatalogFetchEventForSource(final UUID sourceId) throws IOException { final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG_FETCH_EVENT.asterisk()) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index ce7f39d5da1b..3e47bc8dc9d0 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -51,6 +51,7 @@ import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.MoreBooleans; +import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.StandardSync; import io.airbyte.config.persistence.ConfigNotFoundException; @@ -523,13 +524,12 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne if (webBackendConnectionPatch.getSyncCatalog() != null) { // Get the most recent actor catalog fetched for this connection's source and the newly updated sync // catalog - SourceIdRequestBody requestBody = new SourceIdRequestBody().sourceId(originalConnectionRead.getSourceId()); - ActorCatalogWithUpdatedAt mostRecentActorCatalog = sourceHandler.getMostRecentSourceActorCatalogWithUpdatedAt(requestBody); + Optional mostRecentActorCatalog = configRepository.getMostRecentActorCatalogForSource(originalConnectionRead.getSourceId()); AirbyteCatalog newAirbyteCatalog = webBackendConnectionPatch.getSyncCatalog(); // Get the diff between these two catalogs to check for breaking changes - if (mostRecentActorCatalog.getCatalog() != null) { + if (mostRecentActorCatalog.isPresent()) { final io.airbyte.protocol.models.AirbyteCatalog mostRecentAirbyteCatalog = - Jsons.object((JsonNode) mostRecentActorCatalog.getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); + Jsons.object(mostRecentActorCatalog.get().getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); final CatalogDiff catalogDiff = connectionsHandler.getDiff(newAirbyteCatalog, CatalogConverter.toApi(mostRecentAirbyteCatalog), CatalogConverter.toProtocol(newAirbyteCatalog)); From d5afe1c190f82883aa76c808c5f03d7d95a04e0f Mon Sep 17 00:00:00 2001 From: alovew Date: Wed, 28 Dec 2022 14:04:00 -0700 Subject: [PATCH 4/5] formatting --- .../config/persistence/ConfigRepository.java | 14 +++++++------- .../handlers/WebBackendConnectionsHandler.java | 2 -- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index b9fa7985bfc0..78c111a6bbf8 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -1327,13 +1327,13 @@ public Optional getMostRecentSourceActorCatalog(final public Optional getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException { final Result records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk()) - .from(ACTOR_CATALOG) - .join(ACTOR_CATALOG_FETCH_EVENT) - .on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID)) - .where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId)) - .orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch()); - return records.stream().findFirst().map(DbConverter::buildActorCatalog); - } + .from(ACTOR_CATALOG) + .join(ACTOR_CATALOG_FETCH_EVENT) + .on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID)) + .where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId)) + .orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch()); + return records.stream().findFirst().map(DbConverter::buildActorCatalog); + } public Optional getMostRecentActorCatalogFetchEventForSource(final UUID sourceId) throws IOException { diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 3e47bc8dc9d0..362afe51c35b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -6,10 +6,8 @@ import static java.util.stream.Collectors.toMap; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.AirbyteCatalog; import io.airbyte.api.model.generated.AirbyteStream; import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration; From 8965f233eb629400620e02e8f47f5d64318774d4 Mon Sep 17 00:00:00 2001 From: alovew Date: Wed, 28 Dec 2022 14:07:05 -0700 Subject: [PATCH 5/5] revert test changes --- .../handlers/WebBackendConnectionsHandlerTest.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 9783086cdf2a..0103d8f0f5fd 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; -import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.AirbyteCatalog; import io.airbyte.api.model.generated.AirbyteStream; import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration; @@ -124,9 +123,6 @@ class WebBackendConnectionsHandlerTest { private SchedulerHandler schedulerHandler; private StateHandler stateHandler; private WebBackendConnectionsHandler wbHandler; - - private SourceHandler sourceHandler; - private SourceRead sourceRead; private ConnectionRead connectionRead; private ConnectionRead brokenConnectionRead; @@ -158,7 +154,7 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio connectionsHandler = mock(ConnectionsHandler.class); stateHandler = mock(StateHandler.class); operationsHandler = mock(OperationsHandler.class); - sourceHandler = mock(SourceHandler.class); + final SourceHandler sourceHandler = mock(SourceHandler.class); final DestinationHandler destinationHandler = mock(DestinationHandler.class); final JobHistoryHandler jobHistoryHandler = mock(JobHistoryHandler.class); configRepository = mock(ConfigRepository.class); @@ -1094,8 +1090,8 @@ void testUpdateConnectionFixingBreakingSchemaChange() throws JsonValidationExcep final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of()); - when(sourceHandler.getMostRecentSourceActorCatalogWithUpdatedAt(any())).thenReturn(new ActorCatalogWithUpdatedAt().catalog(Jsons.deserialize( - "{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}"))); + when(configRepository.getMostRecentActorCatalogForSource(sourceId)).thenReturn(Optional.of(new ActorCatalog().withCatalog(Jsons.deserialize( + "{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}")))); when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff, catalogDiff); when(configRepository.getConfiguredCatalogForConnection(expected.getConnectionId()))