Skip to content

Commit

Permalink
Bmoric/restore temporal cleaning cron (airbytehq#17258)
Browse files Browse the repository at this point in the history
* Revert "Revert "Bmoric/temporal cleaning cron (airbytehq#16414)" (airbytehq#17192)"

This reverts commit 7516679.

* Generate an artifact

* Change missing property from value to property
  • Loading branch information
benmoriceau authored and jhammarstedt committed Oct 31, 2022
1 parent 6eaa195 commit eacd834
Show file tree
Hide file tree
Showing 64 changed files with 763 additions and 159 deletions.
44 changes: 44 additions & 0 deletions airbyte-commons-temporal/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import org.jsonschema2pojo.SourceType

plugins {
id "java-library"
id 'com.github.eirnym.js2p' version '1.0'
}

dependencies {
annotationProcessor platform(libs.micronaut.bom)
annotationProcessor libs.bundles.micronaut.annotation.processor

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut

implementation 'io.temporal:temporal-sdk:1.8.1'
implementation 'io.temporal:temporal-serviceclient:1.8.1'

testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor


implementation project(':airbyte-config:config-models')
implementation project(':airbyte-metrics:metrics-lib')

testImplementation 'io.temporal:temporal-testing:1.8.1'
// Needed to be able to mock final class
testImplementation 'org.mockito:mockito-inline:4.7.0'
}

jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
source = files("${sourceSets.main.output.resourcesDir}/workers_models")
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
removeOldOutput = true

targetPackage = 'io.airbyte.persistence.job.models'

useLongIntegers = true
generateBuilders = true
includeConstructors = false
includeSetters = true
}

Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import io.temporal.activity.ActivityExecutionContext;
import io.temporal.client.ActivityCompletionException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal;

import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput;
import io.temporal.client.WorkflowClient;
import jakarta.inject.Singleton;
import java.util.UUID;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@NoArgsConstructor
@Singleton
@Slf4j
public class ConnectionManagerUtils {

void safeTerminateWorkflow(final WorkflowClient client, final String workflowId, final String reason) {
log.info("Attempting to terminate existing workflow for workflowId {}.", workflowId);
try {
client.newUntypedWorkflowStub(workflowId).terminate(reason);
} catch (final Exception e) {
log.warn(
"Could not terminate temporal workflow due to the following error; "
+ "this may be because there is currently no running workflow for this connection.",
e);
}
}

public void safeTerminateWorkflow(final WorkflowClient client, final UUID connectionId, final String reason) {
safeTerminateWorkflow(client, getConnectionManagerName(connectionId), reason);
}

public String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}

public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) {
final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput input = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId);
WorkflowClient.start(connectionManagerWorkflow::run, input);

return connectionManagerWorkflow;
}

public ConnectionManagerWorkflow newConnectionManagerWorkflowStub(final WorkflowClient client, final UUID connectionId) {
return client.newWorkflowStub(ConnectionManagerWorkflow.class,
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId)));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
import io.micronaut.context.annotation.Requires;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@AllArgsConstructor
@NoArgsConstructor
@Slf4j
@Singleton
@Requires(property = "airbyte.worker.plane",
notEquals = "DATA_PLANE")
public class TemporalClient {

@Inject
private WorkflowClient client;
@Inject
private WorkflowServiceStubs service;
@Inject
private ConnectionManagerUtils connectionManagerUtils;

private final Set<String> workflowNames = new HashSet<>();

public void restartClosedWorkflowByStatus(final WorkflowExecutionStatus executionStatus) {
final Set<UUID> workflowExecutionInfos = fetchClosedWorkflowsByStatus(executionStatus);

final Set<UUID> nonRunningWorkflow = filterOutRunningWorkspaceId(workflowExecutionInfos);

nonRunningWorkflow.forEach(connectionId -> {
connectionManagerUtils.safeTerminateWorkflow(client, connectionId, "Terminating workflow in "
+ "unreachable state before starting a new workflow for this connection");
connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId);
});
}

Set<UUID> fetchClosedWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) {
ByteString token;
ListClosedWorkflowExecutionsRequest workflowExecutionsRequest =
ListClosedWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.build();

final Set<UUID> workflowExecutionInfos = new HashSet<>();
do {
final ListClosedWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listClosedWorkflowExecutions(workflowExecutionsRequest);
final WorkflowType connectionManagerWorkflowType = WorkflowType.newBuilder().setName(ConnectionManagerWorkflow.class.getSimpleName()).build();
workflowExecutionInfos.addAll(listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.filter(workflowExecutionInfo -> workflowExecutionInfo.getType() == connectionManagerWorkflowType ||
workflowExecutionInfo.getStatus() == executionStatus)
.flatMap((workflowExecutionInfo -> extractConnectionIdFromWorkflowId(workflowExecutionInfo.getExecution().getWorkflowId()).stream()))
.collect(Collectors.toSet()));
token = listOpenWorkflowExecutionsRequest.getNextPageToken();

workflowExecutionsRequest =
ListClosedWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.build();

} while (token != null && token.size() > 0);

return workflowExecutionInfos;
}

@VisibleForTesting
Set<UUID> filterOutRunningWorkspaceId(final Set<UUID> workflowIds) {
refreshRunningWorkflow();

final Set<UUID> runningWorkflowByUUID =
workflowNames.stream().flatMap(name -> extractConnectionIdFromWorkflowId(name).stream()).collect(Collectors.toSet());

return workflowIds.stream().filter(workflowId -> !runningWorkflowByUUID.contains(workflowId)).collect(Collectors.toSet());
}

@VisibleForTesting
void refreshRunningWorkflow() {
workflowNames.clear();
ByteString token;
ListOpenWorkflowExecutionsRequest openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.build();
do {
final ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listOpenWorkflowExecutions(openWorkflowExecutionsRequest);
final Set<String> workflowExecutionInfos = listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.map((workflowExecutionInfo -> workflowExecutionInfo.getExecution().getWorkflowId()))
.collect(Collectors.toSet());
workflowNames.addAll(workflowExecutionInfos);
token = listOpenWorkflowExecutionsRequest.getNextPageToken();

openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.build();

} while (token != null && token.size() > 0);
}

Optional<UUID> extractConnectionIdFromWorkflowId(final String workflowId) {
if (!workflowId.startsWith("connection_manager_")) {
return Optional.empty();
}
return Optional.ofNullable(StringUtils.removeStart(workflowId, "connection_manager_"))
.map(
stringUUID -> UUID.fromString(stringUUID));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

public enum TemporalJobType {
GET_SPEC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import com.uber.m3.tally.RootScopeBuilder;
import com.uber.m3.tally.Scope;
Expand Down Expand Up @@ -74,7 +74,8 @@ public TemporalUtils(@Property(name = "temporal.cloud.client.cert") final String
@Value("${temporal.cloud.host}") final String temporalCloudHost,
@Value("${temporal.cloud.namespace}") final String temporalCloudNamespace,
@Value("${temporal.host}") final String temporalHost,
@Value("${temporal.retention}") final Integer temporalRetentionInDays) {
@Property(name = "${temporal.retention}",
defaultValue = "30") final Integer temporalRetentionInDays) {
this.temporalCloudClientCert = temporalCloudClientCert;
this.temporalCloudClientKey = temporalCloudClientKey;
this.temporalCloudEnabled = temporalCloudEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal.config;

import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.TemporalWorkflowUtils;
import io.micronaut.context.annotation.Factory;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import jakarta.inject.Singleton;

/**
* Micronaut bean factory for Temporal-related singletons.
*/
@Factory
public class TemporalBeanFactory {

@Singleton
public WorkflowServiceStubs temporalService(final TemporalUtils temporalUtils) {
return temporalUtils.createTemporalService();
}

@Singleton
public WorkflowClient workflowClient(
final TemporalUtils temporalUtils,
final WorkflowServiceStubs temporalService) {
return TemporalWorkflowUtils.createWorkflowClient(temporalService, temporalUtils.getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;
package io.airbyte.commons.temporal.scheduling;

import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;
package io.airbyte.commons.temporal.scheduling;

import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -39,7 +39,7 @@ public class ConnectionUpdaterInput {
private WorkflowState workflowState;
private boolean resetConnection;
@Builder.Default
private boolean fromJobResetFailure = false;
private final boolean fromJobResetFailure = false;

@Builder.Default
private boolean skipScheduling = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state;
package io.airbyte.commons.temporal.scheduling.state;

import io.airbyte.config.FailureReason;
import java.util.HashSet;
Expand All @@ -20,7 +20,7 @@ public class WorkflowInternalState {
private Integer attemptNumber = null;

// StandardSyncOutput standardSyncOutput = null;
private final Set<FailureReason> failures = new HashSet<>();
private Set<FailureReason> failures = new HashSet<>();
private Boolean partialSuccess = null;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state;
package io.airbyte.commons.temporal.scheduling.state;

import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener;
import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent;
import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField;
import java.util.UUID;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state.listener;
package io.airbyte.commons.temporal.scheduling.state.listener;

import java.util.LinkedList;
import java.util.Queue;
Expand Down
Loading

0 comments on commit eacd834

Please sign in to comment.