Skip to content

Commit

Permalink
[fix](move-memtable) check segment num when closing each tablet
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Jun 25, 2024
1 parent 4d6172d commit 4a12ca8
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 16 deletions.
3 changes: 2 additions & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ Status DeltaWriterV2::close() {
return _memtable_writer->close();
}

Status DeltaWriterV2::close_wait(RuntimeProfile* profile) {
Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile* profile) {
SCOPED_RAW_TIMER(&_close_wait_time);
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
Expand All @@ -190,6 +190,7 @@ Status DeltaWriterV2::close_wait(RuntimeProfile* profile) {
_update_profile(profile);
}
RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
num_segments = _rowset_writer->next_segment_id();

_delta_written_success = true;
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class DeltaWriterV2 {
Status close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
Status close_wait(RuntimeProfile* profile = nullptr);
Status close_wait(int32_t& num_segments, RuntimeProfile* profile = nullptr);

// abandon current memtable and wait for all pending-flushing memtables to be destructed.
// mem_consumption() should be 0 after this function returns.
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ class BetaRowsetWriterV2 : public RowsetWriter {

int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); };

int32_t next_segment_id() { return _segment_creator.next_segment_id(); };

int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }

int64_t segment_writer_ns() override { return _segment_writer_ns; }
Expand Down
13 changes: 12 additions & 1 deletion be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ Status TabletStream::close() {
if (!_failed_st->ok()) {
return *_failed_st;
}
if (_next_segid.load() != _num_segments) {
return Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
}

Status st = Status::OK();
auto close_func = [this, &mu, &cv, &st]() {
Expand Down Expand Up @@ -307,11 +312,17 @@ Status IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
SCOPED_TIMER(_close_wait_timer);
// open all need commit tablets
for (const auto& tablet : tablets_to_commit) {
if (_id != tablet.index_id()) {
continue;
}
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end() && _id == tablet.index_id()) {
if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id()));
tablet_stream->add_num_segments(tablet.num_segments());
} else {
it->second->add_num_segments(tablet.num_segments());
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TabletStream {

Status append_data(const PStreamHeader& header, butil::IOBuf* data);
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
Status close();
int64_t id() const { return _id; }

Expand All @@ -63,6 +64,7 @@ class TabletStream {
std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
bthread::Mutex _lock;
std::shared_ptr<Status> _failed_st;
PUniqueId _load_id;
Expand Down
9 changes: 6 additions & 3 deletions be/src/vec/sink/delta_writer_v2_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create(
return writer;
}

Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
Status DeltaWriterV2Map::close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
RuntimeProfile* profile) {
int num_use = --_use_cnt;
if (num_use > 0) {
LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use;
Expand All @@ -58,8 +59,10 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
RETURN_IF_ERROR(writer->close());
}
LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
for (auto& [_, writer] : _map) {
RETURN_IF_ERROR(writer->close_wait(profile));
for (auto& [tablet_id, writer] : _map) {
int32_t num_segments;
RETURN_IF_ERROR(writer->close_wait(num_segments, profile));
segments_for_tablet.emplace(tablet_id, num_segments);
}
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/delta_writer_v2_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class DeltaWriterV2Map {
int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator);

// close all delta writers in this DeltaWriterV2Map if there is no other users
Status close(RuntimeProfile* profile = nullptr);
Status close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
RuntimeProfile* profile = nullptr);

// cancel all delta writers in this DeltaWriterV2Map
void cancel(Status status);
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/sink/load_stream_map_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,17 @@ bool LoadStreamMap::release() {
Status LoadStreamMap::close_load(bool incremental) {
return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status {
const auto& tablets = _tablets_to_commit[dst_id];
bool first = true;
for (auto& stream : streams) {
if (stream->is_incremental() != incremental) {
continue;
}
RETURN_IF_ERROR(stream->close_load(tablets));
if (first) {
RETURN_IF_ERROR(stream->close_load(tablets));
first = false;
} else {
RETURN_IF_ERROR(stream->close_load({}));
}
}
return Status::OK();
});
Expand Down
48 changes: 47 additions & 1 deletion be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,53 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) {
std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
buffer_lock.unlock();
VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync;
return _send_with_retry(output);
auto st = _send_with_retry(output);
if (!st.ok()) {
_handle_failure(output, st);
}
return st;
}

void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) {
while (buf.size() > 0) {
// step 1: parse header
size_t hdr_len = 0;
buf.cutn((void*)&hdr_len, sizeof(size_t));
butil::IOBuf hdr_buf;
PStreamHeader hdr;
buf.cutn(&hdr_buf, hdr_len);
butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf);
hdr.ParseFromZeroCopyStream(&wrapper);

// step 2: cut data
size_t data_len = 0;
buf.cutn((void*)&data_len, sizeof(size_t));
butil::IOBuf data_buf;
buf.cutn(&data_buf, data_len);

// step 3: handle failure
switch (hdr.opcode()) {
case PStreamHeader::ADD_SEGMENT:
case PStreamHeader::APPEND_DATA: {
add_failed_tablet(hdr.tablet_id(), st);
} break;
case PStreamHeader::CLOSE_LOAD: {
brpc::StreamClose(_stream_id);
} break;
case PStreamHeader::GET_SCHEMA: {
// Just log and let wait_for_schema timeout
std::ostringstream oss;
for (const auto& tablet : hdr.tablets()) {
oss << " " << tablet.tablet_id();
}
LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", "
<< *this;
} break;
default:
LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ", " << *this;
DCHECK(false);
}
}
}

Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
_success_tablets.push_back(tablet_id);
}

// for tests only
void add_failed_tablet(int64_t tablet_id, Status reason) {
std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
_failed_tablets[tablet_id] = reason;
Expand All @@ -216,6 +215,7 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Status _send_with_retry(butil::IOBuf& buf);
void _handle_failure(butil::IOBuf& buf, Status st);

Status _check_cancel() {
if (!_is_cancelled.load()) {
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ Status VTabletWriterV2::close(Status exec_status) {
{
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
auto st = _delta_writer_for_tablet->close(_profile);
auto st = _delta_writer_for_tablet->close(_segments_for_tablet, _profile);
_delta_writer_for_tablet.reset();
if (!st.ok()) {
RETURN_IF_ERROR(_cancel(st));
Expand Down Expand Up @@ -660,7 +660,9 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
if (VLOG_DEBUG_IS_ON) {
partition_ids.push_back(tablet.partition_id());
}
tablets_to_commit.push_back(tablet);
PTabletID t(tablet);
t.set_num_segments(_segments_for_tablet[tablet_id]);
tablets_to_commit.push_back(t);
}
}
if (VLOG_DEBUG_IS_ON) {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class VTabletWriterV2 final : public AsyncResultWriter {

size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
std::unordered_map<int64_t, int32_t> _segments_for_tablet;

VRowDistribution _row_distribution;
// reuse to avoid frequent memory allocation and release.
Expand Down
10 changes: 6 additions & 4 deletions be/test/vec/exec/delta_writer_v2_pool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) {
EXPECT_EQ(2, pool.size());
EXPECT_EQ(map, map3);
EXPECT_NE(map, map2);
EXPECT_TRUE(map->close().ok());
EXPECT_TRUE(map2->close().ok());
EXPECT_TRUE(map3->close().ok());
std::unordered_map<int64_t, int32_t> sft;
EXPECT_TRUE(map->close(sft).ok());
EXPECT_TRUE(map2->close(sft).ok());
EXPECT_TRUE(map3->close(sft).ok());
EXPECT_EQ(0, pool.size());
}

Expand Down Expand Up @@ -72,7 +73,8 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
EXPECT_NE(writer, writer2);
static_cast<void>(map->close());
std::unordered_map<int64_t, int32_t> sft;
static_cast<void>(map->close(sft));
EXPECT_EQ(0, pool.size());
}

Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ message PTabletID {
optional int64 partition_id = 1;
optional int64 index_id = 2;
optional int64 tablet_id = 3;
optional int64 num_segments = 4;
}

message PTabletInfo {
Expand Down

0 comments on commit 4a12ca8

Please sign in to comment.