Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose deleted keys through iteration. #1084

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db/comparator_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class KVIter : public Iterator {
}

virtual Slice key() const override { return iter_->first; }
virtual bool key_is_deleted() const override { return false; }
virtual Slice value() const override { return iter_->second; }
virtual Status status() const override { return Status::OK(); }

Expand Down
6 changes: 4 additions & 2 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4021,7 +4021,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_options.iterate_upper_bound,
read_options.prefix_same_as_start, read_options.pin_data);
read_options.prefix_same_as_start, read_options.pin_data,
read_options.skip_deleted_keys);
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
Expand Down Expand Up @@ -4133,7 +4134,8 @@ Status DBImpl::NewIterators(
env_, *cfd->ioptions(), cfd->user_comparator(), iter,
kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, nullptr, false, read_options.pin_data));
sv->version_number, nullptr, false, read_options.pin_data,
read_options.skip_deleted_keys));
}
#endif
} else {
Expand Down
131 changes: 111 additions & 20 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class DBIter: public Iterator {
InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false)
bool prefix_same_as_start = false,
bool skip_deleted_keys = true)
: arena_mode_(arena_mode),
env_(env),
logger_(ioptions.info_log),
Expand All @@ -118,7 +119,8 @@ class DBIter: public Iterator {
version_number_(version_number),
iterate_upper_bound_(iterate_upper_bound),
prefix_same_as_start_(prefix_same_as_start),
iter_pinned_(false) {
iter_pinned_(false),
skip_deleted_keys_(skip_deleted_keys) {
RecordTick(statistics_, NO_ITERATORS);
prefix_extractor_ = ioptions.prefix_extractor;
max_skip_ = max_sequential_skip_in_iterations;
Expand All @@ -140,10 +142,17 @@ class DBIter: public Iterator {
}
}
virtual bool Valid() const override { return valid_; }

virtual Slice key() const override {
assert(valid_);
return saved_key_.GetKey();
}

virtual bool key_is_deleted() const override {
assert(valid_);
return saved_key_.IsDeleted();
}

virtual Slice value() const override {
assert(valid_);
return (direction_ == kForward && !current_entry_is_merged_) ?
Expand Down Expand Up @@ -207,6 +216,46 @@ class DBIter: public Iterator {
virtual void SeekToFirst() override;
virtual void SeekToLast() override;

private:

// Class that wraps the IterKey and keeps track of delete status. This is
// implemented as a separate class to force users to consider the deletion
// status as a first class citizen when iterating the db.
class IterKeyWithDeletionStatus {
public:
IterKeyWithDeletionStatus()
: is_deleted_(false) {}

Slice GetKey() const { return key_.GetKey(); }
bool IsDeleted() const { return is_deleted_; }
bool IsKeyPinned() const { return key_.IsKeyPinned(); }

void Clear() { is_deleted_ = false; key_.Clear(); }

void SetInternalKey(const Slice& user_key, SequenceNumber s) {
is_deleted_ = false;
key_.SetInternalKey(user_key, s);
}

void SetInternalKey(const ParsedInternalKey& parsed_key) {
is_deleted_ = false;
key_.SetInternalKey(parsed_key);
}

Slice SetKey(const Slice& key, bool copy, bool is_deleted) {
is_deleted_ = is_deleted;
return key_.SetKey(key, copy);
}

void SetDeleted(bool is_deleted) {
is_deleted_ = is_deleted;
}

private:
IterKey key_;
bool is_deleted_;
};

private:
void ReverseToBackward();
void PrevInternal();
Expand Down Expand Up @@ -239,7 +288,7 @@ class DBIter: public Iterator {
SequenceNumber const sequence_;

Status status_;
IterKey saved_key_;
IterKeyWithDeletionStatus saved_key_;
std::string saved_value_;
Direction direction_;
bool valid_;
Expand All @@ -251,6 +300,7 @@ class DBIter: public Iterator {
IterKey prefix_start_;
bool prefix_same_as_start_;
bool iter_pinned_;
const bool skip_deleted_keys_;
// List of operands for merge operator.
MergeContext merge_context_;
LocalStatistics local_stats_;
Expand Down Expand Up @@ -353,20 +403,28 @@ void DBIter::FindNextUserEntryInternal(bool skipping) {
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
saved_key_.SetKey(ikey.user_key,
!iter_->IsKeyPinned() /* copy */);
!iter_->IsKeyPinned() /* copy */,
true /* is_deleted */);
if (!skip_deleted_keys_) {
valid_ = true;
return;
}

skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeValue:
valid_ = true;
saved_key_.SetKey(ikey.user_key,
!iter_->IsKeyPinned() /* copy */);
!iter_->IsKeyPinned() /* copy */,
false /* is_deleted */);
return;
case kTypeMerge:
// By now, we are sure the current ikey is going to yield a value
saved_key_.SetKey(ikey.user_key,
!iter_->IsKeyPinned() /* copy */);
!iter_->IsKeyPinned() /* copy */,
false /* is_deleted */);
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
Expand Down Expand Up @@ -529,7 +587,8 @@ void DBIter::PrevInternal() {

while (iter_->Valid()) {
saved_key_.SetKey(ExtractUserKey(iter_->key()),
!iter_->IsKeyPinned() /* copy */);
!iter_->IsKeyPinned() /* copy */,
false /* is_deleted */);
if (FindValueForCurrentKey()) {
valid_ = true;
if (!iter_->Valid()) {
Expand All @@ -556,7 +615,9 @@ void DBIter::PrevInternal() {

// This function checks, if the entry with biggest sequence_number <= sequence_
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
// saved_value_
// saved_value_. If skip_deleted_keys_ is not set, this function will treat
// deleted keys as valid keys, except for the fact that saved_value_ will not
// be set.
bool DBIter::FindValueForCurrentKey() {
assert(iter_->Valid());
merge_context_.Clear();
Expand Down Expand Up @@ -587,6 +648,9 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeSingleDeletion:
merge_context_.Clear();
last_not_merge_type = last_key_entry_type;
// TODO: Think about the accounting for internal_deleted_skipped_count
// if we are not skipping deleted keys. Should we update it only the
// first time we encounter a deleted key in this loop?
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeMerge:
Expand All @@ -605,10 +669,6 @@ bool DBIter::FindValueForCurrentKey() {
}

switch (last_key_entry_type) {
case kTypeDeletion:
case kTypeSingleDeletion:
valid_ = false;
return false;
case kTypeMerge:
if (last_not_merge_type == kTypeDeletion) {
StopWatchNano timer(env_, statistics_ != nullptr);
Expand Down Expand Up @@ -636,6 +696,15 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeValue:
// do nothing - we've already has value in saved_value_
break;
case kTypeDeletion:
case kTypeSingleDeletion:
saved_key_.SetDeleted(true);
if (skip_deleted_keys_) {
valid_ = false;
return false;
}
// If we don't intend to skip deleted keys, do nothing.
break;
default:
assert(false);
break;
Expand Down Expand Up @@ -664,8 +733,18 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
valid_ = true;
return true;
}
valid_ = false;
return false;

saved_key_.SetDeleted(true);

if (skip_deleted_keys_) {
valid_ = false;
return false;
}

// Control reaches here if we are dealing with a deleted key, and we do not
// want to ignore them.
valid_ = true;
return true;
}

// kTypeMerge. We need to collect all kTypeMerge values and save them
Expand All @@ -682,6 +761,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
// Handling for the case when we encounter a delete after processing a
// bunch of merges. In that case, we should merge all the value we saw so
// far.
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);
Expand All @@ -700,6 +782,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
return true;
}

// Control reaches here if we encounter a kTypeValue after a bunch of merges.
// In that case, apply all the merges over this value, and keep the value
// in saved_value_.
const Slice& val = iter_->value();
{
StopWatchNano timer(env_, statistics_ != nullptr);
Expand Down Expand Up @@ -773,7 +858,7 @@ void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
void DBIter::Seek(const Slice& target) {
StopWatch sw(env_, statistics_, DB_SEEK);
saved_key_.Clear();
// now savved_key is used to store internal key.
// now saved_key is used to store internal key.
saved_key_.SetInternalKey(target, sequence_);

{
Expand Down Expand Up @@ -848,7 +933,8 @@ void DBIter::SeekToLast() {
// it will seek to the last key before the
// ReadOptions.iterate_upper_bound
if (iter_->Valid() && iterate_upper_bound_ != nullptr) {
saved_key_.SetKey(*iterate_upper_bound_, false /* copy */);
saved_key_.SetKey(*iterate_upper_bound_, false /* copy */,
false /* is_deleted */);
std::string last_key;
AppendInternalKey(&last_key,
ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber,
Expand Down Expand Up @@ -886,11 +972,12 @@ Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
uint64_t max_sequential_skip_in_iterations,
uint64_t version_number,
const Slice* iterate_upper_bound,
bool prefix_same_as_start, bool pin_data) {
bool prefix_same_as_start, bool pin_data,
bool skip_deleted_keys) {
DBIter* db_iter =
new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
false, max_sequential_skip_in_iterations, version_number,
iterate_upper_bound, prefix_same_as_start);
iterate_upper_bound, prefix_same_as_start, skip_deleted_keys);
if (pin_data) {
db_iter->PinData();
}
Expand All @@ -914,6 +1001,9 @@ inline void ArenaWrappedDBIter::Seek(const Slice& target) {
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline bool ArenaWrappedDBIter::key_is_deleted() const {
return db_iter_->key_is_deleted();
}
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
inline Status ArenaWrappedDBIter::PinData() { return db_iter_->PinData(); }
Expand All @@ -934,14 +1024,15 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
const Comparator* user_key_comparator, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
const Slice* iterate_upper_bound, bool prefix_same_as_start,
bool pin_data) {
bool pin_data, bool skip_deleted_keys) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
Arena* arena = iter->GetArena();
auto mem = arena->AllocateAligned(sizeof(DBIter));
DBIter* db_iter =
new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
true, max_sequential_skip_in_iterations, version_number,
iterate_upper_bound, prefix_same_as_start);
iterate_upper_bound, prefix_same_as_start,
skip_deleted_keys);

iter->SetDBIter(db_iter);
if (pin_data) {
Expand Down
7 changes: 5 additions & 2 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ extern Iterator* NewDBIterator(
const Comparator* user_key_comparator, InternalIterator* internal_iter,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false, bool pin_data = false);
bool prefix_same_as_start = false, bool pin_data = false,
bool skip_deleted_keys = true);

// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of
Expand Down Expand Up @@ -59,6 +60,7 @@ class ArenaWrappedDBIter : public Iterator {
virtual void Next() override;
virtual void Prev() override;
virtual Slice key() const override;
virtual bool key_is_deleted() const override;
virtual Slice value() const override;
virtual Status status() const override;

Expand All @@ -78,6 +80,7 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
const Comparator* user_key_comparator, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false, bool pin_data = false);
bool prefix_same_as_start = false, bool pin_data = false,
bool skip_deleted_keys = true);

} // namespace rocksdb
1 change: 1 addition & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3236,6 +3236,7 @@ class ModelDB : public DB {
virtual Slice key() const override { return iter_->first; }
virtual Slice value() const override { return iter_->second; }
virtual Status status() const override { return Status::OK(); }
virtual bool key_is_deleted() const override { return false; }

private:
const KVMap* const map_;
Expand Down
6 changes: 6 additions & 0 deletions db/managed_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ Slice ManagedIterator::value() const {
return cached_value_.GetKey();
}

bool ManagedIterator::key_is_deleted() const {
assert(valid_);
return cached_key_is_deleted_;
}

Status ManagedIterator::status() const { return status_; }

void ManagedIterator::RebuildIterator() {
Expand All @@ -219,6 +224,7 @@ void ManagedIterator::UpdateCurrent() {
status_ = Status::OK();
cached_key_.SetKey(mutable_iter_->key());
cached_value_.SetKey(mutable_iter_->value());
cached_key_is_deleted_ = mutable_iter_->key_is_deleted();
}

void ManagedIterator::ReleaseIter(bool only_old) {
Expand Down
2 changes: 2 additions & 0 deletions db/managed_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ManagedIterator : public Iterator {
virtual void Next() override;
virtual Slice key() const override;
virtual Slice value() const override;
virtual bool key_is_deleted() const override;
virtual Status status() const override;
void ReleaseIter(bool only_old);
void SetDropOld(bool only_old) {
Expand Down Expand Up @@ -73,6 +74,7 @@ class ManagedIterator : public Iterator {

IterKey cached_key_;
IterKey cached_value_;
bool cached_key_is_deleted_ = false;

bool only_drop_old_ = true;
bool snapshot_created_;
Expand Down
Loading