Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert translog changes introduced for CCR #31947

Merged
merged 1 commit into from
Jul 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ public enum SearcherScope {
* Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range.
* The caller has to close the returned snapshot after finishing the reading.
*/
public abstract Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException;
public abstract Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException;

public abstract TranslogStats getTranslogStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFrom(localCheckpoint + 1)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() > localCheckpoint) {
Expand Down Expand Up @@ -480,8 +480,8 @@ public void syncTranslog() throws IOException {
}

@Override
public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo);
public Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
}

/**
Expand All @@ -493,7 +493,7 @@ public Translog.Snapshot readHistoryOperations(String source, MapperService mapp
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE);
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}
}

Expand Down Expand Up @@ -2483,7 +2483,7 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe
} else {
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,7 @@ public Closeable acquireRetentionLockForPeerRecovery() {
*/
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
// TODO: Remove this method after primary-replica resync use soft-deletes
return getEngine().newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE);
return getEngine().newSnapshotFromMinSeqNo(minSeqNo);
}

/**
Expand Down
34 changes: 10 additions & 24 deletions server/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,7 @@ public int totalOperationsByMinGen(long minGeneration) {
public int estimateTotalOperationsFromMinSeq(long minSeqNo) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return readersBetweenMinAndMaxSeqNo(minSeqNo, Long.MAX_VALUE)
.mapToInt(BaseTranslogReader::totalOperations)
.sum();
return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum();
}
}

Expand Down Expand Up @@ -602,23 +600,11 @@ public Operation readOperation(Location location) throws IOException {
return null;
}

/**
* Returns a snapshot with operations having a sequence number equal to or greater than <code>minSeqNo</code>.
*/
public Snapshot newSnapshotFrom(long minSeqNo) throws IOException {
return getSnapshotBetween(minSeqNo, Long.MAX_VALUE);
}

/**
* Returns a snapshot with operations having a sequence number equal to or greater than <code>minSeqNo</code> and
* equal to or lesser than <code>maxSeqNo</code>.
*/
public Snapshot getSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
TranslogSnapshot[] snapshots = readersBetweenMinAndMaxSeqNo(minSeqNo, maxSeqNo)
.map(BaseTranslogReader::newSnapshot)
.toArray(TranslogSnapshot[]::new);
TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot)
.toArray(TranslogSnapshot[]::new);
return newMultiSnapshot(snapshots);
}
}
Expand All @@ -644,14 +630,14 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti
}
}

private Stream<? extends BaseTranslogReader> readersBetweenMinAndMaxSeqNo(long minSeqNo, long maxSeqNo) {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() ;

private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo) {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
return Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> {
final Checkpoint checkpoint = reader.getCheckpoint();
return checkpoint.maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ||
checkpoint.minSeqNo <= maxSeqNo && checkpoint.maxSeqNo >= minSeqNo;
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ protected void doRun() throws Exception {
// these are what we expect the snapshot to return (and potentially some more).
Set<Translog.Operation> expectedOps = new HashSet<>(writtenOps.keySet());
expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView);
try (Translog.Snapshot snapshot = translog.newSnapshotFrom(committedLocalCheckpointAtView + 1L)) {
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(committedLocalCheckpointAtView + 1L)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
expectedOps.remove(op);
Expand Down Expand Up @@ -2814,7 +2814,7 @@ public void testMinSeqNoBasedAPI() throws IOException {
}
assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps));
int readFromSnapshot = 0;
try (Translog.Snapshot snapshot = translog.newSnapshotFrom(seqNo)) {
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) {
assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps));
Translog.Operation op;
while ((op = snapshot.next()) != null) {
Expand All @@ -2831,38 +2831,6 @@ public void testMinSeqNoBasedAPI() throws IOException {
}
}

public void testGetSnapshotBetween() throws IOException {
final int numOperations = randomIntBetween(2, 8196);
final List<Integer> sequenceNumbers = IntStream.range(0, numOperations).boxed().collect(Collectors.toList());
Collections.shuffle(sequenceNumbers, random());
for (Integer sequenceNumber : sequenceNumbers) {
translog.add(new Translog.NoOp(sequenceNumber, 0, "test"));
if (rarely()) {
translog.rollGeneration();
}
}
translog.rollGeneration();

final int iters = randomIntBetween(8, 32);
for (int iter = 0; iter < iters; iter++) {
int min = randomIntBetween(0, numOperations - 1);
int max = randomIntBetween(min, numOperations);
try (Translog.Snapshot snapshot = translog.getSnapshotBetween(min, max)) {
final List<Translog.Operation> operations = new ArrayList<>();
for (Translog.Operation operation = snapshot.next(); operation != null; operation = snapshot.next()) {
if (operation.seqNo() >= min && operation.seqNo() <= max) {
operations.add(operation);
}
}
operations.sort(Comparator.comparingLong(Translog.Operation::seqNo));
Iterator<Translog.Operation> iterator = operations.iterator();
for (long expectedSeqNo = min; expectedSeqNo < max; expectedSeqNo++) {
assertThat(iterator.next().seqNo(), equalTo(expectedSeqNo));
}
}
}
}

public void testSimpleCommit() throws IOException {
final int operations = randomIntBetween(1, 4096);
long seqNo = 0;
Expand Down