diff --git a/docs/reference/modules/http.asciidoc b/docs/reference/modules/http.asciidoc index 0bedafeafef86..320be3925f806 100644 --- a/docs/reference/modules/http.asciidoc +++ b/docs/reference/modules/http.asciidoc @@ -190,3 +190,14 @@ Defaults to `network.tcp.receive_buffer_size`. `http.client_stats.enabled`:: (<>) Enable or disable collection of HTTP client stats. Defaults to `true`. + +`http.client_stats.closed_channels.max_count`:: +(<>) +When `http.client_stats.enabled` is `true`, sets the maximum number of closed +HTTP channels for which {es} reports statistics. Defaults to `10000`. + +`http.client_stats.closed_channels.max_age`:: +(<>) +When `http.client_stats.enabled` is `true`, sets the maximum length of time +after closing a HTTP channel that {es} will report that channel's statistics. +Defaults to `5m`. diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java index aef3e0a9114e7..f52b2de33e318 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java @@ -16,6 +16,7 @@ import org.elasticsearch.transport.netty4.Netty4TcpChannel; import java.net.InetSocketAddress; +import java.net.SocketAddress; public class Netty4HttpChannel implements HttpChannel { @@ -34,12 +35,20 @@ public void sendResponse(HttpResponse response, ActionListener listener) { @Override public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) channel.localAddress(); + return castAddressOrNull(channel.localAddress()); } @Override public InetSocketAddress getRemoteAddress() { - return (InetSocketAddress) channel.remoteAddress(); + return castAddressOrNull(channel.remoteAddress()); + } + + private static InetSocketAddress castAddressOrNull(SocketAddress socketAddress) { + if (socketAddress instanceof InetSocketAddress) { + return (InetSocketAddress) socketAddress; + } else { + return null; + } } @Override diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index cf223cd383030..f03b49cda379d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -275,6 +275,8 @@ public void apply(Settings value, Settings current, Settings previous) { HttpTransportSettings.SETTING_HTTP_TRACE_LOG_INCLUDE, HttpTransportSettings.SETTING_HTTP_TRACE_LOG_EXCLUDE, HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED, + HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_AGE, + HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_COUNT, HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING, HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java b/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java index 1a3c2345e9280..54912a7d94a1d 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java +++ b/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java @@ -14,13 +14,21 @@ import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; +import java.net.InetSocketAddress; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; +import java.util.function.LongPredicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.http.HttpStats.ClientStats.NOT_CLOSED; /** * Tracks a collection of {@link org.elasticsearch.http.HttpStats.ClientStats} for current and recently-closed HTTP connections. @@ -29,94 +37,88 @@ public class HttpClientStatsTracker { private static final Logger logger = LogManager.getLogger(); - private static final long PRUNE_THROTTLE_INTERVAL = TimeUnit.SECONDS.toMillis(60); - private static final long MAX_CLIENT_STATS_AGE = TimeUnit.MINUTES.toMillis(5); - - private final Map httpChannelStats = new ConcurrentHashMap<>(); private final ThreadPool threadPool; - private volatile long lastClientStatsPruneTime; + private final Map httpChannelStats = new ConcurrentHashMap<>(); + private final Semaphore closedChannelPermits; + private final ConcurrentLinkedQueue closedChannelStats = new ConcurrentLinkedQueue<>(); + private final long maxClosedChannelAgeMillis; + private volatile boolean clientStatsEnabled; HttpClientStatsTracker(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this.threadPool = threadPool; - clientStatsEnabled = HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED.get(settings); + this.closedChannelPermits = new Semaphore(HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_COUNT.get(settings)); + this.maxClosedChannelAgeMillis = HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_AGE.get(settings).millis(); + this.clientStatsEnabled = HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED.get(settings); clusterSettings.addSettingsUpdateConsumer(HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED, this::enableClientStats); } - /** - * Prunes client stats of entries that have been disconnected for more than {@link #MAX_CLIENT_STATS_AGE} (i.e. 5 minutes). - * - * @param throttled When true, executes the prune process only if more than {@link #PRUNE_THROTTLE_INTERVAL} (i.e. 60 seconds) has - * elapsed since the last execution. - */ - private void pruneClientStats(boolean throttled) { - if (clientStatsEnabled && throttled == false || - (threadPool.relativeTimeInMillis() - lastClientStatsPruneTime > PRUNE_THROTTLE_INTERVAL)) { - long nowMillis = threadPool.absoluteTimeInMillis(); - for (var statsEntry : httpChannelStats.entrySet()) { - long closedTimeMillis = statsEntry.getValue().closedTimeMillis; - if (closedTimeMillis > 0 && (nowMillis - closedTimeMillis > MAX_CLIENT_STATS_AGE)) { - httpChannelStats.remove(statsEntry.getKey()); - } - } - lastClientStatsPruneTime = threadPool.relativeTimeInMillis(); - } - } - /** * Enables or disables collection of HTTP client stats. */ private void enableClientStats(boolean enabled) { this.clientStatsEnabled = enabled; + if (enabled == false) { - // when disabling, immediately clear client stats + // stop tracking stats for open channels httpChannelStats.clear(); + + // remove all stats for closed channels (NB best effort attempt, we might be concurrently adding some too, but they'll be pruned + // the next time the stats are retrieved) + pruneStaleClosedChannelStats(l -> false); } } /** * Register the given channel with this tracker. - * - * @return the corresponding newly-created stats object, or {@code null} if disabled. */ - HttpStats.ClientStats addClientStats(final HttpChannel httpChannel) { - if (clientStatsEnabled) { - final HttpStats.ClientStats clientStats; - if (httpChannel != null) { - clientStats = new HttpStats.ClientStats(threadPool.absoluteTimeInMillis()); - httpChannelStats.put(getChannelKey(httpChannel), clientStats); - httpChannel.addCloseListener(ActionListener.wrap(() -> { - try { - HttpStats.ClientStats disconnectedClientStats = - httpChannelStats.get(getChannelKey(httpChannel)); - if (disconnectedClientStats != null) { - disconnectedClientStats.closedTimeMillis = threadPool.absoluteTimeInMillis(); - } - } catch (Exception e) { - assert false : e; // the listener code above should never throw - logger.warn("error removing HTTP channel listener", e); - } - })); - } else { - clientStats = null; - } - pruneClientStats(true); - return clientStats; - } else { - return null; + void addClientStats(final HttpChannel httpChannel) { + if (clientStatsEnabled == false) { + return; } + + if (httpChannel == null) { + return; + } + + httpChannelStats.putIfAbsent(httpChannel, new ClientStatsBuilder( + System.identityHashCode(httpChannel), + formatAddress(httpChannel.getRemoteAddress()), + threadPool.absoluteTimeInMillis())); + httpChannel.addCloseListener(ActionListener.wrap(() -> { + try { + final ClientStatsBuilder disconnectedClientStats = httpChannelStats.remove(httpChannel); + if (disconnectedClientStats != null) { + addClosedChannelStats(disconnectedClientStats.build(threadPool.absoluteTimeInMillis())); + } + } catch (Exception e) { + assert false : e; // the listener code above should never throw + logger.warn("error removing HTTP channel listener", e); + } + })); } - private static String getFirstValueForHeader(final HttpRequest request, final String header) { - for (Map.Entry> entry : request.getHeaders().entrySet()) { - if (entry.getKey().equalsIgnoreCase(header)) { - if (entry.getValue().size() > 0) { - return entry.getValue().get(0); + private void addClosedChannelStats(HttpStats.ClientStats clientStats) { + if (clientStatsEnabled == false) { + return; + } + + if (closedChannelPermits.tryAcquire() == false) { + // no room in the list, push out the oldest entry + synchronized (closedChannelStats) { + final HttpStats.ClientStats oldest = closedChannelStats.poll(); + if (oldest == null && closedChannelPermits.tryAcquire() == false) { + // The list is currently empty but no permits are available (and no prune is in progress). This can theoretically + // happen, e.g. if all the permits are held by other threads that haven't got around to adding their stats yet. + // Stupendously unlikely, and those other threads have fresher data anyway, so let's just give up. + return; } } } - return null; + + // we either acquired a permit or removed an item, so there's now room for our stats in the list + closedChannelStats.add(clientStats); } /** @@ -124,62 +126,133 @@ private static String getFirstValueForHeader(final HttpRequest request, final St */ void updateClientStats(final HttpRequest httpRequest, final HttpChannel httpChannel) { if (clientStatsEnabled && httpChannel != null) { - HttpStats.ClientStats clientStats = httpChannelStats.get(getChannelKey(httpChannel)); - if (clientStats == null) { - // will always return a non-null value when httpChannel is non-null - clientStats = addClientStats(httpChannel); + final ClientStatsBuilder clientStats = httpChannelStats.get(httpChannel); + if (clientStats != null) { + clientStats.update(httpRequest, httpChannel, threadPool.absoluteTimeInMillis()); + } + } + } + + /** + * @return a list of the stats for the channels that are currently being tracked. + */ + List getClientStats() { + if (clientStatsEnabled) { + final long currentTimeMillis = threadPool.absoluteTimeInMillis(); + final LongPredicate keepTimePredicate = closeTimeMillis -> currentTimeMillis - closeTimeMillis <= maxClosedChannelAgeMillis; + pruneStaleClosedChannelStats(keepTimePredicate); + return Stream.concat( + closedChannelStats.stream().filter(c -> keepTimePredicate.test(c.closedTimeMillis)), + httpChannelStats.values().stream().map(c -> c.build(NOT_CLOSED)) + ).collect(Collectors.toList()); + } else { + // prune even if disabled since we don't prevent concurrently adding entries while being disabled + httpChannelStats.clear(); + pruneStaleClosedChannelStats(l -> false); + return Collections.emptyList(); + } + } + + private void pruneStaleClosedChannelStats(LongPredicate keepTimePredicate) { + synchronized (closedChannelStats) { + while (true) { + final HttpStats.ClientStats nextStats = closedChannelStats.peek(); + if (nextStats == null) { + return; + } + + if (keepTimePredicate.test(nextStats.closedTimeMillis)) { + // the list elements are pretty much in the order in which the channels were closed so keep all the remaining items + return; + } + + final HttpStats.ClientStats removed = closedChannelStats.poll(); + assert removed == nextStats; // synchronized (closedChannelStats) means nobody else did a poll() since the peek() + closedChannelPermits.release(); } + } + } + + @Nullable + private static String formatAddress(@Nullable InetSocketAddress localAddress) { + return localAddress == null ? null : NetworkAddress.format(localAddress); + } + + private static class ClientStatsBuilder { + final int id; + final long openedTimeMillis; + + String agent; + String localAddress; + String remoteAddress; + String lastUri; + String forwardedFor; + String opaqueId; + long lastRequestTimeMillis = -1L; + long requestCount; + long requestSizeBytes; - if (clientStats.agent == null) { + ClientStatsBuilder(int id, @Nullable String remoteAddress, long openedTimeMillis) { + this.id = id; + this.remoteAddress = remoteAddress; + this.openedTimeMillis = openedTimeMillis; + } + + synchronized void update(HttpRequest httpRequest, HttpChannel httpChannel, long currentTimeMillis) { + if (agent == null) { final String elasticProductOrigin = getFirstValueForHeader(httpRequest, "x-elastic-product-origin"); if (elasticProductOrigin != null) { - clientStats.agent = elasticProductOrigin; + agent = elasticProductOrigin; } else { - final String userAgent = getFirstValueForHeader(httpRequest, "User-Agent"); - if (userAgent != null) { - clientStats.agent = userAgent; - } + agent = getFirstValueForHeader(httpRequest, "User-Agent"); } } - if (clientStats.localAddress == null) { - clientStats.localAddress = - httpChannel.getLocalAddress() == null ? null : NetworkAddress.format(httpChannel.getLocalAddress()); - clientStats.remoteAddress = - httpChannel.getRemoteAddress() == null ? null : NetworkAddress.format(httpChannel.getRemoteAddress()); + if (localAddress == null) { + localAddress = formatAddress(httpChannel.getLocalAddress()); } - if (clientStats.forwardedFor == null) { - final String forwardedFor = getFirstValueForHeader(httpRequest, "x-forwarded-for"); - if (forwardedFor != null) { - clientStats.forwardedFor = forwardedFor; - } + if (remoteAddress == null) { + remoteAddress = formatAddress(httpChannel.getRemoteAddress()); } - if (clientStats.opaqueId == null) { - final String opaqueId = getFirstValueForHeader(httpRequest, "x-opaque-id"); - if (opaqueId != null) { - clientStats.opaqueId = opaqueId; - } + if (forwardedFor == null) { + forwardedFor = getFirstValueForHeader(httpRequest, "x-forwarded-for"); + } + if (opaqueId == null) { + opaqueId = getFirstValueForHeader(httpRequest, "x-opaque-id"); } - clientStats.lastRequestTimeMillis = threadPool.absoluteTimeInMillis(); - clientStats.lastUri = httpRequest.uri(); - clientStats.requestCount.increment(); - clientStats.requestSizeBytes.add(httpRequest.content().length()); + + lastRequestTimeMillis = currentTimeMillis; + lastUri = httpRequest.uri(); + requestCount += 1; + requestSizeBytes += httpRequest.content().length(); } - } - /** - * @return a list of the stats for the channels that are currently being tracked. - */ - List getClientStats() { - pruneClientStats(false); - return new ArrayList<>(httpChannelStats.values()); - } + private static String getFirstValueForHeader(final HttpRequest request, final String header) { + for (Map.Entry> entry : request.getHeaders().entrySet()) { + if (entry.getKey().equalsIgnoreCase(header)) { + if (entry.getValue().size() > 0) { + return entry.getValue().get(0); + } + } + } + return null; + } - /** - * Returns a key suitable for use in a hash table for the specified HttpChannel - */ - private static int getChannelKey(HttpChannel channel) { - // always use an identity-based hash code rather than one based on object state - return System.identityHashCode(channel); + synchronized HttpStats.ClientStats build(long closedTimeMillis) { + return new HttpStats.ClientStats( + id, + agent, + localAddress, + remoteAddress, + lastUri, + forwardedFor, + opaqueId, + openedTimeMillis, + closedTimeMillis, + lastRequestTimeMillis, + requestCount, + requestSizeBytes + ); + } } } diff --git a/server/src/main/java/org/elasticsearch/http/HttpStats.java b/server/src/main/java/org/elasticsearch/http/HttpStats.java index ccf47c73dce76..de5b640664983 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpStats.java +++ b/server/src/main/java/org/elasticsearch/http/HttpStats.java @@ -16,7 +16,6 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.LongAdder; public class HttpStats implements Writeable, ToXContentFragment { @@ -93,28 +92,36 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } public static class ClientStats implements Writeable, ToXContentFragment { - final int id; - String agent; - String localAddress; - String remoteAddress; - String lastUri; - String forwardedFor; - String opaqueId; - long openedTimeMillis; - long closedTimeMillis = -1; - volatile long lastRequestTimeMillis = -1; - final LongAdder requestCount = new LongAdder(); - final LongAdder requestSizeBytes = new LongAdder(); - - ClientStats(long openedTimeMillis) { - this.id = System.identityHashCode(this); - this.openedTimeMillis = openedTimeMillis; - } + public static final long NOT_CLOSED = -1L; - // visible for testing - public ClientStats(String agent, String localAddress, String remoteAddress, String lastUri, String forwardedFor, String opaqueId, - long openedTimeMillis, long closedTimeMillis, long lastRequestTimeMillis, long requestCount, long requestSizeBytes) { - this.id = System.identityHashCode(this); + final int id; + final String agent; + final String localAddress; + final String remoteAddress; + final String lastUri; + final String forwardedFor; + final String opaqueId; + final long openedTimeMillis; + final long closedTimeMillis; + final long lastRequestTimeMillis; + final long requestCount; + final long requestSizeBytes; + + public ClientStats( + int id, + String agent, + String localAddress, + String remoteAddress, + String lastUri, + String forwardedFor, + String opaqueId, + long openedTimeMillis, + long closedTimeMillis, + long lastRequestTimeMillis, + long requestCount, + long requestSizeBytes + ) { + this.id = id; this.agent = agent; this.localAddress = localAddress; this.remoteAddress = remoteAddress; @@ -124,8 +131,8 @@ public ClientStats(String agent, String localAddress, String remoteAddress, Stri this.openedTimeMillis = openedTimeMillis; this.closedTimeMillis = closedTimeMillis; this.lastRequestTimeMillis = lastRequestTimeMillis; - this.requestCount.add(requestCount); - this.requestSizeBytes.add(requestSizeBytes); + this.requestCount = requestCount; + this.requestSizeBytes = requestSizeBytes; } ClientStats(StreamInput in) throws IOException { @@ -139,8 +146,8 @@ public ClientStats(String agent, String localAddress, String remoteAddress, Stri this.openedTimeMillis = in.readLong(); this.closedTimeMillis = in.readLong(); this.lastRequestTimeMillis = in.readLong(); - this.requestCount.add(in.readLong()); - this.requestSizeBytes.add(in.readLong()); + this.requestCount = in.readLong(); + this.requestSizeBytes = in.readLong(); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -165,12 +172,12 @@ public ClientStats(String agent, String localAddress, String remoteAddress, Stri builder.field(Fields.CLIENT_OPAQUE_ID, opaqueId); } builder.field(Fields.CLIENT_OPENED_TIME_MILLIS, openedTimeMillis); - if (closedTimeMillis != -1) { + if (closedTimeMillis != NOT_CLOSED) { builder.field(Fields.CLIENT_CLOSED_TIME_MILLIS, closedTimeMillis); } builder.field(Fields.CLIENT_LAST_REQUEST_TIME_MILLIS, lastRequestTimeMillis); - builder.field(Fields.CLIENT_REQUEST_COUNT, requestCount.longValue()); - builder.field(Fields.CLIENT_REQUEST_SIZE_BYTES, requestSizeBytes.longValue()); + builder.field(Fields.CLIENT_REQUEST_COUNT, requestCount); + builder.field(Fields.CLIENT_REQUEST_SIZE_BYTES, requestSizeBytes); builder.endObject(); return builder; } @@ -187,8 +194,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(openedTimeMillis); out.writeLong(closedTimeMillis); out.writeLong(lastRequestTimeMillis); - out.writeLong(requestCount.longValue()); - out.writeLong(requestSizeBytes.longValue()); + out.writeLong(requestCount); + out.writeLong(requestSizeBytes); } } } diff --git a/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java b/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java index a7469b72ac1fd..4147c3138e37d 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java +++ b/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java @@ -114,6 +114,10 @@ public final class HttpTransportSettings { public static final Setting SETTING_HTTP_CLIENT_STATS_ENABLED = boolSetting("http.client_stats.enabled", true, Property.Dynamic, Property.NodeScope); + public static final Setting SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_COUNT = + intSetting("http.client_stats.closed_channels.max_count", 10000, Property.NodeScope); + public static final Setting SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_AGE = + Setting.timeSetting("http.client_stats.closed_channels.max_age", TimeValue.timeValueMinutes(5), Property.NodeScope); private HttpTransportSettings() { } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 91a1fc1b5ed8b..87ff0faa0937a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -522,6 +522,7 @@ public static NodeStats createNodeStats() { List clientStats = new ArrayList<>(numClients); for (int k = 0; k < numClients; k++) { var cs = new HttpStats.ClientStats( + randomInt(), randomAlphaOfLength(6), randomAlphaOfLength(6), randomAlphaOfLength(6), diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index ad2498665fb14..f336bd0a4a033 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -462,6 +462,7 @@ protected void stopInternal() { .withPath("/internal/stats_test") .withHeaders(Map.of(Task.X_OPAQUE_ID, Collections.singletonList(opaqueId))) .build(); + transport.serverAcceptedChannel(fakeRestRequest.getHttpChannel()); transport.incomingRequest(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel()); HttpStats httpStats = transport.stats(); @@ -478,6 +479,7 @@ protected void stopInternal() { .withPath("/internal/stats_test2") .withHeaders(Map.of(Task.X_OPAQUE_ID.toUpperCase(Locale.ROOT), Collections.singletonList(opaqueId))) .build(); + transport.serverAcceptedChannel(fakeRestRequest.getHttpChannel()); transport.incomingRequest(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel()); httpStats = transport.stats(); assertThat(httpStats.getClientStats().size(), equalTo(2)); @@ -532,6 +534,7 @@ public void testDisablingHttpClientStats() { .withPath("/internal/stats_test") .withHeaders(Map.of(Task.X_OPAQUE_ID, Collections.singletonList(opaqueId))) .build(); + transport.serverAcceptedChannel(fakeRestRequest.getHttpChannel()); transport.incomingRequest(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel()); // HTTP client stats should default to enabled @@ -556,6 +559,7 @@ public void testDisablingHttpClientStats() { .withPath("/internal/stats_test") .withHeaders(Map.of(Task.X_OPAQUE_ID, Collections.singletonList(opaqueId))) .build(); + transport.serverAcceptedChannel(fakeRestRequest.getHttpChannel()); transport.incomingRequest(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel()); httpStats = transport.stats(); assertThat(httpStats.getClientStats().size(), equalTo(0)); @@ -573,6 +577,7 @@ public void testDisablingHttpClientStats() { .withPath("/internal/stats_test") .withHeaders(Map.of(Task.X_OPAQUE_ID, Collections.singletonList(opaqueId))) .build(); + transport.serverAcceptedChannel(fakeRestRequest.getHttpChannel()); transport.incomingRequest(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel()); httpStats = transport.stats(); assertThat(httpStats.getClientStats().size(), equalTo(1)); diff --git a/server/src/test/java/org/elasticsearch/http/HttpClientStatsTrackerTests.java b/server/src/test/java/org/elasticsearch/http/HttpClientStatsTrackerTests.java new file mode 100644 index 0000000000000..94ab314dca19b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/http/HttpClientStatsTrackerTests.java @@ -0,0 +1,448 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.network.NetworkAddress; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.node.Node; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.threadpool.ThreadPool; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_AGE; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_COUNT; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class HttpClientStatsTrackerTests extends ESTestCase { + + public void testCollectsNoStatsIfDisabled() { + final Settings settings = Settings.builder().put(SETTING_HTTP_CLIENT_STATS_ENABLED.getKey(), false).build(); + final HttpClientStatsTracker httpClientStatsTracker = new HttpClientStatsTracker( + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new FakeTimeThreadPool()); + + final HttpChannel httpChannel = randomHttpChannel(); + httpClientStatsTracker.addClientStats(httpChannel); + assertThat(httpClientStatsTracker.getClientStats(), empty()); + + httpClientStatsTracker.updateClientStats(randomHttpRequest(), httpChannel); + assertThat(httpClientStatsTracker.getClientStats(), empty()); + + httpChannel.close(); + assertThat(httpClientStatsTracker.getClientStats(), empty()); + + httpClientStatsTracker.updateClientStats(randomHttpRequest(), randomHttpChannel()); + assertThat(httpClientStatsTracker.getClientStats(), empty()); + } + + public void testStatsCollection() { + final Settings.Builder settings = Settings.builder(); + if (randomBoolean()) { + settings.put(SETTING_HTTP_CLIENT_STATS_ENABLED.getKey(), true); + } + final long maxAgeMillis; + if (randomBoolean()) { + maxAgeMillis = randomLongBetween(1L, 1000000L); + settings.put(SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_AGE.getKey(), TimeValue.timeValueMillis(maxAgeMillis)); + } else { + maxAgeMillis = SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_AGE.get(Settings.EMPTY).millis(); + } + + final FakeTimeThreadPool threadPool = new FakeTimeThreadPool(); + + final HttpClientStatsTracker httpClientStatsTracker = new HttpClientStatsTracker( + settings.build(), + new ClusterSettings(settings.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool); + + threadPool.setRandomTime(); + final long openTimeMillis = threadPool.absoluteTimeInMillis(); + long requestLength = 0L; + final HttpChannel httpChannel = randomHttpChannel(); + httpClientStatsTracker.addClientStats(httpChannel); + { + final List clientsStats = httpClientStatsTracker.getClientStats(); + assertThat(clientsStats, hasSize(1)); + final HttpStats.ClientStats clientStats = clientsStats.get(0); + assertThat(clientStats.remoteAddress, equalTo(NetworkAddress.format(httpChannel.getRemoteAddress()))); + assertNull(clientStats.lastUri); + assertThat(clientStats.requestCount, equalTo(0L)); + assertThat(clientStats.requestSizeBytes, equalTo(requestLength)); + assertThat(clientStats.closedTimeMillis, equalTo(-1L)); + assertThat(clientStats.openedTimeMillis, equalTo(openTimeMillis)); + + assertNull(clientStats.agent); + assertNull(clientStats.forwardedFor); + assertNull(clientStats.opaqueId); + } + + threadPool.setRandomTime(); + final HttpRequest httpRequest1 = randomHttpRequest(); + httpClientStatsTracker.updateClientStats(httpRequest1, httpChannel); + { + final List clientsStats = httpClientStatsTracker.getClientStats(); + assertThat(clientsStats, hasSize(1)); + + final HttpStats.ClientStats clientStats = clientsStats.get(0); + assertThat(clientStats.remoteAddress, equalTo(NetworkAddress.format(httpChannel.getRemoteAddress()))); + assertThat(clientStats.lastUri, equalTo(httpRequest1.uri())); + assertThat(clientStats.requestCount, equalTo(1L)); + requestLength += httpRequest1.content().length(); + assertThat(clientStats.requestSizeBytes, equalTo(requestLength)); + assertThat(clientStats.closedTimeMillis, equalTo(-1L)); + assertThat(clientStats.openedTimeMillis, equalTo(openTimeMillis)); + + final Map relevantHeaders = getRelevantHeaders(httpRequest1); + assertThat(clientStats.agent, equalTo(Optional.empty() + .or(() -> Optional.ofNullable(relevantHeaders.get("x-elastic-product-origin"))) + .or(() -> Optional.ofNullable(relevantHeaders.get("user-agent"))) + .orElse(null))); + assertThat(clientStats.forwardedFor, equalTo(relevantHeaders.get("x-forwarded-for"))); + assertThat(clientStats.opaqueId, equalTo(relevantHeaders.get("x-opaque-id"))); + } + + threadPool.setRandomTime(); + final HttpRequest httpRequest2 = randomHttpRequest(); + httpClientStatsTracker.updateClientStats(httpRequest2, httpChannel); + { + final List clientsStats = httpClientStatsTracker.getClientStats(); + assertThat(clientsStats, hasSize(1)); + + final HttpStats.ClientStats clientStats = clientsStats.get(0); + assertThat(clientStats.remoteAddress, equalTo(NetworkAddress.format(httpChannel.getRemoteAddress()))); + assertThat(clientStats.lastUri, equalTo(httpRequest2.uri())); + assertThat(clientStats.requestCount, equalTo(2L)); + requestLength += httpRequest2.content().length(); + assertThat(clientStats.requestSizeBytes, equalTo(requestLength)); + assertThat(clientStats.closedTimeMillis, equalTo(-1L)); + assertThat(clientStats.openedTimeMillis, equalTo(openTimeMillis)); + + final Map relevantHeaders1 = getRelevantHeaders(httpRequest1); + final Map relevantHeaders2 = getRelevantHeaders(httpRequest2); + assertThat(clientStats.agent, equalTo(Optional.empty() + .or(() -> Optional.ofNullable(relevantHeaders1.get("x-elastic-product-origin"))) + .or(() -> Optional.ofNullable(relevantHeaders1.get("user-agent"))) + .or(() -> Optional.ofNullable(relevantHeaders2.get("x-elastic-product-origin"))) + .or(() -> Optional.ofNullable(relevantHeaders2.get("user-agent"))) + .orElse(null))); + assertThat(clientStats.forwardedFor, equalTo(Optional.empty() + .or(() -> Optional.ofNullable(relevantHeaders1.get("x-forwarded-for"))) + .or(() -> Optional.ofNullable(relevantHeaders2.get("x-forwarded-for"))) + .orElse(null))); + assertThat(clientStats.opaqueId, equalTo(Optional.empty() + .or(() -> Optional.ofNullable(relevantHeaders1.get("x-opaque-id"))) + .or(() -> Optional.ofNullable(relevantHeaders2.get("x-opaque-id"))) + .orElse(null))); + } + + threadPool.setRandomTime(); + final long closeTimeMillis = threadPool.absoluteTimeInMillis(); + httpChannel.close(); + { + final List clientsStats = httpClientStatsTracker.getClientStats(); + assertThat(clientsStats, hasSize(1)); + + final HttpStats.ClientStats clientStats = clientsStats.get(0); + assertThat(clientStats.remoteAddress, equalTo(NetworkAddress.format(httpChannel.getRemoteAddress()))); + assertThat(clientStats.lastUri, equalTo(httpRequest2.uri())); + assertThat(clientStats.requestCount, equalTo(2L)); + assertThat(clientStats.requestSizeBytes, equalTo(requestLength)); + assertThat(clientStats.closedTimeMillis, equalTo(closeTimeMillis)); + assertThat(clientStats.openedTimeMillis, equalTo(openTimeMillis)); + + final Map relevantHeaders1 = getRelevantHeaders(httpRequest1); + final Map relevantHeaders2 = getRelevantHeaders(httpRequest2); + assertThat(clientStats.agent, equalTo(Optional.empty() + .or(() -> Optional.ofNullable(relevantHeaders1.get("x-elastic-product-origin"))) + .or(() -> Optional.ofNullable(relevantHeaders1.get("user-agent"))) + .or(() -> Optional.ofNullable(relevantHeaders2.get("x-elastic-product-origin"))) + .or(() -> Optional.ofNullable(relevantHeaders2.get("user-agent"))) + .orElse(null))); + assertThat(clientStats.forwardedFor, equalTo(Optional.empty() + .or(() -> Optional.ofNullable(relevantHeaders1.get("x-forwarded-for"))) + .or(() -> Optional.ofNullable(relevantHeaders2.get("x-forwarded-for"))) + .orElse(null))); + assertThat(clientStats.opaqueId, equalTo(Optional.empty() + .or(() -> Optional.ofNullable(relevantHeaders1.get("x-opaque-id"))) + .or(() -> Optional.ofNullable(relevantHeaders2.get("x-opaque-id"))) + .orElse(null))); + } + + threadPool.setCurrentTimeInMillis( + threadPool.relativeTimeInMillis() + + maxAgeMillis + + 1 + + (randomBoolean() ? 0L : randomLongBetween(0L, 1L << 60))); + assertThat(httpClientStatsTracker.getClientStats(), empty()); + } + + public void testLimitsNumberOfClosedClients() throws InterruptedException { + + final Settings settings; + final int closedClientLimit; + { + final Settings.Builder builder = Settings.builder(); + if (usually()) { + closedClientLimit = scaledRandomIntBetween(1, 10000); + builder.put(SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_COUNT.getKey(), closedClientLimit); + } else { + closedClientLimit = SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_COUNT.get(Settings.EMPTY); + } + settings = builder.build(); + } + + final FakeTimeThreadPool threadPool = new FakeTimeThreadPool(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final HttpClientStatsTracker httpClientStatsTracker = new HttpClientStatsTracker(settings, clusterSettings, threadPool); + + final Thread[] clientThreads = new Thread[between(1, 5)]; + final CyclicBarrier startBarrier = new CyclicBarrier(clientThreads.length + 1); + final Semaphore operationPermits = new Semaphore(closedClientLimit * 2); + + // If we get stats while a channel is concurrently closed then the iteration through the list of closed channels may see more + // stats than expected, even though it was never in a state that had too many channels, so we block closing while retrieving stats + final ReadWriteLock closeLock = new ReentrantReadWriteLock(true); + final Consumer closeUnderLock = httpChannel -> { + closeLock.readLock().lock(); + httpChannel.close(); + closeLock.readLock().unlock(); + }; + + for (int i = 0; i < clientThreads.length; i++) { + clientThreads[i] = new Thread(() -> { + try { + startBarrier.await(10, TimeUnit.SECONDS); + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new AssertionError("unexpected", e); + } + + HttpChannel httpChannel = randomHttpChannel(); + httpClientStatsTracker.addClientStats(httpChannel); + while (operationPermits.tryAcquire()) { + if (usually()) { + closeUnderLock.accept(httpChannel); + httpChannel = randomHttpChannel(); + httpClientStatsTracker.addClientStats(httpChannel); + } + + httpClientStatsTracker.updateClientStats(randomHttpRequest(), httpChannel); + } + closeUnderLock.accept(httpChannel); + + }, "client-thread-" + i); + clientThreads[i].start(); + } + + final AtomicBoolean keepGoing = new AtomicBoolean(true); + final Thread statsThread = new Thread(() -> { + try { + startBarrier.await(10, TimeUnit.SECONDS); + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new AssertionError("unexpected", e); + } + + while (keepGoing.get()) { + closeLock.writeLock().lock(); + final List clientStats = httpClientStatsTracker.getClientStats(); + closeLock.writeLock().unlock(); + assertThat( + clientStats.stream().filter(c -> c.closedTimeMillis >= 0L).count(), + lessThanOrEqualTo((long) closedClientLimit)); + } + + }, "stats-thread"); + statsThread.start(); + + for (Thread clientThread : clientThreads) { + clientThread.join(); + } + keepGoing.set(false); + statsThread.join(); + } + + public void testClearsStatsIfDisabledConcurrently() throws InterruptedException { + final FakeTimeThreadPool threadPool = new FakeTimeThreadPool(); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final HttpClientStatsTracker httpClientStatsTracker = new HttpClientStatsTracker(Settings.EMPTY, clusterSettings, threadPool); + + final Thread[] clientThreads = new Thread[between(1, 5)]; + final CyclicBarrier startBarrier = new CyclicBarrier(clientThreads.length + 1); + final boolean expectPruning = randomBoolean(); + final Semaphore operationPermits = new Semaphore(between( + 1, + expectPruning + ? SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_COUNT.get(Settings.EMPTY) - 1 + : SETTING_HTTP_CLIENT_STATS_MAX_CLOSED_CHANNEL_COUNT.get(Settings.EMPTY) * 2)); + for (int i = 0; i < clientThreads.length; i++) { + clientThreads[i] = new Thread(() -> { + try { + startBarrier.await(10, TimeUnit.SECONDS); + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new AssertionError("unexpected", e); + } + + HttpChannel httpChannel = randomHttpChannel(); + httpClientStatsTracker.addClientStats(httpChannel); + while (operationPermits.tryAcquire()) { + if (randomBoolean()) { + httpChannel.close(); + httpChannel = randomHttpChannel(); + httpClientStatsTracker.addClientStats(httpChannel); + } + + httpClientStatsTracker.updateClientStats(randomHttpRequest(), httpChannel); + } + httpChannel.close(); + + }, "client-thread-" + i); + clientThreads[i].start(); + } + + try { + startBarrier.await(10, TimeUnit.SECONDS); + } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new AssertionError("unexpected", e); + } + clusterSettings.applySettings( + Settings.builder().put(SETTING_HTTP_CLIENT_STATS_ENABLED.getKey(), false).build()); + + try { + assertThat(httpClientStatsTracker.getClientStats(), empty()); + + // starts collecting stats again + clusterSettings.applySettings( + Settings.builder().put(SETTING_HTTP_CLIENT_STATS_ENABLED.getKey(), true).build()); + + final HttpChannel httpChannel = randomHttpChannel(); + httpClientStatsTracker.addClientStats(httpChannel); + if (expectPruning == false && randomBoolean()) { + // won't be pruned, the clock is not moving and we don't open enough channels to hit the limit + httpChannel.close(); + } + assertTrue(httpClientStatsTracker.getClientStats().stream() + .anyMatch(cs -> cs.remoteAddress.equals(NetworkAddress.format(httpChannel.getRemoteAddress())))); + } finally { + for (Thread clientThread : clientThreads) { + clientThread.join(); + } + } + } + + private Map getRelevantHeaders(HttpRequest httpRequest) { + final Map headers = new HashMap<>(4); + final String[] relevantHeaderNames = new String[]{"user-agent", "x-elastic-product-origin", "x-forwarded-for", "x-opaque-id"}; + for (Map.Entry> header : httpRequest.getHeaders().entrySet()) { + if (header.getValue().size() > 0) { + for (String relevantHeaderName : relevantHeaderNames) { + if (header.getKey().equalsIgnoreCase(relevantHeaderName)) { + headers.putIfAbsent(relevantHeaderName, header.getValue().get(0)); + } + } + } + } + return headers; + } + + private HttpRequest randomHttpRequest() { + final Map> headers = new HashMap<>(); + putRandomHeader("user-agent", headers); + putRandomHeader("x-elastic-product-origin", headers); + putRandomHeader("x-forwarded-for", headers); + putRandomHeader("x-opaque-id", headers); + return new FakeRestRequest.FakeHttpRequest( + randomFrom(RestRequest.Method.values()), + randomAlphaOfLength(10), + new BytesArray(randomByteArrayOfLength(between(0, 20))), + headers); + } + + private static void putRandomHeader(String key, Map> headers) { + if (randomBoolean()) { + headers.put(randomizeCase(key), randomList(1, 5, () -> randomAlphaOfLengthBetween(5, 15))); + } + } + + private static String randomizeCase(String s) { + final char[] chars = s.toCharArray(); + for (int i = 0; i < chars.length; i++) { + chars[i] = randomizeCase(chars[i]); + } + return new String(chars); + } + + private static char randomizeCase(char c) { + switch (between(1, 3)) { + case 1: + return Character.toUpperCase(c); + case 2: + return Character.toLowerCase(c); + default: + return c; + } + } + + private HttpChannel randomHttpChannel() { + return new FakeRestRequest.FakeHttpChannel(new InetSocketAddress(randomIp(randomBoolean()), randomIntBetween(1, 65535))); + } + + private static class FakeTimeThreadPool extends ThreadPool { + + private long currentTimeInMillis; + private final long absoluteTimeOffset = randomLong(); + + FakeTimeThreadPool() { + super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build()); + stopCachedTimeThread(); + setRandomTime(); + } + + @Override + public long relativeTimeInMillis() { + return currentTimeInMillis; + } + + @Override + public long absoluteTimeInMillis() { + return currentTimeInMillis + absoluteTimeOffset; + } + + void setCurrentTimeInMillis(long currentTimeInMillis) { + this.currentTimeInMillis = currentTimeInMillis; + } + + void setRandomTime() { + // absolute time needs to be nonnegative + currentTimeInMillis = randomNonNegativeLong() - absoluteTimeOffset; + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index 67331431e0c04..0b1bf2ee67dbc 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -126,12 +126,12 @@ public Exception getInboundException() { } } - private static class FakeHttpChannel implements HttpChannel { + public static class FakeHttpChannel implements HttpChannel { private final InetSocketAddress remoteAddress; private final ListenableActionFuture closeFuture = new ListenableActionFuture<>(); - private FakeHttpChannel(InetSocketAddress remoteAddress) { + public FakeHttpChannel(InetSocketAddress remoteAddress) { this.remoteAddress = remoteAddress; }