Skip to content

Commit

Permalink
KAFKA-13073: Fix MockLog snapshot implementation (#11032)
Browse files Browse the repository at this point in the history
Fix a simulation test failure by:

1. Relaxing the valiation of the snapshot id against the log start
offset when the state machine attempts to create new snapshot. It
is safe to just ignore the request instead of throwing an exception
when the snapshot id is less that the log start offset.

2. Fixing the MockLog implementation so that it uses startOffset both
externally and internally.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
jsancio authored Jul 14, 2021
1 parent 0785aa3 commit 8134adc
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 54 deletions.
13 changes: 6 additions & 7 deletions core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,15 @@ final class KafkaMetadataLog private (
}

override def createNewSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = {
val highWatermarkOffset = highWatermark.offset
if (snapshotId.offset > highWatermarkOffset) {
throw new IllegalArgumentException(
s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)"
)
if (snapshotId.offset < startOffset) {
info(s"Cannot create a snapshot with an id ($snapshotId) less than the log start offset ($startOffset)")
return Optional.empty()
}

if (snapshotId.offset < startOffset) {
val highWatermarkOffset = highWatermark.offset
if (snapshotId.offset > highWatermarkOffset) {
throw new IllegalArgumentException(
s"Cannot create a snapshot for an end offset ($endOffset) less than the log start offset ($startOffset)"
s"Cannot create a snapshot with an id ($snapshotId) greater than the high-watermark ($highWatermarkOffset)"
)
}

Expand Down
5 changes: 1 addition & 4 deletions core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,7 @@ final class KafkaMetadataLogTest {
// Simulate log cleanup that advances the LSO
log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, SegmentDeletion)

assertThrows(
classOf[IllegalArgumentException],
() => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2, snapshotId.epoch))
)
assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2, snapshotId.epoch)))
}

@Test
Expand Down
1 change: 1 addition & 0 deletions raft/config/kraft-log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stderr.Target=System.err

log4j.logger.org.apache.kafka.raft=INFO
log4j.logger.org.apache.kafka.snapshot=INFO
8 changes: 4 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,17 +236,17 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
*
* See {@link RawSnapshotWriter} for details on how to use this object. The caller of
* this method is responsible for invoking {@link RawSnapshotWriter#close()}. If a
* snapshot already exists then return an {@link Optional#empty()}.
* snapshot already exists or it is less than log start offset then return an
* {@link Optional#empty()}.
*
* Snapshots created using this method will be validated against the existing snapshots
* and the replicated log.
*
* @param snapshotId the end offset and epoch that identifies the snapshot
* @return a writable snapshot if it doesn't already exists
* @return a writable snapshot if it doesn't already exists and greater than the log start
* offset
* @throws IllegalArgumentException if validate is true and end offset is greater than the
* high-watermark
* @throws IllegalArgumentException if validate is true and end offset is less than the log
* start offset
*/
Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId);

Expand Down
58 changes: 37 additions & 21 deletions raft/src/test/java/org/apache/kafka/raft/MockLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -59,14 +61,20 @@ public class MockLog implements ReplicatedLog {
private final NavigableMap<OffsetAndEpoch, MockRawSnapshotReader> snapshots = new TreeMap<>();
private final TopicPartition topicPartition;
private final Uuid topicId;
private final Logger logger;

private long nextId = ID_GENERATOR.getAndIncrement();
private LogOffsetMetadata highWatermark = new LogOffsetMetadata(0, Optional.empty());
private long lastFlushedOffset = 0;

public MockLog(TopicPartition topicPartition, Uuid topicId) {
public MockLog(
TopicPartition topicPartition,
Uuid topicId,
LogContext logContext
) {
this.topicPartition = topicPartition;
this.topicId = topicId;
this.logger = logContext.logger(MockLog.class);
}

@Override
Expand Down Expand Up @@ -218,17 +226,25 @@ private Optional<LogEntry> firstEntry() {

@Override
public LogOffsetMetadata endOffset() {
long nextOffset = lastEntry().map(entry -> entry.offset + 1).orElse(logStartOffset());
long nextOffset = lastEntry()
.map(entry -> entry.offset + 1)
.orElse(
latestSnapshotId()
.map(id -> id.offset)
.orElse(0L)
);
return new LogOffsetMetadata(nextOffset, Optional.of(new MockOffsetMetadata(nextId)));
}

@Override
public long startOffset() {
return firstEntry().map(entry -> entry.offset).orElse(logStartOffset());
}

private long logStartOffset() {
return earliestSnapshotId().map(id -> id.offset).orElse(0L);
return firstEntry()
.map(entry -> entry.offset)
.orElse(
earliestSnapshotId()
.map(id -> id.offset)
.orElse(0L)
);
}

private List<LogEntry> buildEntries(RecordBatch batch, Function<Record, Long> offsetSupplier) {
Expand Down Expand Up @@ -420,6 +436,16 @@ public void initializeLeaderEpoch(int epoch) {

@Override
public Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId) {
if (snapshotId.offset < startOffset()) {
logger.info(
"Cannot create a snapshot with an id ({}) less than the log start offset ({})",
snapshotId,
startOffset()
);

return Optional.empty();
}

long highWatermarkOffset = highWatermark().offset;
if (snapshotId.offset > highWatermarkOffset) {
throw new IllegalArgumentException(
Expand All @@ -431,16 +457,6 @@ public Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId)
);
}

if (snapshotId.offset < logStartOffset()) {
throw new IllegalArgumentException(
String.format(
"Cannot create a snapshot with and id (%s) less than the log start offset (%s)",
snapshotId,
logStartOffset()
)
);
}

ValidOffsetAndEpoch validOffsetAndEpoch = validateOffsetAndEpoch(snapshotId.offset, snapshotId.epoch);
if (validOffsetAndEpoch.kind() != ValidOffsetAndEpoch.Kind.VALID) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -490,12 +506,12 @@ public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {}

@Override
public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
if (logStartOffset() > snapshotId.offset) {
if (startOffset() > snapshotId.offset) {
throw new OffsetOutOfRangeException(
String.format(
"New log start (%s) is less than the curent log start offset (%s)",
snapshotId,
logStartOffset()
startOffset()
)
);
}
Expand Down
8 changes: 3 additions & 5 deletions raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class MockLogTest {

@BeforeEach
public void setup() {
log = new MockLog(topicPartition, topicId);
log = new MockLog(topicPartition, topicId, new LogContext());
}

@AfterEach
Expand Down Expand Up @@ -510,10 +511,7 @@ public void testCreateSnapshotBeforeLogStartOffset() {
assertTrue(log.deleteBeforeSnapshot(snapshotId));
assertEquals(snapshotId.offset, log.startOffset());

assertThrows(
IllegalArgumentException.class,
() -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch))
);
assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public static final class Builder {
private final MockTime time = new MockTime();
private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
private final Random random = Mockito.spy(new Random(1));
private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID);
private final LogContext logContext = new LogContext();
private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, logContext);
private final Set<Integer> voters;
private final OptionalInt localId;

Expand Down Expand Up @@ -223,7 +224,6 @@ Builder withClusterId(Uuid clusterId) {
public RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel(voters);
LogContext logContext = new LogContext();
MockListener listener = new MockListener(localId);
Map<Integer, RaftConfig.AddressSpec> voterAddressMap = voters.stream()
.collect(Collectors.toMap(id -> id, RaftClientTestContext::mockAddress));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.raft;

import net.jqwik.api.AfterFailureMode;
import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
import net.jqwik.api.Tag;
Expand Down Expand Up @@ -103,7 +104,7 @@ public class RaftEventSimulationTest {
private static final int FETCH_MAX_WAIT_MS = 100;
private static final int LINGER_MS = 0;

@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canElectInitialLeader(
@ForAll int seed,
@ForAll @IntRange(min = 1, max = 5) int numVoters,
Expand All @@ -122,7 +123,7 @@ void canElectInitialLeader(
scheduler.runUntil(() -> cluster.allReachedHighWatermark(10));
}

@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canElectNewLeaderAfterOldLeaderFailure(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
Expand Down Expand Up @@ -162,7 +163,7 @@ void canElectNewLeaderAfterOldLeaderFailure(
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10));
}

@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canRecoverAfterAllNodesKilled(
@ForAll int seed,
@ForAll @IntRange(min = 1, max = 5) int numVoters,
Expand Down Expand Up @@ -195,7 +196,7 @@ void canRecoverAfterAllNodesKilled(
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10));
}

@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canElectNewLeaderAfterOldLeaderPartitionedAway(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
Expand Down Expand Up @@ -227,7 +228,7 @@ void canElectNewLeaderAfterOldLeaderPartitionedAway(
scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, nonPartitionedNodes));
}

@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canMakeProgressIfMajorityIsReachable(
@ForAll int seed,
@ForAll @IntRange(min = 0, max = 3) int numObservers
Expand Down Expand Up @@ -272,7 +273,7 @@ void canMakeProgressIfMajorityIsReachable(
scheduler.runUntil(() -> cluster.allReachedHighWatermark(30));
}

@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canMakeProgressAfterBackToBackLeaderFailures(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
Expand Down Expand Up @@ -305,7 +306,7 @@ void canMakeProgressAfterBackToBackLeaderFailures(
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(targetHighWatermark));
}

@Property(tries = 100)
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
void canRecoverFromSingleNodeCommittedDataLoss(
@ForAll int seed,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
Expand Down Expand Up @@ -498,7 +499,15 @@ void runUntil(Supplier<Boolean> exitCondition) {

private static class PersistentState {
final MockQuorumStateStore store = new MockQuorumStateStore();
final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID);
final MockLog log;

PersistentState(int nodeId) {
log = new MockLog(
METADATA_PARTITION,
Uuid.METADATA_TOPIC_ID,
new LogContext(String.format("[Node %s] ", nodeId))
);
}
}

private static class Cluster {
Expand All @@ -516,11 +525,11 @@ private Cluster(int numVoters, int numObservers, Random random) {
int nodeId = 0;
for (; nodeId < numVoters; nodeId++) {
voters.add(nodeId);
nodes.put(nodeId, new PersistentState());
nodes.put(nodeId, new PersistentState(nodeId));
}

for (; nodeId < numVoters + numObservers; nodeId++) {
nodes.put(nodeId, new PersistentState());
nodes.put(nodeId, new PersistentState(nodeId));
}
}

Expand Down Expand Up @@ -674,7 +683,7 @@ void startAll() {

void killAndDeletePersistentState(int nodeId) {
kill(nodeId);
nodes.put(nodeId, new PersistentState());
nodes.put(nodeId, new PersistentState(nodeId));
}

private static RaftConfig.AddressSpec nodeAddress(int id) {
Expand Down

0 comments on commit 8134adc

Please sign in to comment.