Skip to content

Commit

Permalink
Storage mover port (#2039) (#2127)
Browse files Browse the repository at this point in the history
<!--Example: Fixes #1234. See also #3456.-->
Add the StorageMover class from arcticc. Some changes in the
implementation of the StorageMover were needed as
`batch_read_compressed` used to return not only the keys but the
segments as well. In the current ArcticDB version the segment is passed
as continuation and that required keeping additional vector where the
segments themselves are stored.
Make sure all unit tests are passing.
No changes to the API are done.
Note in arcticc it was in the tools module while here it's in the
toolbox module.

<details>
  <summary>
   Checklist for code changes...
  </summary>

- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->

#### Reference Issues/PRs
<!--Example: Fixes #1234. See also #3456.-->

#### What does this implement or fix?

#### Any other comments?

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->
  • Loading branch information
vasil-pashov authored Jan 17, 2025
1 parent 36c0a9f commit 41c373c
Show file tree
Hide file tree
Showing 23 changed files with 1,227 additions and 78 deletions.
3 changes: 3 additions & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ set(arcticdb_srcs
storage/storage.hpp
storage/storage_override.hpp
storage/store.hpp
storage/storage_utils.hpp
stream/aggregator.hpp
stream/aggregator-inl.hpp
stream/append_map.hpp
Expand All @@ -325,6 +326,7 @@ set(arcticdb_srcs
stream/stream_utils.hpp
stream/stream_writer.hpp
toolbox/library_tool.hpp
toolbox/storage_mover.hpp
util/allocator.hpp
util/bitset.hpp
util/buffer.hpp
Expand Down Expand Up @@ -481,6 +483,7 @@ set(arcticdb_srcs
storage/s3/s3_storage.cpp
storage/s3/s3_storage_tool.cpp
storage/storage_factory.cpp
storage/storage_utils.cpp
stream/aggregator.cpp
stream/append_map.cpp
stream/index.cpp
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ class AsyncStore : public Store {
public:
AsyncStore(
std::shared_ptr<storage::Library> library,
const arcticdb::proto::encoding::VariantCodec &codec,
const proto::encoding::VariantCodec &codec,
EncodingVersion encoding_version
) :
library_(std::move(library)),
codec_(std::make_shared<arcticdb::proto::encoding::VariantCodec>(codec)),
codec_(std::make_shared<proto::encoding::VariantCodec>(codec)),
encoding_version_(encoding_version) {
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,13 @@ inline auto& io_executor() {
}

template <typename Task>
inline auto submit_cpu_task(Task&& task) {
auto submit_cpu_task(Task&& task) {
return TaskScheduler::instance()->submit_cpu_task(std::forward<decltype(task)>(task));
}


template <typename Task>
inline auto submit_io_task(Task&& task) {
auto submit_io_task(Task&& task) {
return TaskScheduler::instance()->submit_io_task(std::forward<decltype(task)>(task));
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/entity/atom_key.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ class AtomKeyImpl {
}

friend bool operator<(const AtomKeyImpl &l, const AtomKeyImpl &r) {
auto lt = std::tie(l.id_, l.version_id_, l.index_start_, l.index_end_, l.creation_ts_);
auto rt = std::tie(r.id_, r.version_id_, r.index_start_, r.index_end_, r.creation_ts_);
const auto lt = std::tie(l.id_, l.version_id_, l.index_start_, l.index_end_, l.creation_ts_);
const auto rt = std::tie(r.id_, r.version_id_, r.index_start_, r.index_end_, r.creation_ts_);
return lt < rt;
}

Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/entity/key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ KeyClass key_class_from_key_type(KeyType key_type) {
return get_key_data(key_type).key_class_;
}

const char* get_key_description(KeyType key_type) {
return get_key_data(key_type).description_;
}

bool is_string_key_type(KeyType key_type){
return variant_type_from_key_type(key_type) == VariantType::STRING_TYPE;
}
Expand Down
34 changes: 17 additions & 17 deletions cpp/arcticdb/entity/key.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include <memory>
#include <algorithm>
#include <variant>
#include <array>
#include <ranges>

namespace rng = std::ranges;

namespace arcticdb::entity {

Expand Down Expand Up @@ -191,10 +195,10 @@ enum class KeyType : int {
UNDEFINED
};

inline std::vector<KeyType> key_types_write_precedence() {
consteval auto key_types_write_precedence() {
// TOMBSTONE[_ALL] keys are not included because they're not written to the storage,
// they just exist inside version keys
return {
return std::array {
KeyType::LIBRARY_CONFIG,
KeyType::TABLE_DATA,
KeyType::TABLE_INDEX,
Expand All @@ -213,9 +217,9 @@ inline std::vector<KeyType> key_types_write_precedence() {
};
}

inline std::vector<KeyType> key_types_read_precedence() {
consteval auto key_types_read_precedence() {
auto output = key_types_write_precedence();
std::reverse(std::begin(output), std::end(output));
rng::reverse(output);
return output;
}

Expand Down Expand Up @@ -245,7 +249,7 @@ enum class VariantType : char {

VariantType variant_type_from_key_type(KeyType key_type);

inline bool is_index_key_type(KeyType key_type) {
constexpr bool is_index_key_type(KeyType key_type) {
// TODO: Change name probably.
return (key_type == KeyType::TABLE_INDEX) || (key_type == KeyType::MULTI_KEY);
}
Expand All @@ -256,30 +260,26 @@ bool is_ref_key_class(KeyType k);

bool is_block_ref_key_class(KeyType k);

inline KeyType get_key_type_for_data_stream(const StreamId &) {
constexpr KeyType get_key_type_for_data_stream(const StreamId &) {
return KeyType::TABLE_DATA;
}

inline KeyType get_key_type_for_index_stream(const StreamId &) {
constexpr KeyType get_key_type_for_index_stream(const StreamId &) {
return KeyType::TABLE_INDEX;
}

const char* get_key_description(KeyType type);

template <typename Function>
auto foreach_key_type_read_precedence(Function&& func) {
auto types = key_types_read_precedence();
for(auto type : types) {
func(KeyType(type));
}
constexpr auto foreach_key_type_read_precedence(Function&& func) {
rng::for_each(key_types_read_precedence(), func);
}

template <typename Function>
auto foreach_key_type_write_precedence(Function&& func) {
auto types = key_types_write_precedence();
for(auto type : types) {
func(KeyType(type));
}
constexpr auto foreach_key_type_write_precedence(Function&& func) {
rng::for_each(key_types_write_precedence(), func);
}

inline KeyType key_type_from_int(int type_num) {
util::check(type_num > 0 && type_num < int(KeyType::UNDEFINED), "Unrecognized key type number {}", type_num);
return KeyType(type_num);
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/entity/metrics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ namespace arcticdb {

const std::string MONGO_INSTANCE_LABEL = "mongo_instance";
const std::string PROMETHEUS_ENV_LABEL = "env";
const int SUMMARY_MAX_AGE = 30;
const int SUMMARY_AGE_BUCKETS = 5;
constexpr int SUMMARY_MAX_AGE = 30;
constexpr int SUMMARY_AGE_BUCKETS = 5;

class MetricsConfig {
public:
Expand Down
147 changes: 147 additions & 0 deletions cpp/arcticdb/storage/storage_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#include <arcticdb/storage/storage_utils.hpp>
#include <arcticdb/pipeline/index_writer.hpp>
#include <arcticdb/stream/index_aggregator.hpp>
#include <arcticdb/async/tasks.hpp>

namespace arcticdb {

std::vector<VariantKey> filter_keys_on_existence(
const std::vector<VariantKey>& keys,
const std::shared_ptr<Store>& store,
bool pred
){
auto key_existence = folly::collect(store->batch_key_exists(keys)).get();
std::vector<VariantKey> res;
for (size_t i = 0; i != keys.size(); i++) {
if (key_existence[i] == pred) {
res.push_back(keys[i]);
}
}
return res;
}

void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shared_ptr<Store>& store, bool pred) {
std::vector<VariantKey> var_vector;
var_vector.reserve(keys.size());
rng::copy(keys, std::back_inserter(var_vector));

auto key_existence = store->batch_key_exists(var_vector);

auto keys_itr = keys.begin();
for (size_t i = 0; i != var_vector.size(); i++) {
bool resolved = key_existence[i].wait().value();
if (resolved == pred) {
*keys_itr = std::move(std::get<AtomKey>(var_vector[i]));
++keys_itr;
}
}
keys.erase(keys_itr, keys.end());
}

AtomKey write_table_index_tree_from_source_to_target(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id
) {
ARCTICDB_SAMPLE(WriteIndexSourceToTarget, 0)
// In
auto [_, index_seg] = source_store->read_sync(index_key);
index::IndexSegmentReader index_segment_reader{std::move(index_seg)};
// Out
index::IndexWriter<stream::RowCountIndex> writer(target_store,
{index_key.id(), new_version_id.value_or(index_key.version_id())},
std::move(index_segment_reader.mutable_tsd()));
std::vector<folly::Future<async::CopyCompressedInterStoreTask::ProcessingResult>> futures;
// Process
for (auto iter = index_segment_reader.begin(); iter != index_segment_reader.end(); ++iter) {
auto& sk = *iter;
auto& key = sk.key();
std::optional<entity::AtomKey> key_to_write = atom_key_builder()
.version_id(new_version_id.value_or(key.version_id()))
.creation_ts(util::SysClock::nanos_since_epoch())
.start_index(key.start_index())
.end_index(key.end_index())
.content_hash(key.content_hash())
.build(key.id(), key.type());

writer.add(*key_to_write, sk.slice()); // Both const ref
futures.emplace_back(submit_io_task(async::CopyCompressedInterStoreTask{
sk.key(),
std::move(key_to_write),
false,
false,
source_store,
{target_store}}));
}
const std::vector<async::CopyCompressedInterStoreTask::ProcessingResult> store_results = collect(futures).get();
for (const async::CopyCompressedInterStoreTask::ProcessingResult& res: store_results) {
util::variant_match(
res,
[&](const async::CopyCompressedInterStoreTask::FailedTargets& failed) {
log::storage().error("Failed to move targets: {} from {} to {}", failed, source_store->name(), target_store->name());
},
[](const auto&){});
}
// FUTURE: clean up already written keys if exception
return to_atom(writer.commit().get());
}

AtomKey copy_multi_key_from_source_to_target(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id) {
using namespace arcticdb::stream;
auto fut_index = source_store->read(index_key);
auto [_, index_seg] = std::move(fut_index).get();
std::vector<AtomKey> keys;
for (size_t idx = 0; idx < index_seg.row_count(); idx++) {
keys.push_back(stream::read_key_row(index_seg, static_cast<ssize_t>(idx)));
}
// Recurse on the index keys inside MULTI_KEY
std::vector<VariantKey> new_data_keys;
for (const auto &k: keys) {
auto new_key = copy_index_key_recursively(source_store, target_store, k, new_version_id);
new_data_keys.emplace_back(std::move(new_key));
}
// Write new MULTI_KEY

folly::Future<VariantKey> multi_key_fut = folly::Future<VariantKey>::makeEmpty();
IndexAggregator<RowCountIndex> multi_index_agg(index_key.id(), [&new_version_id, &index_key, &multi_key_fut, &target_store](auto &&segment) {
multi_key_fut = target_store->write(KeyType::MULTI_KEY,
new_version_id.value_or(index_key.version_id()), // version_id
index_key.id(),
0, // start_index
0, // end_index
std::forward<SegmentInMemory>(segment)).wait();
});
for (auto &key: new_data_keys) {
multi_index_agg.add_key(to_atom(key));
}
if (index_seg.has_metadata()) {
google::protobuf::Any metadata = *index_seg.metadata();
multi_index_agg.set_metadata(std::move(metadata));
}
if (index_seg.has_index_descriptor()) {
multi_index_agg.set_timeseries_descriptor(index_seg.index_descriptor());
}
multi_index_agg.commit();
return to_atom(multi_key_fut.value());
}

AtomKey copy_index_key_recursively(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id) {
ARCTICDB_SAMPLE(RecurseIndexKey, 0)
if (index_key.type() == KeyType::TABLE_INDEX) {
return write_table_index_tree_from_source_to_target(source_store, target_store, index_key, new_version_id);
} else if (index_key.type() == KeyType::MULTI_KEY) {
return copy_multi_key_from_source_to_target(source_store, target_store, index_key, new_version_id);
}
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Cannot copy index recursively. Unsupported index key type {}", index_key.type());
}

}
41 changes: 8 additions & 33 deletions cpp/arcticdb/storage/storage_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,13 @@ inline auto stream_id_prefix_matcher(const std::string &prefix) {
std::get<std::string>(id).compare(0u, prefix.size(), prefix) == 0); };
}

inline std::vector<VariantKey> filter_keys_on_existence(
const std::vector<VariantKey>& keys,
const std::shared_ptr<Store>& store,
bool pred
){
auto key_existence = folly::collect(store->batch_key_exists(keys)).get();
std::vector<VariantKey> res;
for (size_t i = 0; i != keys.size(); i++) {
if (key_existence[i] == pred) {
res.push_back(keys[i]);
}
}
return res;
}

inline void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shared_ptr<Store>& store, bool pred) {
std::vector<VariantKey> var_vector;
var_vector.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(var_vector),
[](auto&& k) { return VariantKey(std::move(k)); });

auto key_existence = store->batch_key_exists(var_vector);

auto keys_itr = keys.begin();
for (size_t i = 0; i != var_vector.size(); i++) {
bool resolved = key_existence[i].wait().value();
if (resolved == pred) {
*keys_itr = std::move(std::get<AtomKey>(var_vector[i]));
++keys_itr;
}
}
keys.erase(keys_itr, keys.end());
}
std::vector<VariantKey> filter_keys_on_existence(const std::vector<VariantKey>& keys, const std::shared_ptr<Store>& store, bool pred);
void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shared_ptr<Store>& store, bool pred);

AtomKey copy_index_key_recursively(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id);

} //namespace arcticdb
8 changes: 8 additions & 0 deletions cpp/arcticdb/stream/index_aggregator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class FlatIndexingPolicy {
segment_.set_timeseries_descriptor(timeseries_descriptor);
}

void set_metadata(google::protobuf::Any&& metadata) {
segment_.set_metadata(std::move(metadata));
}

private:
Callback callback_;
FixedSchema schema_;
Expand Down Expand Up @@ -89,6 +93,10 @@ class IndexAggregator {
indexing_policy_.set_timeseries_descriptor(timeseries_descriptor);
}

void set_metadata(google::protobuf::Any&& metadata) {
indexing_policy_.set_metadata(std::move(metadata));
}

private:
IndexingPolicy indexing_policy_;
};
Expand Down
1 change: 0 additions & 1 deletion cpp/arcticdb/stream/segment_aggregator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <arcticdb/pipeline/frame_utils.hpp>
#include <arcticdb/util/format_date.hpp>
#include <arcticdb/util/memory_tracing.hpp>
#include <arcticdb/pipeline/filter_segment.hpp>
#include <arcticdb/stream/merge_utils.hpp>

namespace arcticdb::stream {
Expand Down
Loading

0 comments on commit 41c373c

Please sign in to comment.