From fbe93d4743e37033c211fff4fbf07fa71b5b66d9 Mon Sep 17 00:00:00 2001 From: Sruti Parthiban Date: Thu, 4 Aug 2022 22:05:23 -0700 Subject: [PATCH] Task consumer Integration (#2293) * Integrate task consumers to capture task resource information during unregister. Add consumer that logs topN expensive search tasks Signed-off-by: sruti1312 --- .../src/docker/config/log4j2.properties | 10 ++ distribution/src/config/log4j2.properties | 37 ++++++ .../ImportLog4jPropertiesTaskTests.java | 2 +- .../test/resources/config/log4j2.properties | 35 ++++++ .../action/search/SearchShardTask.java | 21 ++++ .../common/settings/ClusterSettings.java | 4 +- .../search/internal/ShardSearchRequest.java | 16 ++- .../search/query/QuerySearchRequest.java | 5 +- .../org/opensearch/tasks/TaskManager.java | 43 +++++++ .../SearchShardTaskDetailsLogMessage.java | 67 +++++++++++ .../tasks/consumer/TopNSearchTasksLogger.java | 100 +++++++++++++++++ .../tasks/consumer/package-info.java | 12 ++ .../transport/TransportService.java | 15 ++- .../node/tasks/TaskManagerTestCase.java | 10 +- .../index/IndexingSlowLogTests.java | 2 +- .../opensearch/index/SearchSlowLogTests.java | 2 +- ...SearchShardTaskDetailsLogMessageTests.java | 61 ++++++++++ .../consumer/TopNSearchTasksLoggerTests.java | 105 ++++++++++++++++++ ...enSearchIndexLevelReplicationTestCase.java | 2 + .../test/transport/MockTransportService.java | 10 +- 20 files changed, 545 insertions(+), 14 deletions(-) create mode 100644 server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java create mode 100644 server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java create mode 100644 server/src/main/java/org/opensearch/tasks/consumer/package-info.java create mode 100644 server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java create mode 100644 server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java diff --git a/distribution/docker/src/docker/config/log4j2.properties b/distribution/docker/src/docker/config/log4j2.properties index a8c54137c7fd2..761478a9fdc6e 100644 --- a/distribution/docker/src/docker/config/log4j2.properties +++ b/distribution/docker/src/docker/config/log4j2.properties @@ -53,3 +53,13 @@ logger.index_indexing_slowlog.name = index.indexing.slowlog.index logger.index_indexing_slowlog.level = trace logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling logger.index_indexing_slowlog.additivity = false + +appender.task_detailslog_rolling.type = Console +appender.task_detailslog_rolling.name = task_detailslog_rolling +appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout +appender.task_detailslog_rolling.layout.type_name = task_detailslog + +logger.task_detailslog_rolling.name = task.detailslog +logger.task_detailslog_rolling.level = trace +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling +logger.task_detailslog_rolling.additivity = false diff --git a/distribution/src/config/log4j2.properties b/distribution/src/config/log4j2.properties index 4820396c79eb7..bb27aaf2e22e6 100644 --- a/distribution/src/config/log4j2.properties +++ b/distribution/src/config/log4j2.properties @@ -195,3 +195,40 @@ logger.index_indexing_slowlog.level = trace logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old logger.index_indexing_slowlog.additivity = false + +######## Task details log JSON #################### +appender.task_detailslog_rolling.type = RollingFile +appender.task_detailslog_rolling.name = task_detailslog_rolling +appender.task_detailslog_rolling.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.json +appender.task_detailslog_rolling.filePermissions = rw-r----- +appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout +appender.task_detailslog_rolling.layout.type_name = task_detailslog +appender.task_detailslog_rolling.layout.opensearchmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata + +appender.task_detailslog_rolling.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.json.gz +appender.task_detailslog_rolling.policies.type = Policies +appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling.policies.size.size = 1GB +appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling.strategy.max = 4 +################################################# +######## Task details log - old style pattern #### +appender.task_detailslog_rolling_old.type = RollingFile +appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old +appender.task_detailslog_rolling_old.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.log +appender.task_detailslog_rolling_old.filePermissions = rw-r----- +appender.task_detailslog_rolling_old.layout.type = PatternLayout +appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n + +appender.task_detailslog_rolling_old.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.log.gz +appender.task_detailslog_rolling_old.policies.type = Policies +appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling_old.policies.size.size = 1GB +appender.task_detailslog_rolling_old.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling_old.strategy.max = 4 +################################################# +logger.task_detailslog_rolling.name = task.detailslog +logger.task_detailslog_rolling.level = trace +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling_old.ref = task_detailslog_rolling_old +logger.task_detailslog_rolling.additivity = false diff --git a/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java b/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java index 7f67e08c66b9e..96544d3297ad4 100644 --- a/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java +++ b/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java @@ -67,7 +67,7 @@ public void testImportLog4jPropertiesTask() throws IOException { Properties properties = new Properties(); properties.load(Files.newInputStream(taskInput.getOpenSearchConfig().resolve(ImportLog4jPropertiesTask.LOG4J_PROPERTIES))); assertThat(properties, is(notNullValue())); - assertThat(properties.entrySet(), hasSize(137)); + assertThat(properties.entrySet(), hasSize(165)); assertThat(properties.get("appender.rolling.layout.type"), equalTo("OpenSearchJsonLayout")); assertThat( properties.get("appender.deprecation_rolling.fileName"), diff --git a/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties b/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties index b9ad71121165a..4b92d3fc62376 100644 --- a/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties +++ b/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties @@ -176,3 +176,38 @@ logger.index_indexing_slowlog.level = trace logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old logger.index_indexing_slowlog.additivity = false + +######## Task details log JSON #################### +appender.task_detailslog_rolling.type = RollingFile +appender.task_detailslog_rolling.name = task_detailslog_rolling +appender.task_detailslog_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog.json +appender.task_detailslog_rolling.layout.type = ESJsonLayout +appender.task_detailslog_rolling.layout.type_name = task_detailslog +appender.task_detailslog_rolling.layout.esmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata + +appender.task_detailslog_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog-%i.json.gz +appender.task_detailslog_rolling.policies.type = Policies +appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling.policies.size.size = 1GB +appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling.strategy.max = 4 +################################################# +######## Task details log - old style pattern #### +appender.task_detailslog_rolling_old.type = RollingFile +appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old +appender.task_detailslog_rolling_old.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog.log +appender.task_detailslog_rolling_old.layout.type = PatternLayout +appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n + +appender.task_detailslog_rolling_old.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog-%i.log.gz +appender.task_detailslog_rolling_old.policies.type = Policies +appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling_old.policies.size.size = 1GB +appender.task_detailslog_rolling_old.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling_old.strategy.max = 4 +################################################# +logger.task_detailslog_rolling.name = task.detailslog +logger.task_detailslog_rolling.level = trace +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling_old.ref = task_detailslog_rolling_old +logger.task_detailslog_rolling.additivity = false diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java index 57831896db714..c9d0d6e2d3d47 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java @@ -32,12 +32,14 @@ package org.opensearch.action.search; +import org.opensearch.common.MemoizedSupplier; import org.opensearch.search.fetch.ShardFetchSearchRequest; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.TaskId; import java.util.Map; +import java.util.function.Supplier; /** * Task storing information about a currently running search shard request. @@ -46,9 +48,28 @@ * @opensearch.internal */ public class SearchShardTask extends CancellableTask { + // generating metadata in a lazy way since source can be quite big + private final MemoizedSupplier metadataSupplier; public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + this(id, type, action, description, parentTaskId, headers, () -> ""); + } + + public SearchShardTask( + long id, + String type, + String action, + String description, + TaskId parentTaskId, + Map headers, + Supplier metadataSupplier + ) { super(id, type, action, description, parentTaskId, headers); + this.metadataSupplier = new MemoizedSupplier<>(metadataSupplier); + } + + public String getTaskMetadata() { + return metadataSupplier.get(); } @Override diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index fe322992cd3e5..971fb518ff1da 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -41,6 +41,7 @@ import org.opensearch.index.ShardIndexingPressureMemoryManager; import org.opensearch.index.ShardIndexingPressureSettings; import org.opensearch.index.ShardIndexingPressureStore; +import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; @@ -577,7 +578,8 @@ public void apply(Settings value, Settings current, Settings previous) { ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT, ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS, IndexingPressure.MAX_INDEXING_BYTES, - TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED + TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED, + TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED ) ) ); diff --git a/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java index a4d7b2bed516c..828c2f8c78d69 100644 --- a/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java @@ -51,6 +51,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.ToXContent; import org.opensearch.index.Index; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.MatchNoneQueryBuilder; @@ -71,6 +72,7 @@ import org.opensearch.transport.TransportRequest; import java.io.IOException; +import java.util.Collections; import java.util.Arrays; import java.util.Map; import java.util.function.Function; @@ -85,6 +87,8 @@ * @opensearch.internal */ public class ShardSearchRequest extends TransportRequest implements IndicesRequest { + public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); + private final String clusterAlias; private final ShardId shardId; private final int numberOfShards; @@ -501,7 +505,7 @@ public String getClusterAlias() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers, this::getMetadataSupplier); } @Override @@ -510,6 +514,16 @@ public String getDescription() { return "shardId[" + shardId() + "]"; } + public String getMetadataSupplier() { + StringBuilder sb = new StringBuilder(); + if (source != null) { + sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]"); + } else { + sb.append("source[]"); + } + return sb.toString(); + } + public Rewriteable getRewriteable() { return new RequestRewritable(this); } diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java b/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java index ae2f9e8fab989..ca74942decb50 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java @@ -123,7 +123,7 @@ public IndicesOptions indicesOptions() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers, this::getMetadataSupplier); } public String getDescription() { @@ -137,4 +137,7 @@ public String getDescription() { return sb.toString(); } + public String getMetadataSupplier() { + return shardSearchRequest().getMetadataSupplier(); + } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskManager.java b/server/src/main/java/org/opensearch/tasks/TaskManager.java index 01b6b8ea603bf..334cde81dfb6a 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskManager.java +++ b/server/src/main/java/org/opensearch/tasks/TaskManager.java @@ -51,6 +51,8 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; @@ -58,6 +60,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.concurrent.ConcurrentMapLong; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TcpChannel; @@ -75,6 +78,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -92,6 +96,15 @@ public class TaskManager implements ClusterStateApplier { private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100); + public static final String TASK_RESOURCE_CONSUMERS_ATTRIBUTES = "task_resource_consumers.enabled"; + + public static final Setting TASK_RESOURCE_CONSUMERS_ENABLED = Setting.boolSetting( + TASK_RESOURCE_CONSUMERS_ATTRIBUTES, + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** * Rest headers that are copied to the task */ @@ -116,10 +129,26 @@ public class TaskManager implements ClusterStateApplier { private final Map channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap(); private final SetOnce cancellationService = new SetOnce<>(); + private volatile boolean taskResourceConsumersEnabled; + private final Set> taskResourceConsumer; + + public static TaskManager createTaskManagerWithClusterSettings( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { + final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders); + clusterSettings.addSettingsUpdateConsumer(TASK_RESOURCE_CONSUMERS_ENABLED, taskManager::setTaskResourceConsumersEnabled); + return taskManager; + } + public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { this.threadPool = threadPool; this.taskHeaders = new ArrayList<>(taskHeaders); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); + this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings); + this.taskResourceConsumer = Set.of(new TopNSearchTasksLogger(settings)); } public void setTaskResultsService(TaskResultsService taskResultsService) { @@ -135,6 +164,10 @@ public void setTaskResourceTrackingService(TaskResourceTrackingService taskResou this.taskResourceTrackingService.set(taskResourceTrackingService); } + public void setTaskResourceConsumersEnabled(boolean taskResourceConsumersEnabled) { + this.taskResourceConsumersEnabled = taskResourceConsumersEnabled; + } + /** * Registers a task without parent task */ @@ -240,6 +273,16 @@ public Task unregister(Task task) { // Decrement the task's self-thread as part of unregistration. task.decrementResourceTrackingThreads(); + if (taskResourceConsumersEnabled) { + for (Consumer taskConsumer : taskResourceConsumer) { + try { + taskConsumer.accept(task); + } catch (Exception e) { + logger.error("error encountered when updating the consumer", e); + } + } + } + if (task instanceof CancellableTask) { CancellableTaskHolder holder = cancellableTasks.remove(task.getId()); if (holder != null) { diff --git a/server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java b/server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java new file mode 100644 index 0000000000000..1755db3ab4ae8 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.logging.OpenSearchLogMessage; + +import java.util.HashMap; +import java.util.Map; + +/** + * Search shard task information that will be extracted from Task and converted into + * format that will be logged + * + * @opensearch.internal + */ +public final class SearchShardTaskDetailsLogMessage extends OpenSearchLogMessage { + SearchShardTaskDetailsLogMessage(SearchShardTask task) { + super(prepareMap(task), message(task)); + } + + private static Map prepareMap(SearchShardTask task) { + Map messageFields = new HashMap<>(); + messageFields.put("taskId", task.getId()); + messageFields.put("type", task.getType()); + messageFields.put("action", task.getAction()); + messageFields.put("description", task.getDescription()); + messageFields.put("start_time_millis", task.getStartTime()); + messageFields.put("parentTaskId", task.getParentTaskId()); + messageFields.put("resource_stats", task.getResourceStats()); + messageFields.put("metadata", task.getTaskMetadata()); + return messageFields; + } + + // Message will be used in plaintext logs + private static String message(SearchShardTask task) { + StringBuilder sb = new StringBuilder(); + sb.append("taskId:[") + .append(task.getId()) + .append("], ") + .append("type:[") + .append(task.getType()) + .append("], ") + .append("action:[") + .append(task.getAction()) + .append("], ") + .append("description:[") + .append(task.getDescription()) + .append("], ") + .append("start_time_millis:[") + .append(task.getStartTime()) + .append("], ") + .append("resource_stats:[") + .append(task.getResourceStats()) + .append("], ") + .append("metadata:[") + .append(task.getTaskMetadata()) + .append("]"); + return sb.toString(); + } +} diff --git a/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java b/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java new file mode 100644 index 0000000000000..dd7e200d7f4b2 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.Task; + +import java.util.Comparator; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.function.Consumer; + +/** + * A simple listener that logs resource information of high memory consuming search tasks + * + * @opensearch.internal + */ +public class TopNSearchTasksLogger implements Consumer { + public static final String TASK_DETAILS_LOG_PREFIX = "task.detailslog"; + public static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size"; + public static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency"; + + private static final Logger SEARCH_TASK_DETAILS_LOGGER = LogManager.getLogger(TASK_DETAILS_LOG_PREFIX + ".search"); + + // number of memory expensive search tasks that are logged + private static final Setting LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting( + LOG_TOP_QUERIES_SIZE, + 10, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + // frequency in which memory expensive search tasks are logged + private static final Setting LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting( + LOG_TOP_QUERIES_FREQUENCY, + TimeValue.timeValueSeconds(60L), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final int topQueriesSize; + private final long topQueriesLogFrequencyInNanos; + private final Queue> topQueries; + private long lastReportedTimeInNanos = System.nanoTime(); + + public TopNSearchTasksLogger(Settings settings) { + this.topQueriesSize = LOG_TOP_QUERIES_SIZE_SETTING.get(settings); + this.topQueriesLogFrequencyInNanos = LOG_TOP_QUERIES_FREQUENCY_SETTING.get(settings).getNanos(); + this.topQueries = new PriorityQueue<>(topQueriesSize, Comparator.comparingLong(Tuple::v1)); + } + + /** + * Called when task is unregistered and task has resource stats present. + */ + @Override + public void accept(Task task) { + if (task instanceof SearchShardTask) { + recordSearchTask((SearchShardTask) task); + } + } + + private synchronized void recordSearchTask(SearchShardTask searchTask) { + final long memory_in_bytes = searchTask.getTotalResourceUtilization(ResourceStats.MEMORY); + if (System.nanoTime() - lastReportedTimeInNanos >= topQueriesLogFrequencyInNanos) { + publishTopNEvents(); + lastReportedTimeInNanos = System.nanoTime(); + } + if (topQueries.size() >= topQueriesSize && topQueries.peek().v1() < memory_in_bytes) { + // evict the element + topQueries.poll(); + } + if (topQueries.size() < topQueriesSize) { + topQueries.offer(new Tuple<>(memory_in_bytes, searchTask)); + } + } + + private void publishTopNEvents() { + logTopResourceConsumingQueries(); + topQueries.clear(); + } + + private void logTopResourceConsumingQueries() { + for (Tuple topQuery : topQueries) { + SEARCH_TASK_DETAILS_LOGGER.info(new SearchShardTaskDetailsLogMessage(topQuery.v2())); + } + } +} diff --git a/server/src/main/java/org/opensearch/tasks/consumer/package-info.java b/server/src/main/java/org/opensearch/tasks/consumer/package-info.java new file mode 100644 index 0000000000000..40219a1cead5b --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Support for adding consumers to consume task related information. + */ +package org.opensearch.tasks.consumer; diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 1a280f2475e5d..aaba06196bc59 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -210,7 +210,7 @@ public TransportService( setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); - taskManager = createTaskManager(settings, threadPool, taskHeaders); + taskManager = createTaskManager(settings, clusterSettings, threadPool, taskHeaders); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); @@ -246,8 +246,17 @@ public TaskManager getTaskManager() { return taskManager; } - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { - return new TaskManager(settings, threadPool, taskHeaders); + protected TaskManager createTaskManager( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { + if (clusterSettings != null) { + return TaskManager.createTaskManagerWithClusterSettings(settings, clusterSettings, threadPool, taskHeaders); + } else { + return new TaskManager(settings, threadPool, taskHeaders); + } } /** diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index fd6f5d17a3a80..68cf69e30f8a6 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -53,6 +53,7 @@ import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.transport.BoundTransportAddress; import org.opensearch.common.util.PageCacheRecycler; @@ -218,11 +219,16 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) { Collections.emptySet() ) { @Override - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + protected TaskManager createTaskManager( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(settings, threadPool, taskHeaders); + return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders); } } }; diff --git a/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java b/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java index 38c8491d79150..75a346e444b73 100644 --- a/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java +++ b/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java @@ -86,8 +86,8 @@ public static void init() throws IllegalAccessException { @AfterClass public static void cleanup() { - appender.stop(); Loggers.removeAppender(testLogger1, appender); + appender.stop(); } public void testLevelPrecedence() { diff --git a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java index ae159092a4833..ea146ec20b16a 100644 --- a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java @@ -84,9 +84,9 @@ public static void init() throws IllegalAccessException { @AfterClass public static void cleanup() { - appender.stop(); Loggers.removeAppender(queryLog, appender); Loggers.removeAppender(fetchLog, appender); + appender.stop(); } @Override diff --git a/server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java b/server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java new file mode 100644 index 0000000000000..641fdef4891bd --- /dev/null +++ b/server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.ResourceStatsType; +import org.opensearch.tasks.ResourceUsageMetric; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; + +public class SearchShardTaskDetailsLogMessageTests extends OpenSearchSingleNodeTestCase { + public void testTaskDetailsLogHasJsonFields() { + SearchShardTask task = new SearchShardTask( + 0, + "n/a", + "n/a", + "test", + null, + Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"), + () -> "test_metadata" + ); + SearchShardTaskDetailsLogMessage p = new SearchShardTaskDetailsLogMessage(task); + + assertThat(p.getValueFor("taskId"), equalTo("0")); + assertThat(p.getValueFor("type"), equalTo("n/a")); + assertThat(p.getValueFor("action"), equalTo("n/a")); + assertThat(p.getValueFor("description"), equalTo("test")); + assertThat(p.getValueFor("parentTaskId"), equalTo(null)); + // when no resource information present + assertThat(p.getValueFor("resource_stats"), equalTo("{}")); + assertThat(p.getValueFor("metadata"), equalTo("test_metadata")); + + task.startThreadResourceTracking( + 0, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, 0L), + new ResourceUsageMetric(ResourceStats.CPU, 0L) + ); + task.updateThreadResourceStats( + 0, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, 100), + new ResourceUsageMetric(ResourceStats.CPU, 100) + ); + assertThat( + p.getValueFor("resource_stats"), + equalTo("{0=[{cpu_time_in_nanos=100, memory_in_bytes=100}, stats_type=worker_stats, is_active=true, threadId=0]}") + ); + } +} diff --git a/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java b/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java new file mode 100644 index 0000000000000..a8fd3623ef09d --- /dev/null +++ b/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.logging.MockAppender; +import org.opensearch.common.settings.Settings; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.ResourceStatsType; +import org.opensearch.tasks.ResourceUsageMetric; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.Collections; + +import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY; +import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE; + +public class TopNSearchTasksLoggerTests extends OpenSearchSingleNodeTestCase { + static MockAppender appender; + static Logger searchLogger = LogManager.getLogger(TopNSearchTasksLogger.TASK_DETAILS_LOG_PREFIX + ".search"); + + private TopNSearchTasksLogger topNSearchTasksLogger; + + @BeforeClass + public static void init() throws IllegalAccessException { + appender = new MockAppender("trace_appender"); + appender.start(); + Loggers.addAppender(searchLogger, appender); + } + + @AfterClass + public static void cleanup() { + Loggers.removeAppender(searchLogger, appender); + appender.stop(); + } + + public void testLoggerWithTasks() { + final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "0ms").build(); + topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + generateTasks(5); + LogEvent logEvent = appender.getLastEventAndReset(); + assertNotNull(logEvent); + assertEquals(logEvent.getLevel(), Level.INFO); + assertTrue(logEvent.getMessage().getFormattedMessage().contains("cpu_time_in_nanos=300, memory_in_bytes=300")); + } + + public void testLoggerWithoutTasks() { + final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "500ms").build(); + topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + + assertNull(appender.getLastEventAndReset()); + } + + public void testLoggerWithHighFrequency() { + // setting the frequency to a really large value and confirming that nothing gets written to log file. + final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "10m").build(); + topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + generateTasks(5); + generateTasks(2); + + assertNull(appender.getLastEventAndReset()); + } + + // generate search tasks and updates the topN search tasks logger consumer. + public void generateTasks(int numberOfTasks) { + for (int i = 0; i < numberOfTasks; i++) { + Task task = new SearchShardTask( + i, + "n/a", + "n/a", + "test", + null, + Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"), + () -> "n/a" + ); + task.startThreadResourceTracking( + i, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, 0L), + new ResourceUsageMetric(ResourceStats.CPU, 0L) + ); + task.updateThreadResourceStats( + i, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, i * 100L), + new ResourceUsageMetric(ResourceStats.CPU, i * 100L) + ); + topNSearchTasksLogger.accept(task); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index a588909085160..249ffcfd0bf6e 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -76,6 +76,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; @@ -201,6 +202,7 @@ protected class ReplicationGroup implements AutoCloseable, Iterable private final AtomicInteger docId = new AtomicInteger(); boolean closed = false; private volatile ReplicationTargets replicationTargets; + private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer( new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java index 9b9baebd540c3..c80b120ad0148 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java @@ -261,11 +261,16 @@ private static TransportAddress[] extractTransportAddresses(TransportService tra } @Override - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + protected TaskManager createTaskManager( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(settings, threadPool, taskHeaders); + return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders); } } @@ -530,7 +535,6 @@ public void clearCallback() { /** * Adds a new handling behavior that is used when the defined request is received. - * */ public void addRequestHandlingBehavior( String actionName,