From f18d304a8652b4c89f6d56733d7298f747d720a2 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 2 Mar 2022 15:34:45 -0800 Subject: [PATCH] Add autoformat (#10808) --- .../io/airbyte/scheduler/models/JobTest.java | 5 +- .../ResourceRequirementsUtilsTest.java | 2 +- .../DefaultSyncJobFactoryTest.java | 2 +- .../job_tracker/TrackingMetadataTest.java | 2 +- .../process/AsyncOrchestratorPodProcess.java | 10 ++-- .../workers/process/KubePodProcess.java | 2 +- .../workers/temporal/TemporalClient.java | 10 ++-- .../workers/temporal/TemporalUtils.java | 8 ++-- .../ConnectionManagerWorkflowImpl.java | 48 +++++++++---------- .../workers/temporal/sync/LauncherWorker.java | 8 ++-- 10 files changed, 50 insertions(+), 47 deletions(-) diff --git a/airbyte-scheduler/models/src/test/java/io/airbyte/scheduler/models/JobTest.java b/airbyte-scheduler/models/src/test/java/io/airbyte/scheduler/models/JobTest.java index 5277d419f42d..0a0e5f3e8c70 100644 --- a/airbyte-scheduler/models/src/test/java/io/airbyte/scheduler/models/JobTest.java +++ b/airbyte-scheduler/models/src/test/java/io/airbyte/scheduler/models/JobTest.java @@ -4,7 +4,10 @@ package io.airbyte.scheduler.models; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; import java.util.List; diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtilsTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtilsTest.java index b60348d4b420..e1351fb8560d 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtilsTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtilsTest.java @@ -4,7 +4,7 @@ package io.airbyte.scheduler.persistence; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.JobTypeResourceLimit; diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java index fc800a211319..0cce6145107a 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java @@ -4,7 +4,7 @@ package io.airbyte.scheduler.persistence.job_factory; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadataTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadataTest.java index bfc21b47b6c0..21b8bd068d42 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadataTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadataTest.java @@ -4,7 +4,7 @@ package io.airbyte.scheduler.persistence.job_tracker; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java index 8068a26eead0..0a8db3769885 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java @@ -171,13 +171,13 @@ public boolean hasExited() { try { exitValue(); return true; - } catch (IllegalThreadStateException e) { + } catch (final IllegalThreadStateException e) { return false; } } @Override - public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException { + public boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException { // implementation copied from Process.java since this isn't a real Process long remainingNanos = unit.toNanos(timeout); if (hasExited()) @@ -185,7 +185,7 @@ public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException if (timeout <= 0) return false; - long deadline = System.nanoTime() + remainingNanos; + final long deadline = System.nanoTime() + remainingNanos; do { Thread.sleep(Math.min(TimeUnit.NANOSECONDS.toMillis(remainingNanos) + 1, 100)); if (hasExited()) @@ -198,7 +198,7 @@ public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException @Override public int waitFor() throws InterruptedException { - boolean exited = waitFor(10, TimeUnit.DAYS); + final boolean exited = waitFor(10, TimeUnit.DAYS); if (exited) { return exitValue(); @@ -317,7 +317,7 @@ public void create(final Map allLabels, .waitUntilCondition(p -> { return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null; }, 5, TimeUnit.MINUTES); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new RuntimeException(e); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 2ddba00dfa4a..339277295132 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -309,7 +309,7 @@ private static void waitForInitPodToRun(final KubernetesClient client, final Pod client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()); try { pod.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().size() != 0, 5, TimeUnit.MINUTES); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOGGER.error("Init pod not found after 5 minutes"); LOGGER.error("Pod search executed in namespace {} for pod name {} resulted in: {}", podDefinition.getMetadata().getNamespace(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 91cff4612cae..73d07732ee5c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -230,13 +230,13 @@ public void submitConnectionUpdaterAsync(final UUID connectionId) { do { Thread.sleep(DELAY_BETWEEN_QUERY_MS); } while (!isWorkflowRunning(getConnectionManagerName(connectionId))); - } catch (InterruptedException e) {} + } catch (final InterruptedException e) {} return null; }).get(60, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException e) { + } catch (final InterruptedException | ExecutionException e) { log.error("Failed to create a new connection manager workflow", e); - } catch (TimeoutException e) { + } catch (final TimeoutException e) { log.error("Can't create a new connection manager workflow due to timeout", e); } } @@ -471,11 +471,11 @@ TemporalResponse execute(final JobRunConfig jobRunConfig, final Supplier< */ public boolean isWorkflowRunning(final String workflowName) { try { - ConnectionManagerWorkflow connectionManagerWorkflow = getExistingWorkflow(ConnectionManagerWorkflow.class, workflowName); + final ConnectionManagerWorkflow connectionManagerWorkflow = getExistingWorkflow(ConnectionManagerWorkflow.class, workflowName); connectionManagerWorkflow.getState(); return true; - } catch (Exception e) { + } catch (final Exception e) { return false; } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index a23405cad924..f6c412b3fdda 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -75,7 +75,7 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo private static final String HUMAN_READABLE_WORKFLOW_EXECUTION_TTL = DurationFormatUtils.formatDurationWords(WORKFLOW_EXECUTION_TTL.toMillis(), true, true); - public static void configureTemporalNamespace(WorkflowServiceStubs temporalService) { + public static void configureTemporalNamespace(final WorkflowServiceStubs temporalService) { final var client = temporalService.blockingStub(); final var describeNamespaceRequest = DescribeNamespaceRequest.newBuilder().setNamespace(DEFAULT_NAMESPACE).build(); final var currentRetentionGrpcDuration = client.describeNamespace(describeNamespaceRequest).getConfig().getWorkflowExecutionRetentionTtl(); @@ -217,7 +217,7 @@ protected static Set getNamespaces(final WorkflowServiceStubs temporalSe * Runs the code within the supplier while heartbeating in the backgroud. Also makes sure to shut * down the heartbeat server after the fact. */ - public static T withBackgroundHeartbeat(Callable callable) { + public static T withBackgroundHeartbeat(final Callable callable) { final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); try { @@ -229,7 +229,7 @@ public static T withBackgroundHeartbeat(Callable callable) { } catch (final ActivityCompletionException e) { LOGGER.warn("Job either timed out or was cancelled."); throw new RuntimeException(e); - } catch (Exception e) { + } catch (final Exception e) { throw new RuntimeException(e); } finally { LOGGER.info("Stopping temporal heartbeating..."); @@ -258,7 +258,7 @@ public static T withBackgroundHeartbeat(final AtomicReference canc } catch (final ActivityCompletionException e) { LOGGER.warn("Job either timed out or was cancelled."); throw new RuntimeException(e); - } catch (Exception e) { + } catch (final Exception e) { throw new RuntimeException(e); } finally { LOGGER.info("Stopping temporal heartbeating..."); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 89786888ddb3..fb06441408c7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -108,7 +108,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr } } - private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput connectionUpdaterInput) { + private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterInput connectionUpdaterInput) { return Workflow.newCancellationScope(() -> { connectionId = connectionUpdaterInput.getConnectionId(); @@ -122,7 +122,7 @@ private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput co // resetConnection flag to the next run so that that run can execute the actual reset workflowState.setResetConnection(connectionUpdaterInput.isResetConnection()); - Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId()); + final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId()); Workflow.await(timeToWait, () -> skipScheduling() || connectionUpdaterInput.isFromFailure()); @@ -189,7 +189,7 @@ private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput co }); } - private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, StandardSyncOutput standardSyncOutput) { + private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) { workflowState.setSuccess(true); runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput( workflowInternalState.getJobId(), @@ -199,7 +199,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, resetNewConnectionInput(connectionUpdaterInput); } - private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, StandardSyncOutput standardSyncOutput) { + private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) { runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput( workflowInternalState.getJobId(), workflowInternalState.getAttemptId(), @@ -288,8 +288,8 @@ public WorkflowState getState() { @Override public JobInformation getJobInformation() { - Long jobId = workflowInternalState.getJobId(); - Integer attemptId = workflowInternalState.getAttemptId(); + final Long jobId = workflowInternalState.getJobId(); + final Integer attemptId = workflowInternalState.getAttemptId(); return new JobInformation( jobId == null ? NON_RUNNING_JOB_ID : jobId, attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId); @@ -297,8 +297,8 @@ public JobInformation getJobInformation() { @Override public QuarantinedInformation getQuarantinedInformation() { - Long jobId = workflowInternalState.getJobId(); - Integer attemptId = workflowInternalState.getAttemptId(); + final Long jobId = workflowInternalState.getJobId(); + final Integer attemptId = workflowInternalState.getAttemptId(); return new QuarantinedInformation( connectionId, jobId == null ? NON_RUNNING_JOB_ID : jobId, @@ -335,10 +335,10 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn * * We aimed to use this method for call of the temporal activity. */ - private OUTPUT runMandatoryActivityWithOutput(Function mapper, INPUT input) { + private OUTPUT runMandatoryActivityWithOutput(final Function mapper, final INPUT input) { try { return mapper.apply(input); - } catch (Exception e) { + } catch (final Exception e) { log.error("Failed to run an activity for the connection " + connectionId, e); workflowState.setQuarantined(true); workflowState.setRetryFailedActivity(false); @@ -353,7 +353,7 @@ private OUTPUT runMandatoryActivityWithOutput(Function void runMandatoryActivity(Consumer consumer, INPUT input) { + private void runMandatoryActivity(final Consumer consumer, final INPUT input) { runMandatoryActivityWithOutput((inputInternal) -> { consumer.accept(inputInternal); return null; @@ -369,7 +369,7 @@ private void runMandatoryActivity(Consumer consumer, INPUT input) * * Wait time is infinite If the workflow is manual or disabled since we never want to schedule this. */ - private Duration getTimeToWait(UUID connectionId) { + private Duration getTimeToWait(final UUID connectionId) { // Scheduling final ScheduleRetrieverInput scheduleRetrieverInput = new ScheduleRetrieverInput(connectionId); @@ -402,7 +402,7 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu /** * Create a new attempt for a given jobId */ - private Integer createAttemptId(long jobId) { + private Integer createAttemptId(final long jobId) { final AttemptCreationOutput attemptCreationOutput = runMandatoryActivityWithOutput( jobCreationAndStatusUpdateActivity::createNewAttempt, @@ -417,8 +417,8 @@ private Integer createAttemptId(long jobId) { * job and will generate a different output if the job is a sync or a reset. */ private GeneratedJobInput getJobInput() { - Long jobId = workflowInternalState.getJobId(); - Integer attemptId = workflowInternalState.getAttemptId(); + final Long jobId = workflowInternalState.getJobId(); + final Integer attemptId = workflowInternalState.getAttemptId(); final SyncInput getSyncInputActivitySyncInput = new SyncInput( attemptId, jobId, @@ -456,8 +456,8 @@ private void reportJobStarting() { * since the latter is a long running workflow, in the future, using a different Node pool would * make sense. >>>>>>> 76e969f2e5e1b869648142c3565b7375b1892999 */ - private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) { - int taskQueueChangeVersion = + private StandardSyncOutput runChildWorkflow(final GeneratedJobInput jobInputs) { + final int taskQueueChangeVersion = Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION); String taskQueue = TemporalJobType.SYNC.name(); @@ -487,8 +487,8 @@ private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) { * * @return True if the job failed, false otherwise */ - private boolean getFailStatus(StandardSyncOutput standardSyncOutput) { - StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary(); + private boolean getFailStatus(final StandardSyncOutput standardSyncOutput) { + final StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary(); if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) { workflowInternalState.getFailures().addAll(standardSyncOutput.getFailures()); @@ -513,12 +513,12 @@ private void deleteConnectionBeforeTerminatingTheWorkflow() { /** * Set a job as cancel and continue to the next job if and continue as a reset if needed */ - private void reportCancelledAndContinueWith(boolean isReset, ConnectionUpdaterInput connectionUpdaterInput) { + private void reportCancelledAndContinueWith(final boolean isReset, final ConnectionUpdaterInput connectionUpdaterInput) { workflowState.setContinueAsReset(isReset); - Long jobId = workflowInternalState.getJobId(); - Integer attemptId = workflowInternalState.getAttemptId(); - Set failures = workflowInternalState.getFailures(); - Boolean partialSuccess = workflowInternalState.getPartialSuccess(); + final Long jobId = workflowInternalState.getJobId(); + final Integer attemptId = workflowInternalState.getAttemptId(); + final Set failures = workflowInternalState.getFailures(); + final Boolean partialSuccess = workflowInternalState.getPartialSuccess(); runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled, new JobCancelledInput( jobId, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java index f4f0d815325c..16770e4873f6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java @@ -77,7 +77,7 @@ public LauncherWorker(final UUID connectionId, } @Override - public OUTPUT run(INPUT input, Path jobRoot) throws WorkerException { + public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException { final AtomicBoolean isCanceled = new AtomicBoolean(false); final AtomicReference cancellationCallback = new AtomicReference<>(null); return TemporalUtils.withBackgroundHeartbeat(cancellationCallback, () -> { @@ -139,7 +139,7 @@ public OUTPUT run(INPUT input, Path jobRoot) throws WorkerException { resourceRequirements, fileMap, portMap); - } catch (KubernetesClientException e) { + } catch (final KubernetesClientException e) { throw new WorkerException( "Failed to create pod " + podName + ", pre-existing pod exists which didn't advance out of the NOT_STARTED state."); } @@ -159,12 +159,12 @@ public OUTPUT run(INPUT input, Path jobRoot) throws WorkerException { final var output = process.getOutput(); return output.map(s -> Jsons.deserialize(s, outputClass)).orElse(null); - } catch (Exception e) { + } catch (final Exception e) { if (cancelled.get()) { try { log.info("Destroying process due to cancellation."); process.destroy(); - } catch (Exception e2) { + } catch (final Exception e2) { log.error("Failed to destroy process on cancellation.", e2); } throw new WorkerException("Launcher " + application + " was cancelled.", e);