diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java index d94b10cd5c54d..0d9fcad984cbb 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java @@ -196,7 +196,7 @@ public void benchmark() { case "double" -> { DoubleVector values = op.getOutput().getBlock(1).asVector(); for (int p = 0; p < values.getPositionCount(); p++) { - sum += values.getDouble(p); + sum += (long) values.getDouble(p); } } case "keyword" -> { diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index 375da5f71b043..c06a8580d845b 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -7,17 +7,43 @@ */ package org.elasticsearch.repositories.s3; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; +import com.amazonaws.services.s3.model.MultipartUpload; + +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.blobstore.OptionalBytesReference; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.blankOrNullString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase { @@ -67,4 +93,99 @@ protected void createRepository(String repoName) { .get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); } + + public void testCompareAndExchangeCleanup() throws IOException { + final var timeOffsetMillis = new AtomicLong(); + final var threadpool = new TestThreadPool(getTestName()) { + @Override + public long absoluteTimeInMillis() { + return super.absoluteTimeInMillis() + timeOffsetMillis.get(); + } + }; + // construct our own repo instance so we can inject a threadpool that allows to control the passage of time + try ( + var repository = new S3Repository( + node().injector().getInstance(RepositoriesService.class).repository(TEST_REPO_NAME).getMetadata(), + xContentRegistry(), + node().injector().getInstance(PluginsService.class).filterPlugins(S3RepositoryPlugin.class).get(0).getService(), + ClusterServiceUtils.createClusterService(threadpool), + BigArrays.NON_RECYCLING_INSTANCE, + new RecoverySettings(node().settings(), node().injector().getInstance(ClusterService.class).getClusterSettings()) + ) + ) { + repository.start(); + + final var blobStore = (S3BlobStore) repository.blobStore(); + final var blobContainer = (S3BlobContainer) blobStore.blobContainer(repository.basePath().add(getTestName())); + + try (var clientReference = blobStore.clientReference()) { + final var client = clientReference.client(); + final var bucketName = S3Repository.BUCKET_SETTING.get(repository.getMetadata().settings()); + final var registerBlobPath = blobContainer.buildKey("key"); + + class TestHarness { + boolean tryCompareAndSet(BytesReference expected, BytesReference updated) { + return PlainActionFuture.get( + future -> blobContainer.compareAndSetRegister("key", expected, updated, future), + 10, + TimeUnit.SECONDS + ); + } + + BytesReference readRegister() { + return PlainActionFuture.get( + future -> blobContainer.getRegister("key", future.map(OptionalBytesReference::bytesReference)), + 10, + TimeUnit.SECONDS + ); + } + + List listMultipartUploads() { + return client.listMultipartUploads(new ListMultipartUploadsRequest(bucketName).withPrefix(registerBlobPath)) + .getMultipartUploads(); + } + } + + var testHarness = new TestHarness(); + + final var bytes1 = new BytesArray(new byte[] { (byte) 1 }); + final var bytes2 = new BytesArray(new byte[] { (byte) 2 }); + assertTrue(testHarness.tryCompareAndSet(BytesArray.EMPTY, bytes1)); + + // show we're looking at the right blob + assertEquals(bytes1, testHarness.readRegister()); + assertArrayEquals( + bytes1.array(), + client.getObject(new GetObjectRequest(bucketName, registerBlobPath)).getObjectContent().readAllBytes() + ); + + // a fresh ongoing upload blocks other CAS attempts + client.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, registerBlobPath)); + assertThat(testHarness.listMultipartUploads(), hasSize(1)); + + assertFalse(testHarness.tryCompareAndSet(bytes1, bytes2)); + final var multipartUploads = testHarness.listMultipartUploads(); + assertThat(multipartUploads, hasSize(1)); + + // repo clock may not be exactly aligned with ours, but it should be close + final var age = blobStore.getThreadPool().absoluteTimeInMillis() - multipartUploads.get(0) + .getInitiated() + .toInstant() + .toEpochMilli(); + final var ageRangeMillis = TimeValue.timeValueMinutes(1).millis(); + assertThat(age, allOf(greaterThanOrEqualTo(-ageRangeMillis), lessThanOrEqualTo(ageRangeMillis))); + + // if the upload exceeds the TTL then CAS attempts will abort it + timeOffsetMillis.addAndGet(blobStore.getCompareAndExchangeTimeToLive().millis() - Math.min(0, age)); + assertTrue(testHarness.tryCompareAndSet(bytes1, bytes2)); + assertThat(testHarness.listMultipartUploads(), hasSize(0)); + assertEquals(bytes2, testHarness.readRegister()); + } finally { + blobContainer.delete(); + } + } finally { + ThreadPool.terminate(threadpool, 10, TimeUnit.SECONDS); + } + } + } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 6cc7af7bb736b..58c6586ccd044 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -61,8 +61,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -476,7 +478,8 @@ private ListObjectsRequest listObjectsRequest(String pathPrefix) { .withRequestMetricCollector(blobStore.listMetricCollector); } - private String buildKey(String blobName) { + // exposed for tests + String buildKey(String blobName) { return keyPath + blobName; } @@ -665,6 +668,22 @@ private int getUploadIndex(String targetUploadId, List multipar for (MultipartUpload multipartUpload : multipartUploads) { final var observedUploadId = multipartUpload.getUploadId(); if (observedUploadId.equals(targetUploadId)) { + final var currentTimeMillis = blobStore.getThreadPool().absoluteTimeInMillis(); + final var ageMillis = currentTimeMillis - multipartUpload.getInitiated().toInstant().toEpochMilli(); + final var expectedAgeRangeMillis = blobStore.getCompareAndExchangeTimeToLive().millis(); + if (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis) { + logger.warn( + """ + compare-and-exchange of blob [{}:{}] was initiated at [{}={}] \ + which deviates from local node epoch time [{}] by more than the warn threshold of [{}ms]""", + bucket, + blobKey, + multipartUpload.getInitiated(), + multipartUpload.getInitiated().toInstant().toEpochMilli(), + currentTimeMillis, + expectedAgeRangeMillis + ); + } found = true; } else if (observedUploadId.compareTo(targetUploadId) < 0) { uploadIndex += 1; @@ -674,12 +693,47 @@ private int getUploadIndex(String targetUploadId, List multipar return found ? uploadIndex : -1; } - void run(BytesReference expected, BytesReference updated, ActionListener listener) throws Exception { + /** + * @return {@code true} if there are already ongoing uploads, so we should not proceed with the operation + */ + private boolean hasPreexistingUploads() { + final var uploads = listMultipartUploads(); + if (uploads.isEmpty()) { + return false; + } + + final var expiryDate = Date.from( + Instant.ofEpochMilli( + blobStore.getThreadPool().absoluteTimeInMillis() - blobStore.getCompareAndExchangeTimeToLive().millis() + ) + ); + if (uploads.stream().anyMatch(upload -> upload.getInitiated().after(expiryDate))) { + return true; + } + + // there are uploads, but they are all older than the TTL, so clean them up before carrying on (should be rare) + for (final var upload : uploads) { + logger.warn( + "cleaning up stale compare-and-swap upload [{}] initiated at [{}]", + upload.getUploadId(), + upload.getInitiated() + ); + safeAbortMultipartUpload(upload.getUploadId()); + } + return false; + } + + void run(BytesReference expected, BytesReference updated, ActionListener listener) throws Exception { BlobContainerUtils.ensureValidRegisterContent(updated); - if (listMultipartUploads().isEmpty() == false) { - // TODO What if the previous writer crashed? We should consider the age of any ongoing uploads before bailing out like this. + if (hasPreexistingUploads()) { + + // This is a small optimization to improve the liveness properties of this algorithm. + // + // We can safely proceed even if there are other uploads in progress, but that would add to the potential for collisions and + // delays. Thus in this case we prefer avoid disturbing the ongoing attempts and just fail up front. + listener.onResponse(OptionalBytesReference.MISSING); return; } @@ -711,13 +765,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener { if (isComplete.compareAndSet(false, true)) { - try { - abortMultipartUploadIfExists(uploadId); - } catch (Exception e) { - // cleanup is a best-effort thing, we can't do anything better than log and fall through here - logger.error("unexpected error cleaning up upload [" + uploadId + "] of [" + blobKey + "]", e); - assert false : e; - } + safeAbortMultipartUpload(uploadId); } }; @@ -761,9 +809,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener abortMultipartUploadIfExists(currentUploadId)) - ); + .execute(ActionRunnable.run(listeners.acquire(), () -> safeAbortMultipartUpload(currentUploadId))); } } } finally { @@ -784,6 +830,15 @@ void run(BytesReference expected, BytesReference updated, ActionListener request) { }; } + public TimeValue getCompareAndExchangeTimeToLive() { + return service.compareAndExchangeTimeToLive; + } + // metrics collector that ignores null responses that we interpret as the request not reaching the S3 endpoint due to a network // issue private abstract static class IgnoreNoResponseMetricsCollector extends RequestMetricCollector { diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java index b9644a54bffae..6968e307a403b 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoryPlugin.java @@ -106,13 +106,13 @@ public Collection createComponents( AllocationService allocationService, IndicesService indicesService ) { - service.set(s3Service(environment)); + service.set(s3Service(environment, clusterService.getSettings())); this.service.get().refreshAndClearCache(S3ClientSettings.load(settings)); return List.of(service); } - S3Service s3Service(Environment environment) { - return new S3Service(environment); + S3Service s3Service(Environment environment, Settings nodeSettings) { + return new S3Service(environment, nodeSettings); } @Override diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java index 2788bac6600b9..ddacb24be7118 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java @@ -27,11 +27,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import java.io.Closeable; @@ -51,6 +54,12 @@ class S3Service implements Closeable { private static final Logger LOGGER = LogManager.getLogger(S3Service.class); + private static final Setting REPOSITORY_S3_CAS_TTL_SETTING = Setting.timeSetting( + "repository_s3.compare_and_exchange.time_to_live", + StoreHeartbeatService.HEARTBEAT_FREQUENCY, + Setting.Property.NodeScope + ); + private volatile Map clientsCache = emptyMap(); /** @@ -69,13 +78,16 @@ class S3Service implements Closeable { final CustomWebIdentityTokenCredentialsProvider webIdentityTokenCredentialsProvider; - S3Service(Environment environment) { + final TimeValue compareAndExchangeTimeToLive; + + S3Service(Environment environment, Settings nodeSettings) { webIdentityTokenCredentialsProvider = new CustomWebIdentityTokenCredentialsProvider( environment, System::getenv, System::getProperty, Clock.systemUTC() ); + compareAndExchangeTimeToLive = REPOSITORY_S3_CAS_TTL_SETTING.get(nodeSettings); } /** diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java index 6f78da1e9da0f..cc3cddda24917 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/RepositoryCredentialsTests.java @@ -272,8 +272,8 @@ protected void assertSnapshotOrGenericThread() { } @Override - S3Service s3Service(Environment environment) { - return new ProxyS3Service(environment); + S3Service s3Service(Environment environment, Settings nodeSettings) { + return new ProxyS3Service(environment, nodeSettings); } public static final class ClientAndCredentials extends AmazonS3Wrapper { @@ -289,8 +289,8 @@ public static final class ProxyS3Service extends S3Service { private static final Logger logger = LogManager.getLogger(ProxyS3Service.class); - ProxyS3Service(Environment environment) { - super(environment); + ProxyS3Service(Environment environment, Settings nodeSettings) { + super(environment, nodeSettings); } @Override diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 39ab6475d09b4..2fb4a6a16923f 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -76,7 +76,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes @Before public void setUp() throws Exception { - service = new S3Service(Mockito.mock(Environment.class)); + service = new S3Service(Mockito.mock(Environment.class), Settings.EMPTY); super.setUp(); } diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ClientSettingsTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ClientSettingsTests.java index 8a0901463736a..8bff849ca26c2 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ClientSettingsTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ClientSettingsTests.java @@ -177,7 +177,7 @@ public void testRegionCanBeSet() throws IOException { ); assertThat(settings.get("default").region, is("")); assertThat(settings.get("other").region, is(region)); - try (S3Service s3Service = new S3Service(Mockito.mock(Environment.class))) { + try (S3Service s3Service = new S3Service(Mockito.mock(Environment.class), Settings.EMPTY)) { AmazonS3Client other = (AmazonS3Client) s3Service.buildClient(settings.get("other")); assertThat(other.getSignerRegionOverride(), is(region)); } diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java index ba5f7202881df..aecccba2330f4 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryTests.java @@ -45,7 +45,7 @@ public void shutdown() { private static class DummyS3Service extends S3Service { DummyS3Service(Environment environment) { - super(environment); + super(environment, Settings.EMPTY); } @Override diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ServiceTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ServiceTests.java index f9a12dad3df84..bbdeea6d87631 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ServiceTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3ServiceTests.java @@ -18,7 +18,7 @@ public class S3ServiceTests extends ESTestCase { public void testCachedClientsAreReleased() throws IOException { - final S3Service s3Service = new S3Service(Mockito.mock(Environment.class)); + final S3Service s3Service = new S3Service(Mockito.mock(Environment.class), Settings.EMPTY); final Settings settings = Settings.builder().put("endpoint", "http://first").build(); final RepositoryMetadata metadata1 = new RepositoryMetadata("first", "s3", settings); final RepositoryMetadata metadata2 = new RepositoryMetadata("second", "s3", settings); diff --git a/server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java b/server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java index 44d521581dfab..b45e22acf5721 100644 --- a/server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java +++ b/server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java @@ -28,7 +28,8 @@ public interface ChunkedToXContent { /** * Create an iterator of {@link ToXContent} chunks for a REST response. Each chunk is serialized with the same {@link XContentBuilder} * and {@link ToXContent.Params}, which is also the same as the {@link ToXContent.Params} passed as the {@code params} argument. For - * best results, all chunks should be {@code O(1)} size. See also {@link ChunkedToXContentHelper} for some handy utilities. + * best results, all chunks should be {@code O(1)} size. The last chunk in the iterator must always yield at least one byte of output. + * See also {@link ChunkedToXContentHelper} for some handy utilities. *

* Note that chunked response bodies cannot send deprecation warning headers once transmission has started, so implementations must * check for deprecated feature use before returning. @@ -40,8 +41,8 @@ public interface ChunkedToXContent { /** * Create an iterator of {@link ToXContent} chunks for a response to the {@link RestApiVersion#V_7} API. Each chunk is serialized with * the same {@link XContentBuilder} and {@link ToXContent.Params}, which is also the same as the {@link ToXContent.Params} passed as the - * {@code params} argument. For best results, all chunks should be {@code O(1)} size. See also {@link ChunkedToXContentHelper} for some - * handy utilities. + * {@code params} argument. For best results, all chunks should be {@code O(1)} size. The last chunk in the iterator must always yield + * at least one byte of output. See also {@link ChunkedToXContentHelper} for some handy utilities. *

* Similar to {@link #toXContentChunked} but for the {@link RestApiVersion#V_7} API. By default this method delegates to {@link * #toXContentChunked}. diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java index 7966ca8bdfaca..78e529eef2d98 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java @@ -137,7 +137,7 @@ public String getResponseContentTypeString() { /** * Create a chunked response body to be written to a specific {@link RestChannel} from a stream of text chunks, each represented as a - * consumer of a {@link Writer}. + * consumer of a {@link Writer}. The last chunk that the iterator yields must write at least one byte. */ static ChunkedRestResponseBody fromTextChunks(String contentType, Iterator> chunkIterator) { return new ChunkedRestResponseBody() { diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java index 65f2453708294..5c9887a8e0916 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java @@ -10,7 +10,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.Table; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.ByteSizeValue; @@ -92,35 +91,29 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann int lastHeader = headers.size() - 1; List rowOrder = getRowOrder(table, request); - if (verbose == false && rowOrder.isEmpty()) { - return new RestResponse(RestStatus.OK, RestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY); - } - return RestResponse.chunked( RestStatus.OK, ChunkedRestResponseBody.fromTextChunks( RestResponse.TEXT_CONTENT_TYPE, Iterators.concat( // optional header - Iterators.single(writer -> { - if (verbose) { - for (int col = 0; col < headers.size(); col++) { - DisplayHeader header = headers.get(col); - boolean isLastColumn = col == lastHeader; - pad( - new Table.Cell(header.display, table.findHeaderByName(header.name)), - width[col], - request, - writer, - isLastColumn - ); - if (isLastColumn == false) { - writer.append(" "); - } + verbose ? Iterators.single(writer -> { + for (int col = 0; col < headers.size(); col++) { + DisplayHeader header = headers.get(col); + boolean isLastColumn = col == lastHeader; + pad( + new Table.Cell(header.display, table.findHeaderByName(header.name)), + width[col], + request, + writer, + isLastColumn + ); + if (isLastColumn == false) { + writer.append(" "); } - writer.append("\n"); } - }), + writer.append("\n"); + }) : Collections.emptyIterator(), // body Iterators.map(rowOrder.iterator(), row -> writer -> { for (int col = 0; col < headers.size(); col++) { diff --git a/server/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataImplTestCase.java b/server/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataImplTestCase.java index d40e9f94b3ef0..68671f73372ba 100644 --- a/server/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataImplTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/fielddata/AbstractFieldDataImplTestCase.java @@ -9,6 +9,7 @@ package org.elasticsearch.index.fielddata; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.IndexSearcher; @@ -60,6 +61,13 @@ protected long minRamBytesUsed() { return 1; } + protected IndexSearcher newIndexSearcher(IndexReader indexReader) { + // IndexReader can't randomly wrapped with these field data tests. + // Sometimes ParallelCompositeReader is used and its getCoreCacheHelper() method sometimes returns null, + // and IndicesFieldDataCache can't handle this. + return newSearcher(indexReader, false); + } + public void testDeletedDocs() throws Exception { add2SingleValuedDocumentsAndDeleteOneOfThem(); IndexFieldData indexFieldData = getForField("value"); @@ -99,7 +107,7 @@ public void testSingleValueAllSet() throws Exception { assertValues(bytesValues, 1, one()); assertValues(bytesValues, 2, three()); - IndexSearcher searcher = newSearcher(readerContext.reader()); + IndexSearcher searcher = newIndexSearcher(readerContext.reader()); TopFieldDocs topDocs; SortField sortField = indexFieldData.sortField(null, MultiValueMode.MIN, null, false); topDocs = searcher.search(new MatchAllDocsQuery(), 10, new Sort(sortField)); @@ -117,6 +125,9 @@ public void testSingleValueAllSet() throws Exception { assertThat(topDocs.scoreDocs[0].doc, equalTo(2)); assertThat(topDocs.scoreDocs[1].doc, equalTo(0)); assertThat(topDocs.scoreDocs[2].doc, equalTo(1)); + // No need to close the index reader here, because it gets closed on test teardown. + // (This test uses refreshReader(...) which sets topLevelReader in super class and + // that gets closed. } } @@ -176,7 +187,7 @@ public void testMultiValueAllSet() throws Exception { assertValues(bytesValues, 1, one()); assertValues(bytesValues, 2, three()); - IndexSearcher searcher = newSearcher(DirectoryReader.open(writer)); + IndexSearcher searcher = newIndexSearcher(DirectoryReader.open(writer)); SortField sortField = indexFieldData.sortField(null, MultiValueMode.MIN, null, false); TopFieldDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10, new Sort(sortField)); assertThat(topDocs.totalHits.value, equalTo(3L)); @@ -191,6 +202,7 @@ public void testMultiValueAllSet() throws Exception { assertThat(topDocs.scoreDocs[0].doc, equalTo(0)); assertThat(topDocs.scoreDocs[1].doc, equalTo(2)); assertThat(topDocs.scoreDocs[2].doc, equalTo(1)); + searcher.getIndexReader().close(); } } @@ -240,7 +252,7 @@ public void testSortMultiValuesFields() throws Exception { fillExtendedMvSet(); IndexFieldData indexFieldData = getForField("value"); - IndexSearcher searcher = newSearcher(DirectoryReader.open(writer)); + IndexSearcher searcher = newIndexSearcher(DirectoryReader.open(writer)); SortField sortField = indexFieldData.sortField(null, MultiValueMode.MIN, null, false); TopFieldDocs topDocs = searcher.search(new MatchAllDocsQuery(), 10, new Sort(sortField)); assertThat(topDocs.totalHits.value, equalTo(8L)); @@ -282,6 +294,7 @@ public void testSortMultiValuesFields() throws Exception { assertThat(((FieldDoc) topDocs.scoreDocs[6]).fields[0], equalTo(null)); assertThat(topDocs.scoreDocs[7].doc, equalTo(5)); assertThat(((FieldDoc) topDocs.scoreDocs[7]).fields[0], equalTo(null)); + searcher.getIndexReader().close(); } protected abstract void fillExtendedMvSet() throws Exception; diff --git a/server/src/test/java/org/elasticsearch/index/fielddata/AbstractStringFieldDataTestCase.java b/server/src/test/java/org/elasticsearch/index/fielddata/AbstractStringFieldDataTestCase.java index c32f3bd9acbc7..c54e980c5e21b 100644 --- a/server/src/test/java/org/elasticsearch/index/fielddata/AbstractStringFieldDataTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/fielddata/AbstractStringFieldDataTestCase.java @@ -254,7 +254,7 @@ public void testActualMissingValue(boolean reverse) throws IOException { final IndexFieldData indexFieldData = getForField("value"); final String missingValue = values[1]; - IndexSearcher searcher = newSearcher(DirectoryReader.open(writer)); + IndexSearcher searcher = newIndexSearcher(DirectoryReader.open(writer)); SortField sortField = indexFieldData.sortField(missingValue, MultiValueMode.MIN, null, reverse); TopFieldDocs topDocs = searcher.search( new MatchAllDocsQuery(), @@ -312,7 +312,7 @@ public void testSortMissing(boolean first, boolean reverse) throws IOException { } } final IndexFieldData indexFieldData = getForField("value"); - IndexSearcher searcher = newSearcher(DirectoryReader.open(writer)); + IndexSearcher searcher = newIndexSearcher(DirectoryReader.open(writer)); SortField sortField = indexFieldData.sortField(first ? "_first" : "_last", MultiValueMode.MIN, null, reverse); TopFieldDocs topDocs = searcher.search( new MatchAllDocsQuery(), @@ -387,7 +387,7 @@ public void testNestedSorting(MultiValueMode sortMode) throws IOException { } DirectoryReader directoryReader = DirectoryReader.open(writer); directoryReader = ElasticsearchDirectoryReader.wrap(directoryReader, new ShardId(indexService.index(), 0)); - IndexSearcher searcher = newSearcher(directoryReader); + IndexSearcher searcher = newIndexSearcher(directoryReader); IndexFieldData fieldData = getForField("text"); final Object missingValue = switch (randomInt(4)) { case 0 -> "_first"; diff --git a/server/src/test/java/org/elasticsearch/index/fielddata/NoOrdinalsStringFieldDataTests.java b/server/src/test/java/org/elasticsearch/index/fielddata/NoOrdinalsStringFieldDataTests.java index 630eb7a2e12e3..209698261d1cf 100644 --- a/server/src/test/java/org/elasticsearch/index/fielddata/NoOrdinalsStringFieldDataTests.java +++ b/server/src/test/java/org/elasticsearch/index/fielddata/NoOrdinalsStringFieldDataTests.java @@ -10,7 +10,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.SortField; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.Nested; @@ -23,11 +22,10 @@ /** Returns an implementation based on paged bytes which doesn't implement WithOrdinals in order to visit different paths in the code, * eg. BytesRefFieldComparatorSource makes decisions based on whether the field data implements WithOrdinals. */ -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98720") public class NoOrdinalsStringFieldDataTests extends PagedBytesStringFieldDataTests { public static IndexFieldData hideOrdinals(final IndexFieldData in) { - return new IndexFieldData() { + return new IndexFieldData<>() { @Override public String getFieldName() { return in.getFieldName(); diff --git a/server/src/test/java/org/elasticsearch/index/fielddata/PagedBytesStringFieldDataTests.java b/server/src/test/java/org/elasticsearch/index/fielddata/PagedBytesStringFieldDataTests.java index e88419250c601..2035769ef4fbc 100644 --- a/server/src/test/java/org/elasticsearch/index/fielddata/PagedBytesStringFieldDataTests.java +++ b/server/src/test/java/org/elasticsearch/index/fielddata/PagedBytesStringFieldDataTests.java @@ -8,9 +8,6 @@ package org.elasticsearch.index.fielddata; -import org.apache.lucene.tests.util.LuceneTestCase; - -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98720") public class PagedBytesStringFieldDataTests extends AbstractStringFieldDataTestCase { @Override diff --git a/server/src/test/java/org/elasticsearch/index/fielddata/SortedSetDVStringFieldDataTests.java b/server/src/test/java/org/elasticsearch/index/fielddata/SortedSetDVStringFieldDataTests.java index 8d8c714d7b3f5..237a93a1f5be2 100644 --- a/server/src/test/java/org/elasticsearch/index/fielddata/SortedSetDVStringFieldDataTests.java +++ b/server/src/test/java/org/elasticsearch/index/fielddata/SortedSetDVStringFieldDataTests.java @@ -8,9 +8,6 @@ package org.elasticsearch.index.fielddata; -import org.apache.lucene.tests.util.LuceneTestCase; - -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98720") public class SortedSetDVStringFieldDataTests extends AbstractStringFieldDataTestCase { @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index b71967efc7bb4..03e61d5cb3037 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1513,7 +1513,7 @@ public MatchingDirectoryReader(DirectoryReader in, Query query) throws IOExcepti @Override public LeafReader wrap(LeafReader leaf) { try { - final IndexSearcher searcher = newSearcher(leaf); + final IndexSearcher searcher = newSearcher(leaf, false); searcher.setQueryCache(null); final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); final Scorer scorer = weight.scorer(leaf.getContext()); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index 68b9fdd958405..13b7407783ba7 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -40,6 +40,8 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeTestCase { + protected final String TEST_REPO_NAME = "test-repo"; + @Override protected Settings nodeSettings() { return Settings.builder().put(super.nodeSettings()).setSecureSettings(credentials()).build(); @@ -52,14 +54,14 @@ protected Settings nodeSettings() { @Override public void setUp() throws Exception { super.setUp(); - createRepository("test-repo"); + createRepository(TEST_REPO_NAME); deleteAndAssertEmpty(getRepository().basePath()); } @Override public void tearDown() throws Exception { deleteAndAssertEmpty(getRepository().basePath()); - clusterAdmin().prepareDeleteRepository("test-repo").get(); + clusterAdmin().prepareDeleteRepository(TEST_REPO_NAME).get(); super.tearDown(); } @@ -93,7 +95,7 @@ public void testCreateSnapshot() { final String snapshotName = "test-snap-" + System.currentTimeMillis(); logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", snapshotName) + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REPO_NAME, snapshotName) .setWaitForCompletion(true) .setIndices("test-idx-*", "-test-idx-3") .get(); @@ -104,11 +106,11 @@ public void testCreateSnapshot() { ); assertThat( - clusterAdmin().prepareGetSnapshots("test-repo").setSnapshots(snapshotName).get().getSnapshots().get(0).state(), + clusterAdmin().prepareGetSnapshots(TEST_REPO_NAME).setSnapshots(snapshotName).get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS) ); - assertTrue(clusterAdmin().prepareDeleteSnapshot("test-repo", snapshotName).get().isAcknowledged()); + assertTrue(clusterAdmin().prepareDeleteSnapshot(TEST_REPO_NAME, snapshotName).get().isAcknowledged()); } public void testListChildren() throws Exception { @@ -160,7 +162,7 @@ public void testCleanup() throws Exception { final String snapshotName = "test-snap-" + System.currentTimeMillis(); logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", snapshotName) + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REPO_NAME, snapshotName) .setWaitForCompletion(true) .setIndices("test-idx-*", "-test-idx-3") .get(); @@ -171,11 +173,11 @@ public void testCleanup() throws Exception { ); assertThat( - clusterAdmin().prepareGetSnapshots("test-repo").setSnapshots(snapshotName).get().getSnapshots().get(0).state(), + clusterAdmin().prepareGetSnapshots(TEST_REPO_NAME).setSnapshots(snapshotName).get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS) ); - final BlobStoreRepository repo = (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo"); + final BlobStoreRepository repo = (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository(TEST_REPO_NAME); final Executor genericExec = repo.threadPool().executor(ThreadPool.Names.GENERIC); logger.info("--> creating a dangling index folder"); @@ -183,7 +185,7 @@ public void testCleanup() throws Exception { createDanglingIndex(repo, genericExec); logger.info("--> deleting a snapshot to trigger repository cleanup"); - clusterAdmin().prepareDeleteSnapshot("test-repo", snapshotName).get(); + clusterAdmin().prepareDeleteSnapshot(TEST_REPO_NAME, snapshotName).get(); BlobStoreTestUtil.assertConsistency(repo); @@ -191,7 +193,7 @@ public void testCleanup() throws Exception { createDanglingIndex(repo, genericExec); logger.info("--> Execute repository cleanup"); - final CleanupRepositoryResponse response = clusterAdmin().prepareCleanupRepository("test-repo").get(); + final CleanupRepositoryResponse response = clusterAdmin().prepareCleanupRepository(TEST_REPO_NAME).get(); assertCleanupResponse(response, 3L, 1L); } @@ -243,6 +245,6 @@ private Set listChildren(BlobPath path) { } protected BlobStoreRepository getRepository() { - return (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo"); + return (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository(TEST_REPO_NAME); } } diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/bucket/range/HistoBackedRangeAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/bucket/range/HistoBackedRangeAggregatorTests.java index dff8af6f918ac..27fa350d11763 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/bucket/range/HistoBackedRangeAggregatorTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/bucket/range/HistoBackedRangeAggregatorTests.java @@ -45,7 +45,6 @@ public class HistoBackedRangeAggregatorTests extends AggregatorTestCase { private static final String HISTO_FIELD_NAME = "histo_field"; private static final String RAW_FIELD_NAME = "raw_field"; - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98739") @SuppressWarnings("rawtypes") public void testPercentilesAccuracy() throws Exception { long absError = 0L; @@ -108,7 +107,6 @@ public void testPercentilesAccuracy() throws Exception { assertThat((double) absError / docCount, lessThan(0.1)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98741") @SuppressWarnings("rawtypes") public void testMediumRangesAccuracy() throws Exception { List ranges = Arrays.asList(