Skip to content

Commit

Permalink
Merge branch 'main' into stef_apiLinkParent_p1
Browse files Browse the repository at this point in the history
  • Loading branch information
shainaraskas committed Aug 19, 2024
2 parents 3243a24 + aa959e6 commit 1df7f3c
Show file tree
Hide file tree
Showing 110 changed files with 4,123 additions and 650 deletions.
8 changes: 6 additions & 2 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ libs/logstash-bridge @elastic/logstash
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/KibanaOwnedReservedRoleDescriptors.java @elastic/kibana-security

# APM Data index templates, etc.
x-pack/plugin/apm-data/src/main/resources @elastic/apm-server
x-pack/plugin/apm-data/src/yamlRestTest/resources @elastic/apm-server
x-pack/plugin/apm-data/src/main/resources @elastic/obs-ds-intake-services
x-pack/plugin/apm-data/src/yamlRestTest/resources @elastic/obs-ds-intake-services

# OTel
x-pack/plugin/otel-data/src/main/resources @elastic/obs-ds-intake-services
x-pack/plugin/otel-data/src/yamlRestTest/resources @elastic/obs-ds-intake-services

# Delivery
gradle @elastic/es-delivery
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/111091.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111091
summary: "X-pack/plugin/otel: introduce x-pack-otel plugin"
area: Data streams
type: feature
issues: []
11 changes: 6 additions & 5 deletions docs/reference/release-notes/8.15.0.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ after it is killed up to four times in 24 hours. (issue: {es-issue}110530[#11053
* Pipeline aggregations under `time_series` and `categorize_text` aggregations are never
returned (issue: {es-issue}111679[#111679])

* Elasticsearch will not start on Windows machines if
[`bootstrap.memory_lock` is set to `true`](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration-memory.html#bootstrap-memory_lock).
Either downgrade to an earlier version, upgrade to 8.15.1, or else follow the
recommendation in the manual to entirely disable swap instead of using the
memory lock feature (issue: {es-issue}111847[#111847])

[[breaking-8.15.0]]
[float]
=== Breaking changes
Expand All @@ -32,11 +38,6 @@ Rollup::
Search::
* Change `skip_unavailable` remote cluster setting default value to true {es-pull}105792[#105792]

[[known-issues-8.15.0]]
[float]
=== Known issues
* Elasticsearch will not start on Windows machines when the recommended [bootstrap.memory_lock: true](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration-memory.html#bootstrap-memory_lock) setting is configured due to [native access refactoring](https://github.com/elastic/elasticsearch/pull/111866). The workaround for 8.15.0 is to downgrade to the previous version. This issue will be fixed in 8.15.1.

[[bug-8.15.0]]
[float]
=== Bug fixes
Expand Down
3 changes: 3 additions & 0 deletions modules/analysis-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ tasks.named("yamlRestTestV7CompatTransform").configure { task ->
task.skipTest("search.query/50_queries_with_synonyms/Test common terms query with stacked tokens", "#42654 - `common` query throws an exception")
}

artifacts {
restTests(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test"))
}
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ tests:
- class: org.elasticsearch.xpack.restart.CoreFullClusterRestartIT
method: testSnapshotRestore {cluster=UPGRADED}
issue: https://github.com/elastic/elasticsearch/issues/111799
- class: org.elasticsearch.indices.breaker.HierarchyCircuitBreakerTelemetryTests
method: testCircuitBreakerTripCountMetric
issue: https://github.com/elastic/elasticsearch/issues/111778
- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
method: test {comparison.RangeVersion SYNC}
issue: https://github.com/elastic/elasticsearch/issues/111814
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,23 @@
* Side Public License, v 1.
*/

package org.elasticsearch.indices.breaker;
package org.elasticsearch.indices.memory.breaker;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.indices.breaker.CircuitBreakerMetrics;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.RecordingInstruments;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matchers;
import org.junit.After;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -41,54 +39,11 @@
import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, supportsDedicatedMasters = true)
public class HierarchyCircuitBreakerTelemetryTests extends ESIntegTestCase {
public class HierarchyCircuitBreakerTelemetryIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(TestCircuitBreakerTelemetryPlugin.class);
}

public static class TestCircuitBreakerTelemetryPlugin extends TestTelemetryPlugin {
protected final MeterRegistry meter = new RecordingMeterRegistry() {
private final LongCounter tripCount = new RecordingInstruments.RecordingLongCounter(
CircuitBreakerMetrics.ES_BREAKER_TRIP_COUNT_TOTAL,
recorder
) {
@Override
public void incrementBy(long inc) {
throw new UnsupportedOperationException();
}

@Override
public void incrementBy(long inc, Map<String, Object> attributes) {
throw new UnsupportedOperationException();
}
};

@Override
protected LongCounter buildLongCounter(String name, String description, String unit) {
if (name.equals(tripCount.getName())) {
return tripCount;
}
throw new IllegalArgumentException("Unknown counter metric name [" + name + "]");
}

@Override
public LongCounter registerLongCounter(String name, String description, String unit) {
assertCircuitBreakerName(name);
return super.registerLongCounter(name, description, unit);
}

@Override
public LongCounter getLongCounter(String name) {
assertCircuitBreakerName(name);
return super.getLongCounter(name);
}

private void assertCircuitBreakerName(final String name) {
assertThat(name, Matchers.oneOf(CircuitBreakerMetrics.ES_BREAKER_TRIP_COUNT_TOTAL));
}
};
return List.of(TestTelemetryPlugin.class);
}

public void testCircuitBreakerTripCountMetric() {
Expand Down Expand Up @@ -142,37 +97,29 @@ public void testCircuitBreakerTripCountMetric() {
fail("Expected exception not thrown");
}

private List<Measurement> getMeasurements(String dataNodeName) {
final TestTelemetryPlugin dataNodeTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
.filterPlugins(TestCircuitBreakerTelemetryPlugin.class)
@After
public void resetClusterSetting() {
final var circuitBreakerSettings = Settings.builder()
.putNull(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey())
.putNull(FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey())
.putNull(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey())
.putNull(REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey())
.putNull(IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey())
.putNull(IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey())
.putNull(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey())
.putNull(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey());
updateClusterSettings(circuitBreakerSettings);
}

private List<Measurement> getMeasurements(String nodeName) {
final TestTelemetryPlugin telemetryPlugin = internalCluster().getInstance(PluginsService.class, nodeName)
.filterPlugins(TestTelemetryPlugin.class)
.toList()
.get(0);
return Measurement.combine(
Stream.of(dataNodeTelemetryPlugin.getLongCounterMeasurement(CircuitBreakerMetrics.ES_BREAKER_TRIP_COUNT_TOTAL).stream())
Stream.of(telemetryPlugin.getLongCounterMeasurement(CircuitBreakerMetrics.ES_BREAKER_TRIP_COUNT_TOTAL).stream())
.flatMap(Function.identity())
.toList()
);
}

// Make sure circuit breaker telemetry on trip count reports the same values as circuit breaker stats
private void assertCircuitBreakerTripCount(
final HierarchyCircuitBreakerService circuitBreakerService,
final String circuitBreakerName,
int firstBytesEstimate,
int secondBytesEstimate,
long expectedTripCountValue
) {
try {
circuitBreakerService.getBreaker(circuitBreakerName).addEstimateBytesAndMaybeBreak(firstBytesEstimate, randomAlphaOfLength(5));
circuitBreakerService.getBreaker(circuitBreakerName).addEstimateBytesAndMaybeBreak(secondBytesEstimate, randomAlphaOfLength(5));
} catch (final CircuitBreakingException cbex) {
final CircuitBreakerStats circuitBreakerStats = Arrays.stream(circuitBreakerService.stats().getAllStats())
.filter(stats -> circuitBreakerName.equals(stats.getName()))
.findAny()
.get();
assertThat(circuitBreakerService.getBreaker(circuitBreakerName).getTrippedCount(), Matchers.equalTo(expectedTripCountValue));
assertThat(circuitBreakerStats.getTrippedCount(), Matchers.equalTo(expectedTripCountValue));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories.blobstore;

import org.apache.lucene.tests.mockfile.ExtrasFS;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshotsIntegritySuppressor;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;

public class BlobStoreCorruptionIT extends AbstractSnapshotIntegTestCase {

private static final Logger logger = LogManager.getLogger(BlobStoreCorruptionIT.class);

@Before
public void suppressConsistencyCheck() {
disableRepoConsistencyCheck("testing corruption detection involves breaking the repo");
}

public void testCorruptionDetection() throws Exception {
final var repositoryName = randomIdentifier();
final var indexName = randomIdentifier();
final var snapshotName = randomIdentifier();
final var repositoryRootPath = randomRepoPath();

createRepository(repositoryName, FsRepository.TYPE, repositoryRootPath);
createIndexWithRandomDocs(indexName, between(1, 100));
flushAndRefresh(indexName);
createSnapshot(repositoryName, snapshotName, List.of(indexName));

final var corruptedFile = corruptRandomFile(repositoryRootPath);
final var corruptedFileType = RepositoryFileType.getRepositoryFileType(repositoryRootPath, corruptedFile);
final var corruptionDetectors = new ArrayList<CheckedConsumer<ActionListener<Exception>, ?>>();

// detect corruption by listing the snapshots
if (corruptedFileType == RepositoryFileType.SNAPSHOT_INFO) {
corruptionDetectors.add(exceptionListener -> {
logger.info("--> listing snapshots");
client().admin()
.cluster()
.prepareGetSnapshots(TEST_REQUEST_TIMEOUT, repositoryName)
.execute(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse));
});
}

// detect corruption by taking another snapshot
if (corruptedFileType == RepositoryFileType.SHARD_GENERATION) {
corruptionDetectors.add(exceptionListener -> {
logger.info("--> taking another snapshot");
client().admin()
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, randomIdentifier())
.setWaitForCompletion(true)
.execute(exceptionListener.map(createSnapshotResponse -> {
assertNotEquals(SnapshotState.SUCCESS, createSnapshotResponse.getSnapshotInfo().state());
return new ElasticsearchException("create-snapshot failed as expected");
}));
});
}

// detect corruption by restoring the snapshot
switch (corruptedFileType) {
case SNAPSHOT_INFO, GLOBAL_METADATA, INDEX_METADATA -> corruptionDetectors.add(exceptionListener -> {
logger.info("--> restoring snapshot");
client().admin()
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
.setRestoreGlobalState(corruptedFileType == RepositoryFileType.GLOBAL_METADATA || randomBoolean())
.setWaitForCompletion(true)
.execute(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse));
});
case SHARD_SNAPSHOT_INFO, SHARD_DATA -> corruptionDetectors.add(exceptionListener -> {
logger.info("--> restoring snapshot and checking for failed shards");
SubscribableListener
// if shard-level data is corrupted then the overall restore succeeds but the shard recoveries fail
.<AcknowledgedResponse>newForked(l -> client().admin().indices().prepareDelete(indexName).execute(l))
.andThenAccept(ElasticsearchAssertions::assertAcked)

.<RestoreSnapshotResponse>andThen(
l -> client().admin()
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repositoryName, snapshotName)
.setRestoreGlobalState(randomBoolean())
.setWaitForCompletion(true)
.execute(l)
)

.addListener(exceptionListener.map(restoreSnapshotResponse -> {
assertNotEquals(0, restoreSnapshotResponse.getRestoreInfo().failedShards());
return new ElasticsearchException("post-restore recoveries failed as expected");
}));
});
}

try (var ignored = new BlobStoreIndexShardSnapshotsIntegritySuppressor()) {
final var exception = safeAwait(randomFrom(corruptionDetectors));
logger.info(Strings.format("--> corrupted [%s] and caught exception", corruptedFile), exception);
}
}

private static Path corruptRandomFile(Path repositoryRootPath) throws IOException {
final var corruptedFileType = getRandomCorruptibleFileType();
final var corruptedFile = getRandomFileToCorrupt(repositoryRootPath, corruptedFileType);
if (randomBoolean()) {
logger.info("--> deleting [{}]", corruptedFile);
Files.delete(corruptedFile);
} else {
corruptFileContents(corruptedFile);
}
return corruptedFile;
}

private static void corruptFileContents(Path fileToCorrupt) throws IOException {
final var oldFileContents = Files.readAllBytes(fileToCorrupt);
logger.info("--> contents of [{}] before corruption: [{}]", fileToCorrupt, Base64.getEncoder().encodeToString(oldFileContents));
final byte[] newFileContents = new byte[randomBoolean() ? oldFileContents.length : between(0, oldFileContents.length)];
System.arraycopy(oldFileContents, 0, newFileContents, 0, newFileContents.length);
if (newFileContents.length == oldFileContents.length) {
final var corruptionPosition = between(0, newFileContents.length - 1);
newFileContents[corruptionPosition] = randomValueOtherThan(oldFileContents[corruptionPosition], ESTestCase::randomByte);
logger.info(
"--> updating byte at position [{}] from [{}] to [{}]",
corruptionPosition,
oldFileContents[corruptionPosition],
newFileContents[corruptionPosition]
);
} else {
logger.info("--> truncating file from length [{}] to length [{}]", oldFileContents.length, newFileContents.length);
}
Files.write(fileToCorrupt, newFileContents);
logger.info("--> contents of [{}] after corruption: [{}]", fileToCorrupt, Base64.getEncoder().encodeToString(newFileContents));
}

private static RepositoryFileType getRandomCorruptibleFileType() {
return randomValueOtherThanMany(
// these blob types do not have reliable corruption detection, so we must skip them
t -> t == RepositoryFileType.ROOT_INDEX_N || t == RepositoryFileType.ROOT_INDEX_LATEST,
() -> randomFrom(RepositoryFileType.values())
);
}

private static Path getRandomFileToCorrupt(Path repositoryRootPath, RepositoryFileType corruptedFileType) throws IOException {
final var corruptibleFiles = new ArrayList<Path>();
Files.walkFileTree(repositoryRootPath, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path filePath, BasicFileAttributes attrs) throws IOException {
if (ExtrasFS.isExtra(filePath.getFileName().toString()) == false
&& RepositoryFileType.getRepositoryFileType(repositoryRootPath, filePath) == corruptedFileType) {
corruptibleFiles.add(filePath);
}
return super.visitFile(filePath, attrs);
}
});
return randomFrom(corruptibleFiles);
}

}
Loading

0 comments on commit 1df7f3c

Please sign in to comment.