Skip to content

Commit

Permalink
Bmoric/temporal cleaning cron (airbytehq#16414)
Browse files Browse the repository at this point in the history
* WIP Convert airbyte-workers to Micronaut framework

* Rebase cleanup

* Fix broken tests

* Simplify code

* Support control vs data plane configuration

* make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit tests

* Formatting

* Pairing on Worker Micronaut (airbytehq#16364)

* add RouteToSyncTaskQueue activity

* use new route activity in connection manager workflow

* format

* call router service for task queue

* Revert temporal proxy changes

* Formatting

* Fix default value

* register new route activity in test

* fix SyncWorkflowTest now that it isn't doing any routing

* Update dependencies

* More dependency updates

* Update dependencies

* WIP Convert airbyte-workers to Micronaut framework

* Rebase cleanup

* Fix broken tests

* Simplify code

* Support control vs data plane configuration

* make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit tests

* Formatting

* Pairing on Worker Micronaut (airbytehq#16364)

* add RouteToSyncTaskQueue activity

* use new route activity in connection manager workflow

* format

* call router service for task queue

* Revert temporal proxy changes

* Formatting

* Fix default value

* register new route activity in test

* fix SyncWorkflowTest now that it isn't doing any routing

* Update dependencies

* More dependency updates

* Update dependencies

* WIP Convert airbyte-workers to Micronaut framework

* Rebase cleanup

* Fix broken tests

* Simplify code

* Support control vs data plane configuration

* make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit tests

* Formatting

* Pairing on Worker Micronaut (airbytehq#16364)

* add RouteToSyncTaskQueue activity

* use new route activity in connection manager workflow

* format

* call router service for task queue

* Revert temporal proxy changes

* Formatting

* Fix default value

* register new route activity in test

* fix SyncWorkflowTest now that it isn't doing any routing

* Update dependencies

* More dependency updates

* Update dependencies

* Add the injected temporal client to airbyte-cron

* Update cron to make it work

* Remove useless Bean factory

* PR comments

* WIP Convert airbyte-workers to Micronaut framework

* Rebase cleanup

* Fix broken tests

* Simplify code

* Support control vs data plane configuration

* make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit tests

* Formatting

* Pairing on Worker Micronaut (airbytehq#16364)

* add RouteToSyncTaskQueue activity

* use new route activity in connection manager workflow

* format

* call router service for task queue

* Revert temporal proxy changes

* Formatting

* Fix default value

* register new route activity in test

* fix SyncWorkflowTest now that it isn't doing any routing

* Update dependencies

* More dependency updates

* Update dependencies

* Improve conditional bean check

* Match existing Optional functionality

* Add notEquals check

* Add missing env var to Helm chart

* Fix typo

* Mark LogConfigs as Singleton

* WIP Convert airbyte-workers to Micronaut framework

* Rebase cleanup

* Fix broken tests

* Simplify code

* Support control vs data plane configuration

* make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit tests

* Formatting

* Pairing on Worker Micronaut (airbytehq#16364)

* add RouteToSyncTaskQueue activity

* use new route activity in connection manager workflow

* format

* call router service for task queue

* Revert temporal proxy changes

* Formatting

* Fix default value

* register new route activity in test

* fix SyncWorkflowTest now that it isn't doing any routing

* Update dependencies

* More dependency updates

* Update dependencies

* Improve conditional bean check

* Match existing Optional functionality

* Add notEquals check

* Add missing env var to Helm chart

* Fix typo

* Mark LogConfigs as Singleton

* Env vars for log/state storage type

* Remove use of Optional in bean declarations

* Fix typo in config property name

* Support Temporal Cloud namespace

* Change to @value

* Use correct value for conditional check

* Upgrade Micronaut

* Fix merge conflict

* Formatting

* Add missing env var

* Use sync task queue environment variable

* Handle sync task queue as set

* format and force http

* Handle case where sync task queue is empty

* Add correct path to config property

* Remove unused import

* Remove conflict

* Remove unused parameter

* Formatting

* Use pattern for condition process factory beans

* Cleanup

* PR feedback

* Revert hack for testing

* Fix temporal restart by status (airbytehq#16447)

* Update application.yml

* Re add worker dep

* Add missing env var

* PR comments

* Bmoric/move temporal client (airbytehq#16778)

* tmp

* tmp

* View diff

* Move part of the temporal client

* tmp

* copy Temporal Utils test

* Uniq Temporal Utils

* Uniq Temporal Workflow Utils

* Move CancellationHandler

* Move commons-temporal to being shared between platform, connector and CLI

* Rm worker dependency from cron

* Fix build

* Update with right value for cron.

* Fix dep conflict

Co-authored-by: jdpgrailsdev <jpearlin1@gmail.com>
Co-authored-by: pmossman <parker@airbyte.io>
Co-authored-by: Jonathan Pearlin <jonathan@airbyte.io>
  • Loading branch information
4 people authored and jhammarstedt committed Oct 31, 2022
1 parent b483e24 commit 5e4c282
Show file tree
Hide file tree
Showing 64 changed files with 759 additions and 158 deletions.
42 changes: 42 additions & 0 deletions airbyte-commons-temporal/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import org.jsonschema2pojo.SourceType

plugins {
id "java-library"
id 'com.github.eirnym.js2p' version '1.0'
}

dependencies {
annotationProcessor platform(libs.micronaut.bom)
annotationProcessor libs.bundles.micronaut.annotation.processor

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut

implementation 'io.temporal:temporal-sdk:1.8.1'
implementation 'io.temporal:temporal-serviceclient:1.8.1'

testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor


implementation project(':airbyte-config:config-models')
implementation project(':airbyte-metrics:metrics-lib')

testImplementation 'io.temporal:temporal-testing:1.8.1'
// Needed to be able to mock final class
testImplementation 'org.mockito:mockito-inline:4.7.0'
}

jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
source = files("${sourceSets.main.output.resourcesDir}/workers_models")
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
removeOldOutput = true

targetPackage = 'io.airbyte.persistence.job.models'

useLongIntegers = true
generateBuilders = true
includeConstructors = false
includeSetters = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import io.temporal.activity.ActivityExecutionContext;
import io.temporal.client.ActivityCompletionException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal;

import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput;
import io.temporal.client.WorkflowClient;
import java.util.UUID;
import javax.inject.Singleton;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@NoArgsConstructor
@Singleton
@Slf4j
public class ConnectionManagerUtils {

void safeTerminateWorkflow(final WorkflowClient client, final String workflowId, final String reason) {
log.info("Attempting to terminate existing workflow for workflowId {}.", workflowId);
try {
client.newUntypedWorkflowStub(workflowId).terminate(reason);
} catch (final Exception e) {
log.warn(
"Could not terminate temporal workflow due to the following error; "
+ "this may be because there is currently no running workflow for this connection.",
e);
}
}

public void safeTerminateWorkflow(final WorkflowClient client, final UUID connectionId, final String reason) {
safeTerminateWorkflow(client, getConnectionManagerName(connectionId), reason);
}

public String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}

public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) {
final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput input = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId);
WorkflowClient.start(connectionManagerWorkflow::run, input);

return connectionManagerWorkflow;
}

public ConnectionManagerWorkflow newConnectionManagerWorkflowStub(final WorkflowClient client, final UUID connectionId) {
return client.newWorkflowStub(ConnectionManagerWorkflow.class,
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId)));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
import io.micronaut.context.annotation.Requires;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@AllArgsConstructor
@NoArgsConstructor
@Slf4j
@Singleton
@Requires(property = "airbyte.worker.plane",
notEquals = "DATA_PLANE")
public class TemporalClient {

@Inject
private WorkflowClient client;
@Inject
private WorkflowServiceStubs service;
@Inject
private ConnectionManagerUtils connectionManagerUtils;

private final Set<String> workflowNames = new HashSet<>();

public void restartClosedWorkflowByStatus(final WorkflowExecutionStatus executionStatus) {
final Set<UUID> workflowExecutionInfos = fetchClosedWorkflowsByStatus(executionStatus);

final Set<UUID> nonRunningWorkflow = filterOutRunningWorkspaceId(workflowExecutionInfos);

nonRunningWorkflow.forEach(connectionId -> {
connectionManagerUtils.safeTerminateWorkflow(client, connectionId, "Terminating workflow in "
+ "unreachable state before starting a new workflow for this connection");
connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId);
});
}

Set<UUID> fetchClosedWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) {
ByteString token;
ListClosedWorkflowExecutionsRequest workflowExecutionsRequest =
ListClosedWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.build();

final Set<UUID> workflowExecutionInfos = new HashSet<>();
do {
final ListClosedWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listClosedWorkflowExecutions(workflowExecutionsRequest);
final WorkflowType connectionManagerWorkflowType = WorkflowType.newBuilder().setName(ConnectionManagerWorkflow.class.getSimpleName()).build();
workflowExecutionInfos.addAll(listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.filter(workflowExecutionInfo -> workflowExecutionInfo.getType() == connectionManagerWorkflowType ||
workflowExecutionInfo.getStatus() == executionStatus)
.flatMap((workflowExecutionInfo -> extractConnectionIdFromWorkflowId(workflowExecutionInfo.getExecution().getWorkflowId()).stream()))
.collect(Collectors.toSet()));
token = listOpenWorkflowExecutionsRequest.getNextPageToken();

workflowExecutionsRequest =
ListClosedWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.build();

} while (token != null && token.size() > 0);

return workflowExecutionInfos;
}

@VisibleForTesting
Set<UUID> filterOutRunningWorkspaceId(final Set<UUID> workflowIds) {
refreshRunningWorkflow();

final Set<UUID> runningWorkflowByUUID =
workflowNames.stream().flatMap(name -> extractConnectionIdFromWorkflowId(name).stream()).collect(Collectors.toSet());

return workflowIds.stream().filter(workflowId -> !runningWorkflowByUUID.contains(workflowId)).collect(Collectors.toSet());
}

@VisibleForTesting
void refreshRunningWorkflow() {
workflowNames.clear();
ByteString token;
ListOpenWorkflowExecutionsRequest openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.build();
do {
final ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listOpenWorkflowExecutions(openWorkflowExecutionsRequest);
final Set<String> workflowExecutionInfos = listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.map((workflowExecutionInfo -> workflowExecutionInfo.getExecution().getWorkflowId()))
.collect(Collectors.toSet());
workflowNames.addAll(workflowExecutionInfos);
token = listOpenWorkflowExecutionsRequest.getNextPageToken();

openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.build();

} while (token != null && token.size() > 0);
}

Optional<UUID> extractConnectionIdFromWorkflowId(final String workflowId) {
if (!workflowId.startsWith("connection_manager_")) {
return Optional.empty();
}
return Optional.ofNullable(StringUtils.removeStart(workflowId, "connection_manager_"))
.map(
stringUUID -> UUID.fromString(stringUUID));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

public enum TemporalJobType {
GET_SPEC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import com.uber.m3.tally.RootScopeBuilder;
import com.uber.m3.tally.Scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal.config;

import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.TemporalWorkflowUtils;
import io.micronaut.context.annotation.Factory;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import javax.inject.Singleton;

/**
* Micronaut bean factory for Temporal-related singletons.
*/
@Factory
public class TemporalBeanFactory {

@Singleton
public WorkflowServiceStubs temporalService(final TemporalUtils temporalUtils) {
return temporalUtils.createTemporalService();
}

@Singleton
public WorkflowClient workflowClient(
final TemporalUtils temporalUtils,
final WorkflowServiceStubs temporalService) {
return TemporalWorkflowUtils.createWorkflowClient(temporalService, temporalUtils.getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;
package io.airbyte.commons.temporal.scheduling;

import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;
package io.airbyte.commons.temporal.scheduling;

import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -39,7 +39,7 @@ public class ConnectionUpdaterInput {
private WorkflowState workflowState;
private boolean resetConnection;
@Builder.Default
private boolean fromJobResetFailure = false;
private final boolean fromJobResetFailure = false;

@Builder.Default
private boolean skipScheduling = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state;
package io.airbyte.commons.temporal.scheduling.state;

import io.airbyte.config.FailureReason;
import java.util.HashSet;
Expand All @@ -20,7 +20,7 @@ public class WorkflowInternalState {
private Integer attemptNumber = null;

// StandardSyncOutput standardSyncOutput = null;
private final Set<FailureReason> failures = new HashSet<>();
private Set<FailureReason> failures = new HashSet<>();
private Boolean partialSuccess = null;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state;
package io.airbyte.commons.temporal.scheduling.state;

import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener;
import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent;
import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField;
import java.util.UUID;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state.listener;
package io.airbyte.commons.temporal.scheduling.state.listener;

import java.util.LinkedList;
import java.util.Queue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.state.listener;
package io.airbyte.commons.temporal.scheduling.state.listener;

import java.util.LinkedList;
import java.util.Optional;
Expand Down
Loading

0 comments on commit 5e4c282

Please sign in to comment.