Skip to content

Commit

Permalink
Merge branch 'unstable' into aleksraiden-patch-6
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk authored Jan 12, 2024
2 parents 842da25 + 4059b43 commit d6a2ee0
Show file tree
Hide file tree
Showing 3 changed files with 614 additions and 0 deletions.
166 changes: 166 additions & 0 deletions src/storage/iterator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "iterator.h"

#include <cluster/redis_slot.h>

#include "db_util.h"

namespace engine {
DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options, int slot)
: storage_(storage), read_options_(std::move(read_options)), slot_(slot) {
metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName);
metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, metadata_cf_handle_));
}

void DBIterator::Next() {
if (!Valid()) return;

metadata_iter_->Next();
nextUntilValid();
}

void DBIterator::nextUntilValid() {
// slot_ != -1 means we would like to iterate all keys in the slot
// so we can skip the afterwards keys if the slot id doesn't match
if (slot_ != -1 && metadata_iter_->Valid()) {
auto [_, user_key] = ExtractNamespaceKey(metadata_iter_->key(), storage_->IsSlotIdEncoded());
// Release the iterator if the slot id doesn't match
if (GetSlotIdFromKey(user_key.ToString()) != slot_) {
Reset();
return;
}
}

while (metadata_iter_->Valid()) {
Metadata metadata(kRedisNone, false);
// Skip the metadata if it's expired
if (metadata.Decode(metadata_iter_->value()).ok() && !metadata.Expired()) {
metadata_ = metadata;
break;
}
metadata_iter_->Next();
}
}

bool DBIterator::Valid() const { return metadata_iter_ && metadata_iter_->Valid(); }

Slice DBIterator::Key() const { return Valid() ? metadata_iter_->key() : Slice(); }

std::tuple<Slice, Slice> DBIterator::UserKey() const {
if (!Valid()) {
return {};
}
return ExtractNamespaceKey(metadata_iter_->key(), slot_ != -1);
}

Slice DBIterator::Value() const { return Valid() ? metadata_iter_->value() : Slice(); }

RedisType DBIterator::Type() const { return Valid() ? metadata_.Type() : kRedisNone; }

void DBIterator::Reset() {
if (metadata_iter_) metadata_iter_.reset();
}

void DBIterator::Seek(const std::string& target) {
if (!metadata_iter_) return;

// Iterate with the slot id but storage didn't enable slot id encoding
if (slot_ != -1 && !storage_->IsSlotIdEncoded()) {
Reset();
return;
}
std::string prefix = target;
if (slot_ != -1) {
// Use the slot id as the prefix if it's specified
prefix = ComposeSlotKeyPrefix(kDefaultNamespace, slot_) + target;
}

metadata_iter_->Seek(prefix);
nextUntilValid();
}

std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator() const {
if (!Valid()) {
return nullptr;
}

// The string/json type doesn't have sub keys
RedisType type = metadata_.Type();
if (type == kRedisNone || type == kRedisString || type == kRedisJson) {
return nullptr;
}

auto prefix = InternalKey(Key(), "", metadata_.version, storage_->IsSlotIdEncoded()).Encode();
return std::make_unique<SubKeyIterator>(storage_, read_options_, type, std::move(prefix));
}

SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix)
: storage_(storage), read_options_(std::move(read_options)), type_(type), prefix_(std::move(prefix)) {
if (type_ == kRedisStream) {
cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName);
} else {
cf_handle_ = storage_->GetCFHandle(kSubkeyColumnFamilyName);
}
iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, cf_handle_));
}

void SubKeyIterator::Next() {
if (!Valid()) return;

iter_->Next();

if (!Valid()) return;

if (!iter_->key().starts_with(prefix_)) {
Reset();
}
}

bool SubKeyIterator::Valid() const { return iter_ && iter_->Valid(); }

Slice SubKeyIterator::Key() const { return Valid() ? iter_->key() : Slice(); }

Slice SubKeyIterator::UserKey() const {
if (!Valid()) return {};

const InternalKey internal_key(iter_->key(), storage_->IsSlotIdEncoded());
return internal_key.GetSubKey();
}

Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : Slice(); }

void SubKeyIterator::Seek() {
if (!iter_) return;

iter_->Seek(prefix_);
if (!iter_->Valid()) return;
// For the subkey iterator, it MUST contain the prefix key itself
if (!iter_->key().starts_with(prefix_)) {
Reset();
}
}

void SubKeyIterator::Reset() {
if (iter_) iter_.reset();
}

} // namespace engine
82 changes: 82 additions & 0 deletions src/storage/iterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#pragma once

#include <rocksdb/iterator.h>
#include <rocksdb/options.h>

#include "storage.h"

namespace engine {

class SubKeyIterator {
public:
explicit SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix);
~SubKeyIterator() = default;
bool Valid() const;
void Seek();
void Next();
// return the raw key in rocksdb
Slice Key() const;
// return the user key without prefix
Slice UserKey() const;
Slice Value() const;
void Reset();

private:
Storage *storage_;
rocksdb::ReadOptions read_options_;
RedisType type_;
std::string prefix_;
std::unique_ptr<rocksdb::Iterator> iter_;
rocksdb::ColumnFamilyHandle *cf_handle_ = nullptr;
};

class DBIterator {
public:
explicit DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot = -1);
~DBIterator() = default;

bool Valid() const;
void Seek(const std::string &target = "");
void Next();
// return the raw key in rocksdb
Slice Key() const;
// return the namespace and user key without prefix
std::tuple<Slice, Slice> UserKey() const;
Slice Value() const;
RedisType Type() const;
void Reset();
std::unique_ptr<SubKeyIterator> GetSubKeyIterator() const;

private:
void nextUntilValid();

Storage *storage_;
rocksdb::ReadOptions read_options_;
int slot_ = -1;
Metadata metadata_ = Metadata(kRedisNone, false);

rocksdb::ColumnFamilyHandle *metadata_cf_handle_ = nullptr;
std::unique_ptr<rocksdb::Iterator> metadata_iter_;
std::unique_ptr<SubKeyIterator> subkey_iter_;
};

} // namespace engine
Loading

0 comments on commit d6a2ee0

Please sign in to comment.