Skip to content

Commit

Permalink
Fix flaky test in ExportPartitionWorkerTest (#4300)
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
  • Loading branch information
dinujoh authored Mar 19, 2024
1 parent 8ceb736 commit 6f4816d
Showing 1 changed file with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -79,7 +82,7 @@ public void setup() {
"test.collection|0|1|java.lang.Long",
"test.collection|000000000000000000000000|000000000000000000000001|org.bson.types.ObjectId"
})
public void testProcessPartitionSuccess(final String partitionKey) throws Exception {
public void testProcessPartitionSuccess(final String partitionKey) {
when(sourceCoordinator.acquireAvailablePartition(DataQueryPartition.PARTITION_TYPE)).thenReturn(Optional.of(dataQueryPartition));

final ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand All @@ -106,21 +109,29 @@ public void testProcessPartitionSuccess(final String partitionKey) throws Except
lenient().when(sourceCoordinator.acquireAvailablePartition(DataQueryPartition.PARTITION_TYPE))
.thenReturn(Optional.of(dataQueryPartition));

executorService.submit(() -> {
final Future<?> future = executorService.submit(() -> {
try (MockedStatic<MongoDBConnection> mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) {
mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class)))
.thenReturn(mongoClient);
exportPartitionWorker.run();
}
});

Thread.sleep(100);
executorService.shutdownNow();
// Then dependencies are called
verify(mongoClient).getDatabase(eq("test"));
await()
.atMost(Duration.ofSeconds(2))
.untilAsserted(() -> verify(mongoClient).getDatabase(eq("test")));

future.cancel(true);

await()
.atMost(Duration.ofSeconds(2))
.untilAsserted(() -> verify(mockPartitionCheckpoint, times(2)).checkpoint(2));

verify(mongoClient, times(1)).close();
verify(mongoDatabase).getCollection(eq("collection"));
verify(mockRecordBufferWriter).writeToBuffer(eq(mockAcknowledgementSet), any());
verify(successItemsCounter, times(2)).increment();
verify(failureItemsCounter, never()).increment();
executorService.shutdownNow();
}
}

0 comments on commit 6f4816d

Please sign in to comment.