Skip to content

Commit

Permalink
Added tests for SegmentReplicationSourceService
Browse files Browse the repository at this point in the history
This includes refactoring CopyStateTests for code reuse. Also fixed CopyStateTests since these were failing.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
kartg committed Jun 1, 2022
1 parent 1a2c70b commit caca533
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -95,7 +94,8 @@ public void messageReceived(GetSegmentFilesRequest request, TransportChannel cha
if (isInCopyStateMap(request.getCheckpoint())) {
// TODO send files
} else {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
// Return an empty list of files
channel.sendResponse(new GetSegmentFilesResponse(Collections.emptyList()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.Version;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyStateTests;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SegmentReplicationSourceServiceTests extends OpenSearchTestCase {

private ShardId testShardId;
private ReplicationCheckpoint testCheckpoint;
private IndicesService mockIndicesService;
private IndexService mockIndexService;
private IndexShard mockIndexShard;
private TestThreadPool testThreadPool;
private CapturingTransport transport;
private TransportService transportService;
private DiscoveryNode localNode;
private SegmentReplicationSourceService segmentReplicationSourceService;

@Override
public void setUp() throws Exception {
super.setUp();
// setup mocks
mockIndexShard = CopyStateTests.createMockIndexShard();
testShardId = mockIndexShard.shardId();
mockIndicesService = mock(IndicesService.class);
mockIndexService = mock(IndexService.class);
when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService);
when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard);

// This mirrors the creation of the ReplicationCheckpoint inside CopyState
testCheckpoint = new ReplicationCheckpoint(
testShardId,
mockIndexShard.getOperationPrimaryTerm(),
0L,
mockIndexShard.getProcessedLocalCheckpoint(),
0L
);
testThreadPool = new TestThreadPool("test", Settings.EMPTY);
transport = new CapturingTransport();
localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
transportService = transport.createTransportService(
Settings.EMPTY,
testThreadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> localNode,
null,
Collections.emptySet()
);
transportService.start();
transportService.acceptIncomingRequests();
segmentReplicationSourceService = new SegmentReplicationSourceService(transportService, mockIndicesService);
}

@Override
public void tearDown() throws Exception {
ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS);
testThreadPool = null;
super.tearDown();
}

public void testGetSegmentFiles_EmptyResponse() {
final GetSegmentFilesRequest request = new GetSegmentFilesRequest(
1,
"allocationId",
localNode,
Collections.emptyList(),
testCheckpoint
);
transportService.sendRequest(
localNode,
SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES,
request,
new TransportResponseHandler<GetSegmentFilesResponse>() {
@Override
public void handleResponse(GetSegmentFilesResponse response) {
assertEquals(0, response.files.size());
}

@Override
public void handleException(TransportException e) {
fail("unexpected exception: " + e);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public GetSegmentFilesResponse read(StreamInput in) throws IOException {
return new GetSegmentFilesResponse(in);
}
}
);
}

public void testCheckpointInfo() {
final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, "testAllocationId", localNode, testCheckpoint);
transportService.sendRequest(
localNode,
SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO,
request,
new TransportResponseHandler<CheckpointInfoResponse>() {
@Override
public void handleResponse(CheckpointInfoResponse response) {
assertEquals(testCheckpoint, response.getCheckpoint());
assertNotNull(response.getInfosBytes());
// CopyStateTests sets up one pending delete file and one committed segments file
assertEquals(1, response.getPendingDeleteFiles().size());
assertEquals(1, response.getSnapshot().size());
}

@Override
public void handleException(TransportException e) {
fail("unexpected exception: " + e);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public CheckpointInfoResponse read(StreamInput in) throws IOException {
return new CheckpointInfoResponse(in);
}
}
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,51 +29,52 @@

public class CopyStateTests extends IndexShardTestCase {

private static final long EXPECTED_LONG_VALUE = 1L;
private static final ShardId TEST_SHARD_ID = new ShardId("testIndex", "testUUID", 0);
private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST);
private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST);

private static final Store.MetadataSnapshot COMMIT_SNAPSHOT = new Store.MetadataSnapshot(
Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE, PENDING_DELETE_FILE.name(), PENDING_DELETE_FILE),
null,
0
);

private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot(
Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE),
null,
0
);

public void testCopyStateCreation() throws IOException {
// dummy objects setup
final long expectedLongValue = 1L;
final ShardId testShardId = new ShardId("testIndex", "testUUID", 0);
final StoreFileMetadata segmentsFile = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST);
final StoreFileMetadata pendingDeleteFile = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST);
final Store.MetadataSnapshot commitMetadataSnapshot = new Store.MetadataSnapshot(
Map.of("segmentsFile", segmentsFile, "pendingDeleteFile", pendingDeleteFile),
null,
0
);
final Store.MetadataSnapshot segmentInfosMetadataSnapshot = new Store.MetadataSnapshot(
Map.of("segmentsFile", segmentsFile),
null,
0
);
CopyState copyState = new CopyState(createMockIndexShard());
ReplicationCheckpoint checkpoint = copyState.getCheckpoint();
assertEquals(TEST_SHARD_ID, checkpoint.getShardId());
// version was never set so this should be zero
assertEquals(0, checkpoint.getSegmentInfosVersion());
assertEquals(EXPECTED_LONG_VALUE, checkpoint.getPrimaryTerm());

Set<StoreFileMetadata> pendingDeleteFiles = copyState.getPendingDeleteFiles();
assertEquals(1, pendingDeleteFiles.size());
assertTrue(pendingDeleteFiles.contains(PENDING_DELETE_FILE));
}

// Mock objects setup
public static IndexShard createMockIndexShard() throws IOException {
IndexShard mockShard = mock(IndexShard.class);
when(mockShard.shardId()).thenReturn(testShardId);
when(mockShard.getOperationPrimaryTerm()).thenReturn(expectedLongValue);
when(mockShard.getProcessedLocalCheckpoint()).thenReturn(expectedLongValue);
when(mockShard.shardId()).thenReturn(TEST_SHARD_ID);
when(mockShard.getOperationPrimaryTerm()).thenReturn(EXPECTED_LONG_VALUE);
when(mockShard.getProcessedLocalCheckpoint()).thenReturn(EXPECTED_LONG_VALUE);

Store mockStore = mock(Store.class);
when(mockShard.store()).thenReturn(mockStore);

SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major);
when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {}));

when(mockStore.getMetadata(testSegmentInfos)).thenReturn(segmentInfosMetadataSnapshot);
when(mockStore.getMetadata(testSegmentInfos)).thenReturn(SI_SNAPSHOT);

IndexCommit mockIndexCommit = mock(IndexCommit.class);
when(mockShard.acquireLastIndexCommit(false)).thenReturn(new GatedCloseable<>(mockIndexCommit, () -> {}));
when(mockStore.getMetadata(mockIndexCommit)).thenReturn(commitMetadataSnapshot);

// unit test
CopyState copyState = new CopyState(mockShard);
ReplicationCheckpoint checkpoint = copyState.getCheckpoint();
assertEquals(testShardId, checkpoint.getShardId());
// version was never set so this should be zero
assertEquals(0, checkpoint.getSegmentInfosVersion());
assertEquals(expectedLongValue, checkpoint.getPrimaryTerm());

Set<StoreFileMetadata> pendingDeleteFiles = copyState.getPendingDeleteFiles();
assertEquals(1, pendingDeleteFiles.size());
assertTrue(pendingDeleteFiles.contains(pendingDeleteFile));
when(mockStore.getMetadata(mockIndexCommit)).thenReturn(COMMIT_SNAPSHOT);
return mockShard;
}
}

0 comments on commit caca533

Please sign in to comment.