Skip to content

Commit

Permalink
Suppress error reporting after seeking but before a valid First or Fu…
Browse files Browse the repository at this point in the history
…ll record is encountered.

Fix a spelling mistake.
  • Loading branch information
mikewiacek authored and cmumford committed Dec 9, 2015
1 parent b9afa1f commit ce45404
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 7 deletions.
14 changes: 13 additions & 1 deletion db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
eof_(false),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset) {
initial_offset_(initial_offset),
resyncing_(initial_offset > 0) {
}

Reader::~Reader() {
Expand Down Expand Up @@ -74,6 +75,17 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
const unsigned int record_type = ReadPhysicalRecord(&fragment);
if (resyncing_) {
if (record_type == kMiddleType) {
continue;
} else if (record_type == kLastType) {
resyncing_ = false;
continue;
} else {
resyncing_ = false;
}
}

switch (record_type) {
case kFullType:
if (in_fragmented_record) {
Expand Down
5 changes: 5 additions & 0 deletions db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ class Reader {
// Offset at which to start looking for the first record to return
uint64_t const initial_offset_;

// True if we are resynchronizing after a seek (initial_offset_ > 0). In
// particular, a run of kMiddleType and kLastType records can be silently
// skipped in this mode
bool resyncing_;

// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
Expand Down
33 changes: 27 additions & 6 deletions db/log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class LogTest {
virtual Status Skip(uint64_t n) {
if (n > contents_.size()) {
contents_.clear();
return Status::NotFound("in-memory file skipepd past end");
return Status::NotFound("in-memory file skipped past end");
}

contents_.remove_prefix(n);
Expand All @@ -105,7 +105,7 @@ class LogTest {
ReportCollector report_;
bool reading_;
Writer* writer_;
Reader reader_;
Reader* reader_;

// Record metadata for testing initial offset functionality
static size_t initial_offset_record_sizes_[];
Expand All @@ -114,12 +114,13 @@ class LogTest {
public:
LogTest() : reading_(false),
writer_(new Writer(&dest_)),
reader_(&source_, &report_, true/*checksum*/,
0/*initial_offset*/) {
reader_(new Reader(&source_, &report_, true/*checksum*/,
0/*initial_offset*/)) {
}

~LogTest() {
delete writer_;
delete reader_;
}

void ReopenForAppend() {
Expand All @@ -143,7 +144,7 @@ class LogTest {
}
std::string scratch;
Slice record;
if (reader_.ReadRecord(&record, &scratch)) {
if (reader_->ReadRecord(&record, &scratch)) {
return record.ToString();
} else {
return "EOF";
Expand Down Expand Up @@ -198,6 +199,11 @@ class LogTest {
}
}

void StartReadingAt(uint64_t initial_offset) {
delete reader_;
reader_ = new Reader(&source_, &report_, true/*checksum*/, initial_offset);
}

void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
WriteInitialOffsetLog();
reading_ = true;
Expand Down Expand Up @@ -227,7 +233,6 @@ class LogTest {
ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
delete offset_reader;
}

};

size_t LogTest::initial_offset_record_sizes_[] =
Expand Down Expand Up @@ -463,6 +468,22 @@ TEST(LogTest, PartialLastIsIgnored) {
ASSERT_EQ(0, DroppedBytes());
}

TEST(LogTest, SkipIntoMultiRecord) {
// Consider a fragmented record:
// first(R1), middle(R1), last(R1), first(R2)
// If initial_offset points to a record after first(R1) but before first(R2)
// incomplete fragment errors are not actual errors, and must be suppressed
// until a new first or full record is encountered.
Write(BigString("foo", 3*kBlockSize));
Write("correct");
StartReadingAt(kBlockSize);

ASSERT_EQ("correct", Read());
ASSERT_EQ("", ReportMessage());
ASSERT_EQ(0, DroppedBytes());
ASSERT_EQ("EOF", Read());
}

TEST(LogTest, ErrorJoinsRecords) {
// Consider two fragmented records:
// first(R1) last(R1) first(R2) last(R2)
Expand Down

0 comments on commit ce45404

Please sign in to comment.