Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anne/re add changes #21079

Merged
merged 7 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,29 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/sources/most_recent_source_actor_catalog:
post:
tags:
- source
summary: Get most recent ActorCatalog for source
operationId: getMostRecentSourceActorCatalog
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SourceIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/ActorCatalogWithUpdatedAt"
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/sources/search:
post:
tags:
Expand Down Expand Up @@ -3692,6 +3715,16 @@ components:
properties:
logType:
$ref: "#/components/schemas/LogType"
# ACTOR CATALOG
ActorCatalogWithUpdatedAt:
description: A source actor catalog with the timestamp it was mostly recently updated
type: object
properties:
updatedAt:
type: integer
format: int64
catalog:
type: object
# SCHEMA CATALOG
AirbyteCatalog:
description: describes the available schema (catalog).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
"$schema": http://json-schema.org/draft-07/schema#
title: ActorCatalogWithUpdatedAt
description: Catalog of an actor with its most recent ActorCatalogFetchEvent created_at timestamp.
type: object
additionalProperties: false
required:
- id
- catalog
- catalogHash
- updatedAt
properties:
id:
type: string
format: uuid
catalog:
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
catalogHash:
type: string
updatedAt:
type: integer
format: int64
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.ActorCatalogWithUpdatedAt;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
Expand Down Expand Up @@ -1314,6 +1315,16 @@ public Optional<ActorCatalog> getActorCatalog(final UUID actorId,
return records.stream().findFirst().map(DbConverter::buildActorCatalog);
}

public Optional<ActorCatalogWithUpdatedAt> getMostRecentSourceActorCatalog(final UUID sourceId) throws IOException {
final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk(), ACTOR_CATALOG_FETCH_EVENT.CREATED_AT)
.from(ACTOR_CATALOG)
.join(ACTOR_CATALOG_FETCH_EVENT)
.on(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID.eq(ACTOR_CATALOG.ID))
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(sourceId))
.orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1).fetch());
return records.stream().findFirst().map(DbConverter::buildActorCatalogWithUpdatedAt);
}

public Optional<ActorCatalog> getMostRecentActorCatalogForSource(final UUID sourceId) throws IOException {
final Result<Record> records = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk())
.from(ACTOR_CATALOG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.ActorCatalogWithUpdatedAt;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
Expand Down Expand Up @@ -218,6 +219,14 @@ public static ActorCatalog buildActorCatalog(final Record record) {
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH));
}

public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Record record) {
return new ActorCatalogWithUpdatedAt()
.withId(record.get(ACTOR_CATALOG.ID))
.withCatalog(Jsons.deserialize(record.get(ACTOR_CATALOG.CATALOG).toString()))
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH))
.withUpdatedAt(record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, LocalDateTime.class).toEpochSecond(ZoneOffset.UTC));
}

public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) {
return new ActorCatalogFetchEvent()
.withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.server.apis;

import io.airbyte.api.generated.SourceApi;
import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt;
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.SourceCloneRequestBody;
import io.airbyte.api.model.generated.SourceCreate;
Expand Down Expand Up @@ -66,6 +67,11 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody) {
return ApiHelper.execute(() -> sourceHandler.getSource(sourceIdRequestBody));
}

@Override
public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalog(final SourceIdRequestBody sourceIdRequestBody) {
return ApiHelper.execute(() -> sourceHandler.getMostRecentSourceActorCatalogWithUpdatedAt(sourceIdRequestBody));
}

@Override
public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return ApiHelper.execute(() -> sourceHandler.listSourcesForWorkspace(workspaceIdRequestBody));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.SourceCloneConfiguration;
import io.airbyte.api.model.generated.SourceCloneRequestBody;
Expand All @@ -31,6 +32,7 @@
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;

Expand Down Expand Up @@ -132,6 +134,17 @@ public SourceRead getSource(final SourceIdRequestBody sourceIdRequestBody)
return buildSourceRead(sourceIdRequestBody.getSourceId());
}

public ActorCatalogWithUpdatedAt getMostRecentSourceActorCatalogWithUpdatedAt(final SourceIdRequestBody sourceIdRequestBody)
throws IOException {
Optional<io.airbyte.config.ActorCatalogWithUpdatedAt> actorCatalog =
configRepository.getMostRecentSourceActorCatalog(sourceIdRequestBody.getSourceId());
if (actorCatalog.isEmpty()) {
return new ActorCatalogWithUpdatedAt();
} else {
return new ActorCatalogWithUpdatedAt().updatedAt(actorCatalog.get().getUpdatedAt()).catalog(actorCatalog.get().getCatalog());
}
}

public SourceRead cloneSource(final SourceCloneRequestBody sourceCloneRequestBody)
throws JsonValidationException, IOException, ConfigNotFoundException {
// read source configuration from db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ class WebBackendConnectionsHandlerTest {
private SchedulerHandler schedulerHandler;
private StateHandler stateHandler;
private WebBackendConnectionsHandler wbHandler;

private SourceRead sourceRead;
private ConnectionRead connectionRead;
private ConnectionRead brokenConnectionRead;
Expand Down Expand Up @@ -1090,6 +1089,7 @@ void testUpdateConnectionFixingBreakingSchemaChange() throws JsonValidationExcep
new ConnectionRead().connectionId(expected.getConnectionId()).breakingChange(true).sourceId(sourceId));

final CatalogDiff catalogDiff = new CatalogDiff().transforms(List.of());

when(configRepository.getMostRecentActorCatalogForSource(sourceId)).thenReturn(Optional.of(new ActorCatalog().withCatalog(Jsons.deserialize(
"{\"streams\": [{\"name\": \"cat_names\", \"namespace\": \"public\", \"json_schema\": {\"type\": \"object\", \"properties\": {\"id\": {\"type\": \"number\", \"airbyte_type\": \"integer\"}}}}]}"))));
when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff, catalogDiff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,33 @@

import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.client.model.generated.SourceIdRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Singleton
public class RefreshSchemaActivityImpl implements RefreshSchemaActivity {

private final Optional<ConfigRepository> configRepository;

private final SourceApi sourceApi;
private final EnvVariableFeatureFlags envVariableFeatureFlags;

public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository,
SourceApi sourceApi,
public RefreshSchemaActivityImpl(SourceApi sourceApi,
EnvVariableFeatureFlags envVariableFeatureFlags) {
this.configRepository = configRepository;
this.sourceApi = sourceApi;
this.envVariableFeatureFlags = envVariableFeatureFlags;
}

@Override
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
public boolean shouldRefreshSchema(UUID sourceCatalogId) {
// if job persistence is unavailable, default to skipping the schema refresh
if (configRepository.isEmpty() || !envVariableFeatureFlags.autoDetectSchema()) {
if (!envVariableFeatureFlags.autoDetectSchema()) {
return false;
}

Expand All @@ -66,12 +60,13 @@ public void refreshSchema(UUID sourceCatalogId, UUID connectionId) {

private boolean schemaRefreshRanRecently(UUID sourceCatalogId) {
try {
Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId);
if (mostRecentFetchEvent.isEmpty()) {
SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceCatalogId);
ActorCatalogWithUpdatedAt mostRecentFetchEvent = sourceApi.getMostRecentSourceActorCatalog(sourceIdRequestBody);
if (mostRecentFetchEvent.getUpdatedAt() == null) {
return false;
}
return mostRecentFetchEvent.get().getCreatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
} catch (IOException e) {
return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
} catch (ApiException e) {
// catching this exception because we don't want to block replication due to a failed schema refresh
log.info("Encountered an error fetching most recent actor catalog fetch event: ", e);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@

package io.airbyte.workers.temporal.scheduling.activities;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.UUID;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -29,7 +27,6 @@
@ExtendWith(MockitoExtension.class)
class RefreshSchemaActivityTest {

static private ConfigRepository mConfigRepository;
static private SourceApi mSourceApi;
static private EnvVariableFeatureFlags mEnvVariableFeatureFlags;

Expand All @@ -40,32 +37,31 @@ class RefreshSchemaActivityTest {
@BeforeEach
void setUp() {
mSourceApi = mock(SourceApi.class);
mConfigRepository = mock(ConfigRepository.class);
mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class);
mSourceApi = mock(SourceApi.class);
when(mEnvVariableFeatureFlags.autoDetectSchema()).thenReturn(true);
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi, mEnvVariableFeatureFlags);
refreshSchemaActivity = new RefreshSchemaActivityImpl(mSourceApi, mEnvVariableFeatureFlags);
}

@Test
void testShouldRefreshSchemaNoRecentRefresh() throws IOException {
when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.empty());
void testShouldRefreshSchemaNoRecentRefresh() throws ApiException {
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(new ActorCatalogWithUpdatedAt());
Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
}

@Test
void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws IOException {
void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws ApiException {
Long twoDaysAgo = OffsetDateTime.now().minusHours(48l).toEpochSecond();
ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twoDaysAgo);
when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent));
ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twoDaysAgo);
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt);
Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
}

@Test
void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException {
void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws ApiException {
Long twelveHoursAgo = OffsetDateTime.now().minusHours(12l).toEpochSecond();
ActorCatalogFetchEvent fetchEvent = new ActorCatalogFetchEvent().withActorCatalogId(UUID.randomUUID()).withCreatedAt(twelveHoursAgo);
when(mConfigRepository.getMostRecentActorCatalogFetchEventForSource(SOURCE_ID)).thenReturn(Optional.ofNullable(fetchEvent));
ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt().updatedAt(twelveHoursAgo);
when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt);
Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID));
}

Expand Down
Loading