Skip to content

Commit

Permalink
Merge branch 'main' into making-BulkRequest-implement-RefCounted
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 7, 2023
2 parents 2a2829d + 715b1bf commit 5a9f08f
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 24 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102435.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102435
summary: S3 first byte latency metric
area: Search
type: enhancement
issues: []
2 changes: 2 additions & 0 deletions docs/reference/release-notes.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This section summarizes the changes in each release.

* <<release-notes-8.13.0>>
* <<release-notes-8.12.0>>
* <<release-notes-8.11.2>>
* <<release-notes-8.11.1>>
* <<release-notes-8.11.0>>
* <<release-notes-8.10.4>>
Expand Down Expand Up @@ -58,6 +59,7 @@ This section summarizes the changes in each release.

include::release-notes/8.13.0.asciidoc[]
include::release-notes/8.12.0.asciidoc[]
include::release-notes/8.11.2.asciidoc[]
include::release-notes/8.11.1.asciidoc[]
include::release-notes/8.11.0.asciidoc[]
include::release-notes/8.10.4.asciidoc[]
Expand Down
83 changes: 83 additions & 0 deletions docs/reference/release-notes/8.11.2.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
[[release-notes-8.11.2]]
== {es} version 8.11.2

Also see <<breaking-changes-8.11,Breaking changes in 8.11>>.

[[known-issues-8.11.2]]
[float]
=== Known issues
include::8.10.3.asciidoc[tag=no-preventive-gc-issue]

[[bug-8.11.2]]
[float]
=== Bug fixes

Allocation::
* Improve failure handling in `ContinuousComputation` {es-pull}102281[#102281]

Application::
* Default `run_ml_inference` should be true {es-pull}102151[#102151]
* [Query Rules] Fix bug where combining the same metadata with text/numeric values leads to error {es-pull}102891[#102891] (issue: {es-issue}102827[#102827])

Cluster Coordination::
* Synchronize Coordinator#onClusterStateApplied {es-pull}100986[#100986] (issue: {es-issue}99023[#99023])

Data streams::
* [Usage API] Count all the data streams that have lifecycle {es-pull}102259[#102259]

ES|QL::
* ES|QL: Fix drop of renamed grouping {es-pull}102282[#102282] (issue: {es-issue}102121[#102121])
* ES|QL: Fix layout management for Project {es-pull}102399[#102399] (issue: {es-issue}102120[#102120])
* Fix DISSECT with empty patterns {es-pull}102580[#102580] (issue: {es-issue}102577[#102577])
* Fix leaking blocks in TopN {es-pull}102715[#102715] (issue: {es-issue}102646[#102646])
* Fix leaking blocks in `BlockUtils` {es-pull}102716[#102716]
* Fix memory tracking in TopN.Row {es-pull}102831[#102831] (issues: {es-issue}100640[#100640], {es-issue}102784[#102784], {es-issue}102790[#102790], {es-issue}102683[#102683])

ILM+SLM::
* [ILM] Fix downsample to skip already downsampled indices {es-pull}102250[#102250] (issue: {es-issue}102249[#102249])

Infra/Circuit Breakers::
* Add more logging to the real memory circuit breaker and lower minimum interval {es-pull}102396[#102396]

Ingest Node::
* Better processor stat merge {es-pull}102821[#102821]

Machine Learning::
* Ensure datafeed previews with no start or end time don't search the cold or frozen tiers {es-pull}102492[#102492]
* Recreate the Elasticsearch private temporary directory if it doesn't exist when an ML job is opened {es-pull}102599[#102599]

Mapping::
* Fix dense_vector cluster stats indexed_vector_dim_min/max values {es-pull}102467[#102467] (issue: {es-issue}102416[#102416])

Search::
* Allow mismatched sort-by field types if there are no docs to sort {es-pull}102779[#102779]

Security::
* Fix double-completion in `SecurityUsageTransportAction` {es-pull}102114[#102114] (issue: {es-issue}102111[#102111])

Snapshot/Restore::
* Set region for the STS client via privileged calls in AWS SDK {es-pull}102230[#102230] (issue: {es-issue}102173[#102173])
* Simplify `BlobStoreRepository` idle check {es-pull}102057[#102057] (issue: {es-issue}101948[#101948])

Transform::
* Ensure transform updates only modify the expected transform task {es-pull}102934[#102934] (issue: {es-issue}102933[#102933])
* Exclude stack traces from transform audit messages and health {es-pull}102240[#102240]

[[enhancement-8.11.2]]
[float]
=== Enhancements

Machine Learning::
* Add inference counts by model to the machine learning usage stats {es-pull}101915[#101915]

Security::
* Upgrade xmlsec to 2.3.4 {es-pull}102220[#102220]

[[upgrade-8.11.2]]
[float]
=== Upgrades

Snapshot/Restore::
* Upgrade reactor netty http version {es-pull}102311[#102311]


Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import static org.elasticsearch.repositories.RepositoriesModule.HTTP_REQUEST_TIME_IN_MICROS_HISTOGRAM;
import static org.elasticsearch.repositories.RepositoriesModule.METRIC_EXCEPTIONS_COUNT;
import static org.elasticsearch.repositories.RepositoriesModule.METRIC_EXCEPTIONS_HISTOGRAM;
import static org.elasticsearch.repositories.RepositoriesModule.METRIC_OPERATIONS_COUNT;
Expand Down Expand Up @@ -110,6 +111,7 @@ public void testMetricsWithErrors() throws IOException {
assertThat(getLongCounterValue(plugin, METRIC_THROTTLES_COUNT, Operation.PUT_OBJECT), equalTo(2L * batch));
assertThat(getLongHistogramValue(plugin, METRIC_EXCEPTIONS_HISTOGRAM, Operation.PUT_OBJECT), equalTo(batch));
assertThat(getLongHistogramValue(plugin, METRIC_THROTTLES_HISTOGRAM, Operation.PUT_OBJECT), equalTo(2L * batch));
assertThat(getNumberOfMeasurements(plugin, HTTP_REQUEST_TIME_IN_MICROS_HISTOGRAM, Operation.PUT_OBJECT), equalTo(batch));
}

// Get not found
Expand All @@ -129,6 +131,7 @@ public void testMetricsWithErrors() throws IOException {
assertThat(getLongCounterValue(plugin, METRIC_THROTTLES_COUNT, Operation.GET_OBJECT), equalTo(batch));
assertThat(getLongHistogramValue(plugin, METRIC_EXCEPTIONS_HISTOGRAM, Operation.GET_OBJECT), equalTo(batch));
assertThat(getLongHistogramValue(plugin, METRIC_THROTTLES_HISTOGRAM, Operation.GET_OBJECT), equalTo(batch));
assertThat(getNumberOfMeasurements(plugin, HTTP_REQUEST_TIME_IN_MICROS_HISTOGRAM, Operation.GET_OBJECT), equalTo(batch));
}

// List retry exhausted
Expand All @@ -148,6 +151,7 @@ public void testMetricsWithErrors() throws IOException {
assertThat(getLongCounterValue(plugin, METRIC_THROTTLES_COUNT, Operation.LIST_OBJECTS), equalTo(5L * batch));
assertThat(getLongHistogramValue(plugin, METRIC_EXCEPTIONS_HISTOGRAM, Operation.LIST_OBJECTS), equalTo(batch));
assertThat(getLongHistogramValue(plugin, METRIC_THROTTLES_HISTOGRAM, Operation.LIST_OBJECTS), equalTo(5L * batch));
assertThat(getNumberOfMeasurements(plugin, HTTP_REQUEST_TIME_IN_MICROS_HISTOGRAM, Operation.LIST_OBJECTS), equalTo(batch));
}

// Delete to clean up
Expand All @@ -159,6 +163,7 @@ public void testMetricsWithErrors() throws IOException {
assertThat(getLongCounterValue(plugin, METRIC_THROTTLES_COUNT, Operation.DELETE_OBJECTS), equalTo(0L));
assertThat(getLongHistogramValue(plugin, METRIC_EXCEPTIONS_HISTOGRAM, Operation.DELETE_OBJECTS), equalTo(0L));
assertThat(getLongHistogramValue(plugin, METRIC_THROTTLES_HISTOGRAM, Operation.DELETE_OBJECTS), equalTo(0L));
assertThat(getNumberOfMeasurements(plugin, HTTP_REQUEST_TIME_IN_MICROS_HISTOGRAM, Operation.DELETE_OBJECTS), equalTo(1L));
}

private void addErrorStatus(RestStatus... statuses) {
Expand All @@ -174,6 +179,11 @@ private long getLongCounterValue(TestTelemetryPlugin plugin, String instrumentNa
.orElse(0L);
}

private long getNumberOfMeasurements(TestTelemetryPlugin plugin, String instrumentName, Operation operation) {
final List<Measurement> measurements = plugin.getLongHistogramMeasurement(instrumentName);
return measurements.stream().filter(m -> m.attributes().get("operation") == operation.getKey()).count();
}

private long getLongHistogramValue(TestTelemetryPlugin plugin, String instrumentName, Operation operation) {
final List<Measurement> measurements = Measurement.combine(plugin.getLongHistogramMeasurement(instrumentName));
return measurements.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.repositories.RepositoriesModule.HTTP_REQUEST_TIME_IN_MICROS_HISTOGRAM;
import static org.elasticsearch.repositories.RepositoriesModule.METRIC_EXCEPTIONS_COUNT;
import static org.elasticsearch.repositories.RepositoriesModule.METRIC_EXCEPTIONS_HISTOGRAM;
import static org.elasticsearch.repositories.RepositoriesModule.METRIC_OPERATIONS_COUNT;
Expand Down Expand Up @@ -97,6 +99,7 @@ class S3BlobStore implements BlobStore {
private final LongCounter unsuccessfulOperationCounter;
private final LongHistogram exceptionHistogram;
private final LongHistogram throttleHistogram;
private final LongHistogram httpRequestTimeInMicroHistogram;

private final StatsCollectors statsCollectors = new StatsCollectors();

Expand Down Expand Up @@ -134,6 +137,7 @@ class S3BlobStore implements BlobStore {
this.unsuccessfulOperationCounter = this.meterRegistry.getLongCounter(METRIC_UNSUCCESSFUL_OPERATIONS_COUNT);
this.exceptionHistogram = this.meterRegistry.getLongHistogram(METRIC_EXCEPTIONS_HISTOGRAM);
this.throttleHistogram = this.meterRegistry.getLongHistogram(METRIC_THROTTLES_HISTOGRAM);
this.httpRequestTimeInMicroHistogram = this.meterRegistry.getLongHistogram(HTTP_REQUEST_TIME_IN_MICROS_HISTOGRAM);
s3RequestRetryStats = new S3RequestRetryStats(getMaxRetries());
threadPool.scheduleWithFixedDelay(() -> {
var priorRetryStats = s3RequestRetryStats;
Expand Down Expand Up @@ -224,6 +228,7 @@ public final void collectMetrics(Request<?> request, Response<?> response) {
throttleCounter.incrementBy(throttleCount, attributes);
throttleHistogram.record(throttleCount, attributes);
}
httpRequestTimeInMicroHistogram.record(getHttpRequestTimeInMicros(request), attributes);
}

private boolean assertConsistencyBetweenHttpRequestAndOperation(Request<?> request, Operation operation) {
Expand Down Expand Up @@ -262,6 +267,32 @@ private static long getCountForMetric(TimingInfo info, AWSRequestMetrics.Field f
}
}

/**
* Used for APM style metrics to measure statics about performance. This is not for billing.
*/
private static long getHttpRequestTimeInMicros(Request<?> request) {
List<TimingInfo> requestTimesIncludingRetries;
requestTimesIncludingRetries = request.getAWSRequestMetrics()
.getTimingInfo()
.getAllSubMeasurements(AWSRequestMetrics.Field.HttpRequestTime.name());

// Here we calculate the timing in Microseconds for the sum of the individual subMeasurements with the goal of deriving the TTFB
// (time to first byte). We calculate the time in micros for later use with an APM style counter (exposed as a long), rather than
// using the default double exposed by getTimeTakenMillisIfKnown().
long totalTimeInMicros = 0;
for (TimingInfo timingInfo : requestTimesIncludingRetries) {
var endTimeInNanos = timingInfo.getEndTimeNanoIfKnown();
if (endTimeInNanos != null) {
totalTimeInMicros += TimeUnit.NANOSECONDS.toMicros(endTimeInNanos - timingInfo.getStartTimeNano());
}
}
if (totalTimeInMicros == 0) {
logger.warn("Expected HttpRequestTime to be tracked for request [{}] but found no count.", request);
return 0L;
}
return totalTimeInMicros;
}

@Override
public String toString() {
return bucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public final class RepositoriesModule {
public static final String METRIC_UNSUCCESSFUL_OPERATIONS_COUNT = "es.repositories.operations.unsuccessful.count";
public static final String METRIC_EXCEPTIONS_HISTOGRAM = "es.repositories.exceptions.histogram";
public static final String METRIC_THROTTLES_HISTOGRAM = "es.repositories.throttles.histogram";

public static final String HTTP_REQUEST_TIME_IN_MICROS_HISTOGRAM = "es.repositories.requests.http_request_time.histogram";
private final RepositoriesService repositoriesService;

public RepositoriesModule(
Expand All @@ -55,6 +55,7 @@ public RepositoriesModule(
RecoverySettings recoverySettings,
TelemetryProvider telemetryProvider
) {
// TODO: refactor APM metrics into their own class, passed in as a dependency (e.g. see BlobCacheMetrics as an example).
telemetryProvider.getMeterRegistry().registerLongCounter(METRIC_REQUESTS_COUNT, "repository request counter", "unit");
telemetryProvider.getMeterRegistry().registerLongCounter(METRIC_EXCEPTIONS_COUNT, "repository request exception counter", "unit");
telemetryProvider.getMeterRegistry().registerLongCounter(METRIC_THROTTLES_COUNT, "repository operation counter", "unit");
Expand All @@ -66,6 +67,13 @@ public RepositoriesModule(
.registerLongHistogram(METRIC_EXCEPTIONS_HISTOGRAM, "repository request exception histogram", "unit");
telemetryProvider.getMeterRegistry()
.registerLongHistogram(METRIC_THROTTLES_HISTOGRAM, "repository request throttle histogram", "unit");
telemetryProvider.getMeterRegistry()
.registerLongHistogram(
HTTP_REQUEST_TIME_IN_MICROS_HISTOGRAM,
"HttpRequestTime in microseconds expressed as as a histogram",
"micros"
);

Map<String, Repository.Factory> factories = new HashMap<>();
factories.put(
FsRepository.TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.GetSnapshotInfoContext;
Expand Down Expand Up @@ -126,32 +126,21 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
perRepositoryListener -> SubscribableListener

// Get repository data
.<RepositoryData>newForked(
l -> repository.getRepositoryData(
EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO use retentionExecutor, see #101445?
l
)
)
.<RepositoryData>newForked(l -> repository.getRepositoryData(retentionExecutor, l))

// Collect snapshot details by policy, and get any missing details by reading SnapshotInfo
.<SnapshotDetailsByPolicy>andThen(
retentionExecutor,
threadContext,
(l, repositoryData) -> getSnapshotDetailsByPolicy(repository, repositoryData, l)
(l, repositoryData) -> getSnapshotDetailsByPolicy(retentionExecutor, repository, repositoryData, l)
)

// Compute snapshots to delete for each (relevant) policy
.<Void>andThen(
retentionExecutor,
threadContext,
(l, snapshotDetailsByPolicy) -> ActionListener.completeWith(l, () -> {
resultsBuilder.addResult(
repositoryName,
getSnapshotsToDelete(repositoryName, request.policies(), snapshotDetailsByPolicy)
);
return null;
})
)
.<Void>andThen((l, snapshotDetailsByPolicy) -> ActionListener.completeWith(l, () -> {
resultsBuilder.addResult(
repositoryName,
getSnapshotsToDelete(repositoryName, request.policies(), snapshotDetailsByPolicy)
);
return null;
}))

// And notify this repository's listener on completion
.addListener(perRepositoryListener.delegateResponse((l, e) -> {
Expand Down Expand Up @@ -184,6 +173,7 @@ <T> Stream<T> flatMap(BiFunction<String, Map<SnapshotId, RepositoryData.Snapshot

// Exposed for testing
static void getSnapshotDetailsByPolicy(
Executor executor,
Repository repository,
RepositoryData repositoryData,
ActionListener<SnapshotDetailsByPolicy> listener
Expand Down Expand Up @@ -218,7 +208,7 @@ static void getSnapshotDetailsByPolicy(
snapshotInfo.snapshotId(),
RepositoryData.SnapshotDetails.fromSnapshotInfo(snapshotInfo)
),
listener.map(ignored -> snapshotDetailsByPolicy)
new ThreadedActionListener<>(executor, listener.map(ignored -> snapshotDetailsByPolicy))
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ record SeenSnapshotInfo(SnapshotId snapshotId, String policyId) {}
.<RepositoryData>newForked(l -> repository.getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, l))

.<SLMGetExpiredSnapshotsAction.SnapshotDetailsByPolicy>andThen(
(l, rd) -> SLMGetExpiredSnapshotsAction.getSnapshotDetailsByPolicy(repository, rd, l)
(l, rd) -> SLMGetExpiredSnapshotsAction.getSnapshotDetailsByPolicy(EsExecutors.DIRECT_EXECUTOR_SERVICE, repository, rd, l)
)

.andThen((l, snapshotDetailsByPolicy) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ setup:

---
"Test start deployment fails while model download in progress":
- skip:
version: all
reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/103153"

- do:
ml.put_trained_model:
model_id: .elser_model_2
Expand Down

0 comments on commit 5a9f08f

Please sign in to comment.