Skip to content

Commit

Permalink
refresh before syncs when feature flag is on (#19888)
Browse files Browse the repository at this point in the history
* refresh before syncs when feature flag is on
  • Loading branch information
alovew authored Dec 6, 2022
1 parent 5d936cc commit 2b045a9
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp
.execute();
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);

writeSyncStats(now, syncStats, attemptId, ctx);
if (syncStats != null) {
writeSyncStats(now, syncStats, attemptId, ctx);
}

if (normalizationSummary != null) {
ctx.insertInto(NORMALIZATION_SUMMARIES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.airbyte.workers.temporal.sync.NormalizationActivity;
import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivity;
import io.airbyte.workers.temporal.sync.PersistStateActivity;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivity;
import io.airbyte.workers.temporal.sync.ReplicationActivity;
import io.airbyte.workers.temporal.sync.WebhookOperationActivity;
import io.micronaut.context.annotation.Factory;
Expand Down Expand Up @@ -110,9 +111,11 @@ public List<Object> syncActivities(
final DbtTransformationActivity dbtTransformationActivity,
final PersistStateActivity persistStateActivity,
final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity,
final WebhookOperationActivity webhookOperationActivity) {
final WebhookOperationActivity webhookOperationActivity,
final ConfigFetchActivity configFetchActivity,
final RefreshSchemaActivity refreshSchemaActivity) {
return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity,
webhookOperationActivity);
webhookOperationActivity, configFetchActivity, refreshSchemaActivity);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.auth0.jwt.algorithms.Algorithm;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.micronaut.context.BeanProvider;
import io.micronaut.context.annotation.Factory;
Expand All @@ -35,26 +37,34 @@ public class ApiClientBeanFactory {
private static final int JWT_TTL_MINUTES = 5;

@Singleton
public AirbyteApiClient airbyteApiClient(
@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Value("${airbyte.internal.api.host}") final String airbyteApiHost,
@Named("internalApiAuthToken") final BeanProvider<String> internalApiAuthToken,
@Named("internalApiScheme") final String internalApiScheme) {
return new AirbyteApiClient(
new io.airbyte.api.client.invoker.generated.ApiClient()
.setScheme(internalApiScheme)
.setHost(parseHostName(airbyteApiHost))
.setPort(parsePort(airbyteApiHost))
.setBasePath("/api")
.setHttpClientBuilder(HttpClient.newBuilder().version(Version.HTTP_1_1))
.setRequestInterceptor(builder -> {
builder.setHeader("User-Agent", "WorkerApp");
// internalApiAuthToken is in BeanProvider because we want to create a new token each
// time we send a request.
if (!airbyteApiAuthHeaderName.isBlank()) {
builder.setHeader(airbyteApiAuthHeaderName, internalApiAuthToken.get());
}
}));
public ApiClient apiClient(@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Value("${airbyte.internal.api.host}") final String airbyteApiHost,
@Named("internalApiAuthToken") final BeanProvider<String> internalApiAuthToken,
@Named("internalApiScheme") final String internalApiScheme) {
return new io.airbyte.api.client.invoker.generated.ApiClient()
.setScheme(internalApiScheme)
.setHost(parseHostName(airbyteApiHost))
.setPort(parsePort(airbyteApiHost))
.setBasePath("/api")
.setHttpClientBuilder(HttpClient.newBuilder().version(Version.HTTP_1_1))
.setRequestInterceptor(builder -> {
builder.setHeader("User-Agent", "WorkerApp");
// internalApiAuthToken is in BeanProvider because we want to create a new token each
// time we send a request.
if (!airbyteApiAuthHeaderName.isBlank()) {
builder.setHeader(airbyteApiAuthHeaderName, internalApiAuthToken.get());
}
});
}

@Singleton
public AirbyteApiClient airbyteApiClient(ApiClient apiClient) {
return new AirbyteApiClient(apiClient);
}

@Singleton
public SourceApi sourceApi(final ApiClient apiClient) {
return new SourceApi(apiClient);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -19,6 +21,12 @@
@ActivityInterface
public interface ConfigFetchActivity {

@ActivityMethod
Optional<UUID> getSourceId(UUID connectionId);

@ActivityMethod
Optional<Status> getStatus(UUID connectionId);

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,26 @@ public GetMaxAttemptOutput getMaxAttempt() {
return new GetMaxAttemptOutput(syncJobMaxAttempts);
}

@Override
public Optional<UUID> getSourceId(UUID connectionId) {
try {
final StandardSync standardSync = getStandardSync(connectionId);
return Optional.ofNullable(standardSync.getSourceId());
} catch (JsonValidationException | ConfigNotFoundException | IOException e) {
log.info("Encountered an error fetching the connection's Source ID: ", e);
return Optional.empty();
}
}

@Override
public Optional<Status> getStatus(UUID connectionId) {
try {
final StandardSync standardSync = getStandardSync(connectionId);
return Optional.ofNullable(standardSync.getStatus());
} catch (JsonValidationException | ConfigNotFoundException | IOException e) {
log.info("Encountered an error fetching the connection's status: ", e);
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,16 @@

package io.airbyte.workers.temporal.sync;

import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
import java.util.UUID;

@ActivityInterface
public interface RefreshSchemaActivity {

@ActivityMethod
boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException;
boolean shouldRefreshSchema(UUID sourceCatalogId);

public void refreshSchema(UUID sourceCatalogId) throws JsonValidationException, ConfigNotFoundException, IOException, ApiException;
public void refreshSchema(UUID sourceCatalogId, UUID connectionId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,52 +8,74 @@

import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Singleton
public class RefreshSchemaActivityImpl implements RefreshSchemaActivity {

private final Optional<ConfigRepository> configRepository;

private final SourceApi sourceApi;
private final EnvVariableFeatureFlags envVariableFeatureFlags;

public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository, SourceApi sourceApi) {
public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository,
SourceApi sourceApi,
EnvVariableFeatureFlags envVariableFeatureFlags) {
this.configRepository = configRepository;
this.sourceApi = sourceApi;
this.envVariableFeatureFlags = envVariableFeatureFlags;
}

@Override
@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
public boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException {
public boolean shouldRefreshSchema(UUID sourceCatalogId) {
// if job persistence is unavailable, default to skipping the schema refresh
if (configRepository.isEmpty()) {
if (configRepository.isEmpty() || !envVariableFeatureFlags.autoDetectSchema()) {
return false;
}

return !schemaRefreshRanRecently(sourceCatalogId);
}

@Override
public void refreshSchema(UUID sourceCatalogId) throws ApiException {
SourceDiscoverSchemaRequestBody requestBody =
new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true);
sourceApi.discoverSchemaForSource(requestBody);
}
public void refreshSchema(UUID sourceCatalogId, UUID connectionId) {
if (!envVariableFeatureFlags.autoDetectSchema()) {
return;
}

private boolean schemaRefreshRanRecently(UUID sourceCatalogId) throws IOException {
Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId);
SourceDiscoverSchemaRequestBody requestBody =
new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true).connectionId(connectionId);

if (mostRecentFetchEvent.isEmpty()) {
return false;
try {
sourceApi.discoverSchemaForSource(requestBody);
} catch (final Exception e) {
// catching this exception because we don't want to block replication due to a failed schema refresh
log.error("Attempted schema refresh, but failed with error: ", e);
}
}

return mostRecentFetchEvent.get().getCreatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
private boolean schemaRefreshRanRecently(UUID sourceCatalogId) {
try {
Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId);
if (mostRecentFetchEvent.isEmpty()) {
return false;
}
return mostRecentFetchEvent.get().getCreatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond();
} catch (IOException e) {
// catching this exception because we don't want to block replication due to a failed schema refresh
log.info("Encountered an error fetching most recent actor catalog fetch event: ", e);
return true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.OperatorWebhookInput;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WebhookOperationSummary;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.temporal.workflow.Workflow;
import java.util.Map;
import java.util.Optional;
Expand All @@ -42,6 +47,8 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private static final int CURRENT_VERSION = 2;
private static final String NORMALIZATION_SUMMARY_CHECK_TAG = "normalization_summary_check";
private static final int NORMALIZATION_SUMMARY_CHECK_CURRENT_VERSION = 1;
private static final String AUTO_DETECT_SCHEMA_TAG = "auto_detect_schema";
private static final int AUTO_DETECT_SCHEMA_VERSION = 1;

@TemporalActivityStub(activityOptionsBeanName = "longRunActivityOptions")
private ReplicationActivity replicationActivity;
Expand All @@ -55,6 +62,10 @@ public class SyncWorkflowImpl implements SyncWorkflow {
private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private WebhookOperationActivity webhookOperationActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RefreshSchemaActivity refreshSchemaActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
Expand All @@ -72,6 +83,28 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,

final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION);
final String taskQueue = Workflow.getInfo().getTaskQueue();

final int autoDetectSchemaVersion =
Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION);

if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) {
final Optional<UUID> sourceId = configFetchActivity.getSourceId(connectionId);

if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) {
LOGGER.info("Refreshing source schema...");
refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId);
}

final Optional<Status> status = configFetchActivity.getStatus(connectionId);
if (!status.isEmpty() && Status.INACTIVE == status.get()) {
LOGGER.info("Connection is disabled. Cancelling run.");
final StandardSyncOutput output =
new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats()));
return output;
}
}

StandardSyncOutput syncOutput =
replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl;
Expand All @@ -29,18 +30,21 @@
class RefreshSchemaActivityTest {

static private ConfigRepository mConfigRepository;

static private SourceApi mSourceApi;
static private EnvVariableFeatureFlags mEnvVariableFeatureFlags;

static private RefreshSchemaActivityImpl refreshSchemaActivity;

static private final UUID SOURCE_ID = UUID.randomUUID();

@BeforeEach
void setUp() {
mSourceApi = mock(SourceApi.class);
mConfigRepository = mock(ConfigRepository.class);
mEnvVariableFeatureFlags = mock(EnvVariableFeatureFlags.class);
mSourceApi = mock(SourceApi.class);
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi);
when(mEnvVariableFeatureFlags.autoDetectSchema()).thenReturn(true);
refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi, mEnvVariableFeatureFlags);
}

@Test
Expand Down Expand Up @@ -68,9 +72,10 @@ void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException
@Test
void testRefreshSchema() throws ApiException {
UUID sourceId = UUID.randomUUID();
refreshSchemaActivity.refreshSchema(sourceId);
UUID connectionId = UUID.randomUUID();
refreshSchemaActivity.refreshSchema(sourceId, connectionId);
SourceDiscoverSchemaRequestBody requestBody =
new SourceDiscoverSchemaRequestBody().sourceId(sourceId).disableCache(true);
new SourceDiscoverSchemaRequestBody().sourceId(sourceId).disableCache(true).connectionId(connectionId);
verify(mSourceApi, times(1)).discoverSchemaForSource(requestBody);
}

Expand Down
Loading

0 comments on commit 2b045a9

Please sign in to comment.