From 8945f1684facf1ecd0232934b607c8a735bc0907 Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Tue, 27 Sep 2022 08:44:55 -0400 Subject: [PATCH] Use explicit constructor injection for activity implementations (#17152) --- .../CheckConnectionActivityImpl.java | 46 +++++++----- .../catalog/DiscoverCatalogActivityImpl.java | 46 +++++++----- .../AutoDisableConnectionActivityImpl.java | 64 +++++----------- .../activities/ConfigFetchActivityImpl.java | 45 ++++-------- .../ConnectionDeletionActivityImpl.java | 12 ++- .../activities/GenerateInputActivityImpl.java | 12 ++- ...obCreationAndStatusUpdateActivityImpl.java | 62 +++++++++------- .../activities/RecordMetricActivityImpl.java | 12 ++- .../RouteToSyncTaskQueueActivityImpl.java | 8 +- .../activities/StreamResetActivityImpl.java | 12 ++- .../WorkflowConfigActivityImpl.java | 15 ++-- .../temporal/spec/SpecActivityImpl.java | 41 ++++++----- .../sync/DbtTransformationActivityImpl.java | 67 +++++++++-------- .../sync/NormalizationActivityImpl.java | 73 +++++++++++-------- .../sync/PersistStateActivityImpl.java | 16 ++-- .../sync/ReplicationActivityImpl.java | 72 ++++++++++-------- .../AutoDisableConnectionActivityTest.java | 9 +-- .../activities/ConfigFetchActivityTest.java | 22 +++--- .../WorkflowConfigActivityImplTest.java | 3 +- 19 files changed, 316 insertions(+), 321 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index 0053aa0d2fe2..46f1c779c901 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -28,7 +28,6 @@ import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; import java.nio.file.Path; -import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; @@ -37,25 +36,32 @@ pattern = "(?i)^(?!data_plane).*") public class CheckConnectionActivityImpl implements CheckConnectionActivity { - @Inject - @Named("checkWorkerConfigs") - private WorkerConfigs workerConfigs; - @Inject - @Named("checkProcessFactory") - private ProcessFactory processFactory; - @Inject - private SecretsHydrator secretsHydrator; - @Inject - @Named("workspaceRoot") - private Path workspaceRoot; - @Inject - private WorkerEnvironment workerEnvironment; - @Inject - private LogConfigs logConfigs; - @Inject - private AirbyteApiClient airbyteApiClient; - @Value("${airbyte.version}") - private String airbyteVersion; + private final WorkerConfigs workerConfigs; + private final ProcessFactory processFactory; + private final SecretsHydrator secretsHydrator; + private final Path workspaceRoot; + private final WorkerEnvironment workerEnvironment; + private final LogConfigs logConfigs; + private final AirbyteApiClient airbyteApiClient; + private final String airbyteVersion; + + public CheckConnectionActivityImpl(@Named("checkWorkerConfigs") final WorkerConfigs workerConfigs, + @Named("checkProcessFactory") final ProcessFactory processFactory, + final SecretsHydrator secretsHydrator, + @Named("workspaceRoot") final Path workspaceRoot, + final WorkerEnvironment workerEnvironment, + final LogConfigs logConfigs, + final AirbyteApiClient airbyteApiClient, + @Value("${airbyte.version}") final String airbyteVersion) { + this.workerConfigs = workerConfigs; + this.processFactory = processFactory; + this.workspaceRoot = workspaceRoot; + this.workerEnvironment = workerEnvironment; + this.logConfigs = logConfigs; + this.airbyteApiClient = airbyteApiClient; + this.secretsHydrator = secretsHydrator; + this.airbyteVersion = airbyteVersion; + } @Override public ConnectorJobOutput runWithJobOutput(final CheckConnectionInput args) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index 5d201d95d44b..ba21d75c3c0f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -29,7 +29,6 @@ import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; import java.nio.file.Path; -import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; @@ -40,25 +39,32 @@ @Slf4j public class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity { - @Inject - @Named("discoverWorkerConfigs") - private WorkerConfigs workerConfigs; - @Inject - @Named("discoverProcessFactory") - private ProcessFactory processFactory; - @Inject - private SecretsHydrator secretsHydrator; - @Inject - @Named("workspaceRoot") - private Path workspaceRoot; - @Inject - private WorkerEnvironment workerEnvironment; - @Inject - private LogConfigs logConfigs; - @Inject - private AirbyteApiClient airbyteApiClient;; - @Value("${airbyte.version}") - private String airbyteVersion; + private final WorkerConfigs workerConfigs; + private final ProcessFactory processFactory; + private final SecretsHydrator secretsHydrator; + private final Path workspaceRoot; + private final WorkerEnvironment workerEnvironment; + private final LogConfigs logConfigs; + private final AirbyteApiClient airbyteApiClient;; + private final String airbyteVersion; + + public DiscoverCatalogActivityImpl(@Named("discoverWorkerConfigs") final WorkerConfigs workerConfigs, + @Named("discoverProcessFactory") final ProcessFactory processFactory, + final SecretsHydrator secretsHydrator, + @Named("workspaceRoot") final Path workspaceRoot, + final WorkerEnvironment workerEnvironment, + final LogConfigs logConfigs, + final AirbyteApiClient airbyteApiClient, + @Value("${airbyte.version}") final String airbyteVersion) { + this.workerConfigs = workerConfigs; + this.processFactory = processFactory; + this.secretsHydrator = secretsHydrator; + this.workspaceRoot = workspaceRoot; + this.workerEnvironment = workerEnvironment; + this.logConfigs = logConfigs; + this.airbyteApiClient = airbyteApiClient; + this.airbyteVersion = airbyteVersion; + } @Override public ConnectorJobOutput run(final JobRunConfig jobRunConfig, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java index 9c62a2478640..6f74f3ee4edc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityImpl.java @@ -9,7 +9,6 @@ import static io.airbyte.persistence.job.models.Job.REPLICATION_TYPES; import static java.time.temporal.ChronoUnit.DAYS; -import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSync.Status; @@ -27,7 +26,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; -import javax.inject.Inject; import javax.inject.Singleton; @Singleton @@ -35,18 +33,26 @@ pattern = "(?i)^(?!data_plane).*") public class AutoDisableConnectionActivityImpl implements AutoDisableConnectionActivity { - @Inject - private ConfigRepository configRepository; - @Inject - private JobPersistence jobPersistence; - @Inject - private FeatureFlags featureFlags; - @Value("${airbyte.worker.job.failed.max-days}") - private Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable; - @Value("${airbyte.worker.job.failed.max-jobs}") - private Integer maxFailedJobsInARowBeforeConnectionDisable; - @Inject - private JobNotifier jobNotifier; + private final ConfigRepository configRepository; + private final JobPersistence jobPersistence; + private final FeatureFlags featureFlags; + private final Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable; + private final Integer maxFailedJobsInARowBeforeConnectionDisable; + private final JobNotifier jobNotifier; + + public AutoDisableConnectionActivityImpl(final ConfigRepository configRepository, + final JobPersistence jobPersistence, + final FeatureFlags featureFlags, + @Value("${airbyte.worker.job.failed.max-days}") final Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable, + @Value("${airbyte.worker.job.failed.max-jobs}") final Integer maxFailedJobsInARowBeforeConnectionDisable, + final JobNotifier jobNotifier) { + this.configRepository = configRepository; + this.jobPersistence = jobPersistence; + this.featureFlags = featureFlags; + this.maxDaysOfOnlyFailedJobsBeforeConnectionDisable = maxDaysOfOnlyFailedJobsBeforeConnectionDisable; + this.maxFailedJobsInARowBeforeConnectionDisable = maxFailedJobsInARowBeforeConnectionDisable; + this.jobNotifier = jobNotifier; + } // Given a connection id and current timestamp, this activity will set a connection to INACTIVE if // either: @@ -198,34 +204,4 @@ private void disableConnection(final StandardSync standardSync, final Job lastJo jobNotifier.notifyJobByEmail(null, CONNECTION_DISABLED_NOTIFICATION, lastJob); } - @VisibleForTesting - void setConfigRepository(final ConfigRepository configRepository) { - this.configRepository = configRepository; - } - - @VisibleForTesting - void setJobPersistence(final JobPersistence jobPersistence) { - this.jobPersistence = jobPersistence; - } - - @VisibleForTesting - void setFeatureFlags(final FeatureFlags featureFlags) { - this.featureFlags = featureFlags; - } - - @VisibleForTesting - void setMaxDaysOfOnlyFailedJobsBeforeConnectionDisable(final Integer maxDaysOfOnlyFailedJobsBeforeConnectionDisable) { - this.maxDaysOfOnlyFailedJobsBeforeConnectionDisable = maxDaysOfOnlyFailedJobsBeforeConnectionDisable; - } - - @VisibleForTesting - void setMaxFailedJobsInARowBeforeConnectionDisable(final Integer maxFailedJobsInARowBeforeConnectionDisable) { - this.maxFailedJobsInARowBeforeConnectionDisable = maxFailedJobsInARowBeforeConnectionDisable; - } - - @VisibleForTesting - void setJobNotifier(final JobNotifier jobNotifier) { - this.jobNotifier = jobNotifier; - } - } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java index a470d1198392..d40807c7f9ca 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java @@ -4,7 +4,6 @@ package io.airbyte.workers.temporal.scheduling.activities; -import com.google.common.annotations.VisibleForTesting; import io.airbyte.config.Cron; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSync.ScheduleType; @@ -27,7 +26,6 @@ import java.util.TimeZone; import java.util.UUID; import java.util.function.Supplier; -import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; @@ -43,15 +41,20 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity { private final static long MS_PER_SECOND = 1000L; private final static long MIN_CRON_INTERVAL_SECONDS = 60; - @Inject - private ConfigRepository configRepository; - @Inject - private JobPersistence jobPersistence; - @Value("${airbyte.worker.sync.max-attempts}") - private Integer syncJobMaxAttempts; - @Inject - @Named("currentSecondsSupplier") - private Supplier currentSecondsSupplier; + private final ConfigRepository configRepository; + private final JobPersistence jobPersistence; + private final Integer syncJobMaxAttempts; + private final Supplier currentSecondsSupplier; + + public ConfigFetchActivityImpl(final ConfigRepository configRepository, + final JobPersistence jobPersistence, + @Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts, + @Named("currentSecondsSupplier") final Supplier currentSecondsSupplier) { + this.configRepository = configRepository; + this.jobPersistence = jobPersistence; + this.syncJobMaxAttempts = syncJobMaxAttempts; + this.currentSecondsSupplier = currentSecondsSupplier; + } @Override public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) { @@ -158,24 +161,4 @@ public GetMaxAttemptOutput getMaxAttempt() { return new GetMaxAttemptOutput(syncJobMaxAttempts); } - @VisibleForTesting - void setConfigRepository(final ConfigRepository configRepository) { - this.configRepository = configRepository; - } - - @VisibleForTesting - void setJobPersistence(final JobPersistence jobPersistence) { - this.jobPersistence = jobPersistence; - } - - @VisibleForTesting - void setSyncJobMaxAttempts(final Integer syncJobMaxAttempts) { - this.syncJobMaxAttempts = syncJobMaxAttempts; - } - - @VisibleForTesting - void setCurrentSecondsSupplier(final Supplier currentSecondsSupplier) { - this.currentSecondsSupplier = currentSecondsSupplier; - } - } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java index 753e8b32992c..66b26d81f4f9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConnectionDeletionActivityImpl.java @@ -10,20 +10,18 @@ import io.airbyte.workers.temporal.exception.RetryableException; import io.micronaut.context.annotation.Requires; import java.io.IOException; -import javax.inject.Inject; import javax.inject.Singleton; -import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; -@AllArgsConstructor -@NoArgsConstructor @Singleton @Requires(property = "airbyte.worker.plane", pattern = "(?i)^(?!data_plane).*") public class ConnectionDeletionActivityImpl implements ConnectionDeletionActivity { - @Inject - private ConnectionHelper connectionHelper; + private final ConnectionHelper connectionHelper; + + public ConnectionDeletionActivityImpl(final ConnectionHelper connectionHelper) { + this.connectionHelper = connectionHelper; + } @Override public void deleteConnection(final ConnectionDeletionInput input) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 5b41c150828a..6456e43433ba 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -19,20 +19,18 @@ import io.airbyte.workers.temporal.exception.RetryableException; import io.micronaut.context.annotation.Requires; import java.util.List; -import javax.inject.Inject; import javax.inject.Singleton; -import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; -@AllArgsConstructor -@NoArgsConstructor @Singleton @Requires(property = "airbyte.worker.plane", pattern = "(?i)^(?!data_plane).*") public class GenerateInputActivityImpl implements GenerateInputActivity { - @Inject - private JobPersistence jobPersistence; + private final JobPersistence jobPersistence; + + public GenerateInputActivityImpl(final JobPersistence jobPersistence) { + this.jobPersistence = jobPersistence; + } @Override public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index e5a0fd5b3988..864a45ebc2a3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -51,42 +51,50 @@ import java.util.List; import java.util.Optional; import java.util.UUID; -import javax.inject.Inject; import javax.inject.Singleton; -import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -@AllArgsConstructor -@NoArgsConstructor @Slf4j @Singleton @Requires(property = "airbyte.worker.plane", pattern = "(?i)^(?!data_plane).*") public class JobCreationAndStatusUpdateActivityImpl implements JobCreationAndStatusUpdateActivity { - @Inject - private SyncJobFactory jobFactory; - @Inject - private JobPersistence jobPersistence; - @Inject - private TemporalWorkerRunFactory temporalWorkerRunFactory; - @Inject - private WorkerEnvironment workerEnvironment; - @Inject - private LogConfigs logConfigs; - @Inject - private JobNotifier jobNotifier; - @Inject - private JobTracker jobTracker; - @Inject - private ConfigRepository configRepository; - @Inject - private JobCreator jobCreator; - @Inject - private StreamResetPersistence streamResetPersistence; - @Inject - private JobErrorReporter jobErrorReporter; + private final SyncJobFactory jobFactory; + private final JobPersistence jobPersistence; + private final TemporalWorkerRunFactory temporalWorkerRunFactory; + private final WorkerEnvironment workerEnvironment; + private final LogConfigs logConfigs; + private final JobNotifier jobNotifier; + private final JobTracker jobTracker; + private final ConfigRepository configRepository; + private final JobCreator jobCreator; + private final StreamResetPersistence streamResetPersistence; + private final JobErrorReporter jobErrorReporter; + + public JobCreationAndStatusUpdateActivityImpl(final SyncJobFactory jobFactory, + final JobPersistence jobPersistence, + final TemporalWorkerRunFactory temporalWorkerRunFactory, + final WorkerEnvironment workerEnvironment, + final LogConfigs logConfigs, + final JobNotifier jobNotifier, + final JobTracker jobTracker, + final ConfigRepository configRepository, + final JobCreator jobCreator, + final StreamResetPersistence streamResetPersistence, + final JobErrorReporter jobErrorReporter) { + this.jobFactory = jobFactory; + this.jobPersistence = jobPersistence; + this.temporalWorkerRunFactory = temporalWorkerRunFactory; + this.workerEnvironment = workerEnvironment; + this.logConfigs = logConfigs; + this.jobNotifier = jobNotifier; + this.jobTracker = jobTracker; + this.configRepository = configRepository; + this.jobCreator = jobCreator; + this.streamResetPersistence = streamResetPersistence; + this.jobErrorReporter = jobErrorReporter; + } @Override public JobCreationOutput createNewJob(final JobCreationInput input) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java index 1ec253ecacd5..3318e36afe75 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java @@ -13,26 +13,24 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.inject.Inject; import javax.inject.Singleton; -import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; /** * Implementation of the {@link RecordMetricActivity} that is managed by the application framework * and therefore has access to other singletons managed by the framework. */ -@AllArgsConstructor -@NoArgsConstructor @Slf4j @Singleton @Requires(property = "airbyte.worker.plane", pattern = "(?i)^(?!data_plane).*") public class RecordMetricActivityImpl implements RecordMetricActivity { - @Inject - private MetricClient metricClient; + private final MetricClient metricClient; + + public RecordMetricActivityImpl(final MetricClient metricClient) { + this.metricClient = metricClient; + } /** * Records a workflow counter for the specified metric. diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java index 6bb3a3babca5..8f58f9dff9fd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java @@ -5,14 +5,16 @@ package io.airbyte.workers.temporal.scheduling.activities; import io.airbyte.workers.temporal.sync.RouterService; -import javax.inject.Inject; import javax.inject.Singleton; @Singleton public class RouteToSyncTaskQueueActivityImpl implements RouteToSyncTaskQueueActivity { - @Inject - private RouterService routerService; + private final RouterService routerService; + + public RouteToSyncTaskQueueActivityImpl(final RouterService routerService) { + this.routerService = routerService; + } @Override public RouteToSyncTaskQueueOutput route(final RouteToSyncTaskQueueInput input) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java index effe398f2fb7..64bd3a3ee520 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/StreamResetActivityImpl.java @@ -6,22 +6,20 @@ import io.airbyte.workers.temporal.StreamResetRecordsHelper; import io.micronaut.context.annotation.Requires; -import javax.inject.Inject; import javax.inject.Singleton; -import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -@AllArgsConstructor -@NoArgsConstructor @Slf4j @Singleton @Requires(property = "airbyte.worker.plane", pattern = "(?i)^(?!data_plane).*") public class StreamResetActivityImpl implements StreamResetActivity { - @Inject - private StreamResetRecordsHelper streamResetRecordsHelper; + private final StreamResetRecordsHelper streamResetRecordsHelper; + + public StreamResetActivityImpl(final StreamResetRecordsHelper streamResetRecordsHelper) { + this.streamResetRecordsHelper = streamResetRecordsHelper; + } @Override public void deleteStreamResetRecordsForJob(final DeleteStreamResetRecordsForJobInput input) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java index dc0a3989c69c..ad42e0f5e78b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImpl.java @@ -4,7 +4,6 @@ package io.airbyte.workers.temporal.scheduling.activities; -import com.google.common.annotations.VisibleForTesting; import io.micronaut.context.annotation.Property; import io.micronaut.context.annotation.Requires; import java.time.Duration; @@ -21,18 +20,16 @@ pattern = "(?i)^(?!data_plane).*") public class WorkflowConfigActivityImpl implements WorkflowConfigActivity { - @Property(name = "airbyte.workflow.failure.restart-delay", - defaultValue = "600") - private Long workflowRestartDelaySeconds; + private final Long workflowRestartDelaySeconds; + + public WorkflowConfigActivityImpl(@Property(name = "airbyte.workflow.failure.restart-delay", + defaultValue = "600") final Long workflowRestartDelaySeconds) { + this.workflowRestartDelaySeconds = workflowRestartDelaySeconds; + } @Override public Duration getWorkflowRestartDelaySeconds() { return Duration.ofSeconds(workflowRestartDelaySeconds); } - @VisibleForTesting - void setWorkflowRestartDelaySeconds(final Long workflowRestartDelaySeconds) { - this.workflowRestartDelaySeconds = workflowRestartDelaySeconds; - } - } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index c85be7287610..ab2f69177767 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -26,7 +26,6 @@ import io.temporal.activity.ActivityExecutionContext; import java.nio.file.Path; import java.util.function.Supplier; -import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; @@ -35,23 +34,29 @@ pattern = "(?i)^(?!data_plane).*") public class SpecActivityImpl implements SpecActivity { - @Inject - @Named("specWorkerConfigs") - private WorkerConfigs workerConfigs; - @Inject - @Named("specProcessFactory") - private ProcessFactory processFactory; - @Inject - @Named("workspaceRoot") - private Path workspaceRoot; - @Inject - private WorkerEnvironment workerEnvironment; - @Inject - private LogConfigs logConfigs; - @Inject - private AirbyteApiClient airbyteApiClient; - @Value("${airbyte.version}") - private String airbyteVersion; + private final WorkerConfigs workerConfigs; + private final ProcessFactory processFactory; + private final Path workspaceRoot; + private final WorkerEnvironment workerEnvironment; + private final LogConfigs logConfigs; + private final AirbyteApiClient airbyteApiClient; + private final String airbyteVersion; + + public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerConfigs, + @Named("specProcessFactory") final ProcessFactory processFactory, + @Named("workspaceRoot") final Path workspaceRoot, + final WorkerEnvironment workerEnvironment, + final LogConfigs logConfigs, + final AirbyteApiClient airbyteApiClient, + @Value("${airbyte.version}") final String airbyteVersion) { + this.workerConfigs = workerConfigs; + this.processFactory = processFactory; + this.workspaceRoot = workspaceRoot; + this.workerEnvironment = workerEnvironment; + this.logConfigs = logConfigs; + this.airbyteApiClient = airbyteApiClient; + this.airbyteVersion = airbyteVersion; + } @Override public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index f56c5fb566af..332b2c879751 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -35,41 +35,50 @@ import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; -import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; @Singleton public class DbtTransformationActivityImpl implements DbtTransformationActivity { - @Inject - @Named("containerOrchestratorConfig") - private Optional containerOrchestratorConfig; - @Inject - @Named("defaultWorkerConfigs") - private WorkerConfigs workerConfigs; - @Inject - @Named("defaultProcessFactory") - private ProcessFactory processFactory; - @Inject - private SecretsHydrator secretsHydrator; - @Inject - @Named("workspaceRoot") - private Path workspaceRoot; - @Inject - private WorkerEnvironment workerEnvironment; - @Inject - private LogConfigs logConfigs; - @Value("${airbyte.version}") - private String airbyteVersion; - @Value("${micronaut.server.port}") - private Integer serverPort; - @Inject - private AirbyteConfigValidator airbyteConfigValidator; - @Inject - private TemporalUtils temporalUtils; - @Inject - private AirbyteApiClient airbyteApiClient; + private final Optional containerOrchestratorConfig; + private final WorkerConfigs workerConfigs; + private final ProcessFactory processFactory; + private final SecretsHydrator secretsHydrator; + private final Path workspaceRoot; + private final WorkerEnvironment workerEnvironment; + private final LogConfigs logConfigs; + private final String airbyteVersion; + private final Integer serverPort; + private final AirbyteConfigValidator airbyteConfigValidator; + private final TemporalUtils temporalUtils; + private final AirbyteApiClient airbyteApiClient; + + public DbtTransformationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, + @Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs, + @Named("defaultProcessFactory") final ProcessFactory processFactory, + final SecretsHydrator secretsHydrator, + @Named("workspaceRoot") final Path workspaceRoot, + final WorkerEnvironment workerEnvironment, + final LogConfigs logConfigs, + @Value("${airbyte.version}") final String airbyteVersion, + @Value("${micronaut.server.port}") final Integer serverPort, + final AirbyteConfigValidator airbyteConfigValidator, + final TemporalUtils temporalUtils, + final AirbyteApiClient airbyteApiClient) { + this.containerOrchestratorConfig = containerOrchestratorConfig; + this.workerConfigs = workerConfigs; + this.processFactory = processFactory; + this.secretsHydrator = secretsHydrator; + this.workspaceRoot = workspaceRoot; + this.workerEnvironment = workerEnvironment; + this.logConfigs = logConfigs; + this.airbyteVersion = airbyteVersion; + this.serverPort = serverPort; + this.airbyteConfigValidator = airbyteConfigValidator; + this.temporalUtils = temporalUtils; + this.airbyteApiClient = airbyteApiClient; + } @Override public Void run(final JobRunConfig jobRunConfig, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index a233089b30e0..bb086fc81952 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -37,44 +37,53 @@ import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; -import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; @Singleton public class NormalizationActivityImpl implements NormalizationActivity { - @Inject - @Named("containerOrchestratorConfig") - private Optional containerOrchestratorConfig; - @Inject - @Named("defaultWorkerConfigs") - private WorkerConfigs workerConfigs; - @Inject - @Named("defaultProcessFactory") - private ProcessFactory processFactory; - @Inject - private SecretsHydrator secretsHydrator; - @Inject - @Named("workspaceRoot") - private Path workspaceRoot; - @Inject - private WorkerEnvironment workerEnvironment; - @Inject - private LogConfigs logConfigs; - @Value("${airbyte.version}") - private String airbyteVersion; - @Value("${micronaut.server.port}") - private Integer serverPort; - @Inject - private AirbyteConfigValidator airbyteConfigValidator; - @Inject - private TemporalUtils temporalUtils; - @Inject - @Named("normalizationResourceRequirements") - private ResourceRequirements normalizationResourceRequirements; - @Inject - private AirbyteApiClient airbyteApiClient; + private final Optional containerOrchestratorConfig; + private final WorkerConfigs workerConfigs; + private final ProcessFactory processFactory; + private final SecretsHydrator secretsHydrator; + private final Path workspaceRoot; + private final WorkerEnvironment workerEnvironment; + private final LogConfigs logConfigs; + private final String airbyteVersion; + private final Integer serverPort; + private final AirbyteConfigValidator airbyteConfigValidator; + private final TemporalUtils temporalUtils; + private final ResourceRequirements normalizationResourceRequirements; + private final AirbyteApiClient airbyteApiClient; + + public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, + @Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs, + @Named("defaultProcessFactory") final ProcessFactory processFactory, + final SecretsHydrator secretsHydrator, + @Named("workspaceRoot") final Path workspaceRoot, + final WorkerEnvironment workerEnvironment, + final LogConfigs logConfigs, + @Value("${airbyte.version}") final String airbyteVersion, + @Value("${micronaut.server.port}") final Integer serverPort, + final AirbyteConfigValidator airbyteConfigValidator, + final TemporalUtils temporalUtils, + @Named("normalizationResourceRequirements") final ResourceRequirements normalizationResourceRequirements, + final AirbyteApiClient airbyteApiClient) { + this.containerOrchestratorConfig = containerOrchestratorConfig; + this.workerConfigs = workerConfigs; + this.processFactory = processFactory; + this.secretsHydrator = secretsHydrator; + this.workspaceRoot = workspaceRoot; + this.workerEnvironment = workerEnvironment; + this.logConfigs = logConfigs; + this.airbyteVersion = airbyteVersion; + this.serverPort = serverPort; + this.airbyteConfigValidator = airbyteConfigValidator; + this.temporalUtils = temporalUtils; + this.normalizationResourceRequirements = normalizationResourceRequirements; + this.airbyteApiClient = airbyteApiClient; + } @Override public NormalizationSummary normalize(final JobRunConfig jobRunConfig, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java index 1983d1e1ebdf..ba0b317d7875 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/PersistStateActivityImpl.java @@ -26,20 +26,18 @@ import java.util.List; import java.util.Optional; import java.util.UUID; -import javax.inject.Inject; import javax.inject.Singleton; -import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; -@AllArgsConstructor -@NoArgsConstructor @Singleton public class PersistStateActivityImpl implements PersistStateActivity { - @Inject - private AirbyteApiClient airbyteApiClient; - @Inject - private FeatureFlags featureFlags; + private final AirbyteApiClient airbyteApiClient; + private final FeatureFlags featureFlags; + + public PersistStateActivityImpl(final AirbyteApiClient airbyteApiClient, final FeatureFlags featureFlags) { + this.airbyteApiClient = airbyteApiClient; + this.featureFlags = featureFlags; + } @Override public boolean persist(final UUID connectionId, final StandardSyncOutput syncOutput, final ConfiguredAirbyteCatalog configuredCatalog) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 20e05c14f662..88e58a26b0d4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -52,7 +52,6 @@ import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; -import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; import org.slf4j.Logger; @@ -63,36 +62,47 @@ public class ReplicationActivityImpl implements ReplicationActivity { private static final Logger LOGGER = LoggerFactory.getLogger(ReplicationActivityImpl.class); - @Inject - @Named("containerOrchestratorConfig") - private Optional containerOrchestratorConfig; - @Inject - @Named("replicationWorkerConfigs") - private WorkerConfigs workerConfigs; - @Inject - @Named("replicationProcessFactory") - private ProcessFactory processFactory; - @Inject - private SecretsHydrator secretsHydrator; - @Inject - @Named("workspaceRoot") - private Path workspaceRoot; - @Inject - private WorkerEnvironment workerEnvironment; - @Inject - private LogConfigs logConfigs; - @Value("${airbyte.version}") - private String airbyteVersion; - @Inject - private FeatureFlags featureFlags; - @Value("${micronaut.server.port}") - private Integer serverPort; - @Inject - private AirbyteConfigValidator airbyteConfigValidator; - @Inject - private TemporalUtils temporalUtils; - @Inject - private AirbyteApiClient airbyteApiClient; + private final Optional containerOrchestratorConfig; + private final WorkerConfigs workerConfigs; + private final ProcessFactory processFactory; + private final SecretsHydrator secretsHydrator; + private final Path workspaceRoot; + private final WorkerEnvironment workerEnvironment; + private final LogConfigs logConfigs; + private final String airbyteVersion; + private final FeatureFlags featureFlags; + private final Integer serverPort; + private final AirbyteConfigValidator airbyteConfigValidator; + private final TemporalUtils temporalUtils; + private final AirbyteApiClient airbyteApiClient; + + public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, + @Named("replicationWorkerConfigs") final WorkerConfigs workerConfigs, + @Named("replicationProcessFactory") final ProcessFactory processFactory, + final SecretsHydrator secretsHydrator, + @Named("workspaceRoot") final Path workspaceRoot, + final WorkerEnvironment workerEnvironment, + final LogConfigs logConfigs, + @Value("${airbyte.version}") final String airbyteVersion, + final FeatureFlags featureFlags, + @Value("${micronaut.server.port}") final Integer serverPort, + final AirbyteConfigValidator airbyteConfigValidator, + final TemporalUtils temporalUtils, + final AirbyteApiClient airbyteApiClient) { + this.containerOrchestratorConfig = containerOrchestratorConfig; + this.workerConfigs = workerConfigs; + this.processFactory = processFactory; + this.secretsHydrator = secretsHydrator; + this.workspaceRoot = workspaceRoot; + this.workerEnvironment = workerEnvironment; + this.logConfigs = logConfigs; + this.airbyteVersion = airbyteVersion; + this.featureFlags = featureFlags; + this.serverPort = serverPort; + this.airbyteConfigValidator = airbyteConfigValidator; + this.temporalUtils = temporalUtils; + this.airbyteApiClient = airbyteApiClient; + } @Override public StandardSyncOutput replicate(final JobRunConfig jobRunConfig, diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityTest.java index dc0ed067ffdc..22e92b1d27ca 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/AutoDisableConnectionActivityTest.java @@ -83,13 +83,8 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio Mockito.when(mJobPersistence.getLastReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob)); Mockito.when(mJobPersistence.getFirstReplicationJob(CONNECTION_ID)).thenReturn(Optional.of(mJob)); - autoDisableActivity = new AutoDisableConnectionActivityImpl(); - autoDisableActivity.setConfigRepository(mConfigRepository); - autoDisableActivity.setJobPersistence(mJobPersistence); - autoDisableActivity.setFeatureFlags(mFeatureFlags); - autoDisableActivity.setMaxDaysOfOnlyFailedJobsBeforeConnectionDisable(MAX_DAYS_OF_ONLY_FAILED_JOBS); - autoDisableActivity.setMaxFailedJobsInARowBeforeConnectionDisable(MAX_FAILURE_JOBS_IN_A_ROW); - autoDisableActivity.setJobNotifier(mJobNotifier); + autoDisableActivity = new AutoDisableConnectionActivityImpl(mConfigRepository, mJobPersistence, mFeatureFlags, MAX_DAYS_OF_ONLY_FAILED_JOBS, + MAX_FAILURE_JOBS_IN_A_ROW, mJobNotifier); } // test warnings diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java index 61c1719f8ef4..69aa56cd40b3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java @@ -92,11 +92,8 @@ class ConfigFetchActivityTest { @BeforeEach void setup() { - configFetchActivity = new ConfigFetchActivityImpl(); - configFetchActivity.setConfigRepository(mConfigRepository); - configFetchActivity.setJobPersistence(mJobPersistence); - configFetchActivity.setSyncJobMaxAttempts(SYNC_JOB_MAX_ATTEMPTS); - configFetchActivity.setCurrentSecondsSupplier(() -> Instant.now().getEpochSecond()); + configFetchActivity = + new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> Instant.now().getEpochSecond()); } @Nested @@ -164,7 +161,7 @@ void testDeleted() throws IOException, JsonValidationException, ConfigNotFoundEx @Test @DisplayName("Test we will wait the required amount of time with legacy config") void testWait() throws IOException, JsonValidationException, ConfigNotFoundException { - configFetchActivity.setCurrentSecondsSupplier(() -> 60L * 3); + configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3); Mockito.when(mJob.getStartedAtInSecond()) .thenReturn(Optional.of(60L)); @@ -186,7 +183,7 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep @Test @DisplayName("Test we will not wait if we are late in the legacy schedule schema") void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotFoundException { - configFetchActivity.setCurrentSecondsSupplier(() -> 60L * 10); + configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10); Mockito.when(mJob.getStartedAtInSecond()) .thenReturn(Optional.of(60L)); @@ -241,7 +238,7 @@ void testBasicScheduleTypeFirstRun() throws IOException, JsonValidationException @Test @DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run") void testBasicScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException { - configFetchActivity.setCurrentSecondsSupplier(() -> 60L * 3); + configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3); Mockito.when(mJob.getStartedAtInSecond()) .thenReturn(Optional.of(60L)); @@ -269,7 +266,8 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException mockRightNow.set(Calendar.SECOND, 0); mockRightNow.set(Calendar.MILLISECOND, 0); - configFetchActivity.setCurrentSecondsSupplier(() -> mockRightNow.getTimeInMillis() / 1000L); + configFetchActivity = + new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L); Mockito.when(mJobPersistence.getLastReplicationJob(connectionId)) .thenReturn(Optional.of(mJob)); @@ -293,7 +291,9 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti mockRightNow.set(Calendar.MINUTE, 0); mockRightNow.set(Calendar.SECOND, 0); mockRightNow.set(Calendar.MILLISECOND, 0); - configFetchActivity.setCurrentSecondsSupplier(() -> mockRightNow.getTimeInMillis() / 1000L); + + configFetchActivity = + new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L); Mockito.when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L)); Mockito.when(mJobPersistence.getLastReplicationJob(connectionId)) @@ -317,7 +317,7 @@ class TestGetMaxAttempt { @DisplayName("Test that we are using to right service to get the maximum amount of attempt") void testGetMaxAttempt() { final int maxAttempt = 15031990; - configFetchActivity.setSyncJobMaxAttempts(maxAttempt); + configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond()); Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt()) .isEqualTo(maxAttempt); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImplTest.java index faece206fd3a..234c1ff57019 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/WorkflowConfigActivityImplTest.java @@ -15,8 +15,7 @@ class WorkflowConfigActivityImplTest { @Test void testFetchingWorkflowRestartDelayInSeconds() { final Long workflowRestartDelaySeconds = 30L; - final WorkflowConfigActivityImpl activity = new WorkflowConfigActivityImpl(); - activity.setWorkflowRestartDelaySeconds(workflowRestartDelaySeconds); + final WorkflowConfigActivityImpl activity = new WorkflowConfigActivityImpl(workflowRestartDelaySeconds); Assertions.assertEquals(workflowRestartDelaySeconds, activity.getWorkflowRestartDelaySeconds().getSeconds()); }