diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index f14dd1f1..2abf8a79 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -22,6 +22,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.dao.DatasourceDao; @@ -30,6 +31,8 @@ @Log4j2 public class DatasourceUpdateService { + private static final int SLEEP_TIME_IN_MILLIS = 5000; // 5 seconds + private static final int MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS = 10 * 60 * 60 * 1000; // 10 hours private final ClusterService clusterService; private final ClusterSettings clusterSettings; private final DatasourceDao datasourceDao; @@ -86,10 +89,36 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable geoIpDataDao.putGeoIpData(indexName, header, reader.iterator(), renewLock); } + waitUntilAllShardsStarted(indexName, MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS); Instant endTime = Instant.now(); updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime); } + /** + * We wait until all shards are ready to serve search requests before updating datasource metadata to + * point to a new index so that there won't be latency degradation during GeoIP data update + * + * @param indexName the indexName + */ + @VisibleForTesting + protected void waitUntilAllShardsStarted(final String indexName, final int timeout) { + Instant start = Instant.now(); + try { + while (Instant.now().toEpochMilli() - start.toEpochMilli() < timeout) { + if (clusterService.state().routingTable().allShards(indexName).stream().allMatch(shard -> shard.started())) { + return; + } + Thread.sleep(SLEEP_TIME_IN_MILLIS); + } + throw new OpenSearchException( + "index[{}] replication did not complete after {} millis", + MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS + ); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + /** * Return header fields of geo data with given url of a manifest file * diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index e59e2c35..7bf2961f 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -96,6 +96,8 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase { protected Ip2GeoLockService ip2GeoLockService; @Mock protected Ip2GeoProcessorDao ip2GeoProcessorDao; + @Mock + protected RoutingTable routingTable; protected IngestMetadata ingestMetadata; protected NoOpNodeClient client; protected VerifyingClient verifyingClient; @@ -119,7 +121,7 @@ public void prepareIp2GeoTestCase() { when(clusterService.state()).thenReturn(clusterState); when(clusterState.metadata()).thenReturn(metadata); when(clusterState.getMetadata()).thenReturn(metadata); - when(clusterState.routingTable()).thenReturn(RoutingTable.EMPTY_ROUTING_TABLE); + when(clusterState.routingTable()).thenReturn(routingTable); when(ip2GeoExecutor.forDatasourceUpdate()).thenReturn(OpenSearchExecutors.newDirectExecutorService()); when(ingestService.getClusterService()).thenReturn(clusterService); when(threadPool.generic()).thenReturn(OpenSearchExecutors.newDirectExecutorService()); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index 80a3beac..5e08400b 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -6,6 +6,7 @@ package org.opensearch.geospatial.ip2geo.jobscheduler; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; @@ -28,6 +29,7 @@ import org.apache.commons.csv.CSVParser; import org.junit.Before; import org.opensearch.OpenSearchException; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.SuppressForbidden; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; @@ -134,6 +136,9 @@ public void testUpdateOrCreateGeoIpData_whenValidInput_thenSucceed() { File sampleFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile()); when(geoIpDataDao.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180)); + ShardRouting shardRouting = mock(ShardRouting.class); + when(shardRouting.started()).thenReturn(true); + when(routingTable.allShards(anyString())).thenReturn(Arrays.asList(shardRouting)); Datasource datasource = new Datasource(); datasource.setState(DatasourceState.AVAILABLE); @@ -158,6 +163,34 @@ public void testUpdateOrCreateGeoIpData_whenValidInput_thenSucceed() { verify(geoIpDataDao).putGeoIpData(eq(datasource.currentIndexName()), isA(String[].class), any(Iterator.class), any(Runnable.class)); } + public void testWaitUntilAllShardsStarted_whenTimedOut_thenThrowException() { + String indexName = GeospatialTestHelper.randomLowerCaseString(); + ShardRouting shardRouting = mock(ShardRouting.class); + when(shardRouting.started()).thenReturn(false); + when(routingTable.allShards(indexName)).thenReturn(Arrays.asList(shardRouting)); + + // Run + Exception e = expectThrows(OpenSearchException.class, () -> datasourceUpdateService.waitUntilAllShardsStarted(indexName, 10)); + + // Verify + assertTrue(e.getMessage().contains("did not complete")); + } + + @SneakyThrows + public void testWaitUntilAllShardsStarted_whenInterrupted_thenThrowException() { + String indexName = GeospatialTestHelper.randomLowerCaseString(); + ShardRouting shardRouting = mock(ShardRouting.class); + when(shardRouting.started()).thenReturn(false); + when(routingTable.allShards(indexName)).thenReturn(Arrays.asList(shardRouting)); + + // Run + Thread.currentThread().interrupt(); + Exception e = expectThrows(RuntimeException.class, () -> datasourceUpdateService.waitUntilAllShardsStarted(indexName, 10)); + + // Verify + assertEquals(InterruptedException.class, e.getCause().getClass()); + } + @SneakyThrows public void testGetHeaderFields_whenValidInput_thenReturnCorrectValue() { File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile());