Skip to content

Commit

Permalink
[feature](merge-cloud) Decouple rowset id generator and local rowsets…
Browse files Browse the repository at this point in the history
… gc implementation (#25921)
  • Loading branch information
platoneko authored Nov 10, 2023
1 parent 49cffd0 commit d767804
Show file tree
Hide file tree
Showing 61 changed files with 1,066 additions and 895 deletions.
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,8 +635,6 @@ DEFINE_Bool(path_gc_check, "true");
DEFINE_mInt32(path_gc_check_interval_second, "86400");
DEFINE_mInt32(path_gc_check_step, "1000");
DEFINE_mInt32(path_gc_check_step_interval_ms, "10");
DEFINE_mInt32(path_scan_interval_second, "86400");
DEFINE_mInt32(path_scan_step_interval_ms, "70");

// The following 2 configs limit the max usage of disk capacity of a data dir.
// If both of these 2 threshold reached, no more data can be writen into that data dir.
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -690,8 +690,6 @@ DECLARE_Bool(path_gc_check);
DECLARE_mInt32(path_gc_check_interval_second);
DECLARE_mInt32(path_gc_check_step);
DECLARE_mInt32(path_gc_check_step_interval_ms);
DECLARE_mInt32(path_scan_interval_second);
DECLARE_mInt32(path_scan_step_interval_ms);

// The following 2 configs limit the max usage of disk capacity of a data dir.
// If both of these 2 threshold reached, no more data can be writen into that data dir.
Expand Down
19 changes: 8 additions & 11 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -597,18 +597,15 @@ using ResultError = unexpected<Status>;
} \
} while (false)

// clang-format off
#define DORIS_TRY(stmt) \
({ \
auto&& res = (stmt); \
using T = std::decay_t<decltype(res)>; \
static_assert(tl::detail::is_expected<T>::value); \
if (!res.has_value()) [[unlikely]] { \
return std::forward<T>(res).error(); \
} \
std::forward<T>(res).value(); \
#define DORIS_TRY(stmt) \
({ \
auto&& res = (stmt); \
using T = std::decay_t<decltype(res)>; \
if (!res.has_value()) [[unlikely]] { \
return std::forward<T>(res).error(); \
} \
std::forward<T>(res).value(); \
});
// clang-format on

} // namespace doris

Expand Down
3 changes: 1 addition & 2 deletions be/src/http/action/pad_rowset_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,13 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet, const Version& versi
return Status::InternalError("Input version {} exists", version.to_string());
}

std::unique_ptr<RowsetWriter> writer;
RowsetWriterContext ctx;
ctx.version = version;
ctx.rowset_state = VISIBLE;
ctx.segments_overlap = NONOVERLAPPING;
ctx.tablet_schema = tablet->tablet_schema();
ctx.newest_write_timestamp = UnixSeconds();
RETURN_IF_ERROR(tablet->create_rowset_writer(ctx, &writer));
auto writer = DORIS_TRY(tablet->create_rowset_writer(ctx, false));
RowsetSharedPtr rowset;
RETURN_IF_ERROR(writer->build(rowset));
rowset->make_visible(version);
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ class BaseTablet {

virtual bool exceed_version_limit(int32_t limit) const = 0;

virtual Status create_rowset_writer(RowsetWriterContext& context,
std::unique_ptr<RowsetWriter>* rowset_writer) = 0;
virtual Result<std::unique_ptr<RowsetWriter>> create_rowset_writer(RowsetWriterContext& context,
bool vertical) = 0;

virtual Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/cold_data_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ Status ColdDataCompaction::modify_rowsets(const Merger::Statistics* stats) {
// TODO(plat1ko): process primary key
_tablet->tablet_meta()->set_cooldown_meta_id(cooldown_meta_id);
}
Tablet::erase_pending_remote_rowset(_output_rowset->rowset_id().to_string());
{
std::shared_lock rlock(_tablet->get_header_lock());
_tablet->save_meta();
Expand Down
11 changes: 3 additions & 8 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,6 @@ Status Compaction::do_compaction_impl(int64_t permits) {
RowsetWriterContext ctx;
RETURN_IF_ERROR(construct_input_rowset_readers());
RETURN_IF_ERROR(construct_output_rowset_writer(ctx, vertical_compaction));
if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
Tablet::add_pending_remote_rowset(_output_rs_writer->rowset_id().to_string());
}

// 2. write merged rows to output rowset
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
Expand Down Expand Up @@ -644,10 +641,9 @@ Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
DCHECK(resource.fs->type() != io::FileSystemType::LOCAL);
ctx.fs = std::move(resource.fs);
}
if (is_vertical) {
return _tablet->create_vertical_rowset_writer(ctx, &_output_rs_writer);
}
return _tablet->create_rowset_writer(ctx, &_output_rs_writer);
_output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, is_vertical));
_pending_rs_guard = StorageEngine::instance()->add_pending_rowset(ctx);
return Status::OK();
}

Status Compaction::construct_input_rowset_readers() {
Expand Down Expand Up @@ -806,7 +802,6 @@ bool Compaction::_check_if_includes_input_rowsets(
void Compaction::gc_output_rowset() {
if (_state != CompactionState::SUCCESS && _output_rowset != nullptr) {
if (!_output_rowset->is_local()) {
Tablet::erase_pending_remote_rowset(_output_rowset->rowset_id().to_string());
_tablet->record_unused_remote_rowset(_output_rowset->rowset_id(),
_output_rowset->rowset_meta()->resource_id(),
_output_rowset->num_segments());
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "olap/merger.h"
#include "olap/olap_common.h"
#include "olap/rowid_conversion.h"
#include "olap/rowset/pending_rowset_helper.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/tablet.h"
Expand Down Expand Up @@ -115,6 +116,7 @@ class Compaction {
int64_t _input_index_size;

RowsetSharedPtr _output_rowset;
PendingRowsetGuard _pending_rs_guard;
std::unique_ptr<RowsetWriter> _output_rs_writer;

enum CompactionState { INITED = 0, SUCCESS = 1 };
Expand Down
Loading

0 comments on commit d767804

Please sign in to comment.