Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into bump-cache-stats-ap…
Browse files Browse the repository at this point in the history
…i-versions
  • Loading branch information
Peter Alfonsi committed Apr 30, 2024
2 parents 28a8aa4 + 1219c56 commit 19fbd54
Show file tree
Hide file tree
Showing 20 changed files with 1,518 additions and 69 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Gate new stats logic behind FeatureFlags.PLUGGABLE_CACHE ([#13238](https://github.com/opensearch-project/OpenSearch/pull/13238))
- [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- [Batch Ingestion] Add `batch_size` to `_bulk` API. ([#12457](https://github.com/opensearch-project/OpenSearch/issues/12457))
- [Tiered caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic ([#12941](https://github.com/opensearch-project/OpenSearch/pull/12941))
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))
- [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225))
Expand Down Expand Up @@ -59,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.netflix.nebula.ospackage-base` from 11.8.1 to 11.9.0 ([#13440](https://github.com/opensearch-project/OpenSearch/pull/13440))
- Bump `org.bouncycastle:bc-fips` from 1.0.2.4 to 1.0.2.5 ([#13446](https://github.com/opensearch-project/OpenSearch/pull/13446))
- Bump `lycheeverse/lychee-action` from 1.9.3 to 1.10.0 ([#13447](https://github.com/opensearch-project/OpenSearch/pull/13447))
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))

### Changed
- [BWC and API enforcement] Enforcing the presence of API annotations at build time ([#12872](https://github.com/opensearch-project/OpenSearch/pull/12872))
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ plugins {
id 'opensearch.docker-support'
id 'opensearch.global-build-info'
id "com.diffplug.spotless" version "6.25.0" apply false
id "org.gradle.test-retry" version "1.5.8" apply false
id "org.gradle.test-retry" version "1.5.9" apply false
id "test-report-aggregation"
id 'jacoco-report-aggregation'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,90 @@ teardown:
index: test_index
id: test_id3
- match: { _source: {"f1": "v2", "f2": 47, "field1": "value1"}}

---
"Test bulk API with batch enabled happy case":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
bulk:
refresh: true
batch_size: 2
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'
- '{"index": {"_index": "test_index", "_id": "test_id3"}}'
- '{"text": "text3"}'
- '{"index": {"_index": "test_index", "_id": "test_id4"}}'
- '{"text": "text4"}'
- '{"index": {"_index": "test_index", "_id": "test_id5", "pipeline": "pipeline2"}}'
- '{"text": "text5"}'
- '{"index": {"_index": "test_index", "_id": "test_id6", "pipeline": "pipeline2"}}'
- '{"text": "text6"}'

- match: { errors: false }

- do:
get:
index: test_index
id: test_id5
- match: { _source: {"text": "text5", "field2": "value2"}}

- do:
get:
index: test_index
id: test_id3
- match: { _source: { "text": "text3", "field1": "value1" } }

---
"Test bulk API with batch_size missing":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
bulk:
refresh: true
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'

- match: { errors: false }

- do:
get:
index: test_index
id: test_id1
- match: { _source: { "text": "text1", "field1": "value1" } }

- do:
get:
index: test_index
id: test_id2
- match: { _source: { "text": "text2", "field1": "value1" } }

---
"Test bulk API with invalid batch_size":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
catch: bad_request
bulk:
refresh: true
batch_size: -1
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'
4 changes: 4 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
"require_alias": {
"type": "boolean",
"description": "Sets require_alias for all incoming documents. Defaults to unset (false)"
},
"batch_size": {
"type": "int",
"description": "Sets the batch size"
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,10 @@ public void testCacheClearanceAfterIndexClosure() throws Exception {
String index = "index";
setupIndex(client, index);

// assert there are no entries in the cache for index
assertEquals(0, getRequestCacheStats(client, index).getMemorySizeInBytes());
// assert there are no entries in the cache from other indices in the node
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
// create first cache entry in index
createCacheEntry(client, index, "hello");
assertCacheState(client, index, 0, 1);
Expand All @@ -1136,7 +1140,7 @@ public void testCacheClearanceAfterIndexClosure() throws Exception {
// sleep until cache cleaner would have cleaned up the stale key from index
assertBusy(() -> {
// cache cleaner should have cleaned up the stale keys from index
assertFalse(getNodeCacheStats(client).getMemorySizeInBytes() > 0);
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

Expand All @@ -1155,6 +1159,10 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
String index = "index";
setupIndex(client, index);

// assert there are no entries in the cache for index
assertEquals(0, getRequestCacheStats(client, index).getMemorySizeInBytes());
// assert there are no entries in the cache from other indices in the node
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
// create first cache entry in index
createCacheEntry(client, index, "hello");
assertCacheState(client, index, 0, 1);
Expand All @@ -1173,13 +1181,13 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
// sleep until cache cleaner would have cleaned up the stale key from index
assertBusy(() -> {
// cache cleaner should have cleaned up the stale keys from index
assertFalse(getNodeCacheStats(client).getMemorySizeInBytes() > 0);
assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes());
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

// when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys
public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
int cacheCleanIntervalInMillis = 300;
int cacheCleanIntervalInMillis = 10;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10)
Expand All @@ -1194,37 +1202,41 @@ public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
setupIndex(client, index1);
setupIndex(client, index2);

// assert cache is empty for index1
assertEquals(0, getRequestCacheStats(client, index1).getMemorySizeInBytes());
// create first cache entry in index1
createCacheEntry(client, index1, "hello");
assertCacheState(client, index1, 0, 1);
long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(memorySizeForIndex1 > 0);
long memorySizeForIndex1With1Entries = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(memorySizeForIndex1With1Entries > 0);

// create second cache entry in index1
createCacheEntry(client, index1, "there");
assertCacheState(client, index1, 0, 2);
long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1);
long memorySizeForIndex1With2Entries = getRequestCacheStats(client, index1).getMemorySizeInBytes();
assertTrue(memorySizeForIndex1With2Entries > memorySizeForIndex1With1Entries);

// assert cache is empty for index2
assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes());
// create first cache entry in index2
createCacheEntry(client, index2, "hello");
assertCacheState(client, index2, 0, 1);
assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0);

// force refresh index 1 so that it creates 2 stale keys
flushAndRefresh(index1);
// create another cache entry in index 1, this should not be cleaned up.
// force refresh both index1 and index2
flushAndRefresh(index1, index2);
// create another cache entry in index 1 same as memorySizeForIndex1With1Entries, this should not be cleaned up.
createCacheEntry(client, index1, "hello");
// record the size of this entry
long memorySizeOfLatestEntryForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes() - finalMemorySizeForIndex1;
// force refresh index 2 so that it creates 1 stale key
flushAndRefresh(index2);
// sleep until cache cleaner would have cleaned up the stale key from index 2
// sleep until cache cleaner would have cleaned up the stale key from index2
assertBusy(() -> {
// cache cleaner should have cleaned up the stale key from index 2
// cache cleaner should have cleaned up the stale key from index2 and hence cache should be empty
assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes());
// cache cleaner should have only cleaned up the stale entities
assertEquals(memorySizeOfLatestEntryForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes());
// cache cleaner should have only cleaned up the stale entities for index1
long currentMemorySizeInBytesForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes();
// assert the memory size of index1 to only contain 1 entry added after flushAndRefresh
assertEquals(memorySizeForIndex1With1Entries, currentMemorySizeInBytesForIndex1);
// cache for index1 should not be empty since there was an item cached after flushAndRefresh
assertTrue(currentMemorySizeInBytesForIndex1 > 0);
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

Expand Down
30 changes: 29 additions & 1 deletion server/src/main/java/org/opensearch/action/bulk/BulkRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.Version;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.CompositeIndicesRequest;
Expand Down Expand Up @@ -80,7 +81,6 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkRequest.class);

private static final int REQUEST_OVERHEAD = 50;

/**
* Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and
* {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare
Expand All @@ -96,6 +96,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private String globalRouting;
private String globalIndex;
private Boolean globalRequireAlias;
private int batchSize = 1;

private long sizeInBytes = 0;

Expand All @@ -107,6 +108,9 @@ public BulkRequest(StreamInput in) throws IOException {
requests.addAll(in.readList(i -> DocWriteRequest.readDocumentRequest(null, i)));
refreshPolicy = RefreshPolicy.readFrom(in);
timeout = in.readTimeValue();
if (in.getVersion().onOrAfter(Version.V_2_14_0)) {
batchSize = in.readInt();
}
}

public BulkRequest(@Nullable String globalIndex) {
Expand Down Expand Up @@ -346,6 +350,27 @@ public final BulkRequest timeout(TimeValue timeout) {
return this;
}

/**
* Set batch size
* @param size batch size from input
* @return {@link BulkRequest}
*/
public BulkRequest batchSize(int size) {
if (size < 1) {
throw new IllegalArgumentException("batch_size must be greater than 0");
}
this.batchSize = size;
return this;
}

/**
* Get batch size
* @return batch size
*/
public int batchSize() {
return this.batchSize;
}

/**
* Note for internal callers (NOT high level rest client),
* the global parameter setting is ignored when used with:
Expand Down Expand Up @@ -453,6 +478,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(requests, DocWriteRequest::writeDocumentRequest);
refreshPolicy.writeTo(out);
out.writeTimeValue(timeout);
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeInt(batchSize);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,8 @@ public boolean isForceExecution() {
}
},
bulkRequestModifier::markItemAsDropped,
executorName
executorName,
original
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public void before() {
current.incrementAndGet();
}

/**
* Invoke before the given operation begins in multiple items at the same time.
* @param n number of items
*/
public void beforeN(int n) {
current.addAndGet(n);
}

/**
* Invoked upon completion (success or failure) of the given operation
* @param currentTime elapsed time of the operation
Expand All @@ -46,13 +54,35 @@ public void after(long currentTime) {
time.inc(currentTime);
}

/**
* Invoked upon completion (success or failure) of the given operation for multiple items.
* @param n number of items completed
* @param currentTime elapsed time of the operation
*/
public void afterN(int n, long currentTime) {
current.addAndGet(-n);
for (int i = 0; i < n; ++i) {
time.inc(currentTime);
}
}

/**
* Invoked upon failure of the operation.
*/
public void failed() {
failed.inc();
}

/**
* Invoked upon failure of the operation on multiple items.
* @param n number of items on operation.
*/
public void failedN(int n) {
for (int i = 0; i < n; ++i) {
failed.inc();
}
}

public void add(OperationMetrics other) {
// Don't try copying over current, since in-flight requests will be linked to the existing metrics instance.
failed.inc(other.failed.count());
Expand Down
Loading

0 comments on commit 19fbd54

Please sign in to comment.