Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wyxxxcat committed Jul 7, 2024
1 parent 23fd525 commit 08196c6
Show file tree
Hide file tree
Showing 15 changed files with 46 additions and 9 deletions.
3 changes: 3 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1392,9 +1392,12 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
S3ClientConf conf {
.endpoint {},
.region {},
.ak = std::move(new_s3_conf.client_conf.ak),
.sk = std::move(new_s3_conf.client_conf.sk),
.token = std::move(new_s3_conf.client_conf.token),
.bucket {},
.provider = new_s3_conf.client_conf.provider,
};
st = client->reset(conf);
Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
{
.expiration_time = expiration_time,
},
.download_done {},
});
}
#endif
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met
FileReaderOptions opts {
.cache_type = FileCachePolicy::FILE_BLOCK_CACHE,
.is_doris_table = true,
.cache_base_path {},
.file_size = meta.file_size,
};
auto st = meta.file_system->open_file(meta.path, &file_reader, &opts);
Expand Down
6 changes: 5 additions & 1 deletion be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ constexpr std::string_view RANDOM_CACHE_BASE_PATH = "random";

io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state,
const io::FileDescription& fd) {
io::FileReaderOptions opts {.file_size = fd.file_size, .mtime = fd.mtime};
io::FileReaderOptions opts {
.cache_base_path {},
.file_size = fd.file_size,
.mtime = fd.mtime,
};
if (config::enable_file_cache && state != nullptr &&
state->query_options().__isset.enable_file_cache &&
state->query_options().enable_file_cache) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se
.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true,
.cache_base_path = "",
.file_size = _rowset_meta->segment_file_size(seg_id),
};
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options,
Expand Down Expand Up @@ -410,8 +411,7 @@ Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_row
RETURN_IF_ERROR(io::global_local_filesystem()->copy_path(
inverted_index_src_file_path, inverted_index_dst_file_path));
LOG(INFO) << "success to copy file. from=" << inverted_index_src_file_path
<< ", "
<< "to=" << inverted_index_dst_file_path;
<< ", " << "to=" << inverted_index_dst_file_path;
}
}
} else {
Expand Down Expand Up @@ -532,6 +532,7 @@ Status BetaRowset::check_current_rowset_segment() {
.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true,
.cache_base_path {},
.file_size = _rowset_meta->segment_file_size(seg_id),
};
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema,
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr
? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE)
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true};
.is_doris_table = true,
.cache_base_path {},
};
auto s = segment_v2::Segment::open(io::global_local_filesystem(), path, segment_id, rowset_id(),
_context.tablet_schema, reader_options, &segment);
if (!s.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct ColumnIteratorOptions {
// for page cache allocation
// page types are divided into DATA_PAGE & INDEX_PAGE
// INDEX_PAGE including index_page, dict_page and short_key_page
PageTypePB type;
PageTypePB type = PageTypePB::UNKNOWN_PAGE_TYPE;
io::FileReader* file_reader = nullptr; // Ref
// reader statistics
OlapReaderStatistics* stats = nullptr; // Ref
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t
.load_id = _load_id,
.table_schema_param = schema,
// TODO(plat1ko): write_file_cache
.storage_vault_id {},
};

_load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
if (tworkload_group_info.__isset.id) {
tg_id = tworkload_group_info.id;
} else {
return {.valid = false};
return {.name = "", .valid = false};
}

// 2 name
Expand All @@ -266,7 +266,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
if (tworkload_group_info.__isset.version) {
version = tworkload_group_info.version;
} else {
return {.valid = false};
return {.name {}, .valid = false};
}

// 4 cpu_share
Expand Down
1 change: 1 addition & 0 deletions be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB& info) {
.region = info.region(),
.ak = info.ak(),
.sk = info.sk(),
.token {},
.bucket = info.bucket(),
.provider = io::ObjStorageType::AWS,
},
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range) {
.system_type = _params.file_type,
.properties = _params.properties,
.hdfs_params = _params.hdfs_params,
.broker_addresses {},
};
if (range.__isset.file_type) {
// for compatibility
Expand Down
19 changes: 19 additions & 0 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,27 @@ NewOlapScanner::NewOlapScanner(pipeline::ScanLocalStateBase* parent,
_key_ranges(std::move(params.key_ranges)),
_tablet_reader_params({
.tablet = std::move(params.tablet),
.tablet_schema {},
.aggregation = params.aggregation,
.version = {0, params.version},
.start_key {},
.end_key {},
.conditions {},
.bloom_filters {},
.bitmap_filters {},
.in_filters {},
.conditions_except_leafnode_of_andnode {},
.function_filters {},
.delete_predicates {},
.target_cast_type_for_variants {},
.rs_splits {},
.return_columns {},
.output_columns {},
.remaining_conjunct_roots {},
.common_expr_ctxs_push_down {},
.topn_filter_source_node_ids {},
.filter_block_conjuncts {},
.key_group_cluster_key_idxes {},
}) {
_tablet_reader_params.set_read_source(std::move(params.read_source));
_is_init = false;
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* profil
io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name())};
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()),
.fs_name {}};
_fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
io::FileWriterOptions file_writer_options = {.used_by_s3_committer = false};
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options));
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name())};
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()),
.fs_name {}};
// If the destination path contains a schema, use the schema directly.
// If not, use defaultFS.
// Otherwise a write error will occur.
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
.table_schema_param = _schema,
.is_high_priority = _is_high_priority,
.write_file_cache = _write_file_cache,
.storage_vault_id {},
};
bool index_not_found = true;
for (const auto& index : _schema->indexes()) {
Expand Down

0 comments on commit 08196c6

Please sign in to comment.