Skip to content

Commit

Permalink
Add metrics for retries by S3RetryingInputStream (elastic#105600)
Browse files Browse the repository at this point in the history
This PR exposes retries in S3RetryingInputStream as metrics for easier
observability. At the class API level, retries can happen when either
opening an input stream and reading from an input stream. Retry reading
from an input stream internally can also retry re-opening the input
stream. All these retries are counted under the retries for reading
since the higher API usage is a read instead of open.

The list of new metrics are: *
`es.repositories.s3.input_stream.retry.event.total` - Number of times a
retry cycle has been triggered. *
`es.repositories.s3.input_stream.retry.success.total` - Number of a
times a retry cycle has been successfully completed. This should match
the above metric in numbers. Otherwise it indicates there are threads
stuck in infinite retries. *
`es.repositories.s3.input_stream.retry.attempts.histogram` - Number of
attempts to complete a retry cycle successfully.

Relates:
elastic#103300 (comment)
Relates: ES-7666
  • Loading branch information
ywangd authored Feb 23, 2024
1 parent 21b64ba commit 10ec23a
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.xcontent.NamedXContentRegistry;

Expand Down Expand Up @@ -108,8 +107,7 @@ public AzureRepository(
bigArrays,
recoverySettings,
buildBasePath(metadata),
buildLocation(metadata),
RepositoriesMetrics.NOOP
buildLocation(metadata)
);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.storageService = storageService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -77,8 +76,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
bigArrays,
recoverySettings,
buildBasePath(metadata),
buildLocation(metadata),
RepositoriesMetrics.NOOP
buildLocation(metadata)
);
this.storageService = storageService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
Expand Down Expand Up @@ -460,9 +459,9 @@ protected S3Repository createRepository(
ClusterService clusterService,
BigArrays bigArrays,
RecoverySettings recoverySettings,
RepositoriesMetrics repositoriesMetrics
S3RepositoriesMetrics s3RepositoriesMetrics
) {
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) {
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) {

@Override
public BlobStore blobStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.fixtures.minio.MinioTestContainer;
Expand Down Expand Up @@ -145,7 +144,7 @@ public long absoluteTimeInMillis() {
ClusterServiceUtils.createClusterService(threadpool),
BigArrays.NON_RECYCLING_INSTANCE,
new RecoverySettings(node().settings(), node().injector().getInstance(ClusterService.class).getClusterSettings()),
RepositoriesMetrics.NOOP
S3RepositoriesMetrics.NOOP
)
) {
repository.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -84,7 +83,7 @@ class S3BlobStore implements BlobStore {

private final ThreadPool threadPool;
private final Executor snapshotExecutor;
private final RepositoriesMetrics repositoriesMetrics;
private final S3RepositoriesMetrics s3RepositoriesMetrics;

private final StatsCollectors statsCollectors = new StatsCollectors();

Expand All @@ -98,7 +97,7 @@ class S3BlobStore implements BlobStore {
RepositoryMetadata repositoryMetadata,
BigArrays bigArrays,
ThreadPool threadPool,
RepositoriesMetrics repositoriesMetrics
S3RepositoriesMetrics s3RepositoriesMetrics
) {
this.service = service;
this.bigArrays = bigArrays;
Expand All @@ -110,7 +109,7 @@ class S3BlobStore implements BlobStore {
this.repositoryMetadata = repositoryMetadata;
this.threadPool = threadPool;
this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
this.repositoriesMetrics = repositoriesMetrics;
this.s3RepositoriesMetrics = s3RepositoriesMetrics;
}

RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) {
Expand Down Expand Up @@ -174,19 +173,19 @@ public final void collectMetrics(Request<?> request, Response<?> response) {
.map(List::size)
.orElse(0);

repositoriesMetrics.operationCounter().incrementBy(1, attributes);
s3RepositoriesMetrics.common().operationCounter().incrementBy(1, attributes);
if (numberOfAwsErrors == requestCount) {
repositoriesMetrics.unsuccessfulOperationCounter().incrementBy(1, attributes);
s3RepositoriesMetrics.common().unsuccessfulOperationCounter().incrementBy(1, attributes);
}

repositoriesMetrics.requestCounter().incrementBy(requestCount, attributes);
s3RepositoriesMetrics.common().requestCounter().incrementBy(requestCount, attributes);
if (exceptionCount > 0) {
repositoriesMetrics.exceptionCounter().incrementBy(exceptionCount, attributes);
repositoriesMetrics.exceptionHistogram().record(exceptionCount, attributes);
s3RepositoriesMetrics.common().exceptionCounter().incrementBy(exceptionCount, attributes);
s3RepositoriesMetrics.common().exceptionHistogram().record(exceptionCount, attributes);
}
if (throttleCount > 0) {
repositoriesMetrics.throttleCounter().incrementBy(throttleCount, attributes);
repositoriesMetrics.throttleHistogram().record(throttleCount, attributes);
s3RepositoriesMetrics.common().throttleCounter().incrementBy(throttleCount, attributes);
s3RepositoriesMetrics.common().throttleHistogram().record(throttleCount, attributes);
}
maybeRecordHttpRequestTime(request);
}
Expand All @@ -207,7 +206,7 @@ private void maybeRecordHttpRequestTime(Request<?> request) {
if (totalTimeInMicros == 0) {
logger.warn("Expected HttpRequestTime to be tracked for request [{}] but found no count.", request);
} else {
repositoriesMetrics.httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes);
s3RepositoriesMetrics.common().httpRequestTimeInMicroHistogram().record(totalTimeInMicros, attributes);
}
}

Expand Down Expand Up @@ -293,6 +292,14 @@ public long bufferSizeInBytes() {
return bufferSize.getBytes();
}

public RepositoryMetadata getRepositoryMetadata() {
return repositoryMetadata;
}

public S3RepositoriesMetrics getS3RepositoriesMetrics() {
return s3RepositoriesMetrics;
}

@Override
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories.s3;

import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;

public record S3RepositoriesMetrics(
RepositoriesMetrics common,
LongCounter retryStartedCounter,
LongCounter retryCompletedCounter,
LongHistogram retryHistogram
) {

public static S3RepositoriesMetrics NOOP = new S3RepositoriesMetrics(RepositoriesMetrics.NOOP);

public static final String METRIC_RETRY_EVENT_TOTAL = "es.repositories.s3.input_stream.retry.event.total";
public static final String METRIC_RETRY_SUCCESS_TOTAL = "es.repositories.s3.input_stream.retry.success.total";
public static final String METRIC_RETRY_ATTEMPTS_HISTOGRAM = "es.repositories.s3.input_stream.retry.attempts.histogram";

public S3RepositoriesMetrics(RepositoriesMetrics common) {
this(
common,
common.meterRegistry().registerLongCounter(METRIC_RETRY_EVENT_TOTAL, "s3 input stream retry event count", "unit"),
common.meterRegistry().registerLongCounter(METRIC_RETRY_SUCCESS_TOTAL, "s3 input stream retry success count", "unit"),
common.meterRegistry()
.registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
Expand Down Expand Up @@ -195,6 +194,8 @@ class S3Repository extends MeteredBlobStoreRepository {

private final Executor snapshotExecutor;

private final S3RepositoriesMetrics s3RepositoriesMetrics;

/**
* Constructs an s3 backed repository
*/
Expand All @@ -205,7 +206,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final ClusterService clusterService,
final BigArrays bigArrays,
final RecoverySettings recoverySettings,
final RepositoriesMetrics repositoriesMetrics
final S3RepositoriesMetrics s3RepositoriesMetrics
) {
super(
metadata,
Expand All @@ -214,10 +215,10 @@ class S3Repository extends MeteredBlobStoreRepository {
bigArrays,
recoverySettings,
buildBasePath(metadata),
buildLocation(metadata),
repositoriesMetrics
buildLocation(metadata)
);
this.service = service;
this.s3RepositoriesMetrics = s3RepositoriesMetrics;
this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT);

// Parse and validate the user's S3 Storage Class setting
Expand Down Expand Up @@ -408,7 +409,7 @@ protected S3BlobStore createBlobStore() {
metadata,
bigArrays,
threadPool,
repositoriesMetrics
s3RepositoriesMetrics
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ protected S3Repository createRepository(
final ClusterService clusterService,
final BigArrays bigArrays,
final RecoverySettings recoverySettings,
final RepositoriesMetrics repositoriesMetrics
final S3RepositoriesMetrics s3RepositoriesMetrics
) {
return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, repositoriesMetrics);
return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics);
}

@Override
Expand All @@ -101,11 +101,12 @@ public Map<String, Repository.Factory> getRepositories(
final ClusterService clusterService,
final BigArrays bigArrays,
final RecoverySettings recoverySettings,
RepositoriesMetrics repositoriesMetrics
final RepositoriesMetrics repositoriesMetrics
) {
final S3RepositoriesMetrics s3RepositoriesMetrics = new S3RepositoriesMetrics(repositoriesMetrics);
return Collections.singletonMap(
S3Repository.TYPE,
metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, repositoriesMetrics)
metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.repositories.s3.S3BlobStore.configureRequestForMetrics;
Expand Down Expand Up @@ -80,7 +81,7 @@ class S3RetryingInputStream extends InputStream {
this.end = end;
final int initialAttempt = attempt;
openStreamWithRetry();
maybeLogForSuccessAfterRetries(initialAttempt, "opened");
maybeLogAndRecordMetricsForSuccess(initialAttempt, "open");
}

private void openStreamWithRetry() throws IOException {
Expand All @@ -105,6 +106,9 @@ private void openStreamWithRetry() throws IOException {
);
}

if (attempt == 1) {
blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("open"));
}
final long delayInMillis = maybeLogAndComputeRetryDelay("opening", e);
delayBeforeRetry(delayInMillis);
}
Expand Down Expand Up @@ -142,9 +146,12 @@ public int read() throws IOException {
} else {
currentOffset += 1;
}
maybeLogForSuccessAfterRetries(initialAttempt, "read");
maybeLogAndRecordMetricsForSuccess(initialAttempt, "read");
return result;
} catch (IOException e) {
if (attempt == initialAttempt) {
blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read"));
}
reopenStreamOrFail(e);
}
}
Expand All @@ -162,9 +169,12 @@ public int read(byte[] b, int off, int len) throws IOException {
} else {
currentOffset += bytesRead;
}
maybeLogForSuccessAfterRetries(initialAttempt, "read");
maybeLogAndRecordMetricsForSuccess(initialAttempt, "read");
return bytesRead;
} catch (IOException e) {
if (attempt == initialAttempt) {
blobStore.getS3RepositoriesMetrics().retryStartedCounter().incrementBy(1, metricAttributes("read"));
}
reopenStreamOrFail(e);
}
}
Expand Down Expand Up @@ -246,16 +256,20 @@ private void logForRetry(Level level, String action, Exception e) {
);
}

private void maybeLogForSuccessAfterRetries(int initialAttempt, String action) {
private void maybeLogAndRecordMetricsForSuccess(int initialAttempt, String action) {
if (attempt > initialAttempt) {
final int numberOfRetries = attempt - initialAttempt;
logger.info(
"successfully {} input stream for [{}/{}] with purpose [{}] after [{}] retries",
action,
blobStore.bucket(),
blobKey,
purpose.getKey(),
attempt - initialAttempt
numberOfRetries
);
final Map<String, Object> attributes = metricAttributes(action);
blobStore.getS3RepositoriesMetrics().retryCompletedCounter().incrementBy(1, attributes);
blobStore.getS3RepositoriesMetrics().retryHistogram().record(numberOfRetries, attributes);
}
}

Expand Down Expand Up @@ -294,6 +308,21 @@ protected long getRetryDelayInMillis() {
return 10L << (Math.min(attempt - 1, 10));
}

private Map<String, Object> metricAttributes(String action) {
return Map.of(
"repo_type",
S3Repository.TYPE,
"repo_name",
blobStore.getRepositoryMetadata().name(),
"operation",
Operation.GET_OBJECT.getKey(),
"purpose",
purpose.getKey(),
"action",
action
);
}

@Override
public void close() throws IOException {
maybeAbort(currentStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -264,9 +263,9 @@ protected S3Repository createRepository(
ClusterService clusterService,
BigArrays bigArrays,
RecoverySettings recoverySettings,
RepositoriesMetrics repositoriesMetrics
S3RepositoriesMetrics s3RepositoriesMetrics
) {
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, repositoriesMetrics) {
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
Expand Down
Loading

0 comments on commit 10ec23a

Please sign in to comment.