Skip to content

Commit

Permalink
fix memeory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
airborne12 committed Nov 14, 2024
1 parent a228d33 commit dc539c3
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 12 deletions.
16 changes: 11 additions & 5 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire

Status InvertedIndexFileWriter::write_v1() {
int64_t total_size = 0;
lucene::store::Directory* out_dir = nullptr;
std::unique_ptr<lucene::store::IndexOutput> output = nullptr;
for (const auto& entry : _indices_dirs) {
const int64_t index_id = entry.first.first;
const auto& index_suffix = entry.first.second;
Expand All @@ -257,7 +259,9 @@ Status InvertedIndexFileWriter::write_v1() {
calculate_header_length(sorted_files, directory);

// Create output stream
auto [out_dir, output] = create_output_stream_v1(index_id, index_suffix);
auto result = create_output_stream_v1(index_id, index_suffix);
out_dir = result.first;
output = std::move(result.second);

size_t start = output->getFilePointer();
// Write header and data
Expand All @@ -274,6 +278,11 @@ Status InvertedIndexFileWriter::write_v1() {
add_index_info(index_id, index_suffix, compound_file_size);

} catch (CLuceneError& err) {
finalize_output_dir(out_dir);
if (output != nullptr) {
output->close();
output.reset();
}
auto index_path = InvertedIndexDescriptor::get_index_file_path_v1(
_index_path_prefix, index_id, index_suffix);
LOG(ERROR) << "CLuceneError occur when write_v1 idx file " << index_path
Expand Down Expand Up @@ -325,10 +334,7 @@ Status InvertedIndexFileWriter::write_v2() {
compound_file_output->close();
compound_file_output.reset();
}
if (out_dir != nullptr) {
out_dir->close();
_CLDECDELETE(out_dir)
}
finalize_output_dir(out_dir);
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur when close idx file: {}, error msg: {}", index_path.c_str(),
err.what());
Expand Down
14 changes: 7 additions & 7 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class InvertedIndexFileWriter {
Result<std::shared_ptr<DorisFSDirectory>> open(const TabletIndex* index_meta);
Status delete_index(const TabletIndex* index_meta);
Status initialize(InvertedIndexDirectoryMap& indices_dirs);
~InvertedIndexFileWriter() = default;
virtual ~InvertedIndexFileWriter() = default;
Status write_v2();
Status write_v1();
Status close();
Expand Down Expand Up @@ -114,10 +114,10 @@ class InvertedIndexFileWriter {
lucene::store::Directory* directory);
std::pair<lucene::store::Directory*, std::unique_ptr<lucene::store::IndexOutput>>
create_output_stream_v1(int64_t index_id, const std::string& index_suffix);
void write_header_and_data_v1(lucene::store::IndexOutput* output,
const std::vector<FileInfo>& sorted_files,
lucene::store::Directory* directory, int64_t header_length,
int32_t header_file_count);
virtual void write_header_and_data_v1(lucene::store::IndexOutput* output,
const std::vector<FileInfo>& sorted_files,
lucene::store::Directory* directory,
int64_t header_length, int32_t header_file_count);
// Helper functions specific to write_v2
std::pair<lucene::store::Directory*, std::unique_ptr<lucene::store::IndexOutput>>
create_output_stream_v2();
Expand All @@ -140,8 +140,8 @@ class InvertedIndexFileWriter {
directory(dir) {}
};
std::vector<FileMetadata> prepare_file_metadata_v2(int64_t& current_offset);
void write_index_headers_and_metadata(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata);
virtual void write_index_headers_and_metadata(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata);
void copy_files_data_v2(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata);
Status _insert_directory_into_map(int64_t index_id, const std::string& index_suffix,
Expand Down
85 changes: 85 additions & 0 deletions be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,91 @@ TEST_F(InvertedIndexFileWriterTest, CopyFileTest_OpenInputFailure) {
{ writer.copyFile("0.segments", mock_dir.get(), nullptr, buffer, sizeof(buffer)); },
CLuceneError);
}
class InvertedIndexFileWriterMock : public InvertedIndexFileWriter {
public:
InvertedIndexFileWriterMock(const io::FileSystemSPtr& fs, const std::string& index_path_prefix,
const std::string& rowset_id, int32_t segment_id,
InvertedIndexStorageFormatPB storage_format)
: InvertedIndexFileWriter(fs, index_path_prefix, rowset_id, segment_id,
storage_format) {}

MOCK_METHOD(void, write_header_and_data_v1,
(lucene::store::IndexOutput * output, const std::vector<FileInfo>& files,
lucene::store::Directory* dir, int64_t header_length, int32_t file_count),
(override));
};
TEST_F(InvertedIndexFileWriterTest, WriteV1ExceptionHandlingTest) {
InvertedIndexFileWriterMock writer_mock(_fs, _index_path_prefix, _rowset_id, _seg_id,
InvertedIndexStorageFormatPB::V1);

int64_t index_id = 1;
std::string index_suffix = "suffix1";
auto index_meta = create_mock_tablet_index(index_id, index_suffix);
ASSERT_NE(index_meta, nullptr);

auto open_result = writer_mock.open(index_meta.get());
ASSERT_TRUE(open_result.has_value());
auto dir = open_result.value();

auto out_file = std::unique_ptr<lucene::store::IndexOutput>(dir->createOutput("test_file"));
out_file->writeString("test data");
out_file->close();
dir->close();
EXPECT_CALL(writer_mock, write_header_and_data_v1(::testing::_, ::testing::_, ::testing::_,
::testing::_, ::testing::_))
.WillOnce(::testing::Throw(CLuceneError(CL_ERR_IO, "Simulated exception", false)));

Status status = writer_mock.write_v1();
ASSERT_FALSE(status.ok());
ASSERT_EQ(status.code(), ErrorCode::INVERTED_INDEX_CLUCENE_ERROR);
}
class InvertedIndexFileWriterMockV2 : public InvertedIndexFileWriter {
public:
InvertedIndexFileWriterMockV2(const io::FileSystemSPtr& fs,
const std::string& index_path_prefix,
const std::string& rowset_id, int32_t segment_id,
InvertedIndexStorageFormatPB storage_format,
io::FileWriterPtr file_writer)
: InvertedIndexFileWriter(fs, index_path_prefix, rowset_id, segment_id, storage_format,
std::move(file_writer)) {}

MOCK_METHOD(void, write_index_headers_and_metadata,
(lucene::store::IndexOutput * compound_file_output,
const std::vector<FileMetadata>& file_metadata),
(override));
};

TEST_F(InvertedIndexFileWriterTest, WriteV2ExceptionHandlingTest) {
io::FileWriterPtr file_writer;
std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
io::FileWriterOptions opts;
Status st = _fs->create_file(index_path, &file_writer, &opts);
ASSERT_TRUE(st.ok());
InvertedIndexFileWriterMockV2 writer_mock(_fs, _index_path_prefix, _rowset_id, _seg_id,
InvertedIndexStorageFormatPB::V2,
std::move(file_writer));

int64_t index_id = 1;
std::string index_suffix = "suffix1";
auto index_meta = create_mock_tablet_index(index_id, index_suffix);
ASSERT_NE(index_meta, nullptr);

auto open_result = writer_mock.open(index_meta.get());
ASSERT_TRUE(open_result.has_value());
auto dir = open_result.value();

auto out_file = std::unique_ptr<lucene::store::IndexOutput>(dir->createOutput("test_file"));
out_file->writeString("test data");
out_file->close();
dir->close();

EXPECT_CALL(writer_mock, write_index_headers_and_metadata(::testing::_, ::testing::_))
.WillOnce(::testing::Throw(CLuceneError(CL_ERR_IO, "Simulated exception", false)));

Status status = writer_mock.write_v2();
ASSERT_FALSE(status.ok());
ASSERT_EQ(status.code(), ErrorCode::INVERTED_INDEX_CLUCENE_ERROR);
}

} // namespace segment_v2
} // namespace doris

0 comments on commit dc539c3

Please sign in to comment.