Skip to content

Commit

Permalink
Add autoformat (#10808)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau authored Mar 2, 2022
1 parent 5aecbc3 commit f18d304
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.airbyte.scheduler.models;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Arrays;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.scheduler.persistence;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.JobTypeResourceLimit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.scheduler.persistence.job_factory;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.scheduler.persistence.job_tracker;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,21 +171,21 @@ public boolean hasExited() {
try {
exitValue();
return true;
} catch (IllegalThreadStateException e) {
} catch (final IllegalThreadStateException e) {
return false;
}
}

@Override
public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException {
public boolean waitFor(final long timeout, final TimeUnit unit) throws InterruptedException {
// implementation copied from Process.java since this isn't a real Process
long remainingNanos = unit.toNanos(timeout);
if (hasExited())
return true;
if (timeout <= 0)
return false;

long deadline = System.nanoTime() + remainingNanos;
final long deadline = System.nanoTime() + remainingNanos;
do {
Thread.sleep(Math.min(TimeUnit.NANOSECONDS.toMillis(remainingNanos) + 1, 100));
if (hasExited())
Expand All @@ -198,7 +198,7 @@ public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException

@Override
public int waitFor() throws InterruptedException {
boolean exited = waitFor(10, TimeUnit.DAYS);
final boolean exited = waitFor(10, TimeUnit.DAYS);

if (exited) {
return exitValue();
Expand Down Expand Up @@ -317,7 +317,7 @@ public void create(final Map<String, String> allLabels,
.waitUntilCondition(p -> {
return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null;
}, 5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private static void waitForInitPodToRun(final KubernetesClient client, final Pod
client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName());
try {
pod.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().size() != 0, 5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
LOGGER.error("Init pod not found after 5 minutes");
LOGGER.error("Pod search executed in namespace {} for pod name {} resulted in: {}",
podDefinition.getMetadata().getNamespace(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,13 @@ public void submitConnectionUpdaterAsync(final UUID connectionId) {
do {
Thread.sleep(DELAY_BETWEEN_QUERY_MS);
} while (!isWorkflowRunning(getConnectionManagerName(connectionId)));
} catch (InterruptedException e) {}
} catch (final InterruptedException e) {}

return null;
}).get(60, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
} catch (final InterruptedException | ExecutionException e) {
log.error("Failed to create a new connection manager workflow", e);
} catch (TimeoutException e) {
} catch (final TimeoutException e) {
log.error("Can't create a new connection manager workflow due to timeout", e);
}
}
Expand Down Expand Up @@ -471,11 +471,11 @@ <T> TemporalResponse<T> execute(final JobRunConfig jobRunConfig, final Supplier<
*/
public boolean isWorkflowRunning(final String workflowName) {
try {
ConnectionManagerWorkflow connectionManagerWorkflow = getExistingWorkflow(ConnectionManagerWorkflow.class, workflowName);
final ConnectionManagerWorkflow connectionManagerWorkflow = getExistingWorkflow(ConnectionManagerWorkflow.class, workflowName);
connectionManagerWorkflow.getState();

return true;
} catch (Exception e) {
} catch (final Exception e) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo
private static final String HUMAN_READABLE_WORKFLOW_EXECUTION_TTL =
DurationFormatUtils.formatDurationWords(WORKFLOW_EXECUTION_TTL.toMillis(), true, true);

public static void configureTemporalNamespace(WorkflowServiceStubs temporalService) {
public static void configureTemporalNamespace(final WorkflowServiceStubs temporalService) {
final var client = temporalService.blockingStub();
final var describeNamespaceRequest = DescribeNamespaceRequest.newBuilder().setNamespace(DEFAULT_NAMESPACE).build();
final var currentRetentionGrpcDuration = client.describeNamespace(describeNamespaceRequest).getConfig().getWorkflowExecutionRetentionTtl();
Expand Down Expand Up @@ -217,7 +217,7 @@ protected static Set<String> getNamespaces(final WorkflowServiceStubs temporalSe
* Runs the code within the supplier while heartbeating in the backgroud. Also makes sure to shut
* down the heartbeat server after the fact.
*/
public static <T> T withBackgroundHeartbeat(Callable<T> callable) {
public static <T> T withBackgroundHeartbeat(final Callable<T> callable) {
final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

try {
Expand All @@ -229,7 +229,7 @@ public static <T> T withBackgroundHeartbeat(Callable<T> callable) {
} catch (final ActivityCompletionException e) {
LOGGER.warn("Job either timed out or was cancelled.");
throw new RuntimeException(e);
} catch (Exception e) {
} catch (final Exception e) {
throw new RuntimeException(e);
} finally {
LOGGER.info("Stopping temporal heartbeating...");
Expand Down Expand Up @@ -258,7 +258,7 @@ public static <T> T withBackgroundHeartbeat(final AtomicReference<Runnable> canc
} catch (final ActivityCompletionException e) {
LOGGER.warn("Job either timed out or was cancelled.");
throw new RuntimeException(e);
} catch (Exception e) {
} catch (final Exception e) {
throw new RuntimeException(e);
} finally {
LOGGER.info("Stopping temporal heartbeating...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr
}
}

private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput connectionUpdaterInput) {
private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterInput connectionUpdaterInput) {
return Workflow.newCancellationScope(() -> {
connectionId = connectionUpdaterInput.getConnectionId();

Expand All @@ -122,7 +122,7 @@ private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput co
// resetConnection flag to the next run so that that run can execute the actual reset
workflowState.setResetConnection(connectionUpdaterInput.isResetConnection());

Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId());
final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId());

Workflow.await(timeToWait,
() -> skipScheduling() || connectionUpdaterInput.isFromFailure());
Expand Down Expand Up @@ -189,7 +189,7 @@ private CancellationScope generateSyncWorkflowRunnable(ConnectionUpdaterInput co
});
}

private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, StandardSyncOutput standardSyncOutput) {
private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
workflowState.setSuccess(true);
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
Expand All @@ -199,7 +199,7 @@ private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput,
resetNewConnectionInput(connectionUpdaterInput);
}

private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, StandardSyncOutput standardSyncOutput) {
private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId(),
Expand Down Expand Up @@ -288,17 +288,17 @@ public WorkflowState getState() {

@Override
public JobInformation getJobInformation() {
Long jobId = workflowInternalState.getJobId();
Integer attemptId = workflowInternalState.getAttemptId();
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
return new JobInformation(
jobId == null ? NON_RUNNING_JOB_ID : jobId,
attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId);
}

@Override
public QuarantinedInformation getQuarantinedInformation() {
Long jobId = workflowInternalState.getJobId();
Integer attemptId = workflowInternalState.getAttemptId();
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
return new QuarantinedInformation(
connectionId,
jobId == null ? NON_RUNNING_JOB_ID : jobId,
Expand Down Expand Up @@ -335,10 +335,10 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn
*
* We aimed to use this method for call of the temporal activity.
*/
private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(Function<INPUT, OUTPUT> mapper, INPUT input) {
private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(final Function<INPUT, OUTPUT> mapper, final INPUT input) {
try {
return mapper.apply(input);
} catch (Exception e) {
} catch (final Exception e) {
log.error("Failed to run an activity for the connection " + connectionId, e);
workflowState.setQuarantined(true);
workflowState.setRetryFailedActivity(false);
Expand All @@ -353,7 +353,7 @@ private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(Function<INPUT, OU
/**
* Similar to runMandatoryActivityWithOutput but for methods that don't return
*/
private <INPUT> void runMandatoryActivity(Consumer<INPUT> consumer, INPUT input) {
private <INPUT> void runMandatoryActivity(final Consumer<INPUT> consumer, final INPUT input) {
runMandatoryActivityWithOutput((inputInternal) -> {
consumer.accept(inputInternal);
return null;
Expand All @@ -369,7 +369,7 @@ private <INPUT> void runMandatoryActivity(Consumer<INPUT> consumer, INPUT input)
*
* Wait time is infinite If the workflow is manual or disabled since we never want to schedule this.
*/
private Duration getTimeToWait(UUID connectionId) {
private Duration getTimeToWait(final UUID connectionId) {
// Scheduling
final ScheduleRetrieverInput scheduleRetrieverInput = new ScheduleRetrieverInput(connectionId);

Expand Down Expand Up @@ -402,7 +402,7 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu
/**
* Create a new attempt for a given jobId
*/
private Integer createAttemptId(long jobId) {
private Integer createAttemptId(final long jobId) {
final AttemptCreationOutput attemptCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttempt,
Expand All @@ -417,8 +417,8 @@ private Integer createAttemptId(long jobId) {
* job and will generate a different output if the job is a sync or a reset.
*/
private GeneratedJobInput getJobInput() {
Long jobId = workflowInternalState.getJobId();
Integer attemptId = workflowInternalState.getAttemptId();
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptId,
jobId,
Expand Down Expand Up @@ -456,8 +456,8 @@ private void reportJobStarting() {
* since the latter is a long running workflow, in the future, using a different Node pool would
* make sense. >>>>>>> 76e969f2e5e1b869648142c3565b7375b1892999
*/
private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) {
int taskQueueChangeVersion =
private StandardSyncOutput runChildWorkflow(final GeneratedJobInput jobInputs) {
final int taskQueueChangeVersion =
Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION);

String taskQueue = TemporalJobType.SYNC.name();
Expand Down Expand Up @@ -487,8 +487,8 @@ private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) {
*
* @return True if the job failed, false otherwise
*/
private boolean getFailStatus(StandardSyncOutput standardSyncOutput) {
StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary();
private boolean getFailStatus(final StandardSyncOutput standardSyncOutput) {
final StandardSyncSummary standardSyncSummary = standardSyncOutput.getStandardSyncSummary();

if (standardSyncSummary != null && standardSyncSummary.getStatus() == ReplicationStatus.FAILED) {
workflowInternalState.getFailures().addAll(standardSyncOutput.getFailures());
Expand All @@ -513,12 +513,12 @@ private void deleteConnectionBeforeTerminatingTheWorkflow() {
/**
* Set a job as cancel and continue to the next job if and continue as a reset if needed
*/
private void reportCancelledAndContinueWith(boolean isReset, ConnectionUpdaterInput connectionUpdaterInput) {
private void reportCancelledAndContinueWith(final boolean isReset, final ConnectionUpdaterInput connectionUpdaterInput) {
workflowState.setContinueAsReset(isReset);
Long jobId = workflowInternalState.getJobId();
Integer attemptId = workflowInternalState.getAttemptId();
Set<FailureReason> failures = workflowInternalState.getFailures();
Boolean partialSuccess = workflowInternalState.getPartialSuccess();
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final Set<FailureReason> failures = workflowInternalState.getFailures();
final Boolean partialSuccess = workflowInternalState.getPartialSuccess();
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public LauncherWorker(final UUID connectionId,
}

@Override
public OUTPUT run(INPUT input, Path jobRoot) throws WorkerException {
public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException {
final AtomicBoolean isCanceled = new AtomicBoolean(false);
final AtomicReference<Runnable> cancellationCallback = new AtomicReference<>(null);
return TemporalUtils.withBackgroundHeartbeat(cancellationCallback, () -> {
Expand Down Expand Up @@ -139,7 +139,7 @@ public OUTPUT run(INPUT input, Path jobRoot) throws WorkerException {
resourceRequirements,
fileMap,
portMap);
} catch (KubernetesClientException e) {
} catch (final KubernetesClientException e) {
throw new WorkerException(
"Failed to create pod " + podName + ", pre-existing pod exists which didn't advance out of the NOT_STARTED state.");
}
Expand All @@ -159,12 +159,12 @@ public OUTPUT run(INPUT input, Path jobRoot) throws WorkerException {
final var output = process.getOutput();

return output.map(s -> Jsons.deserialize(s, outputClass)).orElse(null);
} catch (Exception e) {
} catch (final Exception e) {
if (cancelled.get()) {
try {
log.info("Destroying process due to cancellation.");
process.destroy();
} catch (Exception e2) {
} catch (final Exception e2) {
log.error("Failed to destroy process on cancellation.", e2);
}
throw new WorkerException("Launcher " + application + " was cancelled.", e);
Expand Down

0 comments on commit f18d304

Please sign in to comment.