Skip to content

Commit

Permalink
Reduce logging in DEBUG for MasterService:run (#14795) (#14864) (#14880)
Browse files Browse the repository at this point in the history
* Reduce logging in DEBUG for MasteService:run by introducing short and long summary in Taskbatcher

Signed-off-by: Sumit Bansal <sumitsb@amazon.com>
(cherry picked from commit b35690c)

Signed-off-by: Sumit Bansal <sumit.asr@gmail.com>
(cherry picked from commit 7e7e775)
  • Loading branch information
sumitasr committed Jul 22, 2024
1 parent 717a16d commit 1d634b4
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))

### Dependencies
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -221,10 +222,10 @@ protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
}

@Override
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, Function<Boolean, String> taskSummaryGenerator) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
runTasks(new TaskInputs(taskExecutor, updateTasks, taskSummaryGenerator));
}

class UpdateTask extends BatchedTask {
Expand Down Expand Up @@ -297,26 +298,33 @@ public static boolean assertNotMasterUpdateThread(String reason) {
}

private void runTasks(TaskInputs taskInputs) {
final String summary = taskInputs.summary;
final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
final String shortSummary = taskInputs.taskSummaryGenerator.apply(false);

if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary);
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
return;
}

logger.debug("executing cluster state update for [{}]", summary);
if (logger.isTraceEnabled()) {
logger.trace("executing cluster state update for [{}]", longSummary);
} else {
logger.debug("executing cluster state update for [{}]", shortSummary);
}

final ClusterState previousClusterState = state();

if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) {
logger.debug("failing [{}]: local node is no longer cluster-manager", summary);
logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary);
taskInputs.onNoLongerClusterManager();
return;
}

final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", summary);
logExecutionTime(computationTime, "compute cluster state update", shortSummary);

clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateComputeHistogram,
Expand All @@ -328,25 +336,25 @@ private void runTasks(TaskInputs taskInputs) {
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary);
} else {
final ClusterState newClusterState = taskOutputs.newClusterState;
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary);
}
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodesDeltaSummary = nodesDelta.shortSummary();
if (nodesDeltaSummary.length() > 0) {
logger.info(
"{}, term: {}, version: {}, delta: {}",
summary,
shortSummary,
newClusterState.term(),
newClusterState.version(),
nodesDeltaSummary
Expand All @@ -357,7 +365,7 @@ private void runTasks(TaskInputs taskInputs) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
} catch (Exception e) {
handleException(summary, publicationStartTime, newClusterState, e);
handleException(shortSummary, publicationStartTime, newClusterState, e);
}
}
}
Expand Down Expand Up @@ -452,8 +460,8 @@ private void handleException(String summary, long startTimeMillis, ClusterState
// TODO: do we want to call updateTask.onFailure here?
}

private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);
private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState, taskSummary);
ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
return new TaskOutputs(
taskInputs,
Expand Down Expand Up @@ -897,7 +905,7 @@ public void onTimeout() {
}
}

private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) {
private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) {
ClusterTasksResult<Object> clusterTasksResult;
try {
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
Expand All @@ -913,7 +921,7 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterSt
"failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}",
previousClusterState.version(),
previousClusterState.stateUUID(),
taskInputs.summary,
taskSummary,
previousClusterState.nodes(),
previousClusterState.routingTable(),
previousClusterState.getRoutingNodes()
Expand Down Expand Up @@ -955,14 +963,19 @@ private List<Batcher.UpdateTask> getNonFailedTasks(TaskInputs taskInputs, Cluste
* Represents a set of tasks to be processed together with their executor
*/
private class TaskInputs {
final String summary;

final List<Batcher.UpdateTask> updateTasks;
final ClusterStateTaskExecutor<Object> executor;
final Function<Boolean, String> taskSummaryGenerator;

TaskInputs(ClusterStateTaskExecutor<Object> executor, List<Batcher.UpdateTask> updateTasks, String summary) {
this.summary = summary;
TaskInputs(
ClusterStateTaskExecutor<Object> executor,
List<Batcher.UpdateTask> updateTasks,
final Function<Boolean, String> taskSummaryGenerator
) {
this.executor = executor;
this.updateTasks = updateTasks;
this.taskSummaryGenerator = taskSummaryGenerator;
}

boolean runOnlyWhenClusterManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ void runIfNotProcessed(BatchedTask updateTask) {
// to give other tasks with different batching key a chance to execute.
if (updateTask.processed.get() == false) {
final List<BatchedTask> toExecute = new ArrayList<>();
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
// While removing task, need to remove task first from taskMap and then remove identity from identityMap.
// Changing this order might lead to duplicate task during submission.
LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
Expand All @@ -187,30 +186,41 @@ void runIfNotProcessed(BatchedTask updateTask) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process {}", task);
toExecute.add(task);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
} else {
logger.trace("skipping {}, already processed", task);
}
}
}

if (toExecute.isEmpty() == false) {
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");

Function<Boolean, String> taskSummaryGenerator = (longSummaryRequired) -> {
if (longSummaryRequired == null || !longSummaryRequired) {
return buildShortSummary(updateTask.batchingKey, toExecute.size());
}
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
};
taskBatcherListener.onBeginProcessing(toExecute);
run(updateTask.batchingKey, toExecute, tasksSummary);
run(updateTask.batchingKey, toExecute, taskSummaryGenerator);
}
}
}

private String buildShortSummary(final Object batchingKey, final int taskCount) {
return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount;
}

/**
* Action to be implemented by the specific batching implementation
* All tasks have the given batching key.
*/
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, Function<Boolean, String> taskSummaryGenerator);

/**
* Represents a runnable task that supports batching.
Expand Down
Loading

0 comments on commit 1d634b4

Please sign in to comment.