Skip to content

Commit

Permalink
Automatically close scroll context when returning streamed results.
Browse files Browse the repository at this point in the history
Original Pull Request #1746
Closes #1745
  • Loading branch information
sothawo authored Mar 27, 2021
1 parent 3500dad commit 13ab2b9
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ static <T> SearchHitsIterator<T> streamResults(int maxCount, SearchScrollHits<T>
private volatile Iterator<SearchHit<T>> currentScrollHits = searchHits.iterator();
private volatile boolean continueScroll = currentScrollHits.hasNext();
private volatile ScrollState scrollState = new ScrollState(searchHits.getScrollId());
private volatile boolean isClosed = false;

@Override
public void close() {
clearScrollConsumer.accept(scrollState.getScrollIds());
if (!isClosed) {
clearScrollConsumer.accept(scrollState.getScrollIds());
isClosed = true;
}
}

@Override
Expand All @@ -96,18 +100,24 @@ public TotalHitsRelation getTotalHitsRelation() {
@Override
public boolean hasNext() {

if (!continueScroll || (maxCount > 0 && currentCount.get() >= maxCount)) {
return false;
boolean hasNext = false;

if (!isClosed && continueScroll && (maxCount <= 0 || currentCount.get() < maxCount)) {

if (!currentScrollHits.hasNext()) {
SearchScrollHits<T> nextPage = continueScrollFunction.apply(scrollState.getScrollId());
currentScrollHits = nextPage.iterator();
scrollState.updateScrollId(nextPage.getScrollId());
continueScroll = currentScrollHits.hasNext();
}
hasNext = currentScrollHits.hasNext();
}

if (!currentScrollHits.hasNext()) {
SearchScrollHits<T> nextPage = continueScrollFunction.apply(scrollState.getScrollId());
currentScrollHits = nextPage.iterator();
scrollState.updateScrollId(nextPage.getScrollId());
continueScroll = currentScrollHits.hasNext();
if (!hasNext) {
close();
}

return currentScrollHits.hasNext();
return hasNext;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.data.util.StreamUtils;

Expand All @@ -39,6 +40,8 @@ public void shouldCallClearScrollOnIteratorClose() {
// given
List<SearchHit<String>> hits = new ArrayList<>();
hits.add(getOneSearchHit());
hits.add(getOneSearchHit());
hits.add(getOneSearchHit());

SearchScrollHits<String> searchHits = newSearchScrollHits(hits, "1234");

Expand All @@ -51,16 +54,35 @@ public void shouldCallClearScrollOnIteratorClose() {
scrollId -> newSearchScrollHits(Collections.emptyList(), scrollId), //
scrollIds -> clearScrollCalled.set(true));

while (iterator.hasNext()) {
iterator.next();
}
iterator.next();
iterator.close();

// then
assertThat(clearScrollCalled).isTrue();

}

@Test // #1745
@DisplayName("should call clearScroll when no more data is available")
void shouldCallClearScrollWhenNoMoreDataIsAvailable() {

List<SearchHit<String>> hits = new ArrayList<>();
hits.add(getOneSearchHit());
SearchScrollHits<String> searchHits = newSearchScrollHits(hits, "1234");
AtomicBoolean clearScrollCalled = new AtomicBoolean(false);

SearchHitsIterator<String> iterator = StreamQueries.streamResults( //
0, //
searchHits, //
scrollId -> newSearchScrollHits(Collections.emptyList(), scrollId), //
scrollIds -> clearScrollCalled.set(true));

while (iterator.hasNext()) {
iterator.next();
}

assertThat(clearScrollCalled).isTrue();
}
private SearchHit<String> getOneSearchHit() {
return new SearchHit<String>(null, null, null, 0, null, null, null, null, null, null, "one");
}
Expand Down

0 comments on commit 13ab2b9

Please sign in to comment.