Skip to content

Commit

Permalink
GH-4819 ordered dual union iteration support
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed Dec 20, 2023
1 parent 8580bfe commit c24319e
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,17 @@
import java.util.NoSuchElementException;

import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.common.order.StatementOrder;
import org.eclipse.rdf4j.model.Value;

/**
* Provides a bag union of the two provided iterations.
*/
public class DualUnionIteration<E> implements CloseableIteration<E> {

private final StatementOrder statementOrder;
private final Comparator<Value> cmp;
private final Comparator<E> cmp;
private CloseableIteration<? extends E> iteration1;
private CloseableIteration<? extends E> iteration2;
private E nextElementIteration1;
private E nextElementIteration2;
private E nextElement;
/**
* Flag indicating whether this iteration has been closed.
Expand All @@ -37,20 +36,15 @@ private DualUnionIteration(CloseableIteration<? extends E> iteration1,
CloseableIteration<? extends E> iteration2) {
this.iteration1 = iteration1;
this.iteration2 = iteration2;
this.statementOrder = null;
this.cmp = null;
}

@Experimental
public DualUnionIteration(StatementOrder statementOrder, Comparator<Value> cmp,
public DualUnionIteration(Comparator<E> cmp,
CloseableIteration<? extends E> iteration1, CloseableIteration<? extends E> iteration2) {
this.iteration1 = iteration1;
this.iteration2 = iteration2;
this.statementOrder = statementOrder;
this.cmp = cmp;

// TODO
throw new UnsupportedOperationException("Not implemented yet");
}

public static <E> CloseableIteration<? extends E> getWildcardInstance(
Expand All @@ -66,25 +60,16 @@ public static <E> CloseableIteration<? extends E> getWildcardInstance(
}

@Experimental
public static <E> CloseableIteration<? extends E> getWildcardInstance(StatementOrder order,
Comparator<Value> cmp,
public static <E> CloseableIteration<? extends E> getWildcardInstance(Comparator<E> cmp,
CloseableIteration<? extends E> leftIteration, CloseableIteration<? extends E> rightIteration) {

if (rightIteration instanceof EmptyIteration) {
return leftIteration;
} else if (leftIteration instanceof EmptyIteration) {
return rightIteration;
} else {
if (!rightIteration.hasNext()) {
rightIteration.close();
return leftIteration;
}
if (!leftIteration.hasNext()) {
leftIteration.close();
return rightIteration;
}
return new DualUnionIteration<>(order, cmp, leftIteration, rightIteration);
}
// if (rightIteration instanceof EmptyIteration) {
// return leftIteration;
// } else if (leftIteration instanceof EmptyIteration) {
// return rightIteration;
// } else {
return new DualUnionIteration<>(cmp, leftIteration, rightIteration);
// }
}

public static <E> CloseableIteration<E> getInstance(CloseableIteration<E> leftIteration,
Expand All @@ -99,32 +84,6 @@ public static <E> CloseableIteration<E> getInstance(CloseableIteration<E> leftIt
}
}

public E getNextElement() {
if (iteration1 == null && iteration2 != null) {
if (iteration2.hasNext()) {
return iteration2.next();
} else {
iteration2.close();
iteration2 = null;
}
} else if (iteration1 != null) {
if (iteration1.hasNext()) {
return iteration1.next();
} else if (iteration2.hasNext()) {
iteration1.close();
iteration1 = null;
return iteration2.next();
} else {
iteration1.close();
iteration1 = null;
iteration2.close();
iteration2 = null;
}
}

return null;
}

@Override
public final boolean hasNext() {
if (closed) {
Expand Down Expand Up @@ -156,7 +115,11 @@ public final E next() {
*/
private E lookAhead() {
if (nextElement == null) {
nextElement = getNextElement();
if (cmp == null) {
lookaheadWithoutOrder();
} else {
lookaheadWithOrder();
}

if (nextElement == null) {
close();
Expand All @@ -165,6 +128,73 @@ private E lookAhead() {
return nextElement;
}

private void lookaheadWithOrder() {
assert cmp != null;
if (nextElementIteration1 == null && iteration1 != null) {
if (iteration1.hasNext()) {
nextElementIteration1 = iteration1.next();
} else {
iteration1.close();
iteration1 = null;
}
}

if (nextElementIteration2 == null && iteration2 != null) {
if (iteration2.hasNext()) {
nextElementIteration2 = iteration2.next();
} else {
iteration2.close();
iteration2 = null;
}
}

if (nextElementIteration1 != null && nextElementIteration2 != null) {
int compare = cmp.compare(nextElementIteration1, nextElementIteration2);

if (compare <= 0) {
nextElement = nextElementIteration1;
nextElementIteration1 = null;
} else {
nextElement = nextElementIteration2;
nextElementIteration2 = null;
}
} else {
if (nextElementIteration1 != null) {
nextElement = nextElementIteration1;
nextElementIteration1 = null;
} else if (nextElementIteration2 != null) {
nextElement = nextElementIteration2;
nextElementIteration2 = null;
}
}
}

private void lookaheadWithoutOrder() {
assert cmp == null;

if (iteration1 == null && iteration2 != null) {
if (iteration2.hasNext()) {
nextElement = iteration2.next();
} else {
iteration2.close();
iteration2 = null;
}
} else if (iteration1 != null) {
if (iteration1.hasNext()) {
nextElement = iteration1.next();
} else if (iteration2.hasNext()) {
iteration1.close();
iteration1 = null;
nextElement = iteration2.next();
} else {
iteration1.close();
iteration1 = null;
iteration2.close();
iteration2 = null;
}
}
}

/**
* Throws an {@link UnsupportedOperationException}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ public Comparator<Statement> getComparator(Comparator<Value> comparator) {

throw new IllegalStateException("Unknown StatementOrder: " + this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ private void calculateNext() {

if (bufferIterator.hasNext()) {
next = bufferIterator.next();
assert resetPossible == 1;
} else {
if (!mark && resetPossible > -1) {
resetPossible--;
Expand All @@ -62,7 +61,7 @@ private void calculateNext() {
}

if (mark && next != null) {
assert resetPossible == 1;
assert resetPossible > 0;
buffer.add(next);
}

Expand Down Expand Up @@ -107,6 +106,10 @@ public E peek() {
return next;
}

/**
* Mark the current position so that the iterator can be reset to the current state. This will cause elements to be
* stored in memory until one of {@link #reset()}, {@link #unmark()} or {@link #mark()} is called
*/
public void mark() {
if (closed) {
throw new IllegalStateException("The iteration has been closed.");
Expand All @@ -120,38 +123,51 @@ public void mark() {

}

/**
* Reset the iterator to the marked position. Resetting an iterator multiple times will always reset to the same
* position. If the iterator was not marked, this will throw an exception.
*/
public void reset() {
if (closed) {
throw new IllegalStateException("The iteration has been closed.");
}
if (buffer == null) {
throw new IllegalStateException("Mark never set");
}
if (resetPossible < 0) {
throw new IllegalStateException("Reset not possible");
}

if (mark && bufferIterator.hasNext()) {
while (bufferIterator.hasNext()) {
buffer.add(bufferIterator.next());
}
}

mark = false;
if (resetPossible < 0) {
throw new IllegalStateException("Reset not possible");
} else if (resetPossible == 0) {
if (resetPossible == 0) {
assert !mark;
buffer.add(next);
next = null;
bufferIterator = buffer.iterator();
} else if (resetPossible == 1) {
} else if (resetPossible > 0) {
next = null;
bufferIterator = buffer.iterator();
}

mark = false;
resetPossible = 1;
}

/**
* @return true if the iterator is marked
*/
boolean isMarked() {
return !closed && mark;
}

/**
* @return true if {@link #reset()} can be called on this iterator
*/
boolean isResettable() {
return !closed && (mark || resetPossible >= 0);
}
Expand All @@ -160,11 +176,16 @@ boolean isResettable() {
public void close() {
this.closed = true;
iterator.close();
buffer = null;
}

// What will happen if we are iterating over the buffer at this point, then unmark is called followed by mark?
/**
* Unmark the iterator. This will cause the iterator to stop buffering elements. If the iterator was recently reset
* and there are still elements in the buffer, then these elements will still be returned by next().
*/
public void unmark() {
mark = false;
resetPossible = -1;
buffer = null;
}
}
Loading

0 comments on commit c24319e

Please sign in to comment.