Skip to content

Commit

Permalink
Use explicit constructor injection for activity implementations (#17152)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored Sep 27, 2022
1 parent a088206 commit 8945f16
Show file tree
Hide file tree
Showing 19 changed files with 316 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
import java.nio.file.Path;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;

Expand All @@ -37,25 +36,32 @@
pattern = "(?i)^(?!data_plane).*")
public class CheckConnectionActivityImpl implements CheckConnectionActivity {

@Inject
@Named("checkWorkerConfigs")
private WorkerConfigs workerConfigs;
@Inject
@Named("checkProcessFactory")
private ProcessFactory processFactory;
@Inject
private SecretsHydrator secretsHydrator;
@Inject
@Named("workspaceRoot")
private Path workspaceRoot;
@Inject
private WorkerEnvironment workerEnvironment;
@Inject
private LogConfigs logConfigs;
@Inject
private AirbyteApiClient airbyteApiClient;
@Value("${airbyte.version}")
private String airbyteVersion;
private final WorkerConfigs workerConfigs;
private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final AirbyteApiClient airbyteApiClient;
private final String airbyteVersion;

public CheckConnectionActivityImpl(@Named("checkWorkerConfigs") final WorkerConfigs workerConfigs,
@Named("checkProcessFactory") final ProcessFactory processFactory,
final SecretsHydrator secretsHydrator,
@Named("workspaceRoot") final Path workspaceRoot,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final AirbyteApiClient airbyteApiClient,
@Value("${airbyte.version}") final String airbyteVersion) {
this.workerConfigs = workerConfigs;
this.processFactory = processFactory;
this.workspaceRoot = workspaceRoot;
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.airbyteApiClient = airbyteApiClient;
this.secretsHydrator = secretsHydrator;
this.airbyteVersion = airbyteVersion;
}

@Override
public ConnectorJobOutput runWithJobOutput(final CheckConnectionInput args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
import java.nio.file.Path;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -40,25 +39,32 @@
@Slf4j
public class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity {

@Inject
@Named("discoverWorkerConfigs")
private WorkerConfigs workerConfigs;
@Inject
@Named("discoverProcessFactory")
private ProcessFactory processFactory;
@Inject
private SecretsHydrator secretsHydrator;
@Inject
@Named("workspaceRoot")
private Path workspaceRoot;
@Inject
private WorkerEnvironment workerEnvironment;
@Inject
private LogConfigs logConfigs;
@Inject
private AirbyteApiClient airbyteApiClient;;
@Value("${airbyte.version}")
private String airbyteVersion;
private final WorkerConfigs workerConfigs;
private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final AirbyteApiClient airbyteApiClient;;
private final String airbyteVersion;

public DiscoverCatalogActivityImpl(@Named("discoverWorkerConfigs") final WorkerConfigs workerConfigs,
@Named("discoverProcessFactory") final ProcessFactory processFactory,
final SecretsHydrator secretsHydrator,
@Named("workspaceRoot") final Path workspaceRoot,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final AirbyteApiClient airbyteApiClient,
@Value("${airbyte.version}") final String airbyteVersion) {
this.workerConfigs = workerConfigs;
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
this.workspaceRoot = workspaceRoot;
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.airbyteApiClient = airbyteApiClient;
this.airbyteVersion = airbyteVersion;
}

@Override
public ConnectorJobOutput run(final JobRunConfig jobRunConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import static io.airbyte.persistence.job.models.Job.REPLICATION_TYPES;
import static java.time.temporal.ChronoUnit.DAYS;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
Expand All @@ -27,26 +26,33 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
public class AutoDisableConnectionActivityImpl implements AutoDisableConnectionActivity {

@Inject
private ConfigRepository configRepository;
@Inject
private JobPersistence jobPersistence;
@Inject
private FeatureFlags featureFlags;
@Value("${airbyte.worker.job.failed.max-days}")
private Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable;
@Value("${airbyte.worker.job.failed.max-jobs}")
private Integer maxFailedJobsInARowBeforeConnectionDisable;
@Inject
private JobNotifier jobNotifier;
private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final FeatureFlags featureFlags;
private final Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable;
private final Integer maxFailedJobsInARowBeforeConnectionDisable;
private final JobNotifier jobNotifier;

public AutoDisableConnectionActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final FeatureFlags featureFlags,
@Value("${airbyte.worker.job.failed.max-days}") final Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable,
@Value("${airbyte.worker.job.failed.max-jobs}") final Integer maxFailedJobsInARowBeforeConnectionDisable,
final JobNotifier jobNotifier) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.featureFlags = featureFlags;
this.maxDaysOfOnlyFailedJobsBeforeConnectionDisable = maxDaysOfOnlyFailedJobsBeforeConnectionDisable;
this.maxFailedJobsInARowBeforeConnectionDisable = maxFailedJobsInARowBeforeConnectionDisable;
this.jobNotifier = jobNotifier;
}

// Given a connection id and current timestamp, this activity will set a connection to INACTIVE if
// either:
Expand Down Expand Up @@ -198,34 +204,4 @@ private void disableConnection(final StandardSync standardSync, final Job lastJo
jobNotifier.notifyJobByEmail(null, CONNECTION_DISABLED_NOTIFICATION, lastJob);
}

@VisibleForTesting
void setConfigRepository(final ConfigRepository configRepository) {
this.configRepository = configRepository;
}

@VisibleForTesting
void setJobPersistence(final JobPersistence jobPersistence) {
this.jobPersistence = jobPersistence;
}

@VisibleForTesting
void setFeatureFlags(final FeatureFlags featureFlags) {
this.featureFlags = featureFlags;
}

@VisibleForTesting
void setMaxDaysOfOnlyFailedJobsBeforeConnectionDisable(final Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable) {
this.maxDaysOfOnlyFailedJobsBeforeConnectionDisable = maxDaysOfOnlyFailedJobsBeforeConnectionDisable;
}

@VisibleForTesting
void setMaxFailedJobsInARowBeforeConnectionDisable(final Integer maxFailedJobsInARowBeforeConnectionDisable) {
this.maxFailedJobsInARowBeforeConnectionDisable = maxFailedJobsInARowBeforeConnectionDisable;
}

@VisibleForTesting
void setJobNotifier(final JobNotifier jobNotifier) {
this.jobNotifier = jobNotifier;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.workers.temporal.scheduling.activities;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.Cron;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.ScheduleType;
Expand All @@ -27,7 +26,6 @@
import java.util.TimeZone;
import java.util.UUID;
import java.util.function.Supplier;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -43,15 +41,20 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity {
private final static long MS_PER_SECOND = 1000L;
private final static long MIN_CRON_INTERVAL_SECONDS = 60;

@Inject
private ConfigRepository configRepository;
@Inject
private JobPersistence jobPersistence;
@Value("${airbyte.worker.sync.max-attempts}")
private Integer syncJobMaxAttempts;
@Inject
@Named("currentSecondsSupplier")
private Supplier<Long> currentSecondsSupplier;
private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final Integer syncJobMaxAttempts;
private final Supplier<Long> currentSecondsSupplier;

public ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.syncJobMaxAttempts = syncJobMaxAttempts;
this.currentSecondsSupplier = currentSecondsSupplier;
}

@Override
public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) {
Expand Down Expand Up @@ -158,24 +161,4 @@ public GetMaxAttemptOutput getMaxAttempt() {
return new GetMaxAttemptOutput(syncJobMaxAttempts);
}

@VisibleForTesting
void setConfigRepository(final ConfigRepository configRepository) {
this.configRepository = configRepository;
}

@VisibleForTesting
void setJobPersistence(final JobPersistence jobPersistence) {
this.jobPersistence = jobPersistence;
}

@VisibleForTesting
void setSyncJobMaxAttempts(final Integer syncJobMaxAttempts) {
this.syncJobMaxAttempts = syncJobMaxAttempts;
}

@VisibleForTesting
void setCurrentSecondsSupplier(final Supplier<Long> currentSecondsSupplier) {
this.currentSecondsSupplier = currentSecondsSupplier;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,18 @@
import io.airbyte.workers.temporal.exception.RetryableException;
import io.micronaut.context.annotation.Requires;
import java.io.IOException;
import javax.inject.Inject;
import javax.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
public class ConnectionDeletionActivityImpl implements ConnectionDeletionActivity {

@Inject
private ConnectionHelper connectionHelper;
private final ConnectionHelper connectionHelper;

public ConnectionDeletionActivityImpl(final ConnectionHelper connectionHelper) {
this.connectionHelper = connectionHelper;
}

@Override
public void deleteConnection(final ConnectionDeletionInput input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,18 @@
import io.airbyte.workers.temporal.exception.RetryableException;
import io.micronaut.context.annotation.Requires;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
public class GenerateInputActivityImpl implements GenerateInputActivity {

@Inject
private JobPersistence jobPersistence;
private final JobPersistence jobPersistence;

public GenerateInputActivityImpl(final JobPersistence jobPersistence) {
this.jobPersistence = jobPersistence;
}

@Override
public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
Expand Down
Loading

0 comments on commit 8945f16

Please sign in to comment.