Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
use spring events to initialize workers and added tests
Browse files Browse the repository at this point in the history
1. Add annotation support for pollingInterval
2. Allow configuration using spring
3. Tests
  • Loading branch information
v1r3n committed Feb 20, 2023
1 parent 719677d commit 5bdb4ac
Show file tree
Hide file tree
Showing 16 changed files with 535 additions and 46 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ allprojects {
}
}

// all client and their related modules are published with Java 8 compatibility
// all client and their related modules are published with Java 11 compatibility
["annotations", "common", "client", "client-spring", "grpc", "grpc-client"].each {
project(":conductor-$it") {
compileJava {
options.release = 8
options.release = 11
}
}
}
Expand Down
1 change: 1 addition & 0 deletions client-spring/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ dependencies {

implementation project(':conductor-common')
api project(':conductor-client')
api project(':conductor-java-sdk')

implementation "com.netflix.eureka:eureka-client:${revEurekaClient}"
implementation 'org.springframework.boot:spring-boot-starter'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor;
import com.netflix.discovery.EurekaClient;

@Configuration(proxyBeanMethods = false)
Expand All @@ -44,6 +45,12 @@ public TaskClient taskClient(ClientProperties clientProperties) {
return taskClient;
}

@ConditionalOnMissingBean
@Bean
public AnnotatedWorkerExecutor annotatedWorkerExecutor(TaskClient taskClient) {
return new AnnotatedWorkerExecutor(taskClient);
}

@ConditionalOnMissingBean
@Bean(initMethod = "init", destroyMethod = "shutdown")
public TaskRunnerConfigurer taskRunnerConfigurer(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.spring;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor;
import com.netflix.conductor.sdk.workflow.executor.task.WorkerConfiguration;

@Component
public class ConductorWorkerAutoConfiguration
implements ApplicationListener<ContextRefreshedEvent> {

@Autowired private TaskClient taskClient;

@Override
public void onApplicationEvent(ContextRefreshedEvent refreshedEvent) {
ApplicationContext applicationContext = refreshedEvent.getApplicationContext();
Environment environment = applicationContext.getEnvironment();
WorkerConfiguration configuration = new SpringWorkerConfiguration(environment);
AnnotatedWorkerExecutor annotatedWorkerExecutor =
new AnnotatedWorkerExecutor(taskClient, configuration);

Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Component.class);
beans.values()
.forEach(
bean -> {
annotatedWorkerExecutor.addBean(bean);
});
annotatedWorkerExecutor.startPolling();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.spring;

import org.springframework.core.env.Environment;

import com.netflix.conductor.sdk.workflow.executor.task.WorkerConfiguration;

public class SpringWorkerConfiguration extends WorkerConfiguration {

private final Environment environment;

public SpringWorkerConfiguration(Environment environment) {
this.environment = environment;
}

@Override
public int getPollingInterval(String taskName) {
String key = "conductor.worker." + taskName + ".pollingInterval";
return environment.getProperty(key, Integer.class, 0);
}

@Override
public int getThreadCount(String taskName) {
String key = "conductor.worker." + taskName + ".threadCount";
return environment.getProperty(key, Integer.class, 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.spring;

import java.util.Date;

import org.springframework.stereotype.Component;

import com.netflix.conductor.sdk.workflow.executor.task.TaskContext;
import com.netflix.conductor.sdk.workflow.task.InputParam;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;

@Component
public class Workers {

@WorkerTask(value = "hello", threadCount = 3)
public String helloWorld(@InputParam("name") String name) {
TaskContext context = TaskContext.get();
System.out.println(new Date() + ":: Poll count: " + context.getPollCount());
if (context.getPollCount() < 5) {
context.addLog("Not ready yet, poll count is only " + context.getPollCount());
context.setCallbackAfter(1);
}

return "Hello, " + name;
}

@WorkerTask(value = "hello_again", pollingInterval = 333)
public String helloAgain(@InputParam("name") String name) {
TaskContext context = TaskContext.get();
System.out.println(new Date() + ":: Poll count: " + context.getPollCount());
if (context.getPollCount() < 5) {
context.addLog("Not ready yet, poll count is only " + context.getPollCount());
context.setCallbackAfter(1);
}

return "Hello (again), " + name;
}
}
2 changes: 2 additions & 0 deletions client-spring/src/test/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
conductor.client.rootUri=http://localhost:8080/api/
conductor.worker.hello.threadCount=100
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public WorkflowExecutor(
Workflow workflow = workflowClient.getWorkflow(workflowId, true);
if (workflow.getStatus().isTerminal()) {
future.complete(workflow);
runningWorkflowFutures.remove(workflowId);
}
}
},
Expand Down Expand Up @@ -140,6 +141,7 @@ public WorkflowExecutor(
Workflow workflow = workflowClient.getWorkflow(workflowId, true);
if (workflow.getStatus().isTerminal()) {
future.complete(workflow);
runningWorkflowFutures.remove(workflowId);
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@

import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;

import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
Expand All @@ -44,6 +41,9 @@ public class AnnotatedWorker implements Worker {

private int pollingInterval = 100;

private Set<TaskResult.Status> failedStatuses =
Set.of(TaskResult.Status.FAILED, TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);

public AnnotatedWorker(String name, Method workerMethod, Object obj) {
this.name = name;
this.workerMethod = workerMethod;
Expand All @@ -58,11 +58,40 @@ public String getTaskDefName() {

@Override
public TaskResult execute(Task task) {
TaskResult result;
TaskResult result = null;
try {
TaskContext context = TaskContext.set(task);
Object[] parameters = getInvocationParameters(task);
Object invocationResult = workerMethod.invoke(obj, parameters);
result = setValue(invocationResult, task);
result = setValue(invocationResult, context.getTaskResult());
if (!failedStatuses.contains(result.getStatus())
&& result.getCallbackAfterSeconds() > 0) {
result.setStatus(TaskResult.Status.IN_PROGRESS);
}
} catch (InvocationTargetException invocationTargetException) {
if (result == null) {
result = new TaskResult(task);
}
Throwable e = invocationTargetException.getCause();
e.printStackTrace();
if (e instanceof NonRetryableException) {
result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
} else {
result.setStatus(TaskResult.Status.FAILED);
}

result.setReasonForIncompletion(e.getMessage());
StringBuilder stackTrace = new StringBuilder();
for (StackTraceElement stackTraceElement : e.getStackTrace()) {
String className = stackTraceElement.getClassName();
if (className.startsWith("jdk.")
|| className.startsWith(AnnotatedWorker.class.getName())) {
break;
}
stackTrace.append(stackTraceElement);
stackTrace.append("\n");
}
result.log(stackTrace.toString());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -141,43 +170,43 @@ private static InputParam findInputParamAnnotation(Annotation[] paramAnnotation)
.orElse(null);
}

private TaskResult setValue(Object invocationResult, Task task) {
private TaskResult setValue(Object invocationResult, TaskResult result) {

if (invocationResult == null) {
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.setStatus(TaskResult.Status.COMPLETED);
return result;
}

OutputParam opAnnotation =
workerMethod.getAnnotatedReturnType().getAnnotation(OutputParam.class);
if (opAnnotation != null) {

String name = opAnnotation.value();
task.getOutputData().put(name, invocationResult);
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().put(name, invocationResult);
result.setStatus(TaskResult.Status.COMPLETED);
return result;

} else if (invocationResult instanceof TaskResult) {

return (TaskResult) invocationResult;

} else if (invocationResult instanceof Map) {
Map resultAsMap = (Map) invocationResult;
task.getOutputData().putAll(resultAsMap);
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().putAll(resultAsMap);
result.setStatus(TaskResult.Status.COMPLETED);
return result;
} else if (invocationResult instanceof String
|| invocationResult instanceof Number
|| invocationResult instanceof Boolean) {
task.getOutputData().put("result", invocationResult);
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().put("result", invocationResult);
result.setStatus(TaskResult.Status.COMPLETED);
return result;
} else if (invocationResult instanceof List) {

List resultAsList = om.convertValue(invocationResult, List.class);
task.getOutputData().put("result", resultAsList);
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().put("result", resultAsList);
result.setStatus(TaskResult.Status.COMPLETED);
return result;

} else if (invocationResult instanceof DynamicForkInput) {
DynamicForkInput forkInput = (DynamicForkInput) invocationResult;
Expand All @@ -186,25 +215,28 @@ private TaskResult setValue(Object invocationResult, Task task) {
for (com.netflix.conductor.sdk.workflow.def.tasks.Task<?> sdkTask : tasks) {
workflowTasks.addAll(sdkTask.getWorkflowDefTasks());
}
task.getOutputData().put(DynamicFork.FORK_TASK_PARAM, workflowTasks);
task.getOutputData().put(DynamicFork.FORK_TASK_INPUT_PARAM, forkInput.getInputs());
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().put(DynamicFork.FORK_TASK_PARAM, workflowTasks);
result.getOutputData().put(DynamicFork.FORK_TASK_INPUT_PARAM, forkInput.getInputs());
result.setStatus(TaskResult.Status.COMPLETED);
return result;

} else {
Map resultAsMap = om.convertValue(invocationResult, Map.class);
task.getOutputData().putAll(resultAsMap);
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().putAll(resultAsMap);
result.setStatus(TaskResult.Status.COMPLETED);
return result;
}
}

public void setPollingInterval(int pollingInterval) {
System.out.println(
"Setting the polling interval for " + getTaskDefName() + ", to " + pollingInterval);
this.pollingInterval = pollingInterval;
}

@Override
public int getPollingInterval() {
System.out.println("Sending the polling interval to " + pollingInterval);
return pollingInterval;
}
}
Loading

0 comments on commit 5bdb4ac

Please sign in to comment.