Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
csun5285 committed Jun 30, 2024
1 parent 7d9f9a6 commit a26e9de
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 69 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Status DeltaWriterV2::init() {
context.rowset_id = ExecEnv::GetInstance()->storage_engine().next_rowset_id();
context.data_dir = nullptr;
context.partial_update_info = _partial_update_info;
context.memtable_on_sink_support_idx_v2 = true;
context.memtable_on_sink_support_index_v2 = true;

_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,11 @@ Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWri
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)};
std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix);
return _create_file_writer(index_path, file_writer);
} else if (file_type == FileType::SEGMENT_FILE) {
return _create_file_writer(segment_path, file_writer);
}
return _create_file_writer(segment_path, file_writer);
return Status::Error<ErrorCode::INTERNAL_ERROR>(
fmt::format("failed to create file = {}, file type = {}", segment_path, file_type));
}

Status BetaRowsetWriter::_create_segment_writer_for_segcompaction(
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ struct RowsetWriterContext {
std::shared_ptr<FileWriterCreator> file_writer_creator;
std::shared_ptr<SegmentCollector> segment_collector;

// memtable_on_sink_support_idx_v2 = true, we will create SinkFileWriter to send inverted index file
bool memtable_on_sink_support_idx_v2 = false;
// memtable_on_sink_support_index_v2 = true, we will create SinkFileWriter to send inverted index file
bool memtable_on_sink_support_index_v2 = false;

/// begin file cache opts
bool write_file_cache = false;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::Segmen
if (_context.tablet_schema->has_inverted_index() &&
_context.tablet_schema->get_inverted_index_storage_format() >=
InvertedIndexStorageFormatPB::V2 &&
_context.memtable_on_sink_support_idx_v2) {
_context.memtable_on_sink_support_index_v2) {
RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, inverted_file_writer,
FileType::INVERTED_INDEX_FILE));
}
Expand Down Expand Up @@ -178,7 +178,7 @@ Status SegmentFlusher::_create_segment_writer(
if (_context.tablet_schema->has_inverted_index() &&
_context.tablet_schema->get_inverted_index_storage_format() >=
InvertedIndexStorageFormatPB::V2 &&
_context.memtable_on_sink_support_idx_v2) {
_context.memtable_on_sink_support_index_v2) {
RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, inverted_file_writer,
FileType::INVERTED_INDEX_FILE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class DorisFSDirectory::FSIndexOutput : public lucene::store::BufferedIndexOutpu

class DorisFSDirectory::FSIndexOutputV2 : public lucene::store::BufferedIndexOutput {
private:
io::FileWriter* _index_v2_file_writer;
io::FileWriter* _index_v2_file_writer = nullptr;

protected:
void flushBuffer(const uint8_t* b, const int32_t size) override;
Expand Down Expand Up @@ -384,6 +384,7 @@ void DorisFSDirectory::FSIndexOutputV2::flushBuffer(const uint8_t* b, const int3
if (_index_v2_file_writer == nullptr) {
LOG(WARNING) << "File writer is nullptr in DorisFSDirectory::FSIndexOutputV2, "
"ignore flush.";
_CLTHROWA(CL_ERR_IO, "flushBuffer error, _index_v2_file_writer = nullptr");
} else if (b == nullptr) {
LOG(WARNING) << "buffer is nullptr when flushBuffer in "
"DorisFSDirectory::FSIndexOutput";
Expand Down Expand Up @@ -420,6 +421,7 @@ void DorisFSDirectory::FSIndexOutputV2::close() {
}
} else {
LOG(WARNING) << "File writer is nullptr, ignore finalize and close.";
_CLTHROWA(CL_ERR_IO, "close file writer error, _index_v2_file_writer = nullptr");
}
}

Expand Down
9 changes: 6 additions & 3 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,13 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
g_load_stream_flush_running_threads << -1;
auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf, file_type);
if (eos && st.ok()) {
if (file_type == FileType::INVERTED_INDEX_FILE) {
st = _load_stream_writer->close_inverted_index(new_segid);
if (file_type == FileType::SEGMENT_FILE || file_type == FileType::INVERTED_INDEX_FILE) {
st = _load_stream_writer->close_writer(new_segid, file_type);
} else {
st = _load_stream_writer->close_segment(new_segid);
st = Status::InternalError(
"appent data failed, file type error, file type = {}, "
"segment_id={}",
file_type, new_segid);
}
}
if (!st.ok() && _failed_st->ok()) {
Expand Down
69 changes: 22 additions & 47 deletions be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,70 +133,45 @@ Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOB
return file_writer->append(buf.to_string());
}

Status LoadStreamWriter::close_segment(uint32_t segid) {
Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) {
SCOPED_ATTACH_TASK(_query_thread_context);
io::FileWriter* file_writer = nullptr;
auto& file_writers =
file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers;
{
std::lock_guard lock_guard(_lock);
DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.uninited_writer", { _is_init = false; });
DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.uninited_writer", { _is_init = false; });
if (!_is_init) {
return Status::Corruption("close_segment failed, LoadStreamWriter is not inited");
return Status::Corruption("close_writer failed, LoadStreamWriter is not inited");
}
DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.bad_segid",
{ segid = _segment_file_writers.size(); });
if (segid >= _segment_file_writers.size()) {
return Status::Corruption("close_segment failed, segment {} is never opened", segid);
DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid",
{ segid = file_writers.size(); });
if (segid >= file_writers.size()) {
return Status::Corruption(
"close_writer failed, file {} is never opened, file type is {}", segid,
file_type);
}
file_writer = _segment_file_writers[segid].get();
file_writer = file_writers[segid].get();
}
DBUG_EXECUTE_IF("LoadStreamWriter.close_segment.null_file_writer", { file_writer = nullptr; });

DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", { file_writer = nullptr; });
if (file_writer == nullptr) {
return Status::Corruption("close_segment failed, file writer {} is destoryed", segid);
return Status::Corruption(
"close_writer failed, file writer {} is destoryed, fiel type is {}", segid,
file_type);
}
auto st = file_writer->close();
if (!st.ok()) {
_is_canceled = true;
return st;
}
g_load_stream_file_writer_cnt << -1;
LOG(INFO) << "segment " << segid << " path " << file_writer->path().native()
<< "closed, written " << file_writer->bytes_appended() << " bytes";
LOG(INFO) << "file " << segid << " path " << file_writer->path().native() << "closed, written "
<< file_writer->bytes_appended() << " bytes"
<< ", file type is " << file_type;
if (file_writer->bytes_appended() == 0) {
return Status::Corruption("segment {} closed with 0 bytes", file_writer->path().native());
}
return Status::OK();
}

Status LoadStreamWriter::close_inverted_index(uint32_t segid) {
SCOPED_ATTACH_TASK(_query_thread_context);
io::FileWriter* inverted_file_writer = nullptr;
{
std::lock_guard lock_guard(_lock);
if (!_is_init) {
return Status::Corruption(
"inverted file writer failed, LoadStreamWriter is not inited");
}
if (segid >= _inverted_file_writers.size()) {
return Status::Corruption(
"inverted file writer failed, inverted file {} is never opened", segid);
}
inverted_file_writer = _inverted_file_writers[segid].get();
}
if (inverted_file_writer == nullptr) {
return Status::Corruption("inverted file writer failed, file writer {} is destoryed",
segid);
}
auto st = inverted_file_writer->close();
if (!st.ok()) {
_is_canceled = true;
return st;
}
LOG(INFO) << "inverted file writer " << segid << " path "
<< inverted_file_writer->path().native() << "closed, written "
<< inverted_file_writer->bytes_appended() << " bytes";
if (inverted_file_writer->bytes_appended() == 0) {
return Status::Corruption("inverted file writer {} closed with 0 bytes",
inverted_file_writer->path().native());
return Status::Corruption("file {} closed with 0 bytes, file type is {}",
file_writer->path().native(), file_type);
}
return Status::OK();
}
Expand Down
4 changes: 1 addition & 3 deletions be/src/runtime/load_stream_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class LoadStreamWriter {
Status append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf,
FileType file_type = FileType::SEGMENT_FILE);

Status close_inverted_index(uint32_t segid);

Status close_segment(uint32_t segid);
Status close_writer(uint32_t segid, FileType file_type);

Status add_segment(uint32_t segid, const SegmentStatistics& stat, TabletSchemaSPtr flush_chema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,16 @@ suite("load_stream_fault_injection", "nonConcurrent") {
load_with_injection("LocalFileSystem.create_file_impl.open_file_failed", "")
// LoadStreamWriter append_data meet null file writer error
load_with_injection("LoadStreamWriter.append_data.null_file_writer", "")
// LoadStreamWriter close_segment meet not inited error
load_with_injection("LoadStreamWriter.close_segment.uninited_writer", "")
// LoadStreamWriter close_segment meet not bad segid error
load_with_injection("LoadStreamWriter.close_segment.bad_segid", "")
// LoadStreamWriter close_segment meet null file writer error
load_with_injection("LoadStreamWriter.close_segment.null_file_writer", "")
// LoadStreamWriter close_segment meet file writer failed to close error
// LoadStreamWriter close_writer meet not inited error
load_with_injection("LoadStreamWriter.close_writer.uninited_writer", "")
// LoadStreamWriter close_writer meet not bad segid error
load_with_injection("LoadStreamWriter.close_writer.bad_segid", "")
// LoadStreamWriter close_writer meet null file writer error
load_with_injection("LoadStreamWriter.close_writer.null_file_writer", "")
// LoadStreamWriter close_writer meet file writer failed to close error
load_with_injection("LocalFileWriter.close.failed", "")
// LoadStreamWriter close_segment meet bytes_appended and real file size not match error
load_with_injection("FileWriter.close_segment.zero_bytes_appended", "")
// LoadStreamWriter close_writer meet bytes_appended and real file size not match error
load_with_injection("FileWriter.close_writer.zero_bytes_appended", "")
// LoadStreamWriter add_segment meet not inited error
load_with_injection("LoadStreamWriter.add_segment.uninited_writer", "")
// LoadStreamWriter add_segment meet not bad segid error
Expand Down

0 comments on commit a26e9de

Please sign in to comment.