Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
airborne12 committed Nov 14, 2024
1 parent 7782e05 commit a228d33
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 134 deletions.
6 changes: 4 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,11 @@ Status Compaction::do_inverted_index_compaction() {
DORIS_TRY(inverted_index_file_readers[src_segment_id]->open(index_meta));
}
for (int dest_segment_id = 0; dest_segment_id < dest_segment_num; dest_segment_id++) {
auto* dest_dir =
auto dest_dir =
DORIS_TRY(inverted_index_file_writers[dest_segment_id]->open(index_meta));
dest_index_dirs[dest_segment_id] = dest_dir;
// Destination directories in dest_index_dirs do not need to be deconstructed,
// but their lifecycle must be managed by inverted_index_file_writers.
dest_index_dirs[dest_segment_id] = dest_dir.get();
}
auto st = compact_column(index_meta->index_id(), src_idx_dirs, dest_index_dirs,
index_tmp_path.native(), trans_vec, dest_segment_num_rows);
Expand Down
7 changes: 0 additions & 7 deletions be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ Status compact_column(int64_t index_id,
// when index_writer is destroyed, if closeDir is set, dir will be close
// _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed.
_CLDECDELETE(dir)
for (auto* d : dest_index_dirs) {
if (d != nullptr) {
// NOTE: DO NOT close dest dir here, because it will be closed when dest index writer finalize.
//d->close();
//_CLDELETE(d);
}
}

// delete temporary segment_path, only when inverted_index_ram_dir_enable is false
if (!config::inverted_index_ram_dir_enable) {
Expand Down
87 changes: 29 additions & 58 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@

#include <glog/logging.h>

#include <algorithm>
#include <filesystem>

#include "common/status.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
#include "olap/rowset/segment_v2/inverted_index_reader.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"

namespace doris::segment_v2 {

Expand All @@ -38,41 +35,35 @@ Status InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_di
return Status::OK();
}

Result<DorisFSDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index_meta) {
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
const auto& local_fs = io::global_local_filesystem();
auto local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path(
tmp_file_dir.native(), _rowset_id, _seg_id, index_meta->index_id(),
index_meta->get_index_suffix());
bool exists = false;
auto st = local_fs->exists(local_fs_index_path, &exists);
DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_error",
{ st = Status::Error<ErrorCode::IO_ERROR>("debug point: no such file error"); })
if (!st.ok()) {
LOG(ERROR) << "index_path:" << local_fs_index_path << " exists error:" << st;
return ResultError(st);
}
DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_true", { exists = true; })
if (exists) {
LOG(ERROR) << "try to init a directory:" << local_fs_index_path << " already exists";
return ResultError(
Status::InternalError("InvertedIndexFileWriter::open directory already exists"));
}

bool can_use_ram_dir = true;
auto* dir = DorisFSDirectoryFactory::getDirectory(local_fs, local_fs_index_path.c_str(),
can_use_ram_dir);
auto key = std::make_pair(index_meta->index_id(), index_meta->get_index_suffix());
auto [it, inserted] = _indices_dirs.emplace(key, std::unique_ptr<DorisFSDirectory>(dir));
Status InvertedIndexFileWriter::_insert_directory_into_map(int64_t index_id,
const std::string& index_suffix,
std::shared_ptr<DorisFSDirectory> dir) {
auto key = std::make_pair(index_id, index_suffix);
auto [it, inserted] = _indices_dirs.emplace(key, std::move(dir));
if (!inserted) {
LOG(ERROR) << "InvertedIndexFileWriter::open attempted to insert a duplicate key: ("
<< key.first << ", " << key.second << ")";
LOG(ERROR) << "Directories already in map: ";
for (const auto& entry : _indices_dirs) {
LOG(ERROR) << "Key: (" << entry.first.first << ", " << entry.first.second << ")";
}
return ResultError(Status::InternalError(
"InvertedIndexFileWriter::open attempted to insert a duplicate dir"));
return Status::InternalError(
"InvertedIndexFileWriter::open attempted to insert a duplicate dir");
}
return Status::OK();
}

Result<std::shared_ptr<DorisFSDirectory>> InvertedIndexFileWriter::open(
const TabletIndex* index_meta) {
auto local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path(
_tmp_dir, _rowset_id, _seg_id, index_meta->index_id(), index_meta->get_index_suffix());
bool can_use_ram_dir = true;
auto dir = std::shared_ptr<DorisFSDirectory>(DorisFSDirectoryFactory::getDirectory(
_local_fs, local_fs_index_path.c_str(), can_use_ram_dir));
auto st =
_insert_directory_into_map(index_meta->index_id(), index_meta->get_index_suffix(), dir);
if (!st.ok()) {
return ResultError(st);
}

return dir;
Expand Down Expand Up @@ -222,7 +213,7 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire
int64_t chunk = bufferLength;

while (remainder > 0) {
int64_t len = std::min(std::min(chunk, length), remainder);
int64_t len = std::min({chunk, length, remainder});
input->readBytes(buffer, len);
output->writeBytes(buffer, len);
remainder -= len;
Expand Down Expand Up @@ -509,40 +500,20 @@ InvertedIndexFileWriter::prepare_file_metadata_v2(int64_t& current_offset) {
for (const auto& entry : _indices_dirs) {
const int64_t index_id = entry.first.first;
const auto& index_suffix = entry.first.second;
const auto& dir = entry.second.get();
auto* dir = entry.second.get();

// Get sorted files
auto sorted_files = get_sorted_files_v2(dir);
auto sorted_files = prepare_sorted_files(dir);

for (const auto& file : sorted_files) {
int64_t file_length = dir->fileLength(file.c_str());
file_metadata.emplace_back(index_id, index_suffix, file, current_offset, file_length,
dir);
current_offset += file_length; // Update the data offset
file_metadata.emplace_back(index_id, index_suffix, file.filename, current_offset,
file.filesize, dir);
current_offset += file.filesize; // Update the data offset
}
}
return file_metadata;
}

std::vector<std::string> InvertedIndexFileWriter::get_sorted_files_v2(
lucene::store::Directory* dir) {
std::vector<std::string> files;
dir->list(&files);

// Remove write.lock file
files.erase(std::remove(files.begin(), files.end(), DorisFSDirectory::WRITE_LOCK_FILE),
files.end());

// Sort files by file length
std::sort(files.begin(), files.end(), [dir](const std::string& a, const std::string& b) {
int64_t size_a = dir->fileLength(a.c_str());
int64_t size_b = dir->fileLength(b.c_str());
return size_a < size_b;
});

return files;
}

void InvertedIndexFileWriter::write_index_headers_and_metadata(
lucene::store::IndexOutput* output, const std::vector<FileMetadata>& file_metadata) {
// Group files by index_id and index_suffix
Expand Down
18 changes: 13 additions & 5 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@

#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "runtime/exec_env.h"

namespace doris {
class TabletIndex;

namespace segment_v2 {
class DorisFSDirectory;
using InvertedIndexDirectoryMap =
std::map<std::pair<int64_t, std::string>, std::unique_ptr<lucene::store::Directory>>;
std::map<std::pair<int64_t, std::string>, std::shared_ptr<lucene::store::Directory>>;

class InvertedIndexFileWriter;
using InvertedIndexFileWriterPtr = std::unique_ptr<InvertedIndexFileWriter>;
Expand All @@ -58,9 +60,13 @@ class InvertedIndexFileWriter {
_rowset_id(std::move(rowset_id)),
_seg_id(seg_id),
_storage_format(storage_format),
_idx_v2_writer(std::move(file_writer)) {}
_local_fs(io::global_local_filesystem()),
_idx_v2_writer(std::move(file_writer)) {
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
_tmp_dir = tmp_file_dir.native();
}

Result<DorisFSDirectory*> open(const TabletIndex* index_meta);
Result<std::shared_ptr<DorisFSDirectory>> open(const TabletIndex* index_meta);
Status delete_index(const TabletIndex* index_meta);
Status initialize(InvertedIndexDirectoryMap& indices_dirs);
~InvertedIndexFileWriter() = default;
Expand Down Expand Up @@ -134,19 +140,21 @@ class InvertedIndexFileWriter {
directory(dir) {}
};
std::vector<FileMetadata> prepare_file_metadata_v2(int64_t& current_offset);
std::vector<std::string> get_sorted_files_v2(lucene::store::Directory* dir);
void write_index_headers_and_metadata(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata);
void copy_files_data_v2(lucene::store::IndexOutput* output,
const std::vector<FileMetadata>& file_metadata);

Status _insert_directory_into_map(int64_t index_id, const std::string& index_suffix,
std::shared_ptr<DorisFSDirectory> dir);
// Member variables...
InvertedIndexDirectoryMap _indices_dirs;
const io::FileSystemSPtr _fs;
std::string _index_path_prefix;
std::string _rowset_id;
int64_t _seg_id;
InvertedIndexStorageFormatPB _storage_format;
std::string _tmp_dir;
const std::shared_ptr<io::LocalFileSystem>& _local_fs;

// write to disk or stream
io::FileWriterPtr _idx_v2_writer = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
bool create_index = true;
bool close_dir_on_shutdown = true;
auto index_writer = std::make_unique<lucene::index::IndexWriter>(
_dir, _analyzer.get(), create_index, close_dir_on_shutdown);
_dir.get(), _analyzer.get(), create_index, close_dir_on_shutdown);
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setRAMBufferSizeMB_error",
{ index_writer->setRAMBufferSizeMB(-100); })
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setMaxBufferedDocs_error",
Expand Down Expand Up @@ -708,7 +708,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer = nullptr;
InvertedIndexCtxSPtr _inverted_index_ctx = nullptr;
DorisFSDirectory* _dir = nullptr;
std::shared_ptr<DorisFSDirectory> _dir = nullptr;
const KeyCoder* _value_key_coder;
const TabletIndex* _index_meta;
InvertedIndexParserType _parser_type;
Expand Down
Loading

0 comments on commit a228d33

Please sign in to comment.