Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Add property for the timeout of sweeper polling for workflows #3633

Merged
merged 1 commit into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Changes to configurations:
| workflow.executor.service.max.threads | conductor.app.executorServiceMaxThreadCount | 50 |
| decider.sweep.frequency.seconds | conductor.app.sweepFrequency | 30s |
| workflow.sweeper.thread.count | conductor.app.sweeperThreadCount | 5 |
| - | conductor.app.sweeperWorkflowPollTimeout | 2000ms |
| workflow.event.processor.thread.count | conductor.app.eventProcessorThreadCount | 2 |
| workflow.event.message.indexing.enabled | conductor.app.eventMessageIndexingEnabled | true |
| workflow.event.execution.indexing.enabled | conductor.app.eventExecutionIndexingEnabled | true |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class ConductorProperties {
/** The number of threads to use to do background sweep on active workflows. */
private int sweeperThreadCount = Runtime.getRuntime().availableProcessors() * 2;

/** The timeout (in milliseconds) for the polling of workflows to be swept. */
private Duration sweeperWorkflowPollTimeout = Duration.ofMillis(2000);

/** The number of threads to configure the threadpool in the event processor. */
private int eventProcessorThreadCount = 2;

Expand Down Expand Up @@ -250,6 +253,14 @@ public void setSweeperThreadCount(int sweeperThreadCount) {
this.sweeperThreadCount = sweeperThreadCount;
}

public Duration getSweeperWorkflowPollTimeout() {
return sweeperWorkflowPollTimeout;
}

public void setSweeperWorkflowPollTimeout(Duration sweeperWorkflowPollTimeout) {
this.sweeperWorkflowPollTimeout = sweeperWorkflowPollTimeout;
}

public int getEventProcessorThreadCount() {
return eventProcessorThreadCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class WorkflowReconciler extends LifecycleAwareComponent {
private final WorkflowSweeper workflowSweeper;
private final QueueDAO queueDAO;
private final int sweeperThreadCount;
private final int sweeperWorkflowPollTimeout;

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowReconciler.class);

Expand All @@ -50,6 +51,8 @@ public WorkflowReconciler(
this.workflowSweeper = workflowSweeper;
this.queueDAO = queueDAO;
this.sweeperThreadCount = properties.getSweeperThreadCount();
this.sweeperWorkflowPollTimeout =
(int) properties.getSweeperWorkflowPollTimeout().toMillis();
LOGGER.info(
"WorkflowReconciler initialized with {} sweeper threads",
properties.getSweeperThreadCount());
Expand All @@ -63,7 +66,8 @@ public void pollAndSweep() {
if (!isRunning()) {
LOGGER.debug("Component stopped, skip workflow sweep");
} else {
List<String> workflowIds = queueDAO.pop(DECIDER_QUEUE, sweeperThreadCount, 2000);
List<String> workflowIds =
queueDAO.pop(DECIDER_QUEUE, sweeperThreadCount, sweeperWorkflowPollTimeout);
if (workflowIds != null) {
// wait for all workflow ids to be "swept"
CompletableFuture.allOf(
Expand Down