diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8f41e6f5f1e1f..9c93d2bf25608 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -588,7 +588,7 @@ public enum SearcherScope { * Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range. * The caller has to close the returned snapshot after finishing the reading. */ - public abstract Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException; + public abstract Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException; public abstract TranslogStats getTranslogStats(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 99dfb908711e3..81fe7f60bc5d6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -344,7 +344,7 @@ public void restoreLocalCheckpointFromTranslog() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); final long localCheckpoint = localCheckpointTracker.getCheckpoint(); - try (Translog.Snapshot snapshot = getTranslog().newSnapshotFrom(localCheckpoint + 1)) { + try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() > localCheckpoint) { @@ -480,8 +480,8 @@ public void syncTranslog() throws IOException { } @Override - public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException { - return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo); + public Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException { + return getTranslog().newSnapshotFromMinSeqNo(minSeqNo); } /** @@ -493,7 +493,7 @@ public Translog.Snapshot readHistoryOperations(String source, MapperService mapp if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { return newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); } else { - return getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE); + return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); } } @@ -2483,7 +2483,7 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe } else { final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE)) { + try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index de0131ff5de27..23d7bce70fb04 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1605,7 +1605,7 @@ public Closeable acquireRetentionLockForPeerRecovery() { */ public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException { // TODO: Remove this method after primary-replica resync use soft-deletes - return getEngine().newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE); + return getEngine().newSnapshotFromMinSeqNo(minSeqNo); } /** diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index e1c2b1e3b9ef2..39a077aa7822a 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -411,9 +411,7 @@ public int totalOperationsByMinGen(long minGeneration) { public int estimateTotalOperationsFromMinSeq(long minSeqNo) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - return readersBetweenMinAndMaxSeqNo(minSeqNo, Long.MAX_VALUE) - .mapToInt(BaseTranslogReader::totalOperations) - .sum(); + return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum(); } } @@ -602,23 +600,11 @@ public Operation readOperation(Location location) throws IOException { return null; } - /** - * Returns a snapshot with operations having a sequence number equal to or greater than minSeqNo. - */ - public Snapshot newSnapshotFrom(long minSeqNo) throws IOException { - return getSnapshotBetween(minSeqNo, Long.MAX_VALUE); - } - - /** - * Returns a snapshot with operations having a sequence number equal to or greater than minSeqNo and - * equal to or lesser than maxSeqNo. - */ - public Snapshot getSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException { + public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - TranslogSnapshot[] snapshots = readersBetweenMinAndMaxSeqNo(minSeqNo, maxSeqNo) - .map(BaseTranslogReader::newSnapshot) - .toArray(TranslogSnapshot[]::new); + TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot) + .toArray(TranslogSnapshot[]::new); return newMultiSnapshot(snapshots); } } @@ -644,14 +630,14 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti } } - private Stream readersBetweenMinAndMaxSeqNo(long minSeqNo, long maxSeqNo) { - assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() ; - + private Stream readersAboveMinSeqNo(long minSeqNo) { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() : + "callers of readersAboveMinSeqNo must hold a lock: readLock [" + + readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]"; return Stream.concat(readers.stream(), Stream.of(current)) .filter(reader -> { - final Checkpoint checkpoint = reader.getCheckpoint(); - return checkpoint.maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || - checkpoint.minSeqNo <= maxSeqNo && checkpoint.maxSeqNo >= minSeqNo; + final long maxSeqNo = reader.getCheckpoint().maxSeqNo; + return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo; }); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index c92370fec04ec..cf6e753684676 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -998,7 +998,7 @@ protected void doRun() throws Exception { // these are what we expect the snapshot to return (and potentially some more). Set expectedOps = new HashSet<>(writtenOps.keySet()); expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView); - try (Translog.Snapshot snapshot = translog.newSnapshotFrom(committedLocalCheckpointAtView + 1L)) { + try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(committedLocalCheckpointAtView + 1L)) { Translog.Operation op; while ((op = snapshot.next()) != null) { expectedOps.remove(op); @@ -2814,7 +2814,7 @@ public void testMinSeqNoBasedAPI() throws IOException { } assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps)); int readFromSnapshot = 0; - try (Translog.Snapshot snapshot = translog.newSnapshotFrom(seqNo)) { + try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) { assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps)); Translog.Operation op; while ((op = snapshot.next()) != null) { @@ -2831,38 +2831,6 @@ public void testMinSeqNoBasedAPI() throws IOException { } } - public void testGetSnapshotBetween() throws IOException { - final int numOperations = randomIntBetween(2, 8196); - final List sequenceNumbers = IntStream.range(0, numOperations).boxed().collect(Collectors.toList()); - Collections.shuffle(sequenceNumbers, random()); - for (Integer sequenceNumber : sequenceNumbers) { - translog.add(new Translog.NoOp(sequenceNumber, 0, "test")); - if (rarely()) { - translog.rollGeneration(); - } - } - translog.rollGeneration(); - - final int iters = randomIntBetween(8, 32); - for (int iter = 0; iter < iters; iter++) { - int min = randomIntBetween(0, numOperations - 1); - int max = randomIntBetween(min, numOperations); - try (Translog.Snapshot snapshot = translog.getSnapshotBetween(min, max)) { - final List operations = new ArrayList<>(); - for (Translog.Operation operation = snapshot.next(); operation != null; operation = snapshot.next()) { - if (operation.seqNo() >= min && operation.seqNo() <= max) { - operations.add(operation); - } - } - operations.sort(Comparator.comparingLong(Translog.Operation::seqNo)); - Iterator iterator = operations.iterator(); - for (long expectedSeqNo = min; expectedSeqNo < max; expectedSeqNo++) { - assertThat(iterator.next().seqNo(), equalTo(expectedSeqNo)); - } - } - } - } - public void testSimpleCommit() throws IOException { final int operations = randomIntBetween(1, 4096); long seqNo = 0;