diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java index 7650643..92a0297 100644 --- a/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java @@ -45,6 +45,7 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.unit.TimeValue; +import org.opensearch.data.core.OpenSearchOperations; import org.opensearch.index.query.MoreLikeThisQueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.reindex.BulkByScrollResponse; @@ -81,7 +82,7 @@ * OpenSearchRestTemplate * @since 0.1 */ -public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate { +public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate implements OpenSearchOperations { private static final Log LOGGER = LogFactory.getLog(OpenSearchRestTemplate.class); @@ -485,6 +486,13 @@ public Boolean closePointInTime(String pit) { return false; } + @Override + public List listPointInTime() { + return execute(client -> client.getAllPits(RequestOptions.DEFAULT)) + .getPitInfos().stream().map(pit -> new PitInfo(pit.getPitId(), pit.getCreationTime(), null)) + .toList(); + } + public SearchResponse suggest(SuggestBuilder suggestion, IndexCoordinates index) { SearchRequest searchRequest = requestFactory.searchRequest(suggestion, index); return execute(client -> client.search(searchRequest, RequestOptions.DEFAULT)); diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchTemplate.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchTemplate.java index 9df8585..ba071c3 100644 --- a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchTemplate.java +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchTemplate.java @@ -37,6 +37,7 @@ import org.opensearch.client.opensearch.core.pit.DeletePitRequest; import org.opensearch.client.opensearch.core.search.SearchResult; import org.opensearch.client.transport.Version; +import org.opensearch.data.core.OpenSearchOperations; import org.springframework.data.elasticsearch.BulkFailureException; import org.springframework.data.elasticsearch.client.UnsupportedBackendOperation; import org.springframework.data.elasticsearch.core.AbstractElasticsearchTemplate; @@ -76,7 +77,7 @@ * @author Haibo Liu * @since 4.4 */ -public class OpenSearchTemplate extends AbstractElasticsearchTemplate { +public class OpenSearchTemplate extends AbstractElasticsearchTemplate implements OpenSearchOperations { private static final Log LOGGER = LogFactory.getLog(OpenSearchTemplate.class); @@ -633,6 +634,14 @@ public Boolean closePointInTime(String pit) { return !response.pits().isEmpty(); } + @Override + public List listPointInTime() { + return execute(client -> client.listAllPit()).pits() + .stream() + .map(pit -> new PitInfo(pit.pitId(), pit.creationTime(), pit.keepAlive() == null ? null : Duration.ofMillis(pit.keepAlive()))) + .toList(); + } + // endregion // region script methods @@ -737,5 +746,4 @@ protected List checkForBulkOperationFailure(BulkRespon } // endregion - } diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java b/spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java new file mode 100644 index 0000000..8a13afa --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java @@ -0,0 +1,25 @@ +package org.opensearch.data.core; + +import java.time.Duration; +import java.util.List; +import org.springframework.data.elasticsearch.core.ElasticsearchOperations; + +/** + * The extension over {@link ElasticsearchOperations} with OpenSearch specific operations. + */ +public interface OpenSearchOperations extends ElasticsearchOperations { + /** + * Return all active point in time searches + * @return all active point in time searches + */ + List listPointInTime(); + + /** + * Describes the point in time entry + * + * @param id the point in time id + * @param keepAlive the new keep alive value to be sent with the query + */ + record PitInfo(String id, long creationTime, Duration keepAlive) { + } +} diff --git a/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java b/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java index a73e0ac..b184427 100755 --- a/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java +++ b/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -40,6 +40,8 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -52,6 +54,7 @@ import org.junit.jupiter.api.Test; import org.opensearch.data.client.EnabledIfOpenSearchVersion; import org.opensearch.data.client.orhlc.NativeSearchQueryBuilder; +import org.opensearch.data.core.OpenSearchOperations; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataAccessException; import org.springframework.dao.InvalidDataAccessApiUsageException; @@ -130,7 +133,7 @@ public abstract class ElasticsearchIntegrationTests { private static final String MULTI_INDEX_2_NAME = MULTI_INDEX_PREFIX + "-2"; private static final String MULTI_INDEX_3_NAME = MULTI_INDEX_PREFIX + "-3"; - @Autowired protected ElasticsearchOperations operations; + @Autowired protected OpenSearchOperations operations; private IndexOperations indexOperations; @Autowired protected IndexNameProvider indexNameProvider; @@ -2747,16 +2750,23 @@ public void testPointInTimeKeepAliveExpired() throws InterruptedException { SearchHits results = operations.search(query,SampleEntity.class); assertThat(results.getSearchHits().size()).isEqualTo(2); - // There may be a better way to do it, but Opensearch by default waits for up-to a minute to clear expired pits - Thread.sleep(120000); final Query searchAfterQuery = getBuilderWithMatchAllQuery() // .withSort(Sort.by(Sort.Order.desc("message"))) // .withPointInTime(qpit) .withSearchAfter(List.of(Objects.requireNonNull(results.getSearchHit(1).getContent().getMessage()))) .build(); - assertThatExceptionOfType(ResourceNotFoundException.class).isThrownBy( - ()-> operations.search(searchAfterQuery, SampleEntity.class) - ); + + final long started = System.nanoTime(); + while ((System.nanoTime() - started) < TimeUnit.SECONDS.toNanos(120)) { + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); + if (operations.listPointInTime().isEmpty()) { + break; + } + } + + assertThatExceptionOfType(ResourceNotFoundException.class) + .isThrownBy(()-> operations.search(searchAfterQuery,SampleEntity.class)); + Boolean pitResult = operations.closePointInTime(pit); Assertions.assertTrue(pitResult); }