Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

New api to query queue size based on task type and domain #2972

Merged
merged 2 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -354,13 +355,43 @@ public void removeTaskFromQueue(String taskType, String taskId) {
public int getQueueSizeForTask(String taskType) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank");

Map<String, Integer> taskTypeToQueueSizeMap =
Integer queueSize =
getForEntity(
"tasks/queue/sizes", new Object[] {"taskType", taskType}, queueSizeMap);
if (taskTypeToQueueSizeMap.containsKey(taskType)) {
return taskTypeToQueueSizeMap.get(taskType);
"tasks/queue/size",
new Object[] {"taskType", taskType},
new GenericType<Integer>() {});
return queueSize != null ? queueSize : 0;
}

public int getQueueSizeForTask(
String taskType, String domain, String isolationGroupId, String executionNamespace) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank");

List<Object> params = new LinkedList<>();
params.add("taskType");
params.add(taskType);

if (StringUtils.isNotBlank(domain)) {
params.add("domain");
params.add(domain);
}
return 0;

if (StringUtils.isNotBlank(isolationGroupId)) {
params.add("isolationGroupId");
params.add(isolationGroupId);
}

if (StringUtils.isNotBlank(executionNamespace)) {
params.add("executionNamespace");
params.add(executionNamespace);
}

Integer queueSize =
getForEntity(
"tasks/queue/size",
params.toArray(new Object[0]),
new GenericType<Integer>() {});
return queueSize != null ? queueSize : 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,13 @@ public static String getQueueName(Task task) {
}

/**
* @param taskType
* @param domain
* @param isolationGroup
* @param executionNameSpace
* @return //domain:taskType@eexecutionNameSpace-isolationGroup
* Creates a queue name string using <code>taskType</code>, <code>domain</code>, <code>
* isolationGroupId</code> and <code>executionNamespace</code>.
*
* @return domain:taskType@eexecutionNameSpace-isolationGroupId.
*/
public static String getQueueName(
String taskType, String domain, String isolationGroup, String executionNameSpace) {
String taskType, String domain, String isolationGroupId, String executionNamespace) {

String queueName;
if (domain == null) {
Expand All @@ -56,12 +55,12 @@ public static String getQueueName(
queueName = domain + DOMAIN_SEPARATOR + taskType;
}

if (executionNameSpace != null) {
queueName = queueName + EXECUTION_NAME_SPACE_SEPARATOR + executionNameSpace;
if (executionNamespace != null) {
queueName = queueName + EXECUTION_NAME_SPACE_SEPARATOR + executionNamespace;
}

if (isolationGroup != null) {
queueName = queueName + ISOLATION_SEPARATOR + isolationGroup;
if (isolationGroupId != null) {
queueName = queueName + ISOLATION_SEPARATOR + isolationGroupId;
}
return queueName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,15 @@ public boolean ackTaskReceived(Task task) {
public Map<String, Integer> getTaskQueueSizes(List<String> taskDefNames) {
Map<String, Integer> sizes = new HashMap<>();
for (String taskDefName : taskDefNames) {
sizes.put(taskDefName, queueDAO.getSize(taskDefName));
sizes.put(taskDefName, getTaskQueueSize(taskDefName));
}
return sizes;
}

public Integer getTaskQueueSize(String queueName) {
return queueDAO.getSize(queueName);
}

public void removeTaskFromQueue(String taskId) {
Task task = getTask(taskId);
if (task == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ void removeTaskFromQueue(
*/
Map<String, Integer> getTaskQueueSizes(List<String> taskTypes);

/**
* Get the queue size for a Task Type. The input can optionally include <code>domain</code>,
* <code>isolationGroupId</code> and <code>executionNamespace</code>.
*
* @return
*/
Integer getTaskQueueSize(
String taskType, String domain, String isolationGroupId, String executionNamespace);

/**
* Get the details about each queue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;

Expand Down Expand Up @@ -254,6 +255,15 @@ public Map<String, Integer> getTaskQueueSizes(List<String> taskTypes) {
return executionService.getTaskQueueSizes(taskTypes);
}

@Override
public Integer getTaskQueueSize(
String taskType, String domain, String isolationGroupId, String executionNamespace) {
String queueName =
QueueUtils.getQueueName(taskType, domain, isolationGroupId, executionNamespace);

return executionService.getTaskQueueSize(queueName);
}

/**
* Get the details about each queue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,24 @@ public ResponseEntity<Task> getTask(@PathVariable("taskId") String taskId) {
}

@GetMapping("/queue/sizes")
@Operation(summary = "Get Task type queue sizes")
@Operation(summary = "Deprecated. Please use /tasks/queue/size endpoint")
@Deprecated
public Map<String, Integer> size(
@RequestParam(value = "taskType", required = false) List<String> taskTypes) {
return taskService.getTaskQueueSizes(taskTypes);
}

@GetMapping("/queue/size")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the API above in line 107 - @GetMapping("/queue/sizes") be modified to support this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If i were to modify it, i would make it like the new api that's added. /queue/sizes accepts a list and is unbounded. We can deprecate /queue/sizes and remove it in an upcoming release.

@Operation(summary = "Get queue size for a task type.")
public Integer taskDepth(
@RequestParam("taskType") String taskType,
@RequestParam(value = "domain", required = false) String domain,
@RequestParam(value = "isolationGroupId", required = false) String isolationGroupId,
@RequestParam(value = "executionNamespace", required = false)
String executionNamespace) {
return taskService.getTaskQueueSize(taskType, domain, executionNamespace, isolationGroupId);
}

@GetMapping("/queue/all/verbose")
@Operation(summary = "Get the details about each queue")
public Map<String, Map<String, Map<String, Long>>> allVerbose() {
Expand Down