diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index 18889cba35..935fc45bd4 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -14,6 +14,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -354,13 +355,43 @@ public void removeTaskFromQueue(String taskType, String taskId) { public int getQueueSizeForTask(String taskType) { Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank"); - Map taskTypeToQueueSizeMap = + Integer queueSize = getForEntity( - "tasks/queue/sizes", new Object[] {"taskType", taskType}, queueSizeMap); - if (taskTypeToQueueSizeMap.containsKey(taskType)) { - return taskTypeToQueueSizeMap.get(taskType); + "tasks/queue/size", + new Object[] {"taskType", taskType}, + new GenericType() {}); + return queueSize != null ? queueSize : 0; + } + + public int getQueueSizeForTask( + String taskType, String domain, String isolationGroupId, String executionNamespace) { + Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank"); + + List params = new LinkedList<>(); + params.add("taskType"); + params.add(taskType); + + if (StringUtils.isNotBlank(domain)) { + params.add("domain"); + params.add(domain); } - return 0; + + if (StringUtils.isNotBlank(isolationGroupId)) { + params.add("isolationGroupId"); + params.add(isolationGroupId); + } + + if (StringUtils.isNotBlank(executionNamespace)) { + params.add("executionNamespace"); + params.add(executionNamespace); + } + + Integer queueSize = + getForEntity( + "tasks/queue/size", + params.toArray(new Object[0]), + new GenericType() {}); + return queueSize != null ? queueSize : 0; } /** diff --git a/core/src/main/java/com/netflix/conductor/core/utils/QueueUtils.java b/core/src/main/java/com/netflix/conductor/core/utils/QueueUtils.java index b381ec4685..bca477390f 100644 --- a/core/src/main/java/com/netflix/conductor/core/utils/QueueUtils.java +++ b/core/src/main/java/com/netflix/conductor/core/utils/QueueUtils.java @@ -40,14 +40,13 @@ public static String getQueueName(Task task) { } /** - * @param taskType - * @param domain - * @param isolationGroup - * @param executionNameSpace - * @return //domain:taskType@eexecutionNameSpace-isolationGroup + * Creates a queue name string using taskType, domain, + * isolationGroupId and executionNamespace. + * + * @return domain:taskType@eexecutionNameSpace-isolationGroupId. */ public static String getQueueName( - String taskType, String domain, String isolationGroup, String executionNameSpace) { + String taskType, String domain, String isolationGroupId, String executionNamespace) { String queueName; if (domain == null) { @@ -56,12 +55,12 @@ public static String getQueueName( queueName = domain + DOMAIN_SEPARATOR + taskType; } - if (executionNameSpace != null) { - queueName = queueName + EXECUTION_NAME_SPACE_SEPARATOR + executionNameSpace; + if (executionNamespace != null) { + queueName = queueName + EXECUTION_NAME_SPACE_SEPARATOR + executionNamespace; } - if (isolationGroup != null) { - queueName = queueName + ISOLATION_SEPARATOR + isolationGroup; + if (isolationGroupId != null) { + queueName = queueName + ISOLATION_SEPARATOR + isolationGroupId; } return queueName; } diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index b4d0e354a9..965570b345 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -293,11 +293,15 @@ public boolean ackTaskReceived(Task task) { public Map getTaskQueueSizes(List taskDefNames) { Map sizes = new HashMap<>(); for (String taskDefName : taskDefNames) { - sizes.put(taskDefName, queueDAO.getSize(taskDefName)); + sizes.put(taskDefName, getTaskQueueSize(taskDefName)); } return sizes; } + public Integer getTaskQueueSize(String queueName) { + return queueDAO.getSize(queueName); + } + public void removeTaskFromQueue(String taskId) { Task task = getTask(taskId); if (task == null) { diff --git a/core/src/main/java/com/netflix/conductor/service/TaskService.java b/core/src/main/java/com/netflix/conductor/service/TaskService.java index 03e1371aa6..7f4f3d0a67 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskService.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskService.java @@ -164,6 +164,15 @@ void removeTaskFromQueue( */ Map getTaskQueueSizes(List taskTypes); + /** + * Get the queue size for a Task Type. The input can optionally include domain, + * isolationGroupId and executionNamespace. + * + * @return + */ + Integer getTaskQueueSize( + String taskType, String domain, String isolationGroupId, String executionNamespace); + /** * Get the details about each queue. * diff --git a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java index fdcb503410..333b2e3469 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java @@ -34,6 +34,7 @@ import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.TaskSummary; import com.netflix.conductor.common.utils.ExternalPayloadStorage; +import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; @@ -254,6 +255,15 @@ public Map getTaskQueueSizes(List taskTypes) { return executionService.getTaskQueueSizes(taskTypes); } + @Override + public Integer getTaskQueueSize( + String taskType, String domain, String isolationGroupId, String executionNamespace) { + String queueName = + QueueUtils.getQueueName(taskType, domain, isolationGroupId, executionNamespace); + + return executionService.getTaskQueueSize(queueName); + } + /** * Get the details about each queue. * diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java index 251eb59370..1b1fe6b7a3 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java @@ -105,12 +105,24 @@ public ResponseEntity getTask(@PathVariable("taskId") String taskId) { } @GetMapping("/queue/sizes") - @Operation(summary = "Get Task type queue sizes") + @Operation(summary = "Deprecated. Please use /tasks/queue/size endpoint") + @Deprecated public Map size( @RequestParam(value = "taskType", required = false) List taskTypes) { return taskService.getTaskQueueSizes(taskTypes); } + @GetMapping("/queue/size") + @Operation(summary = "Get queue size for a task type.") + public Integer taskDepth( + @RequestParam("taskType") String taskType, + @RequestParam(value = "domain", required = false) String domain, + @RequestParam(value = "isolationGroupId", required = false) String isolationGroupId, + @RequestParam(value = "executionNamespace", required = false) + String executionNamespace) { + return taskService.getTaskQueueSize(taskType, domain, executionNamespace, isolationGroupId); + } + @GetMapping("/queue/all/verbose") @Operation(summary = "Get the details about each queue") public Map>> allVerbose() {