Skip to content

Commit

Permalink
Add tests and address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed Sep 2, 2024
1 parent b45ad8c commit a1a6b82
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -40,6 +41,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -253,10 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(discoveryStats.getClusterStateStats());
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT).get();
assertEquals(0, cleanupAttemptFailedCount);
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
Expand Down Expand Up @@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
}

public void testRemotePublicationDownloadStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);

}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);

assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ public interface PersistedState extends Closeable {
* Returns the stats for the persistence layer for {@link CoordinationState}.
* @return PersistedStateStats
*/
PersistedStateStats getUploadStats();
PersistedStateStats getStats();

/**
* Marks the last accepted cluster state as committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,14 +896,14 @@ public DiscoveryStats stats() {
ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats();
ArrayList<PersistedStateStats> stats = new ArrayList<>();
Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> {
if (persistedStateRegistry.getPersistedState(stateType) != null) {
if (persistedStateRegistry.getPersistedState(stateType).getUploadStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getUploadStats());
}
if (persistedStateRegistry.getPersistedState(stateType) != null
&& persistedStateRegistry.getPersistedState(stateType).getStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getDownloadStats());
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());

Check warning on line 906 in server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java#L905-L906

Added lines #L905 - L906 were not covered by tests
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getDownloadStats() {
return remoteClusterStateService.getDownloadStats();
public PersistedStateStats getFullDownloadStats() {
return remoteClusterStateService.getFullDownloadStats();

Check warning on line 182 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L182

Added line #L182 was not covered by tests
}

public PersistedStateStats getDiffDownloadStats() {
return remoteClusterStateService.getDiffDownloadStats();

Check warning on line 186 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L186

Added line #L186 was not covered by tests
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
Expand Down Expand Up @@ -235,7 +239,8 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, RuntimeException {
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
boolean applyFullState = false;
try {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
Expand All @@ -248,7 +253,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
Expand All @@ -262,7 +266,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}

if (applyFullState == true) {
remoteClusterStateService.fullDownloadState();
logger.debug(
() -> new ParameterizedMessage(

Check warning on line 270 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L270

Added line #L270 was not covered by tests
"Downloading full cluster state for term {}, version {}, stateUUID {}",
Expand All @@ -282,7 +285,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
lastSeenClusterState.set(clusterState);
return response;
} else {
remoteClusterStateService.diffDownloadState();
logger.debug(
() -> new ParameterizedMessage(

Check warning on line 289 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L289

Added line #L289 was not covered by tests
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
Expand All @@ -303,9 +305,12 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
return response;

Check warning on line 305 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L303-L305

Added lines #L303 - L305 were not covered by tests
}
} catch (Exception e) {
remoteClusterStateService.readMetadataFailed();
if (e instanceof IOException) throw new IOException("IOException in reading remote cluster state", e);
throw new RuntimeException("Runtime exception in reading remote cluster state", e);
if (applyFullState) {
remoteClusterStateService.fullDownloadFailed();

Check warning on line 309 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L309

Added line #L309 was not covered by tests
} else {
remoteClusterStateService.diffDownloadFailed();
}
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
// Note: These stats are not published yet, will come in future
return null;
}
Expand Down Expand Up @@ -745,7 +745,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
return remoteClusterStateService.getUploadStats();

Check warning on line 749 in server/src/main/java/org/opensearch/gateway/GatewayMetaState.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/GatewayMetaState.java#L749

Added line #L749 was not covered by tests
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,8 +1364,8 @@ public ClusterState getClusterStateForManifest(
clusterState = ClusterState.builder(state).metadata(mb).build();
}
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateDownloadSucceeded();
remoteStateStats.stateDownloadTook(durationMillis);
remoteStateStats.stateFullDownloadSucceeded();
remoteStateStats.stateFullDownloadTook(durationMillis);

return clusterState;
}
Expand Down Expand Up @@ -1456,8 +1456,8 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
.build();

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateDownloadSucceeded();
remoteStateStats.stateDownloadTook(durationMillis);
remoteStateStats.stateDiffDownloadSucceeded();
remoteStateStats.stateDiffDownloadTook(durationMillis);

return clusterState;
}
Expand Down Expand Up @@ -1658,27 +1658,27 @@ public void writeMetadataFailed() {
remoteStateStats.stateUploadFailed();

Check warning on line 1658 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1658

Added line #L1658 was not covered by tests
}

public void readMetadataFailed() {
remoteStateStats.stateDownloadFailed();
public RemotePersistenceStats getRemoteStateStats() {
return remoteStateStats;
}

public void fullDownloadState() {
remoteStateStats.fullDownloadState();
public PersistedStateStats getUploadStats() {
return remoteStateStats.getUploadStats();
}

public void diffDownloadState() {
remoteStateStats.diffDownloadState();
public PersistedStateStats getFullDownloadStats() {
return remoteStateStats.getRemoteFullDownloadStats();
}

public RemotePersistenceStats getRemoteStateStats() {
return remoteStateStats;
public PersistedStateStats getDiffDownloadStats() {
return remoteStateStats.getRemoteDiffDownloadStats();

Check warning on line 1674 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1674

Added line #L1674 was not covered by tests
}

public PersistedStateStats getUploadStats() {
return remoteStateStats.getUploadStats();
public void fullDownloadFailed() {
remoteStateStats.stateFullDownloadFailed();
}

Check warning on line 1679 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1678-L1679

Added lines #L1678 - L1679 were not covered by tests

public PersistedStateStats getDownloadStats() {
return remoteStateStats.getDownloadStats();
public void diffDownloadFailed() {
remoteStateStats.stateDiffDownloadFailed();
}

Check warning on line 1683 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L1682-L1683

Added lines #L1682 - L1683 were not covered by tests
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
public class RemotePersistenceStats {

RemoteUploadStats remoteUploadStats;
RemoteDownloadStats remoteDownloadStats;
PersistedStateStats remoteDiffDownloadStats;
PersistedStateStats remoteFullDownloadStats;

final String FULL_DOWNLOAD_STATS = "remote_full_download";
final String DIFF_DOWNLOAD_STATS = "remote_diff_download";

public RemotePersistenceStats() {
remoteUploadStats = new RemoteUploadStats();
remoteDownloadStats = new RemoteDownloadStats();
remoteDiffDownloadStats = new PersistedStateStats(DIFF_DOWNLOAD_STATS);
remoteFullDownloadStats = new PersistedStateStats(FULL_DOWNLOAD_STATS);
}

public void cleanUpAttemptFailed() {
Expand Down Expand Up @@ -61,32 +66,40 @@ public void stateUploadFailed() {
remoteUploadStats.stateFailed();
}

Check warning on line 67 in server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java#L66-L67

Added lines #L66 - L67 were not covered by tests

public void stateDownloadSucceeded() {
remoteDownloadStats.stateSucceeded();
public void stateFullDownloadSucceeded() {
remoteFullDownloadStats.stateSucceeded();
}

public void stateDownloadTook(long durationMillis) {
remoteDownloadStats.stateTook(durationMillis);
public void stateDiffDownloadSucceeded() {
remoteDiffDownloadStats.stateSucceeded();
}

public void stateDownloadFailed() {
remoteDownloadStats.stateFailed();
public void stateFullDownloadTook(long durationMillis) {
remoteFullDownloadStats.stateTook(durationMillis);
}

public PersistedStateStats getUploadStats() {
return remoteUploadStats;
public void stateDiffDownloadTook(long durationMillis) {
remoteDiffDownloadStats.stateTook(durationMillis);
}

public void stateFullDownloadFailed() {
remoteFullDownloadStats.stateFailed();
}

Check warning on line 87 in server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java#L86-L87

Added lines #L86 - L87 were not covered by tests

public PersistedStateStats getDownloadStats() {
return remoteDownloadStats;
public void stateDiffDownloadFailed() {
remoteDiffDownloadStats.stateFailed();
}

Check warning on line 91 in server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java#L90-L91

Added lines #L90 - L91 were not covered by tests

public PersistedStateStats getUploadStats() {
return remoteUploadStats;
}

public void diffDownloadState() {
remoteDownloadStats.diffDownloadState();
public PersistedStateStats getRemoteDiffDownloadStats() {
return remoteDiffDownloadStats;

Check warning on line 98 in server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java#L98

Added line #L98 was not covered by tests
}

public void fullDownloadState() {
remoteDownloadStats.fullDownloadState();
public PersistedStateStats getRemoteFullDownloadStats() {
return remoteFullDownloadStats;
}

}
Loading

0 comments on commit a1a6b82

Please sign in to comment.